From f01f7a5ab9efe8ddc6000c25043aa9ffc7ddef87 Mon Sep 17 00:00:00 2001 From: Cory McWilliams Date: Thu, 2 May 2024 19:02:23 -0400 Subject: [PATCH] Show active RPC requests in the connections tab. Probably TMI, but I want greater introspection into what is going on, and this seemed like a positive step. --- apps/ssb.json | 2 +- apps/ssb/tf-tab-connections.js | 3 ++ src/ssb.c | 75 ++++++++++++++++++++++++++-------- src/ssb.connections.c | 1 + src/ssb.h | 17 ++++++-- src/ssb.js.c | 19 ++++++++- src/ssb.rpc.c | 56 ++++++++++++------------- src/ssb.tests.c | 6 +-- 8 files changed, 127 insertions(+), 52 deletions(-) diff --git a/apps/ssb.json b/apps/ssb.json index 16ae2ce4..0b8a1b6d 100644 --- a/apps/ssb.json +++ b/apps/ssb.json @@ -1,5 +1,5 @@ { "type": "tildefriends-app", "emoji": "🐌", - "previous": "&eapETfizGynW7oCLJmlsDwEYHqXWh2usHnDdVPxpXv0=.sha256" + "previous": "&XFhkB74Ll0f2MA24HyKNK42Dckkb+3AGmuDxJDOets8=.sha256" } diff --git a/apps/ssb/tf-tab-connections.js b/apps/ssb/tf-tab-connections.js index f4a9ac40..abdf376a 100644 --- a/apps/ssb/tf-tab-connections.js +++ b/apps/ssb/tf-tab-connections.js @@ -111,6 +111,9 @@ class TfTabConnectionsElement extends LitElement { ${this.connections .filter((x) => x.tunnel === this.connections.indexOf(connection)) .map((x) => html`
  • ${this.render_connection(x)}
  • `)} + ${connection.requests.map(x => html` + ${x.request_number > 0 ? '🟩' : '🟥'} ${x.name} + `)} ${this.render_room_peers(connection.id)} `; diff --git a/src/ssb.c b/src/ssb.c index 1b4a0776..c4783851 100644 --- a/src/ssb.c +++ b/src/ssb.c @@ -98,6 +98,7 @@ typedef struct _tf_ssb_debug_close_t typedef struct _tf_ssb_request_t { + char name[256]; tf_ssb_rpc_callback_t* callback; tf_ssb_callback_cleanup_t* cleanup; void* user_data; @@ -347,15 +348,16 @@ static JSClassID _connection_class_id; static int s_connection_index; static int s_tunnel_index; -static void _tf_ssb_connection_destroy(tf_ssb_connection_t* connection, const char* reason); static void _tf_ssb_connection_client_send_hello(tf_ssb_connection_t* connection); -static void _tf_ssb_connection_on_close(uv_handle_t* handle); static void _tf_ssb_connection_close(tf_ssb_connection_t* connection, const char* reason); -static void _tf_ssb_nonce_inc(uint8_t* nonce); -static void _tf_ssb_write(tf_ssb_connection_t* connection, void* data, size_t size); +static void _tf_ssb_connection_destroy(tf_ssb_connection_t* connection, const char* reason); static void _tf_ssb_connection_finalizer(JSRuntime* runtime, JSValue value); -static void _tf_ssb_update_settings(tf_ssb_t* ssb); +static void _tf_ssb_connection_on_close(uv_handle_t* handle); +static void _tf_ssb_nonce_inc(uint8_t* nonce); +static void _tf_ssb_notify_connections_changed(tf_ssb_t* ssb, tf_ssb_change_t change, tf_ssb_connection_t* connection); static void _tf_ssb_start_update_settings(tf_ssb_t* ssb); +static void _tf_ssb_update_settings(tf_ssb_t* ssb); +static void _tf_ssb_write(tf_ssb_connection_t* connection, void* data, size_t size); static void _tf_ssb_add_debug_close(tf_ssb_t* ssb, tf_ssb_connection_t* connection, const char* reason) { @@ -482,7 +484,7 @@ static void _tf_ssb_write(tf_ssb_connection_t* connection, void* data, size_t si } else if (connection->tunnel_connection) { - tf_ssb_connection_rpc_send(connection->tunnel_connection, k_ssb_rpc_flag_binary | k_ssb_rpc_flag_stream, -connection->tunnel_request_number, data, size, NULL, NULL, NULL); + tf_ssb_connection_rpc_send(connection->tunnel_connection, k_ssb_rpc_flag_binary | k_ssb_rpc_flag_stream, -connection->tunnel_request_number, NULL, data, size, NULL, NULL, NULL); } } @@ -640,7 +642,7 @@ static bool _tf_ssb_connection_get_request_callback(tf_ssb_connection_t* connect return false; } -void tf_ssb_connection_add_request(tf_ssb_connection_t* connection, int32_t request_number, tf_ssb_rpc_callback_t* callback, tf_ssb_callback_cleanup_t* cleanup, void* user_data, +void tf_ssb_connection_add_request(tf_ssb_connection_t* connection, int32_t request_number, const char* name, tf_ssb_rpc_callback_t* callback, tf_ssb_callback_cleanup_t* cleanup, void* user_data, tf_ssb_connection_t* dependent_connection) { tf_ssb_request_t* existing = @@ -666,6 +668,7 @@ void tf_ssb_connection_add_request(tf_ssb_connection_t* connection, int32_t requ .user_data = user_data, .dependent_connection = dependent_connection, }; + snprintf(request.name, sizeof(request.name), "%s", name); int index = tf_util_insert_index(&request_number, connection->requests, connection->requests_count, sizeof(tf_ssb_request_t), _request_compare); connection->requests = tf_resize_vec(connection->requests, sizeof(tf_ssb_request_t) * (connection->requests_count + 1)); if (connection->requests_count - index) @@ -677,6 +680,7 @@ void tf_ssb_connection_add_request(tf_ssb_connection_t* connection, int32_t requ connection->ssb->request_count++; } + _tf_ssb_notify_connections_changed(connection->ssb, k_tf_ssb_change_update, connection); } static int _message_request_compare(const void* a, const void* b) @@ -737,10 +741,11 @@ void tf_ssb_connection_remove_request(tf_ssb_connection_t* connection, int32_t r connection->requests_count--; connection->requests = tf_resize_vec(connection->requests, sizeof(tf_ssb_request_t) * connection->requests_count); connection->ssb->request_count--; + _tf_ssb_notify_connections_changed(connection->ssb, k_tf_ssb_change_update, connection); } } -void tf_ssb_connection_rpc_send(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, const uint8_t* message, size_t size, tf_ssb_rpc_callback_t* callback, +void tf_ssb_connection_rpc_send(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, const char* new_request_name, const uint8_t* message, size_t size, tf_ssb_rpc_callback_t* callback, tf_ssb_callback_cleanup_t* cleanup, void* user_data) { if (!connection) @@ -755,6 +760,7 @@ void tf_ssb_connection_rpc_send(tf_ssb_connection_t* connection, uint8_t flags, { assert(request_number > 0); assert(!_tf_ssb_connection_get_request_callback(connection, request_number, NULL, NULL)); + assert(new_request_name); } else if (!_tf_ssb_connection_get_request_callback(connection, request_number, NULL, NULL)) { @@ -785,18 +791,18 @@ void tf_ssb_connection_rpc_send(tf_ssb_connection_t* connection, uint8_t flags, } else if (flags & k_ssb_rpc_flag_new_request) { - tf_ssb_connection_add_request(connection, request_number, callback, cleanup, user_data, NULL); + tf_ssb_connection_add_request(connection, request_number, new_request_name, callback, cleanup, user_data, NULL); } } void tf_ssb_connection_rpc_send_json( - tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue message, tf_ssb_rpc_callback_t* callback, tf_ssb_callback_cleanup_t* cleanup, void* user_data) + tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, const char* new_request_name, JSValue message, tf_ssb_rpc_callback_t* callback, tf_ssb_callback_cleanup_t* cleanup, void* user_data) { JSContext* context = connection->ssb->context; JSValue json = JS_JSONStringify(context, message, JS_NULL, JS_NULL); size_t size = 0; const char* json_string = JS_ToCStringLen(context, &size, json); - tf_ssb_connection_rpc_send(connection, k_ssb_rpc_flag_json | (flags & ~k_ssb_rpc_mask_type), request_number, (const uint8_t*)json_string, size, callback, cleanup, user_data); + tf_ssb_connection_rpc_send(connection, k_ssb_rpc_flag_json | (flags & ~k_ssb_rpc_mask_type), request_number, new_request_name, (const uint8_t*)json_string, size, callback, cleanup, user_data); JS_FreeCString(context, json_string); JS_FreeValue(context, json); } @@ -810,7 +816,7 @@ void tf_ssb_connection_rpc_send_error(tf_ssb_connection_t* connection, uint8_t f JS_SetPropertyStr(context, message, "stack", JS_NewString(context, stack ? stack : "stack unavailable")); JS_SetPropertyStr(context, message, "message", JS_NewString(context, error)); tf_ssb_connection_rpc_send_json( - connection, ((flags & k_ssb_rpc_flag_stream) ? (k_ssb_rpc_flag_stream) : 0) | k_ssb_rpc_flag_end_error, request_number, message, NULL, NULL, NULL); + connection, ((flags & k_ssb_rpc_flag_stream) ? (k_ssb_rpc_flag_stream) : 0) | k_ssb_rpc_flag_end_error, request_number, NULL, message, NULL, NULL, NULL); JS_FreeValue(context, message); tf_free((void*)stack); } @@ -1550,7 +1556,30 @@ static void _tf_ssb_connection_rpc_recv(tf_ssb_connection_t* connection, uint8_t else if (JS_IsObject(val)) { bool found = false; - tf_ssb_connection_add_request(connection, -request_number, NULL, NULL, NULL, NULL); + char namebuf[256] = ""; + JSValue name = JS_GetPropertyStr(context, val, "name"); + if (JS_IsArray(context, name)) + { + int length = tf_util_get_length(context, name); + int offset = 0; + for (int i = 0; i < length; i++) + { + JSValue part = JS_GetPropertyUint32(context, name, i); + const char* part_str = JS_ToCString(context, part); + offset += snprintf(namebuf + offset, sizeof(namebuf) - offset, "%s%s", i == 0 ? "" : ".", part_str); + JS_FreeCString(context, part_str); + JS_FreeValue(context, part); + } + } + else if (JS_IsString(name)) + { + const char* part_str = JS_ToCString(context, name); + snprintf(namebuf, sizeof(namebuf), "%s", part_str); + JS_FreeCString(context, part_str); + } + JS_FreeValue(context, name); + + tf_ssb_connection_add_request(connection, -request_number, namebuf, NULL, NULL, NULL, NULL); for (tf_ssb_rpc_callback_node_t* it = connection->ssb->rpc; it; it = it->next) { if (_tf_ssb_name_equals(context, val, it->name)) @@ -2607,7 +2636,7 @@ static void _tf_ssb_connection_tunnel_callback( tf_ssb_connection_t* tunnel = user_data; if (flags & k_ssb_rpc_flag_end_error) { - tf_ssb_connection_rpc_send(connection, flags, -request_number, (const uint8_t*)"false", strlen("false"), NULL, NULL, NULL); + tf_ssb_connection_rpc_send(connection, flags, -request_number, NULL, (const uint8_t*)"false", strlen("false"), NULL, NULL, NULL); tf_ssb_connection_close(tunnel); } else @@ -2644,7 +2673,7 @@ tf_ssb_connection_t* tf_ssb_connection_tunnel_create(tf_ssb_t* ssb, const char* ssb->connections_count++; _tf_ssb_notify_connections_changed(ssb, k_tf_ssb_change_create, tunnel); - tf_ssb_connection_add_request(connection, request_number, _tf_ssb_connection_tunnel_callback, NULL, tunnel, tunnel); + tf_ssb_connection_add_request(connection, request_number, "tunnel.connect", _tf_ssb_connection_tunnel_callback, NULL, tunnel, tunnel); if (request_number > 0) { tunnel->state = k_tf_ssb_state_connected; @@ -3379,7 +3408,7 @@ void tf_ssb_notify_message_added(tf_ssb_t* ssb, const char* id, JSValue message_ if (message_request) { tf_ssb_connection_rpc_send_json( - connection, k_ssb_rpc_flag_stream, message_request->request_number, message_request->keys ? message_keys : message, NULL, NULL, NULL); + connection, k_ssb_rpc_flag_stream, message_request->request_number, NULL, message_request->keys ? message_keys : message, NULL, NULL, NULL); } } @@ -3937,3 +3966,17 @@ bool tf_ssb_hmacsha256_verify(const char* public_key, const void* payload, size_ } return result; } + +JSValue tf_ssb_connection_requests_to_object(tf_ssb_connection_t* connection) +{ + JSContext* context = connection->ssb->context; + JSValue object = JS_NewArray(context); + for (int i = 0; i < connection->requests_count; i++) + { + JSValue request = JS_NewObject(context); + JS_SetPropertyStr(context, request, "name", JS_NewString(context, connection->requests[i].name)); + JS_SetPropertyStr(context, request, "request_number", JS_NewInt32(context, connection->requests[i].request_number)); + JS_SetPropertyUint32(context, object, i, request); + } + return object; +} diff --git a/src/ssb.connections.c b/src/ssb.connections.c index c2cad022..6c240cd6 100644 --- a/src/ssb.connections.c +++ b/src/ssb.connections.c @@ -44,6 +44,7 @@ static void _tf_ssb_connections_changed_callback(tf_ssb_t* ssb, tf_ssb_change_t } break; case k_tf_ssb_change_remove: + case k_tf_ssb_change_update: break; } } diff --git a/src/ssb.h b/src/ssb.h index c6cd733a..551e42fa 100644 --- a/src/ssb.h +++ b/src/ssb.h @@ -38,6 +38,7 @@ typedef enum _tf_ssb_change_t k_tf_ssb_change_create, k_tf_ssb_change_connect, k_tf_ssb_change_remove, + k_tf_ssb_change_update, } tf_ssb_change_t; /** @@ -658,13 +659,14 @@ void tf_ssb_remove_rpc_callback(tf_ssb_t* ssb, const char** name, tf_ssb_rpc_cal ** @param connection The connection on which to send the message. ** @param flags The message flags. ** @param request_number The request number. +** @param new_request_name The name of the request if it is new. ** @param message The message payload. ** @param size The size of the message. ** @param callback A callback to call if a response is received. ** @param cleanup A callback to call if the callback is removed. ** @param user_data User data to pass to the callback. */ -void tf_ssb_connection_rpc_send(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, const uint8_t* message, size_t size, tf_ssb_rpc_callback_t* callback, +void tf_ssb_connection_rpc_send(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, const char* new_request_name, const uint8_t* message, size_t size, tf_ssb_rpc_callback_t* callback, tf_ssb_callback_cleanup_t* cleanup, void* user_data); /** @@ -672,13 +674,14 @@ void tf_ssb_connection_rpc_send(tf_ssb_connection_t* connection, uint8_t flags, ** @param connection The connection on which to send the message. ** @param flags The message flags. ** @param request_number The request number. +** @param new_request_name The name of the request if it is new. ** @param message The JS message payload. ** @param callback A callback to call if a response is received. ** @param cleanup A callback to call if the callback is removed. ** @param user_data User data to pass to the callback. */ void tf_ssb_connection_rpc_send_json( - tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue message, tf_ssb_rpc_callback_t* callback, tf_ssb_callback_cleanup_t* cleanup, void* user_data); + tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, const char* new_request_name, JSValue message, tf_ssb_rpc_callback_t* callback, tf_ssb_callback_cleanup_t* cleanup, void* user_data); /** ** Send a MUXRPC error message. @@ -703,12 +706,13 @@ void tf_ssb_connection_rpc_send_error_method_not_allowed(tf_ssb_connection_t* co ** request number. ** @param connection The connection on which to register the callback. ** @param request_number The request number. +** @param name The name of the RPC request. ** @param callback The callback. ** @param cleanup The function to call when the callback is removed. ** @param user_data User data to pass to the callback. ** @param dependent_connection A connection, which, if removed, invalidates this request. */ -void tf_ssb_connection_add_request(tf_ssb_connection_t* connection, int32_t request_number, tf_ssb_rpc_callback_t* callback, tf_ssb_callback_cleanup_t* cleanup, void* user_data, +void tf_ssb_connection_add_request(tf_ssb_connection_t* connection, int32_t request_number, const char* name, tf_ssb_rpc_callback_t* callback, tf_ssb_callback_cleanup_t* cleanup, void* user_data, tf_ssb_connection_t* dependent_connection); /** @@ -719,6 +723,13 @@ void tf_ssb_connection_add_request(tf_ssb_connection_t* connection, int32_t requ */ void tf_ssb_connection_remove_request(tf_ssb_connection_t* connection, int32_t request_number); +/** +** Get a debug representation of active requests. +** @param connection The connection. +** @return The active requests as a JS object. +*/ +JSValue tf_ssb_connection_requests_to_object(tf_ssb_connection_t* connection); + /** ** A function scheduled to be run later. ** @param connection The owning connection. diff --git a/src/ssb.js.c b/src/ssb.js.c index 5fe60b84..774345ba 100644 --- a/src/ssb.js.c +++ b/src/ssb.js.c @@ -545,6 +545,7 @@ static JSValue _tf_ssb_connections(JSContext* context, JSValueConst this_val, in } JS_SetPropertyStr(context, object, "tunnel", JS_NewInt32(context, tunnel_index)); } + JS_SetPropertyStr(context, object, "requests", tf_ssb_connection_requests_to_object(connection)); JS_SetPropertyUint32(context, result, i, object); } } @@ -1146,6 +1147,22 @@ static void _tf_ssb_on_connections_changed_callback(tf_ssb_t* ssb, tf_ssb_change { case k_tf_ssb_change_create: break; + case k_tf_ssb_change_update: + { + JSValue object = JS_DupValue(context, tf_ssb_connection_get_object(connection)); + JSValue args[] = { + JS_NewString(context, "update"), + object, + }; + response = JS_Call(context, callback, JS_UNDEFINED, 2, args); + if (tf_util_report_error(context, response)) + { + tf_ssb_remove_connections_changed_callback(ssb, _tf_ssb_on_connections_changed_callback, user_data); + } + JS_FreeValue(context, args[0]); + JS_FreeValue(context, object); + } + break; case k_tf_ssb_change_connect: { JSValue object = JS_DupValue(context, tf_ssb_connection_get_object(connection)); @@ -1304,7 +1321,7 @@ static JSValue _tf_ssb_createTunnel(JSContext* context, JSValueConst this_val, i JS_SetPropertyUint32(context, args, 0, arg); JS_SetPropertyStr(context, message, "args", args); JS_SetPropertyStr(context, message, "type", JS_NewString(context, "duplex")); - tf_ssb_connection_rpc_send_json(connection, k_ssb_rpc_flag_stream | k_ssb_rpc_flag_new_request, request_number, message, NULL, NULL, NULL); + tf_ssb_connection_rpc_send_json(connection, k_ssb_rpc_flag_stream | k_ssb_rpc_flag_new_request, request_number, "tunnel.connect", message, NULL, NULL, NULL); JS_FreeValue(context, message); tf_ssb_connection_tunnel_create(ssb, portal_id, request_number, target_id); diff --git a/src/ssb.rpc.c b/src/ssb.rpc.c index ca03dff2..98437dd5 100644 --- a/src/ssb.rpc.c +++ b/src/ssb.rpc.c @@ -50,7 +50,7 @@ static void _tf_ssb_rpc_gossip_ping_callback( { char buffer[256]; snprintf(buffer, sizeof(buffer), "%" PRId64, (int64_t)time(NULL) * 1000); - tf_ssb_connection_rpc_send(connection, flags, -request_number, (const uint8_t*)buffer, strlen(buffer), NULL, NULL, NULL); + tf_ssb_connection_rpc_send(connection, flags, -request_number, NULL, (const uint8_t*)buffer, strlen(buffer), NULL, NULL, NULL); if (flags & k_ssb_rpc_flag_end_error) { tf_ssb_connection_remove_request(connection, request_number); @@ -59,7 +59,7 @@ static void _tf_ssb_rpc_gossip_ping_callback( static void _tf_ssb_rpc_gossip_ping(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) { - tf_ssb_connection_add_request(connection, -request_number, _tf_ssb_rpc_gossip_ping_callback, NULL, NULL, NULL); + tf_ssb_connection_add_request(connection, -request_number, "gossip.ping", _tf_ssb_rpc_gossip_ping_callback, NULL, NULL, NULL); } static void _tf_ssb_rpc_blobs_get_callback( @@ -74,7 +74,7 @@ static void _tf_ssb_rpc_blobs_get(tf_ssb_connection_t* connection, uint8_t flags return; } - tf_ssb_connection_add_request(connection, -request_number, _tf_ssb_rpc_blobs_get_callback, NULL, NULL, NULL); + tf_ssb_connection_add_request(connection, -request_number, "blobs.get", _tf_ssb_rpc_blobs_get_callback, NULL, NULL, NULL); tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection); JSContext* context = tf_ssb_connection_get_context(connection); JSValue ids = JS_GetPropertyStr(context, args, "args"); @@ -101,7 +101,7 @@ static void _tf_ssb_rpc_blobs_get(tf_ssb_connection_t* connection, uint8_t flags { for (size_t offset = 0; offset < size; offset += k_send_max) { - tf_ssb_connection_rpc_send(connection, k_ssb_rpc_flag_binary | k_ssb_rpc_flag_stream, -request_number, blob + offset, + tf_ssb_connection_rpc_send(connection, k_ssb_rpc_flag_binary | k_ssb_rpc_flag_stream, -request_number, NULL, blob + offset, offset + k_send_max <= size ? k_send_max : (size - offset), NULL, NULL, NULL); } success = true; @@ -111,7 +111,7 @@ static void _tf_ssb_rpc_blobs_get(tf_ssb_connection_t* connection, uint8_t flags JS_FreeValue(context, arg); } JS_FreeValue(context, ids); - tf_ssb_connection_rpc_send(connection, k_ssb_rpc_flag_json | k_ssb_rpc_flag_end_error | k_ssb_rpc_flag_stream, -request_number, (const uint8_t*)(success ? "true" : "false"), + tf_ssb_connection_rpc_send(connection, k_ssb_rpc_flag_json | k_ssb_rpc_flag_end_error | k_ssb_rpc_flag_stream, -request_number, NULL, (const uint8_t*)(success ? "true" : "false"), strlen(success ? "true" : "false"), NULL, NULL, NULL); } @@ -126,7 +126,7 @@ static void _tf_ssb_rpc_blobs_has(tf_ssb_connection_t* connection, uint8_t flags JS_FreeCString(context, id_str); JS_FreeValue(context, id); JS_FreeValue(context, ids); - tf_ssb_connection_rpc_send(connection, k_ssb_rpc_flag_json, -request_number, (const uint8_t*)(has ? "true" : "false"), strlen(has ? "true" : "false"), NULL, NULL, NULL); + tf_ssb_connection_rpc_send(connection, k_ssb_rpc_flag_json, -request_number, NULL, (const uint8_t*)(has ? "true" : "false"), strlen(has ? "true" : "false"), NULL, NULL, NULL); } static void _tf_ssb_rpc_blob_wants_added_callback(tf_ssb_t* ssb, const char* id, void* user_data) @@ -136,7 +136,7 @@ static void _tf_ssb_rpc_blob_wants_added_callback(tf_ssb_t* ssb, const char* id, JSContext* context = tf_ssb_get_context(ssb); JSValue message = JS_NewObject(context); JS_SetPropertyStr(context, message, id, JS_NewInt64(context, -1)); - tf_ssb_connection_rpc_send_json(connection, k_ssb_rpc_flag_stream, -blob_wants->request_number, message, NULL, NULL, NULL); + tf_ssb_connection_rpc_send_json(connection, k_ssb_rpc_flag_stream, -blob_wants->request_number, NULL, message, NULL, NULL, NULL); JS_FreeValue(context, message); } @@ -195,7 +195,7 @@ static void _tf_ssb_request_blob_wants_after_work(tf_ssb_connection_t* connectio { JSValue message = JS_NewObject(context); JS_SetPropertyStr(context, message, work->out_id[i], JS_NewInt32(context, -1)); - tf_ssb_connection_rpc_send_json(connection, k_ssb_rpc_flag_stream, -blob_wants->request_number, message, NULL, NULL, NULL); + tf_ssb_connection_rpc_send_json(connection, k_ssb_rpc_flag_stream, -blob_wants->request_number, NULL, message, NULL, NULL, NULL); JS_FreeValue(context, message); blob_wants->wants_sent++; } @@ -239,7 +239,7 @@ static void _tf_ssb_rpc_tunnel_callback(tf_ssb_connection_t* connection, uint8_t } else { - tf_ssb_connection_rpc_send(tun->connection, flags, tun->request_number, message, size, NULL, NULL, NULL); + tf_ssb_connection_rpc_send(tun->connection, flags, tun->request_number, NULL, message, size, NULL, NULL, NULL); } } @@ -290,7 +290,7 @@ static void _tf_ssb_rpc_tunnel_connect(tf_ssb_connection_t* connection, uint8_t JS_SetPropertyStr(context, message, "args", arg_array); JS_SetPropertyStr(context, message, "type", JS_NewString(context, "duplex")); - tf_ssb_connection_rpc_send_json(target_connection, k_ssb_rpc_flag_stream | k_ssb_rpc_flag_new_request, tunnel_request_number, message, NULL, NULL, NULL); + tf_ssb_connection_rpc_send_json(target_connection, k_ssb_rpc_flag_stream | k_ssb_rpc_flag_new_request, tunnel_request_number, "tunnel.connect", message, NULL, NULL, NULL); tunnel_t* data0 = tf_malloc(sizeof(tunnel_t)); *data0 = (tunnel_t) { @@ -302,8 +302,8 @@ static void _tf_ssb_rpc_tunnel_connect(tf_ssb_connection_t* connection, uint8_t .connection = connection, .request_number = -request_number, }; - tf_ssb_connection_add_request(connection, -request_number, _tf_ssb_rpc_tunnel_callback, _tf_ssb_rpc_tunnel_cleanup, data0, target_connection); - tf_ssb_connection_add_request(target_connection, tunnel_request_number, _tf_ssb_rpc_tunnel_callback, _tf_ssb_rpc_tunnel_cleanup, data1, connection); + tf_ssb_connection_add_request(connection, -request_number, "tunnel.connect", _tf_ssb_rpc_tunnel_callback, _tf_ssb_rpc_tunnel_cleanup, data0, target_connection); + tf_ssb_connection_add_request(target_connection, tunnel_request_number, "tunnel.connect", _tf_ssb_rpc_tunnel_callback, _tf_ssb_rpc_tunnel_cleanup, data1, connection); JS_FreeValue(context, message); JS_FreeCString(context, portal_str); @@ -348,7 +348,7 @@ static void _tf_ssb_rpc_room_meta(tf_ssb_connection_t* connection, uint8_t flags JS_SetPropertyUint32(context, features, 2, JS_NewString(context, "room2")); JS_SetPropertyStr(context, response, "features", features); } - tf_ssb_connection_rpc_send_json(connection, flags, -request_number, response, NULL, NULL, NULL); + tf_ssb_connection_rpc_send_json(connection, flags, -request_number, NULL, response, NULL, NULL, NULL); JS_FreeValue(context, response); } @@ -384,11 +384,11 @@ static void _tf_ssb_rpc_room_attendants(tf_ssb_connection_t* connection, uint8_t { JS_SetPropertyUint32(context, ids, id_count++, JS_NewString(context, id)); - tf_ssb_connection_rpc_send_json(connections[i], flags, -tf_ssb_connection_get_attendant_request_number(connections[i]), joined, NULL, NULL, NULL); + tf_ssb_connection_rpc_send_json(connections[i], flags, -tf_ssb_connection_get_attendant_request_number(connections[i]), NULL, joined, NULL, NULL, NULL); } } JS_SetPropertyStr(context, state, "ids", ids); - tf_ssb_connection_rpc_send_json(connection, flags, -request_number, state, NULL, NULL, NULL); + tf_ssb_connection_rpc_send_json(connection, flags, -request_number, NULL, state, NULL, NULL, NULL); JS_FreeValue(context, joined); JS_FreeValue(context, state); @@ -436,7 +436,7 @@ static void _tf_ssb_rpc_connection_blobs_get_callback( } /* TODO: Should we send the response in the callback? */ bool stored = true; - tf_ssb_connection_rpc_send(connection, k_ssb_rpc_flag_json | k_ssb_rpc_flag_stream | k_ssb_rpc_flag_end_error, -request_number, (const uint8_t*)(stored ? "true" : "false"), + tf_ssb_connection_rpc_send(connection, k_ssb_rpc_flag_json | k_ssb_rpc_flag_stream | k_ssb_rpc_flag_end_error, -request_number, NULL, (const uint8_t*)(stored ? "true" : "false"), strlen(stored ? "true" : "false"), NULL, NULL, NULL); } } @@ -470,7 +470,7 @@ static void _tf_ssb_rpc_connection_blobs_get(tf_ssb_connection_t* connection, co JS_SetPropertyUint32(context, args, 0, JS_NewString(context, blob_id)); JS_SetPropertyStr(context, message, "args", args); - tf_ssb_connection_rpc_send_json(connection, k_ssb_rpc_flag_stream | k_ssb_rpc_flag_new_request, tf_ssb_connection_next_request_number(connection), message, + tf_ssb_connection_rpc_send_json(connection, k_ssb_rpc_flag_stream | k_ssb_rpc_flag_new_request, tf_ssb_connection_next_request_number(connection), "blobs.get", message, _tf_ssb_rpc_connection_blobs_get_callback, _tf_ssb_rpc_connection_blobs_get_cleanup, get); JS_FreeValue(context, message); @@ -525,14 +525,14 @@ static void _tf_ssb_rpc_connection_blobs_createWants_callback( { JSValue message = JS_NewObject(context); JS_SetPropertyStr(context, message, blob_id, JS_NewInt64(context, blob_size)); - tf_ssb_connection_rpc_send_json(connection, k_ssb_rpc_flag_stream, -blob_wants->request_number, message, NULL, NULL, NULL); + tf_ssb_connection_rpc_send_json(connection, k_ssb_rpc_flag_stream, -blob_wants->request_number, NULL, message, NULL, NULL, NULL); JS_FreeValue(context, message); } else if (size == -1LL) { JSValue message = JS_NewObject(context); JS_SetPropertyStr(context, message, blob_id, JS_NewInt64(context, -2)); - tf_ssb_connection_rpc_send_json(connection, k_ssb_rpc_flag_stream, -blob_wants->request_number, message, NULL, NULL, NULL); + tf_ssb_connection_rpc_send_json(connection, k_ssb_rpc_flag_stream, -blob_wants->request_number, NULL, message, NULL, NULL, NULL); JS_FreeValue(context, message); } } @@ -646,7 +646,7 @@ static void _tf_ssb_rpc_connection_tunnel_isRoom_callback( JS_SetPropertyStr(context, message, "name", name); JS_SetPropertyStr(context, message, "type", JS_NewString(context, "source")); JS_SetPropertyStr(context, message, "args", JS_NewArray(context)); - tf_ssb_connection_rpc_send_json(connection, k_ssb_rpc_flag_stream | k_ssb_rpc_flag_new_request, tf_ssb_connection_next_request_number(connection), message, + tf_ssb_connection_rpc_send_json(connection, k_ssb_rpc_flag_stream | k_ssb_rpc_flag_new_request, tf_ssb_connection_next_request_number(connection), "room.attendants", message, _tf_ssb_rpc_connection_room_attendants_callback, NULL, NULL); JS_FreeValue(context, message); } @@ -744,7 +744,7 @@ static void _tf_ssb_connection_send_history_stream_after_work(tf_ssb_connection_ { for (int i = 0; i < request->out_messages_count; i++) { - tf_ssb_connection_rpc_send(connection, k_ssb_rpc_flag_stream | k_ssb_rpc_flag_json, request->request_number, (const uint8_t*)request->out_messages[i], + tf_ssb_connection_rpc_send(connection, k_ssb_rpc_flag_stream | k_ssb_rpc_flag_json, request->request_number, NULL, (const uint8_t*)request->out_messages[i], strlen(request->out_messages[i]), NULL, NULL, NULL); } if (!request->out_finished) @@ -753,7 +753,7 @@ static void _tf_ssb_connection_send_history_stream_after_work(tf_ssb_connection_ } else if (!request->live) { - tf_ssb_connection_rpc_send(connection, k_ssb_rpc_flag_json, request->request_number, (const uint8_t*)"false", strlen("false"), NULL, NULL, NULL); + tf_ssb_connection_rpc_send(connection, k_ssb_rpc_flag_json, request->request_number, NULL, (const uint8_t*)"false", strlen("false"), NULL, NULL, NULL); } } for (int i = 0; i < request->out_messages_count; i++) @@ -899,7 +899,7 @@ static void _tf_ssb_rpc_ebt_replicate_send_clock_after_work(tf_ssb_connection_t* if (work->out_clock) { tf_ssb_connection_rpc_send( - connection, k_ssb_rpc_flag_stream | k_ssb_rpc_flag_json, -work->request_number, (const uint8_t*)work->out_clock, strlen(work->out_clock), NULL, NULL, NULL); + connection, k_ssb_rpc_flag_stream | k_ssb_rpc_flag_json, -work->request_number, NULL, (const uint8_t*)work->out_clock, strlen(work->out_clock), NULL, NULL, NULL); tf_free(work->out_clock); } tf_free(work); @@ -1060,7 +1060,7 @@ static void _tf_ssb_rpc_send_ebt_replicate(tf_ssb_connection_t* connection) JS_SetPropertyStr(context, message, "args", args); JS_SetPropertyStr(context, message, "type", JS_NewString(context, "duplex")); int32_t request_number = tf_ssb_connection_next_request_number(connection); - tf_ssb_connection_rpc_send_json(connection, k_ssb_rpc_flag_stream | k_ssb_rpc_flag_new_request, request_number, message, _tf_ssb_rpc_ebt_replicate_client, NULL, NULL); + tf_ssb_connection_rpc_send_json(connection, k_ssb_rpc_flag_stream | k_ssb_rpc_flag_new_request, request_number, "ebt.replicate", message, _tf_ssb_rpc_ebt_replicate_client, NULL, NULL); if (!tf_ssb_connection_get_ebt_request_number(connection)) { tf_ssb_connection_set_ebt_request_number(connection, request_number); @@ -1076,7 +1076,7 @@ static void _tf_ssb_rpc_ebt_replicate_server( return; } _tf_ssb_rpc_ebt_replicate(connection, flags, request_number, args, message, size, user_data); - tf_ssb_connection_add_request(connection, -request_number, _tf_ssb_rpc_ebt_replicate, NULL, NULL, NULL); + tf_ssb_connection_add_request(connection, -request_number, "ebt.replicate", _tf_ssb_rpc_ebt_replicate, NULL, NULL, NULL); } static void _tf_ssb_rpc_connections_changed_callback(tf_ssb_t* ssb, tf_ssb_change_t change, tf_ssb_connection_t* connection, void* user_data) @@ -1091,7 +1091,7 @@ static void _tf_ssb_rpc_connections_changed_callback(tf_ssb_t* ssb, tf_ssb_chang JS_SetPropertyStr(context, message, "name", name); JS_SetPropertyStr(context, message, "type", JS_NewString(context, "source")); JS_SetPropertyStr(context, message, "args", JS_NewArray(context)); - tf_ssb_connection_rpc_send_json(connection, k_ssb_rpc_flag_stream | k_ssb_rpc_flag_new_request, tf_ssb_connection_next_request_number(connection), message, + tf_ssb_connection_rpc_send_json(connection, k_ssb_rpc_flag_stream | k_ssb_rpc_flag_new_request, tf_ssb_connection_next_request_number(connection), "blobs.createWants", message, _tf_ssb_rpc_connection_blobs_createWants_callback, NULL, NULL); JS_FreeValue(context, message); @@ -1104,7 +1104,7 @@ static void _tf_ssb_rpc_connections_changed_callback(tf_ssb_t* ssb, tf_ssb_chang JS_SetPropertyStr(context, message, "name", name); JS_SetPropertyStr(context, message, "args", JS_NewArray(context)); tf_ssb_connection_rpc_send_json( - connection, k_ssb_rpc_flag_new_request, tf_ssb_connection_next_request_number(connection), message, _tf_ssb_rpc_connection_tunnel_isRoom_callback, NULL, NULL); + connection, k_ssb_rpc_flag_new_request, tf_ssb_connection_next_request_number(connection), "tunnel.isRoom", message, _tf_ssb_rpc_connection_tunnel_isRoom_callback, NULL, NULL); JS_FreeValue(context, message); _tf_ssb_rpc_send_ebt_replicate(connection); @@ -1126,7 +1126,7 @@ static void _tf_ssb_rpc_connections_changed_callback(tf_ssb_t* ssb, tf_ssb_chang { if (tf_ssb_connection_is_attendant(connections[i])) { - tf_ssb_connection_rpc_send_json(connections[i], k_ssb_rpc_flag_stream, -tf_ssb_connection_get_attendant_request_number(connections[i]), left, NULL, NULL, NULL); + tf_ssb_connection_rpc_send_json(connections[i], k_ssb_rpc_flag_stream, -tf_ssb_connection_get_attendant_request_number(connections[i]), NULL, left, NULL, NULL, NULL); } } JS_FreeValue(context, left); diff --git a/src/ssb.tests.c b/src/ssb.tests.c index 387caa23..90213b3f 100644 --- a/src/ssb.tests.c +++ b/src/ssb.tests.c @@ -444,7 +444,7 @@ void tf_ssb_test_rooms(const tf_test_options_t* options) JS_SetPropertyStr(context, message, "args", args); JS_SetPropertyStr(context, message, "type", JS_NewString(context, "duplex")); - tf_ssb_connection_rpc_send_json(connections[0], k_ssb_rpc_flag_stream | k_ssb_rpc_flag_new_request, tunnel_request_number, message, NULL, NULL, NULL); + tf_ssb_connection_rpc_send_json(connections[0], k_ssb_rpc_flag_stream | k_ssb_rpc_flag_new_request, tunnel_request_number, "tunnel.connect", message, NULL, NULL, NULL); JS_FreeValue(context, message); tf_ssb_connection_t* tun0 = tf_ssb_connection_tunnel_create(ssb1, id0, tunnel_request_number, id2); @@ -716,7 +716,7 @@ static void _close_callback(uv_timer_t* timer) tf_printf("breaking %s %p\n", data->id, data->connection); const char* message = "{\"name\":\"Error\",\"message\":\"whoops\",\"stack\":\"nah\"}"; tf_ssb_connection_rpc_send( - data->connection, k_ssb_rpc_flag_stream | k_ssb_rpc_flag_json | k_ssb_rpc_flag_end_error, data->request_number, (const uint8_t*)message, strlen(message), NULL, NULL, NULL); + data->connection, k_ssb_rpc_flag_stream | k_ssb_rpc_flag_json | k_ssb_rpc_flag_end_error, data->request_number, NULL, (const uint8_t*)message, strlen(message), NULL, NULL, NULL); uv_close((uv_handle_t*)timer, _timer_close); } @@ -769,7 +769,7 @@ static void _ssb_test_room_broadcasts_visit(const char* host, const struct socka JS_SetPropertyStr(context, message, "args", args); JS_SetPropertyStr(context, message, "type", JS_NewString(context, "duplex")); - tf_ssb_connection_rpc_send_json(tunnel, k_ssb_rpc_flag_stream | k_ssb_rpc_flag_new_request, tunnel_request_number, message, NULL, NULL, NULL); + tf_ssb_connection_rpc_send_json(tunnel, k_ssb_rpc_flag_stream | k_ssb_rpc_flag_new_request, tunnel_request_number, "tunnel.connect", message, NULL, NULL, NULL); JS_FreeValue(context, message); tf_printf("tunnel create ssb=%p portal=%s rn=%d target=%s\n", ssb, portal, (int)tunnel_request_number, target);