tildefriends/src/ssb.tests.c

822 lines
25 KiB
C

#include "ssb.h"
#include "log.h"
#include "mem.h"
#include "ssb.db.h"
#include "ssb.js.h"
#include "tests.h"
#include "trace.h"
#include "util.js.h"
#include "uv.h"
#include <assert.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <unistd.h>
void tf_ssb_test_id_conversion(const tf_test_options_t* options)
{
tf_printf("Testing id conversion.\n");
uint8_t bin[k_id_bin_len] = { 0 };
char str[k_id_base64_len] = { 0 };
const char* k_id = "@bzRTe6hgOII2yZ1keGGoNoQgostjQc830trHc453crY=.ed25519";
(void)bin;
(void)str;
(void)k_id;
bool b = tf_ssb_id_str_to_bin(bin, k_id);
(void)b;
assert(b);
b = tf_ssb_id_bin_to_str(str, sizeof(str), bin);
assert(b);
b = strcmp(str, k_id) == 0;
assert(b);
}
typedef struct _test_t
{
tf_ssb_t* ssb0;
tf_ssb_t* ssb1;
tf_ssb_t* ssb2;
int connection_count0;
int connection_count1;
int connection_count2;
int broadcast_count0;
int broadcast_count1;
int broadcast_count2;
} test_t;
static void _ssb_test_connections_changed(tf_ssb_t* ssb, tf_ssb_change_t change, tf_ssb_connection_t* connection, void* user_data)
{
test_t* test = user_data;
int count = 0;
const char** c = tf_ssb_get_connection_ids(ssb);
for (const char** p = c; *p; p++)
{
count++;
}
tf_free(c);
if (ssb == test->ssb0)
{
tf_printf("callback0 change=%d connection=%p\n", change, connection);
test->connection_count0 = count;
}
else if (ssb == test->ssb1)
{
tf_printf("callback1 change=%d connection=%p\n", change, connection);
test->connection_count1 = count;
}
else if (ssb == test->ssb2)
{
tf_printf("callback2 change=%d connection=%p\n", change, connection);
test->connection_count2 = count;
}
tf_printf("conns = %d %d %d\n", test->connection_count0, test->connection_count1, test->connection_count2);
}
typedef struct _count_messages_t
{
tf_ssb_t* ssb;
int count;
} count_messages_t;
static void _count_messages_callback(JSValue row, void* user_data)
{
count_messages_t* count = user_data;
JSContext* context = tf_ssb_get_context(count->ssb);
JS_ToInt32(context, &count->count, JS_GetPropertyStr(context, row, "count"));
}
static int _ssb_test_count_messages(tf_ssb_t* ssb)
{
count_messages_t count = { .ssb = ssb };
tf_ssb_db_visit_query(ssb, "SELECT COUNT(*) AS count FROM messages", JS_UNDEFINED, _count_messages_callback, &count);
return count.count;
}
static void _message_added(tf_ssb_t* ssb, const char* id, void* user_data)
{
++*(int*)user_data;
}
static void _ssb_test_idle(uv_idle_t* idle)
{
tf_ssb_t* ssb = idle->data;
JSRuntime* runtime = JS_GetRuntime(tf_ssb_get_context(ssb));
while (JS_IsJobPending(runtime))
{
JSContext* context = NULL;
int r = JS_ExecutePendingJob(runtime, &context);
JSValue result = JS_GetException(context);
if (context)
{
tf_util_report_error(context, result);
}
if (r == 0)
{
break;
}
}
}
static void _message_stored(const char* id, bool verified, bool is_new, void* user_data)
{
*(bool*)user_data = true;
}
static void _wait_stored(tf_ssb_t* ssb, bool* stored)
{
while (!*stored)
{
uv_run(tf_ssb_get_loop(ssb), UV_RUN_ONCE);
}
}
void tf_ssb_test_ssb(const tf_test_options_t* options)
{
tf_printf("Testing SSB.\n");
uv_loop_t loop = { 0 };
uv_loop_init(&loop);
unlink("out/test_db0.sqlite");
tf_ssb_t* ssb0 = tf_ssb_create(&loop, NULL, "file:out/test_db0.sqlite");
tf_ssb_register(tf_ssb_get_context(ssb0), ssb0);
unlink("out/test_db1.sqlite");
tf_ssb_t* ssb1 = tf_ssb_create(&loop, NULL, "file:out/test_db1.sqlite");
tf_ssb_register(tf_ssb_get_context(ssb1), ssb1);
uv_idle_t idle0 = { .data = ssb0 };
uv_idle_init(&loop, &idle0);
uv_idle_start(&idle0, _ssb_test_idle);
uv_idle_t idle1 = { .data = ssb1 };
uv_idle_init(&loop, &idle1);
uv_idle_start(&idle1, _ssb_test_idle);
test_t test = {
.ssb0 = ssb0,
.ssb1 = ssb1,
};
tf_ssb_add_connections_changed_callback(ssb0, _ssb_test_connections_changed, NULL, &test);
tf_ssb_add_connections_changed_callback(ssb1, _ssb_test_connections_changed, NULL, &test);
tf_ssb_generate_keys(ssb0);
tf_ssb_generate_keys(ssb1);
uint8_t priv0[512] = { 0 };
uint8_t priv1[512] = { 0 };
tf_ssb_get_private_key(ssb0, priv0, sizeof(priv0));
tf_ssb_get_private_key(ssb1, priv1, sizeof(priv1));
char id0[k_id_base64_len] = { 0 };
char id1[k_id_base64_len] = { 0 };
bool b = tf_ssb_whoami(ssb0, id0, sizeof(id0));
(void)b;
assert(b);
b = tf_ssb_whoami(ssb1, id1, sizeof(id1));
assert(b);
tf_printf("ID %s and %s\n", id0, id1);
char blob_id[k_id_base64_len] = { 0 };
const char* k_blob = "Hello, blob!";
b = tf_ssb_db_blob_store(ssb0, (const uint8_t*)k_blob, strlen(k_blob), blob_id, sizeof(blob_id), NULL);
assert(b);
JSContext* context0 = tf_ssb_get_context(ssb0);
JSValue obj = JS_NewObject(context0);
JS_SetPropertyStr(context0, obj, "type", JS_NewString(context0, "post"));
JS_SetPropertyStr(context0, obj, "text", JS_NewString(context0, "Hello, world!"));
bool stored = false;
JSValue signed_message = tf_ssb_sign_message(ssb0, id0, priv0, obj);
tf_ssb_verify_strip_and_store_message(ssb0, signed_message, _message_stored, &stored);
JS_FreeValue(context0, signed_message);
_wait_stored(ssb0, &stored);
JS_FreeValue(context0, obj);
obj = JS_NewObject(context0);
JS_SetPropertyStr(context0, obj, "type", JS_NewString(context0, "post"));
JS_SetPropertyStr(context0, obj, "text", JS_NewString(context0, "First post."));
stored = false;
signed_message = tf_ssb_sign_message(ssb0, id0, priv0, obj);
tf_ssb_verify_strip_and_store_message(ssb0, signed_message, _message_stored, &stored);
JS_FreeValue(context0, signed_message);
_wait_stored(ssb0, &stored);
JS_FreeValue(context0, obj);
obj = JS_NewObject(context0);
JS_SetPropertyStr(context0, obj, "type", JS_NewString(context0, "post"));
JS_SetPropertyStr(context0, obj, "text", JS_NewString(context0, "First post."));
JSValue mentions = JS_NewArray(context0);
JSValue mention = JS_NewObject(context0);
JS_SetPropertyStr(context0, mention, "link", JS_NewString(context0, blob_id));
JS_SetPropertyUint32(context0, mentions, 0, mention);
JS_SetPropertyStr(context0, obj, "mentions", mentions);
stored = false;
signed_message = tf_ssb_sign_message(ssb0, id0, priv0, obj);
tf_ssb_verify_strip_and_store_message(ssb0, signed_message, _message_stored, &stored);
JS_FreeValue(context0, signed_message);
_wait_stored(ssb0, &stored);
JS_FreeValue(context0, obj);
uint8_t* b0;
size_t s0 = 0;
b = tf_ssb_db_blob_get(ssb0, blob_id, &b0, &s0);
assert(b);
assert(s0 == strlen(k_blob));
assert(memcmp(b0, k_blob, strlen(k_blob)) == 0);
tf_free(b0);
b = !tf_ssb_db_blob_get(ssb1, blob_id, NULL, NULL);
assert(b);
tf_ssb_server_open(ssb0, 12347);
uint8_t id0bin[k_id_bin_len];
tf_ssb_id_str_to_bin(id0bin, id0);
tf_ssb_connect(ssb1, "127.0.0.1", 12347, id0bin);
tf_printf("Waiting for connection.\n");
while (test.connection_count0 != 1 || test.connection_count1 != 1)
{
uv_run(&loop, UV_RUN_ONCE);
}
tf_ssb_server_close(ssb0);
tf_printf("Waiting for messages.\n");
while (_ssb_test_count_messages(ssb1) < 3)
{
uv_run(&loop, UV_RUN_ONCE);
}
tf_printf("Waiting for blob.\n");
while (!tf_ssb_db_blob_get(ssb1, blob_id, NULL, NULL))
{
uv_run(&loop, UV_RUN_ONCE);
}
uint8_t* b1;
size_t s1 = 0;
b = tf_ssb_db_blob_get(ssb1, blob_id, &b1, &s1);
assert(b);
tf_printf("s1 = %zd sl = %zd\n", s1, strlen(k_blob));
assert(s1 == strlen(k_blob));
assert(memcmp(b1, k_blob, strlen(k_blob)) == 0);
tf_free(b1);
tf_printf("Waiting for message to self.\n");
int count0 = 0;
int count1 = 0;
tf_ssb_add_message_added_callback(ssb0, _message_added, NULL, &count0);
tf_ssb_add_message_added_callback(ssb1, _message_added, NULL, &count1);
obj = JS_NewObject(context0);
JS_SetPropertyStr(context0, obj, "type", JS_NewString(context0, "post"));
JS_SetPropertyStr(context0, obj, "text", JS_NewString(context0, "Message to self."));
stored = false;
signed_message = tf_ssb_sign_message(ssb0, id0, priv0, obj);
tf_ssb_verify_strip_and_store_message(ssb0, signed_message, _message_stored, &stored);
JS_FreeValue(context0, signed_message);
_wait_stored(ssb0, &stored);
JS_FreeValue(context0, obj);
while (count0 == 0)
{
uv_run(&loop, UV_RUN_ONCE);
}
tf_ssb_remove_message_added_callback(ssb0, _message_added, &count0);
tf_printf("Waiting for message from other.\n");
while (count1 == 0)
{
uv_run(&loop, UV_RUN_ONCE);
}
tf_ssb_remove_message_added_callback(ssb1, _message_added, &count1);
tf_printf("done\n");
tf_ssb_send_close(ssb1);
uv_close((uv_handle_t*)&idle0, NULL);
uv_close((uv_handle_t*)&idle1, NULL);
tf_printf("final run\n");
uv_run(&loop, UV_RUN_DEFAULT);
tf_printf("done\n");
tf_printf("destroy 0\n");
tf_ssb_destroy(ssb0);
tf_printf("destroy 1\n");
tf_ssb_destroy(ssb1);
tf_printf("close\n");
uv_loop_close(&loop);
}
static void _broadcasts_visit(const char* host, const struct sockaddr_in* addr, tf_ssb_connection_t* tunnel, const uint8_t* pub, void* user_data)
{
int* count = user_data;
(*count)++;
}
static void _broadcasts_changed(tf_ssb_t* ssb, void* user_data)
{
int* count = NULL;
test_t* test = user_data;
if (ssb == test->ssb0)
{
count = &test->broadcast_count0;
}
else if (ssb == test->ssb1)
{
count = &test->broadcast_count1;
}
else if (ssb == test->ssb2)
{
count = &test->broadcast_count2;
}
if (count)
{
*count = 0;
}
tf_ssb_visit_broadcasts(ssb, _broadcasts_visit, count);
tf_printf("BROADCASTS %d %d %d\n", test->broadcast_count0, test->broadcast_count1, test->broadcast_count2);
}
void tf_ssb_test_rooms(const tf_test_options_t* options)
{
tf_printf("Testing Rooms.\n");
uv_loop_t loop = { 0 };
uv_loop_init(&loop);
unlink("out/test_db0.sqlite");
tf_ssb_t* ssb0 = tf_ssb_create(&loop, NULL, "file:out/test_db0.sqlite");
tf_ssb_register(tf_ssb_get_context(ssb0), ssb0);
unlink("out/test_db1.sqlite");
tf_ssb_t* ssb1 = tf_ssb_create(&loop, NULL, "file:out/test_db1.sqlite");
tf_ssb_register(tf_ssb_get_context(ssb1), ssb1);
unlink("out/test_db2.sqlite");
tf_ssb_t* ssb2 = tf_ssb_create(&loop, NULL, "file:out/test_db2.sqlite");
tf_ssb_register(tf_ssb_get_context(ssb2), ssb2);
uv_idle_t idle0 = { .data = ssb0 };
uv_idle_init(&loop, &idle0);
uv_idle_start(&idle0, _ssb_test_idle);
uv_idle_t idle1 = { .data = ssb1 };
uv_idle_init(&loop, &idle1);
uv_idle_start(&idle1, _ssb_test_idle);
uv_idle_t idle2 = { .data = ssb2 };
uv_idle_init(&loop, &idle2);
uv_idle_start(&idle2, _ssb_test_idle);
test_t test = {
.ssb0 = ssb0,
.ssb1 = ssb1,
.ssb2 = ssb2,
};
tf_ssb_add_connections_changed_callback(ssb0, _ssb_test_connections_changed, NULL, &test);
tf_ssb_add_connections_changed_callback(ssb1, _ssb_test_connections_changed, NULL, &test);
tf_ssb_add_connections_changed_callback(ssb2, _ssb_test_connections_changed, NULL, &test);
tf_ssb_add_broadcasts_changed_callback(ssb0, _broadcasts_changed, NULL, &test);
tf_ssb_add_broadcasts_changed_callback(ssb1, _broadcasts_changed, NULL, &test);
tf_ssb_add_broadcasts_changed_callback(ssb2, _broadcasts_changed, NULL, &test);
tf_ssb_generate_keys(ssb0);
tf_ssb_generate_keys(ssb1);
tf_ssb_generate_keys(ssb2);
char id0[k_id_base64_len] = { 0 };
char id1[k_id_base64_len] = { 0 };
char id2[k_id_base64_len] = { 0 };
bool b = tf_ssb_whoami(ssb0, id0, sizeof(id0));
(void)b;
assert(b);
b = tf_ssb_whoami(ssb1, id1, sizeof(id1));
assert(b);
b = tf_ssb_whoami(ssb2, id2, sizeof(id2));
assert(b);
tf_printf("ID %s, %s, %s\n", id0, id1, id2);
tf_ssb_server_open(ssb0, 12347);
uint8_t id0bin[k_id_bin_len];
tf_ssb_id_str_to_bin(id0bin, id0);
tf_ssb_connect(ssb1, "127.0.0.1", 12347, id0bin);
tf_ssb_connect(ssb2, "127.0.0.1", 12347, id0bin);
tf_printf("Waiting for connection.\n");
while (test.connection_count0 != 2 || test.connection_count1 != 1 || test.connection_count2 != 1)
{
uv_run(&loop, UV_RUN_ONCE);
}
tf_ssb_server_close(ssb0);
tf_printf("Waiting for broadcasts.\n");
while (test.broadcast_count1 != 1 || test.broadcast_count2 != 1)
{
uv_run(&loop, UV_RUN_ONCE);
}
tf_ssb_connection_t* connections[4];
int count = tf_ssb_get_connections(ssb1, connections, 4);
(void)count;
assert(count == 1);
int32_t tunnel_request_number = tf_ssb_connection_next_request_number(connections[0]);
JSContext* context = tf_ssb_get_context(ssb1);
JSValue message = JS_NewObject(context);
JSValue name = JS_NewArray(context);
JS_SetPropertyUint32(context, name, 0, JS_NewString(context, "tunnel"));
JS_SetPropertyUint32(context, name, 1, JS_NewString(context, "connect"));
JS_SetPropertyStr(context, message, "name", name);
JSValue args = JS_NewArray(context);
JSValue arg = JS_NewObject(context);
JS_SetPropertyStr(context, arg, "portal", JS_NewString(context, id0));
JS_SetPropertyStr(context, arg, "target", JS_NewString(context, id2));
JS_SetPropertyUint32(context, args, 0, arg);
JS_SetPropertyStr(context, message, "args", args);
JS_SetPropertyStr(context, message, "type", JS_NewString(context, "duplex"));
tf_ssb_connection_rpc_send_json(connections[0], k_ssb_rpc_flag_stream | k_ssb_rpc_flag_new_request, tunnel_request_number, message, NULL, NULL, NULL);
JS_FreeValue(context, message);
tf_ssb_connection_t* tun0 = tf_ssb_connection_tunnel_create(ssb1, id0, tunnel_request_number, id2);
tf_printf("tun0 = %p\n", tun0);
tf_printf("Done.\n");
while (test.connection_count0 != 2 || test.connection_count1 != 2 || test.connection_count2 != 2)
{
uv_run(&loop, UV_RUN_ONCE);
}
tf_printf("Done.\n");
uv_run(&loop, UV_RUN_NOWAIT);
tf_ssb_connection_close(tun0);
uv_run(&loop, UV_RUN_NOWAIT);
uv_close((uv_handle_t*)&idle0, NULL);
uv_close((uv_handle_t*)&idle1, NULL);
uv_close((uv_handle_t*)&idle2, NULL);
tf_ssb_send_close(ssb0);
tf_ssb_send_close(ssb1);
tf_ssb_send_close(ssb2);
tf_ssb_close_all(ssb0);
tf_ssb_close_all(ssb1);
tf_ssb_close_all(ssb2);
uv_run(&loop, UV_RUN_DEFAULT);
tf_ssb_destroy(ssb0);
tf_ssb_destroy(ssb1);
tf_ssb_destroy(ssb2);
uv_run(&loop, UV_RUN_DEFAULT);
uv_loop_close(&loop);
}
static void _assert_visible(tf_ssb_t* ssb, const char* id, const char* contact, bool visible)
{
const char** ids = tf_ssb_db_following_deep_ids(ssb, &id, 1, 2);
bool found = false;
(void)found;
for (int i = 0; ids[i]; i++)
{
if (strcmp(ids[i], contact) == 0)
{
found = true;
break;
}
}
tf_free(ids);
assert(found == visible);
}
void tf_ssb_test_following(const tf_test_options_t* options)
{
tf_printf("Testing following.\n");
uv_loop_t loop = { 0 };
uv_loop_init(&loop);
unlink("out/test_db0.sqlite");
tf_ssb_t* ssb0 = tf_ssb_create(&loop, NULL, "file:out/test_db0.sqlite");
tf_ssb_generate_keys(ssb0);
char id0[k_id_base64_len] = { "@" };
char id1[k_id_base64_len] = { "@" };
char id2[k_id_base64_len] = { "@" };
char id3[k_id_base64_len] = { "@" };
char priv0b[512] = { 0 };
char priv1b[512] = { 0 };
char priv2b[512] = { 0 };
char priv3b[512] = { 0 };
uint8_t priv0[512] = { 0 };
uint8_t priv1[512] = { 0 };
uint8_t priv2[512] = { 0 };
uint8_t priv3[512] = { 0 };
tf_ssb_generate_keys_buffer(id0 + 1, sizeof(id0) - 1, priv0b, sizeof(priv0b));
tf_ssb_generate_keys_buffer(id1 + 1, sizeof(id1) - 1, priv1b, sizeof(priv1b));
tf_ssb_generate_keys_buffer(id2 + 1, sizeof(id2) - 1, priv2b, sizeof(priv2b));
tf_ssb_generate_keys_buffer(id3 + 1, sizeof(id3) - 1, priv3b, sizeof(priv3b));
tf_base64_decode(priv0b, strlen(priv0b), priv0, sizeof(priv0));
tf_base64_decode(priv1b, strlen(priv1b), priv1, sizeof(priv1));
tf_base64_decode(priv2b, strlen(priv2b), priv2, sizeof(priv2));
tf_base64_decode(priv3b, strlen(priv3b), priv3, sizeof(priv3));
JSContext* context = tf_ssb_get_context(ssb0);
JSValue message;
JSValue signed_message;
bool stored;
#define FOLLOW_BLOCK(id, priv, contact, follow, block) \
message = JS_NewObject(context); \
JS_SetPropertyStr(context, message, "type", JS_NewString(context, "contact")); \
JS_SetPropertyStr(context, message, "contact", JS_NewString(context, contact)); \
JS_SetPropertyStr(context, message, "following", follow ? JS_TRUE : JS_FALSE); \
JS_SetPropertyStr(context, message, "blocking", block ? JS_TRUE : JS_FALSE); \
signed_message = tf_ssb_sign_message(ssb0, id, priv, message); \
stored = false; \
tf_ssb_verify_strip_and_store_message(ssb0, signed_message, _message_stored, &stored); \
_wait_stored(ssb0, &stored); \
JS_FreeValue(context, signed_message); \
JS_FreeValue(context, message);
FOLLOW_BLOCK(id0, priv0, id1, true, false);
FOLLOW_BLOCK(id1, priv1, id2, true, false);
FOLLOW_BLOCK(id1, priv1, id3, true, false);
_assert_visible(ssb0, id0, id0, true);
_assert_visible(ssb0, id0, id1, true);
_assert_visible(ssb0, id0, id2, true);
_assert_visible(ssb0, id0, id3, true);
FOLLOW_BLOCK(id0, priv0, id3, false, true);
_assert_visible(ssb0, id0, id0, true);
_assert_visible(ssb0, id0, id1, true);
_assert_visible(ssb0, id0, id2, true);
_assert_visible(ssb0, id0, id3, false);
#undef FOLLOW_BLOCK
uv_run(&loop, UV_RUN_DEFAULT);
tf_ssb_destroy(ssb0);
uv_loop_close(&loop);
}
void tf_ssb_test_bench(const tf_test_options_t* options)
{
tf_printf("Testing following.\n");
uv_loop_t loop = { 0 };
uv_loop_init(&loop);
tf_trace_t* trace = tf_trace_create();
unlink("out/test_db0.sqlite");
tf_ssb_t* ssb0 = tf_ssb_create(&loop, NULL, "file:out/test_db0.sqlite");
tf_ssb_set_trace(ssb0, trace);
tf_ssb_generate_keys(ssb0);
char id0[k_id_base64_len] = { 0 };
tf_ssb_whoami(ssb0, id0, sizeof(id0));
uint8_t priv0[512];
tf_ssb_get_private_key(ssb0, priv0, sizeof(priv0));
struct timespec start_time = { 0 };
struct timespec end_time = { 0 };
clock_gettime(CLOCK_REALTIME, &start_time);
const int k_messages = 4096;
JSValue obj = JS_NewObject(tf_ssb_get_context(ssb0));
JS_SetPropertyStr(tf_ssb_get_context(ssb0), obj, "type", JS_NewString(tf_ssb_get_context(ssb0), "post"));
JS_SetPropertyStr(tf_ssb_get_context(ssb0), obj, "text", JS_NewString(tf_ssb_get_context(ssb0), "Hello, world!"));
for (int i = 0; i < k_messages; i++)
{
bool stored = false;
JSValue signed_message = tf_ssb_sign_message(ssb0, id0, priv0, obj);
tf_ssb_verify_strip_and_store_message(ssb0, signed_message, _message_stored, &stored);
JS_FreeValue(tf_ssb_get_context(ssb0), signed_message);
_wait_stored(ssb0, &stored);
}
JS_FreeValue(tf_ssb_get_context(ssb0), obj);
clock_gettime(CLOCK_REALTIME, &end_time);
tf_printf("insert = %f seconds\n", (end_time.tv_sec - start_time.tv_sec) + (end_time.tv_nsec - start_time.tv_nsec) / 1e9);
unlink("out/test_db1.sqlite");
tf_ssb_t* ssb1 = tf_ssb_create(&loop, NULL, "file:out/test_db1.sqlite");
tf_ssb_set_trace(ssb1, trace);
tf_ssb_generate_keys(ssb1);
uint8_t id0bin[k_id_bin_len];
tf_ssb_id_str_to_bin(id0bin, id0);
tf_ssb_set_main_thread(ssb0, true);
tf_ssb_set_main_thread(ssb1, true);
uv_idle_t idle0 = { .data = ssb0 };
uv_idle_init(&loop, &idle0);
uv_idle_start(&idle0, _ssb_test_idle);
uv_idle_t idle1 = { .data = ssb1 };
uv_idle_init(&loop, &idle1);
uv_idle_start(&idle1, _ssb_test_idle);
tf_ssb_register(tf_ssb_get_context(ssb0), ssb0);
tf_ssb_register(tf_ssb_get_context(ssb1), ssb1);
tf_ssb_server_open(ssb0, 12347);
tf_ssb_connect(ssb1, "127.0.0.1", 12347, id0bin);
tf_printf("Waiting for messages.\n");
clock_gettime(CLOCK_REALTIME, &start_time);
int count = 0;
tf_ssb_add_message_added_callback(ssb1, _message_added, NULL, &count);
while (count < k_messages)
{
uv_run(&loop, UV_RUN_ONCE);
}
tf_ssb_remove_message_added_callback(ssb1, _message_added, &count);
clock_gettime(CLOCK_REALTIME, &end_time);
tf_ssb_set_main_thread(ssb0, false);
tf_ssb_set_main_thread(ssb1, false);
count = _ssb_test_count_messages(ssb1);
if (count < k_messages)
{
abort();
}
tf_printf("Done.\n");
tf_printf("replicate = %f seconds\n", (end_time.tv_sec - start_time.tv_sec) + (end_time.tv_nsec - start_time.tv_nsec) / 1e9);
tf_ssb_send_close(ssb1);
tf_ssb_server_close(ssb0);
uv_close((uv_handle_t*)&idle0, NULL);
uv_close((uv_handle_t*)&idle1, NULL);
uv_run(&loop, UV_RUN_DEFAULT);
char* trace_data = tf_trace_export(trace);
if (trace_data)
{
FILE* file = fopen("out/trace.json", "wb");
if (file)
{
fwrite(trace_data, 1, strlen(trace_data), file);
fclose(file);
}
tf_free(trace_data);
}
tf_ssb_destroy(ssb1);
tf_ssb_destroy(ssb0);
tf_trace_destroy(trace);
uv_loop_close(&loop);
}
static void _ssb_test_room_connections_changed(tf_ssb_t* ssb, tf_ssb_change_t change, tf_ssb_connection_t* connection, void* user_data)
{
const char* changes[] = { "create", "connect", "remove" };
tf_printf("change=%s %p connection=%s:%d\n", changes[change], connection, tf_ssb_connection_get_host(connection), tf_ssb_connection_get_port(connection));
}
typedef struct _close_t
{
tf_ssb_t* ssb;
tf_ssb_connection_t* connection;
uv_timer_t timer;
int32_t request_number;
char id[k_id_base64_len];
} close_t;
static void _timer_close(uv_handle_t* handle)
{
tf_free(handle->data);
}
static void _close_callback(uv_timer_t* timer)
{
close_t* data = timer->data;
tf_printf("breaking %s %p\n", data->id, data->connection);
const char* message = "{\"name\":\"Error\",\"message\":\"whoops\",\"stack\":\"nah\"}";
tf_ssb_connection_rpc_send(
data->connection, k_ssb_rpc_flag_stream | k_ssb_rpc_flag_json | k_ssb_rpc_flag_end_error, data->request_number, (const uint8_t*)message, strlen(message), NULL, NULL, NULL);
uv_close((uv_handle_t*)timer, _timer_close);
}
static void _break_in_a_bit(tf_ssb_t* ssb, tf_ssb_connection_t* connection, const char* id, int32_t request_number)
{
close_t* data = tf_malloc(sizeof(close_t));
*data = (close_t)
{
.ssb = ssb,
.connection = connection,
.request_number = request_number,
.timer =
{
.data = data,
},
};
snprintf(data->id, sizeof(data->id), "%s", id);
uv_timer_init(tf_ssb_get_loop(ssb), &data->timer);
uv_timer_start(&data->timer, _close_callback, 3000, 0);
}
static void _ssb_test_room_broadcasts_visit(const char* host, const struct sockaddr_in* addr, tf_ssb_connection_t* tunnel, const uint8_t* pub, void* user_data)
{
tf_ssb_t* ssb = user_data;
char id[k_id_base64_len] = { 0 };
tf_ssb_id_bin_to_str(id, sizeof(id), pub);
tf_ssb_connection_t* connections[8];
if (tunnel && strcmp(id, "@Jqm63iKumgaWfUI6mXtmQCDHiQJhzMiEWXYUqtcGs9o=.ed25519") != 0 && tf_ssb_get_connections(ssb, connections, 8) == 1)
{
tf_printf("%s %p %s\n", host, tunnel, id);
int32_t tunnel_request_number = tf_ssb_connection_next_request_number(tunnel);
char portal[k_id_base64_len] = { 0 };
char target[k_id_base64_len] = { 0 };
tf_ssb_connection_get_id(tunnel, portal, sizeof(portal));
tf_ssb_id_bin_to_str(target, sizeof(target), pub);
JSContext* context = tf_ssb_get_context(ssb);
JSValue message = JS_NewObject(context);
JSValue name = JS_NewArray(context);
JS_SetPropertyUint32(context, name, 0, JS_NewString(context, "tunnel"));
JS_SetPropertyUint32(context, name, 1, JS_NewString(context, "connect"));
JS_SetPropertyStr(context, message, "name", name);
JSValue args = JS_NewArray(context);
JSValue arg = JS_NewObject(context);
JS_SetPropertyStr(context, arg, "portal", JS_NewString(context, portal));
JS_SetPropertyStr(context, arg, "target", JS_NewString(context, target));
JS_SetPropertyUint32(context, args, 0, arg);
JS_SetPropertyStr(context, message, "args", args);
JS_SetPropertyStr(context, message, "type", JS_NewString(context, "duplex"));
tf_ssb_connection_rpc_send_json(tunnel, k_ssb_rpc_flag_stream | k_ssb_rpc_flag_new_request, tunnel_request_number, message, NULL, NULL, NULL);
JS_FreeValue(context, message);
tf_printf("tunnel create ssb=%p portal=%s rn=%d target=%s\n", ssb, portal, (int)tunnel_request_number, target);
tf_ssb_connection_tunnel_create(ssb, portal, tunnel_request_number, target);
_break_in_a_bit(ssb, tunnel, target, tunnel_request_number);
}
}
static void _ssb_test_room_broadcasts_changed(tf_ssb_t* ssb, void* user_data)
{
tf_ssb_visit_broadcasts(ssb, _ssb_test_room_broadcasts_visit, ssb);
}
void tf_ssb_test_go_ssb_room(const tf_test_options_t* options)
{
tf_printf("Testing go_ssb_room.\n");
uv_loop_t loop = { 0 };
uv_loop_init(&loop);
tf_trace_t* trace = tf_trace_create();
unlink("out/test_db0.sqlite");
tf_ssb_t* ssb0 = tf_ssb_create(&loop, NULL, "file:out/test_db0.sqlite");
tf_ssb_set_trace(ssb0, trace);
tf_ssb_generate_keys(ssb0);
unlink("out/test_db1.sqlite");
tf_ssb_t* ssb1 = tf_ssb_create(&loop, NULL, "file:out/test_db1.sqlite");
tf_ssb_set_trace(ssb1, trace);
tf_ssb_generate_keys(ssb1);
tf_ssb_add_connections_changed_callback(ssb0, _ssb_test_room_connections_changed, NULL, NULL);
tf_ssb_add_connections_changed_callback(ssb1, _ssb_test_room_connections_changed, NULL, NULL);
tf_ssb_add_broadcasts_changed_callback(ssb0, _ssb_test_room_broadcasts_changed, NULL, NULL);
tf_ssb_connect_str(ssb0, "net:linode.unprompted.com:8008~shs:Q0pc/7kXQJGIlqJxuwayL2huayzddgkVDoGkYVWQS1Y=:SSB+Room+PSK3TLYC2T86EHQCUHBUHASCASE18JBV24=");
tf_ssb_connect_str(ssb1, "net:linode.unprompted.com:8008~shs:Q0pc/7kXQJGIlqJxuwayL2huayzddgkVDoGkYVWQS1Y=:SSB+Room+PSK3TLYC2T86EHQCUHBUHASCASE18JBV24=");
uv_run(&loop, UV_RUN_DEFAULT);
tf_ssb_destroy(ssb0);
tf_ssb_destroy(ssb1);
tf_trace_destroy(trace);
uv_loop_close(&loop);
}