diff --git a/core/ssb.js b/core/ssb.js index dc3a7b8b..020b514e 100644 --- a/core/ssb.js +++ b/core/ssb.js @@ -1,6 +1,7 @@ "use strict"; var g_wants_requests = {}; var g_database = new Database('core'); +let g_attendants = {}; const k_use_create_history_stream = false; const k_blobs_concurrent_target = 8; @@ -72,7 +73,28 @@ function storeMessage(message) { } } -ssb.addEventListener('connections', function(change, connection) { +function tunnel_attendants(request) { + if (request.message.type !== 'state') { + throw Error('Unexpected type: ' + request.message.type); + } + let state = new Set(request.message.ids); + for (let id of state) { + request.add_room_attendant(id); + } + request.more(function attendants(message) { + if (message.message.type === 'joined') { + request.add_room_attendant(message.message.id); + state.add(message.message.id); + } else if (message.message.type === 'left') { + request.remove_room_attendant(message.message.id); + state.delete(message.message.id); + } else { + throw Error('Unexpected type: ' + message.type); + } + }); +} + +ssb.addEventListener('connections', function on_connections_changed(change, connection) { if (change == 'add') { var sequence = get_latest_sequence_for_author(connection.id); if (k_use_create_history_stream) { @@ -89,12 +111,17 @@ ssb.addEventListener('connections', function(change, connection) { }); } else { if (connection.is_client) { + connection.send_json({'name': ['tunnel', 'isRoom'], 'args': [], 'type': 'source'}, function tunnel_is_room(request) { + if (request.message) { + connection.send_json({'name': ['room', 'attendants'], 'args': [], 'type': 'source'}, tunnel_attendants); + } + }); connection.send_json({"name": ["ebt", "replicate"], "args": [{"version": 3, "format": "classic"}], "type": "duplex"}, ebtReplicateClient); } } connection.active_blob_wants = {}; - connection.send_json({'name': ['blobs', 'createWants'], 'type': 'source', 'args': []}, function(message) { + connection.send_json({'name': ['blobs', 'createWants'], 'type': 'source', 'args': []}, function on_blob_create_wants(message) { Object.keys(message.message).forEach(function(id) { if (message.message[id] < 0) { if (g_wants_requests[connection.id]) { @@ -129,6 +156,8 @@ ssb.addEventListener('connections', function(change, connection) { }); } else if (change == 'remove') { print('REMOVE', connection.id); + notify_attendant_changed(connection.id, 'left'); + delete g_attendants[connection.id]; delete g_wants_requests[connection.id]; } else { print('CHANGE', change); @@ -191,13 +220,59 @@ ssb.addRpc(['blobs', 'get'], function(request) { }); ssb.addRpc(['gossip', 'ping'], function(request) { - request.more(function(message) { + request.more(function ping(message) { message.send_json(Date.now()); }); }); ssb.addRpc(['tunnel', 'isRoom'], function(request) { - request.send_json(false); + request.send_json(true); +}); + +function notify_attendant_changed(id, type) { + for (let r of Object.values(g_attendants)) { + try { + r.send_json({ + type: type, + id: id, + }); + } catch (e) { + print(`Removing ${r.connection.id} from g_attendants in ${type}.`, e); + delete g_attendants[r.connection.id]; + } + } +} + +ssb.addRpc(['room', 'attendants'], function(request) { + let ids = Object.keys(g_attendants).sort(); + request.send_json({ + type: 'state', + ids: ids, + }); + notify_attendant_changed(request.connection.id, 'joined'); + g_attendants[request.connection.id] = request; +}); + +ssb.addRpc(['tunnel', 'connect'], function(request) { + if (!request.args[0].origin && + request.args[0].portal && + request.args[0].target) { + let target_connection = ssb.getConnection(request.args[0].target); + let target_request_number = target_connection.send_json({ + 'name': ['tunnel', 'connect'], + 'args': [{ + 'origin': request.connection.id, + 'portal': request.args[0].portal, + 'target': request.args[0].target, + }], + 'type': 'duplex', + }); + ssb.tunnel(request.connection, -request.request_number, target_connection, target_request_number); + } else if (request.args[0].origin && + request.args[0].portal && + request.args[0].target) { + ssb.createTunnel(request.connection, -request.request_number, request.args[0].origin); + } }); function ebtReplicateSendClock(request, have) { diff --git a/src/ssb.c b/src/ssb.c index 96bf5e47..b787652c 100644 --- a/src/ssb.c +++ b/src/ssb.c @@ -28,6 +28,11 @@ #define _countof(a) ((int)(sizeof((a)) / sizeof(*(a)))) #endif +#define GREEN "\e[1;32m" +#define MAGENTA "\e[1;35m" +#define CYAN "\e[1;36m" +#define RESET "\e[0m" + static_assert(k_id_base64_len == sodium_base64_ENCODED_LEN(9 + crypto_box_PUBLICKEYBYTES, sodium_base64_VARIANT_ORIGINAL), "k_id_base64_len"); static_assert(k_id_bin_len == crypto_box_PUBLICKEYBYTES, "k_id_bin_len"); static_assert(k_blob_id_len == (sodium_base64_ENCODED_LEN(crypto_hash_sha256_BYTES, sodium_base64_VARIANT_ORIGINAL) + 8), "k_blob_id_len"); @@ -66,6 +71,7 @@ typedef struct _tf_ssb_request_t { int32_t request_number; tf_ssb_rpc_callback_t* callback; + tf_ssb_callback_cleanup_t* cleanup; void* user_data; } tf_ssb_request_t; @@ -76,6 +82,7 @@ typedef struct _tf_ssb_broadcast_t time_t mtime; char host[256]; struct sockaddr_in addr; + tf_ssb_connection_t* tunnel_connection; uint8_t pub[crypto_sign_PUBLICKEYBYTES]; } tf_ssb_broadcast_t; @@ -184,8 +191,13 @@ typedef struct _tf_ssb_connection_t uv_connect_t connect; uv_async_t async; + tf_ssb_connection_t* tunnel_connection; + int32_t tunnel_request_number; + JSValue object; + char name[32]; + char host[256]; int port; @@ -224,14 +236,17 @@ typedef struct _tf_ssb_connection_t } tf_ssb_connection_t; static JSClassID _connection_class_id; +static int s_connection_index; +static int s_tunnel_index; static void _tf_ssb_connection_destroy(tf_ssb_connection_t* connection, const char* reason); -static void _tf_ssb_connection_client_send_hello(uv_stream_t* stream); +static void _tf_ssb_connection_client_send_hello(tf_ssb_connection_t* connection); static void _tf_ssb_connection_on_close(uv_handle_t* handle); static void _tf_ssb_connection_close(tf_ssb_connection_t* connection, const char* reason); static void _tf_ssb_nonce_inc(uint8_t* nonce); static void _tf_ssb_write(tf_ssb_connection_t* connection, void* data, size_t size); static void _tf_ssb_connection_finalizer(JSRuntime* runtime, JSValue value); +static void _tf_ssb_connection_remove_request(tf_ssb_connection_t* connection, int32_t request_number); static void _tf_ssb_connection_send_close(tf_ssb_connection_t* connection) { @@ -285,14 +300,29 @@ static void _tf_ssb_connection_on_write(uv_write_t* req, int status) static void _tf_ssb_write(tf_ssb_connection_t* connection, void* data, size_t size) { - uv_write_t* write = tf_malloc(sizeof(uv_write_t) + size); - *write = (uv_write_t) { .data = connection }; - memcpy(write + 1, data, size); - int result = uv_write(write, (uv_stream_t*)&connection->tcp, &(uv_buf_t) { .base = (char*)(write + 1), .len = size }, 1, _tf_ssb_connection_on_write); - if (result) + if (connection->tcp.data) { - _tf_ssb_connection_close(connection, "write failed"); - tf_free(write); + uv_write_t* write = tf_malloc(sizeof(uv_write_t) + size); + *write = (uv_write_t) { .data = connection }; + memcpy(write + 1, data, size); + int result = uv_write(write, (uv_stream_t*)&connection->tcp, &(uv_buf_t) { .base = (char*)(write + 1), .len = size }, 1, _tf_ssb_connection_on_write); + if (result) + { + _tf_ssb_connection_close(connection, "write failed"); + tf_free(write); + } + } + else if (connection->tunnel_connection) + { + tf_ssb_connection_rpc_send( + connection->tunnel_connection, + k_ssb_rpc_flag_binary | k_ssb_rpc_flag_stream, + -connection->tunnel_request_number, + data, + size, + NULL, + NULL, + NULL); } } @@ -436,18 +466,14 @@ static bool _tf_ssb_connection_get_request_callback(tf_ssb_connection_t* connect return false; } -void tf_ssb_connection_add_request(tf_ssb_connection_t* connection, int32_t request_number, tf_ssb_rpc_callback_t* callback, void* user_data) +void tf_ssb_connection_add_request(tf_ssb_connection_t* connection, int32_t request_number, tf_ssb_rpc_callback_t* callback, tf_ssb_callback_cleanup_t* cleanup, void* user_data) { - if (_tf_ssb_connection_get_request_callback(connection, request_number, NULL, NULL)) - { - /* TODO: This leaks the callback. */ - printf("Adding a request %d that is already registered.\n", request_number); - return; - } + _tf_ssb_connection_remove_request(connection, request_number); tf_ssb_request_t request = { .request_number = request_number, .callback = callback, + .cleanup = cleanup, .user_data = user_data, }; int index = tf_util_insert_index(&request_number, connection->requests, connection->requests_count, sizeof(tf_ssb_request_t), _request_compare); @@ -467,9 +493,9 @@ static void _tf_ssb_connection_remove_request(tf_ssb_connection_t* connection, i tf_ssb_request_t* request = bsearch(&request_number, connection->requests, connection->requests_count, sizeof(tf_ssb_request_t), _request_compare); if (request) { - if (request->user_data) + if (request->cleanup) { - JS_FreeValue(tf_ssb_connection_get_context(connection), JS_MKPTR(JS_TAG_OBJECT, request->user_data)); + request->cleanup(connection->ssb, request->user_data); } int index = request - connection->requests; memmove(request, request + 1, sizeof(tf_ssb_request_t) * (connection->requests_count - index - 1)); @@ -479,7 +505,7 @@ static void _tf_ssb_connection_remove_request(tf_ssb_connection_t* connection, i } } -void tf_ssb_connection_rpc_send(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, const uint8_t* message, size_t size, tf_ssb_rpc_callback_t* callback, void* user_data) +void tf_ssb_connection_rpc_send(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, const uint8_t* message, size_t size, tf_ssb_rpc_callback_t* callback, tf_ssb_callback_cleanup_t* cleanup, void* user_data) { if (!connection) { @@ -487,7 +513,7 @@ void tf_ssb_connection_rpc_send(tf_ssb_connection_t* connection, uint8_t flags, } if (request_number > 0 && callback) { - tf_ssb_connection_add_request(connection, request_number, callback, user_data); + tf_ssb_connection_add_request(connection, request_number, callback, cleanup, user_data); } uint8_t* combined = tf_malloc(9 + size); *combined = flags; @@ -496,9 +522,9 @@ void tf_ssb_connection_rpc_send(tf_ssb_connection_t* connection, uint8_t flags, uint32_t rn = htonl((uint32_t)request_number); memcpy(combined + 1 + sizeof(uint32_t), &rn, sizeof(rn)); memcpy(combined + 1 + 2 * sizeof(uint32_t), message, size); + printf(MAGENTA "%s RPC SEND" RESET " flags=%x RN=%d: %.*s\n", connection->name, flags, request_number, (flags & k_ssb_rpc_mask_type) == k_ssb_rpc_flag_binary? 0 : (int)size, message); _tf_ssb_connection_box_stream_send(connection, combined, 1 + 2 * sizeof(uint32_t) + size); tf_free(combined); - printf("RPC SEND flags=%x RN=%d: %.*s\n", flags, request_number, (int)size, message); connection->ssb->rpc_out++; } @@ -852,17 +878,20 @@ bool tf_ssb_connection_get_id(tf_ssb_connection_t* connection, char* out_id, siz return tf_ssb_id_bin_to_str(out_id, out_id_size, connection->serverpub); } -static bool _tf_ssb_is_already_connected(tf_ssb_t* ssb, uint8_t* id) +static bool _tf_ssb_is_already_connected(tf_ssb_t* ssb, uint8_t* id, tf_ssb_connection_t* ignore_connection) { for (tf_ssb_connection_t* connection = ssb->connections; connection; connection = connection->next) { - if (memcmp(connection->serverpub, id, k_id_bin_len) == 0) + if (!ignore_connection || connection != ignore_connection) { - return true; - } - else if (memcmp(ssb->pub, id, k_id_bin_len) == 0) - { - return true; + if (memcmp(connection->serverpub, id, k_id_bin_len) == 0) + { + return true; + } + else if (memcmp(ssb->pub, id, k_id_bin_len) == 0) + { + return true; + } } } return false; @@ -937,9 +966,13 @@ static void _tf_ssb_connection_verify_client_identity(tf_ssb_connection_t* conne } uint8_t* detached_signature_A = m; - if (_tf_ssb_is_already_connected(connection->ssb, m + 64)) + if (_tf_ssb_is_already_connected(connection->ssb, m + 64, connection)) { - _tf_ssb_connection_close(connection, "already connected"); + char id_base64[k_id_base64_len] = { 0 }; + tf_ssb_id_bin_to_str(id_base64, sizeof(id_base64), m + 64); + char reason[256]; + snprintf(reason, sizeof(reason), "already connected: %s\n", id_base64); + _tf_ssb_connection_close(connection, reason); return; } @@ -1101,11 +1134,16 @@ static bool _tf_ssb_name_equals(JSContext* context, JSValue object, const char** static void _tf_ssb_connection_rpc_recv(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, const uint8_t* message, size_t size) { connection->ssb->rpc_in++; - if (flags & k_ssb_rpc_flag_json) + if (size == 0) + { + _tf_ssb_connection_close(connection, "read zero"); + return; + } + else if (flags & k_ssb_rpc_flag_json) { char id[k_id_base64_len] = ""; tf_ssb_id_bin_to_str(id, sizeof(id), connection->serverpub); - printf("RPC RECV from %s flags=%x RN=%d: %.*s\n", id, flags, request_number, (int)size, message); + printf(CYAN "%s RPC RECV" RESET " from %s flags=%x RN=%d: %.*s\n", connection->name, id, flags, request_number, (int)size, message); JSContext* context = connection->ssb->context; JSValue val = JS_ParseJSON(context, (const char*)message, size, NULL); @@ -1139,7 +1177,7 @@ static void _tf_ssb_connection_rpc_recv(tf_ssb_connection_t* connection, uint8_t { const char* k_unsupported = "{\"message\": \"method: is not in list of allowed methods\", \"name\": \"Error\", \"stack\": \"none\"}"; tf_ssb_connection_rpc_send(connection, k_ssb_rpc_flag_json | k_ssb_rpc_flag_end_error | (flags & k_ssb_rpc_flag_stream), -request_number, - (const uint8_t*)k_unsupported, strlen(k_unsupported), NULL, NULL); + (const uint8_t*)k_unsupported, strlen(k_unsupported), NULL, NULL, NULL); } } } @@ -1152,7 +1190,7 @@ static void _tf_ssb_connection_rpc_recv(tf_ssb_connection_t* connection, uint8_t } else if ((flags & k_ssb_rpc_mask_type) == k_ssb_rpc_flag_binary) { - printf("RPC RECV flags=%x RN=%d: %zd bytes\n", flags, request_number, size); + printf(CYAN "%s RPC RECV" RESET " flags=%x RN=%d: %zd bytes\n", connection->name, flags, request_number, size); tf_ssb_rpc_callback_t* callback = NULL; void* user_data = NULL; if (_tf_ssb_connection_get_request_callback(connection, -request_number, &callback, &user_data)) @@ -1162,6 +1200,10 @@ static void _tf_ssb_connection_rpc_recv(tf_ssb_connection_t* connection, uint8_t callback(connection, flags, request_number, JS_UNDEFINED, message, size, user_data); } } + else + { + printf("No request callback for %p %d\n", connection, -request_number); + } } if (request_number < 0 && @@ -1348,7 +1390,11 @@ void _tf_ssb_connection_destroy(tf_ssb_connection_t* connection, const char* rea if (!connection->destroy_reason) { connection->destroy_reason = reason; - printf("destroying connection: %s\n", reason); + printf("destroying connection %p obj=%p: %s\n", connection, JS_VALUE_GET_PTR(connection->object), reason); + } + while (connection->requests) + { + _tf_ssb_connection_remove_request(connection, connection->requests->request_number); } for (tf_ssb_connection_t** it = &connection->ssb->connections; *it; it = &(*it)->next) { @@ -1361,9 +1407,27 @@ void _tf_ssb_connection_destroy(tf_ssb_connection_t* connection, const char* rea break; } } - while (connection->requests) + bool again = true; + while (again) { - _tf_ssb_connection_remove_request(connection, connection->requests->request_number); + again = false; + for (tf_ssb_connection_t* it = connection->ssb->connections; it; it = it->next) + { + if (it->tunnel_connection == connection) + { + it->tunnel_connection = NULL; + _tf_ssb_connection_close(it, "tunnel closed"); + again = true; + break; + } + else if (connection->tunnel_connection == it) + { + connection->tunnel_connection = NULL; + _tf_ssb_connection_close(it, "tunnel closed"); + again = true; + break; + } + } } if (!JS_IsUndefined(connection->object)) { @@ -1404,18 +1468,16 @@ static void _tf_ssb_connection_on_close(uv_handle_t* handle) } } -static void _tf_ssb_connection_on_tcp_recv(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) +static void _tf_ssb_connection_on_tcp_recv_internal(tf_ssb_connection_t* connection, const void* data, ssize_t nread) { - tf_ssb_connection_t* connection = stream->data; if (nread >= 0) { if (connection->recv_size + nread > sizeof(connection->recv_buffer)) { _tf_ssb_connection_close(connection, "recv buffer overflow"); - tf_free(buf->base); return; } - memcpy(connection->recv_buffer + connection->recv_size, buf->base, nread); + memcpy(connection->recv_buffer + connection->recv_size, data, nread); connection->recv_size += nread; switch (connection->state) @@ -1461,7 +1523,7 @@ static void _tf_ssb_connection_on_tcp_recv(uv_stream_t* stream, ssize_t nread, c } else { - _tf_ssb_connection_client_send_hello((uv_stream_t*)&connection->tcp); + _tf_ssb_connection_client_send_hello(connection); connection->state = k_tf_ssb_state_server_wait_client_identity; } } @@ -1492,12 +1554,17 @@ static void _tf_ssb_connection_on_tcp_recv(uv_stream_t* stream, ssize_t nread, c { _tf_ssb_connection_close(connection, "read zero"); } +} + +static void _tf_ssb_connection_on_tcp_recv(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) +{ + tf_ssb_connection_t* connection = stream->data; + _tf_ssb_connection_on_tcp_recv_internal(connection, buf->base, nread); tf_free(buf->base); } -static void _tf_ssb_connection_client_send_hello(uv_stream_t* stream) +static void _tf_ssb_connection_client_send_hello(tf_ssb_connection_t* connection) { - tf_ssb_connection_t* connection = stream->data; char write[crypto_auth_BYTES + crypto_box_PUBLICKEYBYTES]; if (crypto_box_keypair(connection->epub, connection->epriv) != 0) @@ -1536,7 +1603,7 @@ static void _tf_ssb_connection_on_connect(uv_connect_t* connect, int status) } else { - _tf_ssb_connection_client_send_hello(connect->handle); + _tf_ssb_connection_client_send_hello(connection); } } else @@ -1788,10 +1855,6 @@ void tf_ssb_destroy(tf_ssb_t* ssb) uv_run(ssb->loop, UV_RUN_ONCE); } - if (ssb->loop == &ssb->own_loop) - { - uv_loop_close(ssb->loop); - } while (ssb->rpc) { tf_ssb_rpc_callback_node_t* node = ssb->rpc; @@ -1848,6 +1911,10 @@ void tf_ssb_destroy(tf_ssb_t* ssb) } tf_free(node); } + if (ssb->loop == &ssb->own_loop) + { + uv_loop_close(ssb->loop); + } if (ssb->own_context) { JS_FreeContext(ssb->context); @@ -1893,6 +1960,15 @@ static void _tf_ssb_connection_send_json_response(tf_ssb_connection_t* connectio _tf_ssb_on_rpc(connection, flags, request_number, args, message, size, user_data); } +static void _tf_ssb_connection_cleanup_value(tf_ssb_t* ssb, void* user_data) +{ + if (user_data) + { + JSValue callback = JS_MKPTR(JS_TAG_OBJECT, user_data); + JS_FreeValue(tf_ssb_get_context(ssb), callback); + } +} + static JSValue _tf_ssb_connection_send_json(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv) { tf_ssb_connection_t* connection = JS_GetOpaque(this_val, _connection_class_id); @@ -1915,9 +1991,10 @@ static JSValue _tf_ssb_connection_send_json(JSContext* context, JSValueConst thi (const uint8_t*)message, size, _tf_ssb_connection_send_json_response, + _tf_ssb_connection_cleanup_value, JS_IsFunction(context, argv[1]) ? JS_VALUE_GET_PTR(JS_DupValue(context, argv[1])) : NULL); JS_FreeCString(context, message); - return JS_UNDEFINED; + return JS_NewInt32(context, request_number); } static void _tf_ssb_connection_process_message_async(uv_async_t* async) @@ -1952,6 +2029,7 @@ tf_ssb_connection_t* tf_ssb_connection_create(tf_ssb_t* ssb, const char* host, c JSContext* context = ssb->context; tf_ssb_connection_t* connection = tf_malloc(sizeof(tf_ssb_connection_t)); memset(connection, 0, sizeof(*connection)); + snprintf(connection->name, sizeof(connection->name), "cli%d", s_connection_index++); connection->ssb = ssb; connection->tcp.data = connection; connection->connect.data = connection; @@ -1962,6 +2040,7 @@ tf_ssb_connection_t* tf_ssb_connection_create(tf_ssb_t* ssb, const char* host, c uv_async_init(ssb->loop, &connection->async, _tf_ssb_connection_process_message_async); connection->object = JS_NewObjectClass(ssb->context, _connection_class_id); + printf("%s = %p\n", connection->name, JS_VALUE_GET_PTR(connection->object)); JS_SetOpaque(connection->object, connection); JS_SetPropertyStr(context, connection->object, "send_json", JS_NewCFunction(context, _tf_ssb_connection_send_json, "send_json", 2)); char public_key_str[k_id_base64_len] = { 0 }; @@ -1992,6 +2071,67 @@ tf_ssb_connection_t* tf_ssb_connection_create(tf_ssb_t* ssb, const char* host, c return connection; } +static void _tf_ssb_connection_tunnel_callback( + tf_ssb_connection_t* connection, + uint8_t flags, + int32_t request_number, + JSValue args, + const uint8_t* message, + size_t size, + void* user_data) +{ + tf_ssb_connection_t* tunnel = user_data; + _tf_ssb_connection_on_tcp_recv_internal(tunnel, message, size); +} + +tf_ssb_connection_t* tf_ssb_connection_tunnel_create(tf_ssb_connection_t* connection, int32_t request_number, const char* target_id) +{ + tf_ssb_t* ssb = connection->ssb; + + JSContext* context = ssb->context; + tf_ssb_connection_t* tunnel = tf_malloc(sizeof(tf_ssb_connection_t)); + memset(tunnel, 0, sizeof(*tunnel)); + snprintf(tunnel->name, sizeof(tunnel->name), "tun%d", s_tunnel_index++); + tunnel->ssb = ssb; + tunnel->tunnel_connection = connection; + tunnel->tunnel_request_number = -request_number; + tunnel->send_request_number = 1; + tunnel->async.data = tunnel; + uv_async_init(ssb->loop, &tunnel->async, _tf_ssb_connection_process_message_async); + + tunnel->object = JS_NewObjectClass(ssb->context, _connection_class_id); + printf("%s = %p\n", tunnel->name, JS_VALUE_GET_PTR(connection->object)); + JS_SetOpaque(tunnel->object, tunnel); + JS_SetPropertyStr(context, tunnel->object, "send_json", JS_NewCFunction(context, _tf_ssb_connection_send_json, "send_json", 2)); + JS_SetPropertyStr(context, tunnel->object, "id", JS_NewString(context, target_id)); + JS_SetPropertyStr(context, tunnel->object, "is_client", JS_TRUE); + + tf_ssb_id_str_to_bin(tunnel->serverpub, target_id); + + tunnel->next = ssb->connections; + ssb->connections = tunnel; + ssb->connections_count++; + _tf_ssb_notify_connections_changed(ssb, k_tf_ssb_change_create, tunnel); + + tf_ssb_connection_add_request( + connection, + request_number, + _tf_ssb_connection_tunnel_callback, + NULL, + tunnel); + if (request_number < 0) + { + tunnel->state = k_tf_ssb_state_connected; + _tf_ssb_connection_client_send_hello(tunnel); + } + else + { + tunnel->state = k_tf_ssb_state_server_wait_hello; + } + + return tunnel; +} + typedef struct _connect_t { tf_ssb_t* ssb; uv_getaddrinfo_t req; @@ -2047,6 +2187,7 @@ static void _tf_ssb_on_connection(uv_stream_t* stream, int status) tf_ssb_connection_t* connection = tf_malloc(sizeof(tf_ssb_connection_t)); memset(connection, 0, sizeof(*connection)); + snprintf(connection->name, sizeof(connection->name), "srv%d", s_connection_index++); connection->ssb = ssb; connection->tcp.data = connection; connection->send_request_number = 1; @@ -2054,6 +2195,7 @@ static void _tf_ssb_on_connection(uv_stream_t* stream, int status) uv_async_init(ssb->loop, &connection->async, _tf_ssb_connection_process_message_async); connection->object = JS_NewObjectClass(ssb->context, _connection_class_id); + printf("%s = %p\n", connection->name, JS_VALUE_GET_PTR(connection->object)); JS_SetPropertyStr(ssb->context, connection->object, "send_json", JS_NewCFunction(ssb->context, _tf_ssb_connection_send_json, "send_json", 2)); JS_SetOpaque(connection->object, connection); @@ -2204,7 +2346,7 @@ static bool _tf_ssb_parse_broadcast(const char* in_broadcast, tf_ssb_broadcast_t printf("pton failed\n"); } } - else if (strncmp(in_broadcast, "ws:", 3)) + else if (strncmp(in_broadcast, "ws:", 3) == 0) { printf("Unsupported broadcast: %s\n", in_broadcast); } @@ -2250,25 +2392,40 @@ static void _tf_ssb_add_broadcast(tf_ssb_t* ssb, const tf_ssb_broadcast_t* broad return; } - for (tf_ssb_broadcast_t* node = ssb->broadcasts; node; node = node->next) + if (broadcast->tunnel_connection) { - if (node->addr.sin_family == broadcast->addr.sin_family && - node->addr.sin_port == broadcast->addr.sin_port && - node->addr.sin_addr.s_addr == broadcast->addr.sin_addr.s_addr && - memcmp(node->pub, broadcast->pub, sizeof(node->pub)) == 0) + for (tf_ssb_broadcast_t* node = ssb->broadcasts; node; node = node->next) { - node->mtime = time(NULL); - return; + if (node->tunnel_connection == broadcast->tunnel_connection && + memcmp(node->pub, broadcast->pub, sizeof(node->pub)) == 0) + { + node->mtime = time(NULL); + return; + } } } - - char key[k_id_base64_len]; - if (tf_ssb_id_bin_to_str(key, sizeof(key), broadcast->pub)) + else { - tf_ssb_connections_store(ssb->connections_tracker, broadcast->host, ntohs(broadcast->addr.sin_port), key); + for (tf_ssb_broadcast_t* node = ssb->broadcasts; node; node = node->next) + { + if (node->addr.sin_family == broadcast->addr.sin_family && + node->addr.sin_port == broadcast->addr.sin_port && + node->addr.sin_addr.s_addr == broadcast->addr.sin_addr.s_addr && + memcmp(node->pub, broadcast->pub, sizeof(node->pub)) == 0) + { + node->mtime = time(NULL); + return; + } + } + + char key[k_id_base64_len]; + if (tf_ssb_id_bin_to_str(key, sizeof(key), broadcast->pub)) + { + tf_ssb_connections_store(ssb->connections_tracker, broadcast->host, ntohs(broadcast->addr.sin_port), key); + } + printf("Received new broadcast: host=%s, pub=%s.\n", broadcast->host, key); } - printf("Received new broadcast: host=%s, pub=%s.\n", broadcast->host, key); tf_ssb_broadcast_t* node = tf_malloc(sizeof(tf_ssb_broadcast_t)); *node = *broadcast; node->next = ssb->broadcasts; @@ -2306,7 +2463,7 @@ static void _tf_ssb_on_broadcast_listener_recv(uv_udp_t* handle, ssize_t nread, tf_free(buf->base); } -void tf_ssb_visit_broadcasts(tf_ssb_t* ssb, void (*callback)(const struct sockaddr_in* addr, const uint8_t* pub, void* user_data), void* user_data) +void tf_ssb_visit_broadcasts(tf_ssb_t* ssb, void (*callback)(const struct sockaddr_in* addr, tf_ssb_connection_t* tunnel, const uint8_t* pub, void* user_data), void* user_data) { time_t now = time(NULL); tf_ssb_broadcast_t* next = NULL; @@ -2315,7 +2472,7 @@ void tf_ssb_visit_broadcasts(tf_ssb_t* ssb, void (*callback)(const struct sockad next = node->next; if (node->mtime - now < 60) { - callback(&node->addr, node->pub, user_data); + callback(&node->addr, node->tunnel_connection, node->pub, user_data); } } } @@ -2392,6 +2549,24 @@ void tf_ssb_append_post(tf_ssb_t* ssb, const char* text) JS_FreeValue(ssb->context, obj); } +tf_ssb_connection_t* tf_ssb_connection_get(tf_ssb_t* ssb, const char* id) +{ + uint8_t pub[k_id_bin_len] = { 0 }; + tf_ssb_id_str_to_bin(pub, id); + for (tf_ssb_connection_t* connection = ssb->connections; connection; connection = connection->next) + { + if (memcmp(connection->serverpub, pub, k_id_bin_len) == 0) + { + return connection; + } + else if (memcmp(ssb->pub, pub, k_id_bin_len) == 0) + { + return connection; + } + } + return NULL; +} + const char** tf_ssb_get_connection_ids(tf_ssb_t* ssb) { int count = 0; @@ -2561,6 +2736,11 @@ JSClassID tf_ssb_get_connection_class_id() JSValue tf_ssb_connection_get_object(tf_ssb_connection_t* connection) { + if (connection && !JS_IsUndefined(connection->object)) + { + JSRefCountHeader *p = (JSRefCountHeader *)JS_VALUE_GET_PTR(connection->object); + printf("%p _get_object count=%d\nn", JS_VALUE_GET_PTR(connection->object), p->ref_count); + } return connection ? connection->object : JS_UNDEFINED; } @@ -2660,3 +2840,42 @@ void tf_ssb_notify_blob_want_added(tf_ssb_t* ssb, const char* id) node->callback(ssb, id, node->user_data); } } + +void tf_ssb_connection_add_room_attendant(tf_ssb_connection_t* connection, const char* id) +{ + tf_ssb_broadcast_t broadcast = + { + .tunnel_connection = connection, + }; + tf_ssb_id_str_to_bin(broadcast.pub, id); + _tf_ssb_add_broadcast(connection->ssb, &broadcast); +} + +void tf_ssb_connection_remove_room_attendant(tf_ssb_connection_t* connection, const char* id) +{ + uint8_t pub[k_id_bin_len] = { 0 }; + tf_ssb_id_str_to_bin(pub, id); + + int modified = 0; + for (tf_ssb_broadcast_t** it = &connection->ssb->broadcasts; *it;) + { + if ((*it)->tunnel_connection == connection && + memcmp((*it)->pub, pub, k_id_bin_len) == 0) + { + tf_ssb_broadcast_t* node = *it; + *it = node->next; + tf_free(node); + connection->ssb->broadcasts_count--; + modified++; + } + else + { + it = &(*it)->next; + } + } + + if (modified) + { + _tf_ssb_notify_broadcasts_changed(connection->ssb); + } +} diff --git a/src/ssb.db.c b/src/ssb.db.c index 670d7824..b8961fa4 100644 --- a/src/ssb.db.c +++ b/src/ssb.db.c @@ -114,7 +114,7 @@ void tf_ssb_db_init(tf_ssb_t* ssb) populate_fts = true; } - if (!populate_fts) + if (!populate_fts && /* HACK */ false) { printf("Checking FTS5 integrity...\n"); if (sqlite3_exec(db, "INSERT INTO messages_fts(messages_fts, rank) VALUES ('integrity-check', 0)", NULL, NULL, NULL) == SQLITE_CORRUPT_VTAB) @@ -279,6 +279,7 @@ bool tf_ssb_db_store_message(tf_ssb_t* ssb, JSContext* context, const char* id, { printf("%s\n", sqlite3_errmsg(db)); } + printf("changes = %d\n", sqlite3_changes(db)); stored = r == SQLITE_DONE && sqlite3_changes(db) != 0; if (stored) { diff --git a/src/ssb.h b/src/ssb.h index 86cc648d..9a2457df 100644 --- a/src/ssb.h +++ b/src/ssb.h @@ -80,8 +80,9 @@ void tf_ssb_run(tf_ssb_t* ssb); void tf_ssb_append_message_with_keys(tf_ssb_t* ssb, const char* author, const uint8_t* private_key, JSValue message); bool tf_ssb_whoami(tf_ssb_t* ssb, char* out_id, size_t out_id_size); -void tf_ssb_visit_broadcasts(tf_ssb_t* ssb, void (*callback)(const struct sockaddr_in* addr, const uint8_t* pub, void* user_data), void* user_data); +void tf_ssb_visit_broadcasts(tf_ssb_t* ssb, void (*callback)(const struct sockaddr_in* addr, tf_ssb_connection_t* tunnel, const uint8_t* pub, void* user_data), void* user_data); +tf_ssb_connection_t* tf_ssb_get_connection(tf_ssb_t* ssb, const char* id); const char** tf_ssb_get_connection_ids(tf_ssb_t* ssb); int tf_ssb_get_connections(tf_ssb_t* ssb, tf_ssb_connection_t** out_connections, int out_connections_count); void tf_ssb_connect(tf_ssb_t* ssb, const char* host, int port, const uint8_t* key); @@ -105,6 +106,7 @@ sqlite3* tf_ssb_connection_get_db(tf_ssb_connection_t* connection); int32_t tf_ssb_connection_next_request_number(tf_ssb_connection_t* connection); +tf_ssb_connection_t* tf_ssb_connection_get(tf_ssb_t* ssb, const char* id); bool tf_ssb_connection_get_id(tf_ssb_connection_t* connection, char* out_id, size_t out_id_size); JSValue tf_ssb_connection_get_object(tf_ssb_connection_t* connection); @@ -132,8 +134,13 @@ typedef void (tf_ssb_rpc_callback_t)(tf_ssb_connection_t* connection, uint8_t fl void tf_ssb_add_rpc_callback(tf_ssb_t* ssb, const char** name, tf_ssb_rpc_callback_t* callback, tf_ssb_callback_cleanup_t* cleanup, void* user_data); void tf_ssb_remove_rpc_callback(tf_ssb_t* ssb, const char** name, tf_ssb_rpc_callback_t* callback, void* user_data); -void tf_ssb_connection_rpc_send(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, const uint8_t* message, size_t size, tf_ssb_rpc_callback_t* callback, void* user_data); -void tf_ssb_connection_add_request(tf_ssb_connection_t* connection, int32_t request_number, tf_ssb_rpc_callback_t* callback, void* user_data); +void tf_ssb_connection_rpc_send(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, const uint8_t* message, size_t size, tf_ssb_rpc_callback_t* callback, tf_ssb_callback_cleanup_t* cleanup, void* user_data); +void tf_ssb_connection_add_request(tf_ssb_connection_t* connection, int32_t request_number, tf_ssb_rpc_callback_t* callback, tf_ssb_callback_cleanup_t* cleanup, void* user_data); + +void tf_ssb_connection_add_room_attendant(tf_ssb_connection_t* connection, const char* id); +void tf_ssb_connection_remove_room_attendant(tf_ssb_connection_t* connection, const char* id); + +tf_ssb_connection_t* tf_ssb_connection_tunnel_create(tf_ssb_connection_t* connection, int32_t request_number, const char* target_id); JSClassID tf_ssb_get_connection_class_id(); diff --git a/src/ssb.js.c b/src/ssb.js.c index a62d1fad..a4f4f9e8 100644 --- a/src/ssb.js.c +++ b/src/ssb.js.c @@ -253,6 +253,15 @@ static JSValue _tf_ssb_connections(JSContext* context, JSValueConst this_val, in return result; } +static JSValue _tf_ssb_getConnection(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv) +{ + tf_ssb_t* ssb = JS_GetOpaque(this_val, _tf_ssb_classId); + const char* id = JS_ToCString(context, argv[0]); + tf_ssb_connection_t* connection = tf_ssb_connection_get(ssb, id); + JS_FreeCString(context, id); + return JS_DupValue(context, tf_ssb_connection_get_object(connection)); +} + typedef struct _sqlStream_callback_t { JSContext* context; @@ -319,7 +328,7 @@ typedef struct _broadcasts_t int length; } broadcasts_t; -static void _tf_ssb_broadcasts_visit(const struct sockaddr_in* addr, const uint8_t* pub, void* user_data) +static void _tf_ssb_broadcasts_visit(const struct sockaddr_in* addr, tf_ssb_connection_t* tunnel, const uint8_t* pub, void* user_data) { broadcasts_t* broadcasts = user_data; JSValue entry = JS_NewObject(broadcasts->context); @@ -327,8 +336,15 @@ static void _tf_ssb_broadcasts_visit(const struct sockaddr_in* addr, const uint8 char pubkey[k_id_base64_len]; uv_ip4_name(addr, address, sizeof(address)); tf_ssb_id_bin_to_str(pubkey, sizeof(pubkey), pub); - JS_SetPropertyStr(broadcasts->context, entry, "address", JS_NewString(broadcasts->context, address)); - JS_SetPropertyStr(broadcasts->context, entry, "port", JS_NewInt32(broadcasts->context, ntohs(addr->sin_port))); + if (tunnel) + { + JS_SetPropertyStr(broadcasts->context, entry, "tunnel", JS_DupValue(broadcasts->context, tf_ssb_connection_get_object(tunnel))); + } + else + { + JS_SetPropertyStr(broadcasts->context, entry, "address", JS_NewString(broadcasts->context, address)); + JS_SetPropertyStr(broadcasts->context, entry, "port", JS_NewInt32(broadcasts->context, ntohs(addr->sin_port))); + } JS_SetPropertyStr(broadcasts->context, entry, "pubkey", JS_NewString(broadcasts->context, pubkey)); JS_SetPropertyUint32(broadcasts->context, broadcasts->array, broadcasts->length++, entry); } @@ -411,6 +427,7 @@ static JSValue _tf_ssb_rpc_send_json(JSContext* context, JSValueConst this_val, (const uint8_t*)message, size, NULL, + NULL, NULL); JS_FreeValue(context, connection_val); JS_FreeCString(context, message); @@ -443,6 +460,7 @@ static JSValue _tf_ssb_rpc_send_json_end(JSContext* context, JSValueConst this_v (const uint8_t*)message, size, NULL, + NULL, NULL); JS_FreeValue(context, connection_val); JS_FreeCString(context, message); @@ -450,6 +468,12 @@ static JSValue _tf_ssb_rpc_send_json_end(JSContext* context, JSValueConst this_v return JS_UNDEFINED; } +static void _tf_ssb_cleanup_value(tf_ssb_t* ssb, void* user_data) +{ + JSValue callback = JS_MKPTR(JS_TAG_OBJECT, user_data); + JS_FreeValue(tf_ssb_get_context(ssb), callback); +} + static JSValue _tf_ssb_rpc_more(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv) { JSValue connection_val = JS_GetPropertyStr(context, this_val, "connection"); @@ -459,7 +483,7 @@ static JSValue _tf_ssb_rpc_more(JSContext* context, JSValueConst this_val, int a JS_ToInt32(context, &request_number, request_val); JS_FreeValue(context, request_val); - tf_ssb_connection_add_request(connection, -request_number, _tf_ssb_on_rpc, JS_VALUE_GET_PTR(JS_DupValue(context, argv[0]))); + tf_ssb_connection_add_request(connection, -request_number, _tf_ssb_on_rpc, _tf_ssb_cleanup_value, JS_VALUE_GET_PTR(JS_DupValue(context, argv[0]))); JS_FreeValue(context, connection_val); return JS_UNDEFINED; @@ -485,6 +509,7 @@ static JSValue _tf_ssb_rpc_send_binary(JSContext* context, JSValueConst this_val (const uint8_t*)message, size, NULL, + NULL, NULL); } else @@ -505,6 +530,7 @@ static JSValue _tf_ssb_rpc_send_binary(JSContext* context, JSValueConst this_val (const uint8_t*)message + offset, size, NULL, + NULL, NULL); } } @@ -514,6 +540,28 @@ static JSValue _tf_ssb_rpc_send_binary(JSContext* context, JSValueConst this_val return JS_UNDEFINED; } +static JSValue _tf_ssb_rpc_add_room_attendant(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv) +{ + JSValue connection_val = JS_GetPropertyStr(context, this_val, "connection"); + tf_ssb_connection_t* connection = JS_GetOpaque(connection_val, tf_ssb_get_connection_class_id()); + const char* id = JS_ToCString(context, argv[0]); + tf_ssb_connection_add_room_attendant(connection, id); + JS_FreeCString(context, id); + JS_FreeValue(context, connection_val); + return JS_UNDEFINED; +} + +static JSValue _tf_ssb_rpc_remove_room_attendant(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv) +{ + JSValue connection_val = JS_GetPropertyStr(context, this_val, "connection"); + tf_ssb_connection_t* connection = JS_GetOpaque(connection_val, tf_ssb_get_connection_class_id()); + const char* id = JS_ToCString(context, argv[0]); + tf_ssb_connection_remove_room_attendant(connection, id); + JS_FreeCString(context, id); + JS_FreeValue(context, connection_val); + return JS_UNDEFINED; +} + void _tf_ssb_on_rpc(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) { tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection); @@ -530,6 +578,8 @@ void _tf_ssb_on_rpc(tf_ssb_connection_t* connection, uint8_t flags, int32_t requ JS_SetPropertyStr(context, object, "send_binary", JS_NewCFunction(context, _tf_ssb_rpc_send_binary, "send_binary", 1)); JS_SetPropertyStr(context, object, "send_json_end", JS_NewCFunction(context, _tf_ssb_rpc_send_json_end, "send_json_end", 1)); JS_SetPropertyStr(context, object, "more", JS_NewCFunction(context, _tf_ssb_rpc_more, "more", 1)); + JS_SetPropertyStr(context, object, "add_room_attendant", JS_NewCFunction(context, _tf_ssb_rpc_add_room_attendant, "add_room_attendant", 1)); + JS_SetPropertyStr(context, object, "remove_room_attendant", JS_NewCFunction(context, _tf_ssb_rpc_remove_room_attendant, "remove_room_attendant", 1)); JSValue result = JS_Call(context, callback, JS_UNDEFINED, 1, &object); tf_util_report_error(context, result); @@ -537,12 +587,6 @@ void _tf_ssb_on_rpc(tf_ssb_connection_t* connection, uint8_t flags, int32_t requ JS_FreeValue(context, object); } -static void _tf_ssb_cleanup_value(tf_ssb_t* ssb, void* user_data) -{ - JSValue callback = JS_MKPTR(JS_TAG_OBJECT, user_data); - JS_FreeValue(tf_ssb_get_context(ssb), callback); -} - static JSValue _tf_ssb_add_rpc(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv) { tf_ssb_t* ssb = JS_GetOpaque(this_val, _tf_ssb_classId); @@ -888,6 +932,67 @@ static JSValue _tf_ssb_hmacsha256_verify(JSContext* context, JSValueConst this_v return result; } +static JSValue _tf_ssb_createTunnel(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv) +{ + tf_ssb_connection_t* connection = JS_GetOpaque(argv[0], tf_ssb_get_connection_class_id()); + int32_t request_number = 0; + JS_ToInt32(context, &request_number, argv[1]); + const char* target_id = JS_ToCString(context, argv[2]); + tf_ssb_connection_tunnel_create(connection, request_number, target_id); + JS_FreeCString(context, target_id); + return JS_UNDEFINED; +} + +typedef struct tunnel_t +{ + tf_ssb_connection_t* connection; + int32_t request_number; +} tunnel_t; + +void _tf_ssb_tunnel_rpc_callback(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) +{ + tunnel_t* tun = user_data; + tf_ssb_connection_rpc_send(tun->connection, flags, tun->request_number, message, size, NULL, NULL, NULL); +} + +void _tf_ssb_tunnel_cleanup(tf_ssb_t* ssb, void* user_data) +{ + tf_free(user_data); +} + +static JSValue _tf_ssb_tunnel(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv) +{ + tf_ssb_connection_t* connection0 = JS_GetOpaque(argv[0], tf_ssb_get_connection_class_id()); + int32_t request_number0 = 0; + JS_ToInt32(context, &request_number0, argv[1]); + + tf_ssb_connection_t* connection1 = JS_GetOpaque(argv[2], tf_ssb_get_connection_class_id()); + int32_t request_number1 = 0; + JS_ToInt32(context, &request_number1, argv[3]); + + printf("TUNNEL %p %d <=> %p %d\n", connection0, request_number0, connection1, request_number1); + + tunnel_t* data0 = tf_malloc(sizeof(tunnel_t)); + *data0 = (tunnel_t) + { + .connection = connection1, + .request_number = request_number1, + }; + tunnel_t* data1 = tf_malloc(sizeof(tunnel_t)); + *data1 = (tunnel_t) + { + .connection = connection0, + .request_number = request_number0, + }; + + printf("ADD REQUEST %p %d\n", connection0, request_number0); + printf("ADD REQUEST %p %d\n", connection1, request_number1); + tf_ssb_connection_add_request(connection0, request_number0, _tf_ssb_tunnel_rpc_callback, _tf_ssb_tunnel_cleanup, data0); + tf_ssb_connection_add_request(connection1, request_number1, _tf_ssb_tunnel_rpc_callback, _tf_ssb_tunnel_cleanup, data1); + + return JS_UNDEFINED; +} + void tf_ssb_register(JSContext* context, tf_ssb_t* ssb) { JS_NewClassID(&_tf_ssb_classId); @@ -919,10 +1024,13 @@ void tf_ssb_register(JSContext* context, tf_ssb_t* ssb) JS_SetPropertyStr(context, object, "blobStore", JS_NewCFunction(context, _tf_ssb_blobStore, "blobStore", 2)); JS_SetPropertyStr(context, object, "messageContentGet", JS_NewCFunction(context, _tf_ssb_messageContentGet, "messageContentGet", 1)); JS_SetPropertyStr(context, object, "connections", JS_NewCFunction(context, _tf_ssb_connections, "connections", 0)); + JS_SetPropertyStr(context, object, "getConnection", JS_NewCFunction(context, _tf_ssb_getConnection, "getConnection", 1)); JS_SetPropertyStr(context, object, "sqlStream", JS_NewCFunction(context, _tf_ssb_sqlStream, "sqlStream", 3)); JS_SetPropertyStr(context, object, "storeMessage", JS_NewCFunction(context, _tf_ssb_storeMessage, "storeMessage", 1)); JS_SetPropertyStr(context, object, "getBroadcasts", JS_NewCFunction(context, _tf_ssb_getBroadcasts, "getBroadcasts", 0)); JS_SetPropertyStr(context, object, "connect", JS_NewCFunction(context, _tf_ssb_connect, "connect", 1)); + JS_SetPropertyStr(context, object, "createTunnel", JS_NewCFunction(context, _tf_ssb_createTunnel, "createTunnel", 3)); + JS_SetPropertyStr(context, object, "tunnel", JS_NewCFunction(context, _tf_ssb_tunnel, "tunnel", 4)); /* Should be trusted only. */ JS_SetPropertyStr(context, object, "addRpc", JS_NewCFunction(context, _tf_ssb_add_rpc, "addRpc", 2)); diff --git a/src/ssb.tests.c b/src/ssb.tests.c index a2eb9c14..d39de25c 100644 --- a/src/ssb.tests.c +++ b/src/ssb.tests.c @@ -35,8 +35,13 @@ void tf_ssb_test_id_conversion(const tf_test_options_t* options) typedef struct _test_t { tf_ssb_t* ssb0; tf_ssb_t* ssb1; + tf_ssb_t* ssb2; int connection_count0; int connection_count1; + int connection_count2; + int broadcast_count0; + int broadcast_count1; + int broadcast_count2; } test_t; static void _ssb_test_connections_changed(tf_ssb_t* ssb, tf_ssb_change_t change, tf_ssb_connection_t* connection, void* user_data) @@ -61,7 +66,12 @@ static void _ssb_test_connections_changed(tf_ssb_t* ssb, tf_ssb_change_t change, printf("callback1 change=%d connection=%p\n", change, connection); test->connection_count1 = count; } - printf("conns = %d %d\n", test->connection_count0, test->connection_count1); + else if (ssb == test->ssb2) + { + printf("callback2 change=%d connection=%p\n", change, connection); + test->connection_count2 = count; + } + printf("conns = %d %d %d\n", test->connection_count0, test->connection_count1, test->connection_count2); } static void _count_messages_callback(JSValue row, void* user_data) @@ -240,6 +250,193 @@ void tf_ssb_test_ssb(const tf_test_options_t* options) sqlite3_close(db1); } +static void _broadcasts_visit(const struct sockaddr_in* addr, tf_ssb_connection_t* tunnel, const uint8_t* pub, void* user_data) +{ + int* count = user_data; + (*count)++; +} + +static void _broadcasts_changed(tf_ssb_t* ssb, void* user_data) +{ + int* count = NULL; + test_t* test = user_data; + if (ssb == test->ssb0) + { + count = &test->broadcast_count0; + } + else if (ssb == test->ssb1) + { + count = &test->broadcast_count1; + } + else if (ssb == test->ssb2) + { + count = &test->broadcast_count2; + } + *count = 0; + tf_ssb_visit_broadcasts(ssb, _broadcasts_visit, count); + printf("BROADCASTS %d %d %d\n", test->broadcast_count0, test->broadcast_count1, test->broadcast_count2); +} + +void tf_ssb_test_rooms(const tf_test_options_t* options) +{ + printf("Testing Rooms.\n"); + sqlite3* db0 = NULL; + sqlite3* db1 = NULL; + sqlite3* db2 = NULL; + + int r = sqlite3_open(":memory:", &db0); + (void)r; + assert(r == SQLITE_OK); + r = sqlite3_open(":memory:", &db1); + assert(r == SQLITE_OK); + r = sqlite3_open(":memory:", &db2); + assert(r == SQLITE_OK); + + uv_loop_t loop = { 0 }; + uv_loop_init(&loop); + + tf_ssb_t* ssb0 = tf_ssb_create(&loop, NULL, db0); + tf_ssb_register(tf_ssb_get_context(ssb0), ssb0); + tf_ssb_t* ssb1 = tf_ssb_create(&loop, NULL, db1); + tf_ssb_register(tf_ssb_get_context(ssb1), ssb1); + tf_ssb_t* ssb2 = tf_ssb_create(&loop, NULL, db2); + tf_ssb_register(tf_ssb_get_context(ssb2), ssb2); + + uv_idle_t idle0 = { .data = ssb0 }; + uv_idle_init(&loop, &idle0); + uv_idle_start(&idle0, _ssb_test_idle); + + uv_idle_t idle1 = { .data = ssb1 }; + uv_idle_init(&loop, &idle1); + uv_idle_start(&idle1, _ssb_test_idle); + + uv_idle_t idle2 = { .data = ssb2 }; + uv_idle_init(&loop, &idle2); + uv_idle_start(&idle2, _ssb_test_idle); + + test_t test = + { + .ssb0 = ssb0, + .ssb1 = ssb1, + .ssb2 = ssb2, + }; + + tf_ssb_add_connections_changed_callback(ssb0, _ssb_test_connections_changed, NULL, &test); + tf_ssb_add_connections_changed_callback(ssb1, _ssb_test_connections_changed, NULL, &test); + tf_ssb_add_connections_changed_callback(ssb2, _ssb_test_connections_changed, NULL, &test); + + tf_ssb_add_broadcasts_changed_callback(ssb0, _broadcasts_changed, NULL, &test); + tf_ssb_add_broadcasts_changed_callback(ssb1, _broadcasts_changed, NULL, &test); + tf_ssb_add_broadcasts_changed_callback(ssb2, _broadcasts_changed, NULL, &test); + + tf_ssb_generate_keys(ssb0); + tf_ssb_generate_keys(ssb1); + tf_ssb_generate_keys(ssb2); + + char id0[k_id_base64_len] = { 0 }; + char id1[k_id_base64_len] = { 0 }; + char id2[k_id_base64_len] = { 0 }; + bool b = tf_ssb_whoami(ssb0, id0, sizeof(id0)); + (void)b; + assert(b); + b = tf_ssb_whoami(ssb1, id1, sizeof(id1)); + assert(b); + b = tf_ssb_whoami(ssb2, id2, sizeof(id2)); + assert(b); + printf("ID %s, %s, %s\n", id0, id1, id2); + + tf_ssb_server_open(ssb0, 12347); + + uint8_t id0bin[k_id_bin_len]; + tf_ssb_id_str_to_bin(id0bin, id0); + tf_ssb_connect(ssb1, "127.0.0.1", 12347, id0bin); + tf_ssb_connect(ssb2, "127.0.0.1", 12347, id0bin); + + printf("Waiting for connection.\n"); + while (test.connection_count0 != 2 || + test.connection_count1 != 1 || + test.connection_count2 != 1) + { + uv_run(&loop, UV_RUN_ONCE); + } + tf_ssb_server_close(ssb0); + + while (test.broadcast_count1 != 1 || + test.broadcast_count2 != 1) + { + uv_run(&loop, UV_RUN_ONCE); + } + + tf_ssb_connection_t* connections[4]; + int count = tf_ssb_get_connections(ssb1, connections, 4); + assert(count == 1); + + int32_t tunnel_request_number = tf_ssb_connection_next_request_number(connections[0]); + + JSContext* context = tf_ssb_get_context(ssb1); + JSValue message = JS_NewObject(context); + JSValue name = JS_NewArray(context); + JS_SetPropertyUint32(context, name, 0, JS_NewString(context, "tunnel")); + JS_SetPropertyUint32(context, name, 1, JS_NewString(context, "connect")); + JS_SetPropertyStr(context, message, "name", name); + JSValue args = JS_NewArray(context); + JSValue arg = JS_NewObject(context); + JS_SetPropertyStr(context, arg, "portal", JS_NewString(context, id0)); + JS_SetPropertyStr(context, arg, "target", JS_NewString(context, id2)); + JS_SetPropertyUint32(context, args, 0, arg); + JS_SetPropertyStr(context, message, "args", args); + JS_SetPropertyStr(context, message, "type", JS_NewString(context, "duplex")); + + JSValue message_json = JS_JSONStringify(context, message, JS_NULL, JS_NULL); + size_t size; + const char* raw = JS_ToCStringLen(context, &size, message_json); + tf_ssb_connection_rpc_send( + connections[0], + k_ssb_rpc_flag_json | k_ssb_rpc_flag_stream, + tunnel_request_number, + (const uint8_t*)raw, + size, + NULL, + NULL, + NULL); + JS_FreeCString(context, raw); + JS_FreeValue(context, message_json); + JS_FreeValue(context, message); + + tf_ssb_connection_t* tun0 = tf_ssb_connection_tunnel_create(connections[0], tunnel_request_number, id2); + printf("tun0 = %p\n", tun0); + + printf("Done.\n"); + + while (test.connection_count0 != 2 || + test.connection_count1 != 2 || + test.connection_count2 != 2) + { + uv_run(&loop, UV_RUN_ONCE); + } + + printf("Done.\n"); + + tf_ssb_send_close(ssb1); + tf_ssb_send_close(ssb2); + + uv_close((uv_handle_t*)&idle0, NULL); + uv_close((uv_handle_t*)&idle1, NULL); + uv_close((uv_handle_t*)&idle2, NULL); + + uv_run(&loop, UV_RUN_DEFAULT); + + tf_ssb_destroy(ssb0); + tf_ssb_destroy(ssb1); + tf_ssb_destroy(ssb2); + + uv_loop_close(&loop); + + sqlite3_close(db0); + sqlite3_close(db1); + sqlite3_close(db2); +} + void tf_ssb_test_following(const tf_test_options_t* options) { printf("Testing following.\n"); diff --git a/src/ssb.tests.h b/src/ssb.tests.h index 11ce688c..631ec754 100644 --- a/src/ssb.tests.h +++ b/src/ssb.tests.h @@ -5,3 +5,4 @@ typedef struct _tf_test_options_t tf_test_options_t; void tf_ssb_test_id_conversion(const tf_test_options_t* options); void tf_ssb_test_ssb(const tf_test_options_t* options); void tf_ssb_test_following(const tf_test_options_t* options); +void tf_ssb_test_rooms(const tf_test_options_t* options); diff --git a/src/tests.c b/src/tests.c index 4f6e4908..ffb573ad 100644 --- a/src/tests.c +++ b/src/tests.c @@ -696,5 +696,6 @@ void tf_tests(const tf_test_options_t* options) _tf_test_run(options, "file", _test_file); _tf_test_run(options, "sign", _test_sign); _tf_test_run(options, "b64", _test_b64); + _tf_test_run(options, "rooms", tf_ssb_test_rooms); printf("Tests completed.\n"); }