Show active RPC requests in the connections tab. Probably TMI, but I want greater introspection into what is going on, and this seemed like a positive step.

This commit is contained in:
Cory McWilliams 2024-05-02 19:02:23 -04:00
parent a2dce833f8
commit f01f7a5ab9
8 changed files with 127 additions and 52 deletions

View File

@ -1,5 +1,5 @@
{
"type": "tildefriends-app",
"emoji": "🐌",
"previous": "&eapETfizGynW7oCLJmlsDwEYHqXWh2usHnDdVPxpXv0=.sha256"
"previous": "&XFhkB74Ll0f2MA24HyKNK42Dckkb+3AGmuDxJDOets8=.sha256"
}

View File

@ -111,6 +111,9 @@ class TfTabConnectionsElement extends LitElement {
${this.connections
.filter((x) => x.tunnel === this.connections.indexOf(connection))
.map((x) => html`<li>${this.render_connection(x)}</li>`)}
${connection.requests.map(x => html`
<span class="w3-tag w3-small">${x.request_number > 0 ? '🟩' : '🟥'} ${x.name}</span>
`)}
${this.render_room_peers(connection.id)}
</ul>
`;

View File

@ -98,6 +98,7 @@ typedef struct _tf_ssb_debug_close_t
typedef struct _tf_ssb_request_t
{
char name[256];
tf_ssb_rpc_callback_t* callback;
tf_ssb_callback_cleanup_t* cleanup;
void* user_data;
@ -347,15 +348,16 @@ static JSClassID _connection_class_id;
static int s_connection_index;
static int s_tunnel_index;
static void _tf_ssb_connection_destroy(tf_ssb_connection_t* connection, const char* reason);
static void _tf_ssb_connection_client_send_hello(tf_ssb_connection_t* connection);
static void _tf_ssb_connection_on_close(uv_handle_t* handle);
static void _tf_ssb_connection_close(tf_ssb_connection_t* connection, const char* reason);
static void _tf_ssb_nonce_inc(uint8_t* nonce);
static void _tf_ssb_write(tf_ssb_connection_t* connection, void* data, size_t size);
static void _tf_ssb_connection_destroy(tf_ssb_connection_t* connection, const char* reason);
static void _tf_ssb_connection_finalizer(JSRuntime* runtime, JSValue value);
static void _tf_ssb_update_settings(tf_ssb_t* ssb);
static void _tf_ssb_connection_on_close(uv_handle_t* handle);
static void _tf_ssb_nonce_inc(uint8_t* nonce);
static void _tf_ssb_notify_connections_changed(tf_ssb_t* ssb, tf_ssb_change_t change, tf_ssb_connection_t* connection);
static void _tf_ssb_start_update_settings(tf_ssb_t* ssb);
static void _tf_ssb_update_settings(tf_ssb_t* ssb);
static void _tf_ssb_write(tf_ssb_connection_t* connection, void* data, size_t size);
static void _tf_ssb_add_debug_close(tf_ssb_t* ssb, tf_ssb_connection_t* connection, const char* reason)
{
@ -482,7 +484,7 @@ static void _tf_ssb_write(tf_ssb_connection_t* connection, void* data, size_t si
}
else if (connection->tunnel_connection)
{
tf_ssb_connection_rpc_send(connection->tunnel_connection, k_ssb_rpc_flag_binary | k_ssb_rpc_flag_stream, -connection->tunnel_request_number, data, size, NULL, NULL, NULL);
tf_ssb_connection_rpc_send(connection->tunnel_connection, k_ssb_rpc_flag_binary | k_ssb_rpc_flag_stream, -connection->tunnel_request_number, NULL, data, size, NULL, NULL, NULL);
}
}
@ -640,7 +642,7 @@ static bool _tf_ssb_connection_get_request_callback(tf_ssb_connection_t* connect
return false;
}
void tf_ssb_connection_add_request(tf_ssb_connection_t* connection, int32_t request_number, tf_ssb_rpc_callback_t* callback, tf_ssb_callback_cleanup_t* cleanup, void* user_data,
void tf_ssb_connection_add_request(tf_ssb_connection_t* connection, int32_t request_number, const char* name, tf_ssb_rpc_callback_t* callback, tf_ssb_callback_cleanup_t* cleanup, void* user_data,
tf_ssb_connection_t* dependent_connection)
{
tf_ssb_request_t* existing =
@ -666,6 +668,7 @@ void tf_ssb_connection_add_request(tf_ssb_connection_t* connection, int32_t requ
.user_data = user_data,
.dependent_connection = dependent_connection,
};
snprintf(request.name, sizeof(request.name), "%s", name);
int index = tf_util_insert_index(&request_number, connection->requests, connection->requests_count, sizeof(tf_ssb_request_t), _request_compare);
connection->requests = tf_resize_vec(connection->requests, sizeof(tf_ssb_request_t) * (connection->requests_count + 1));
if (connection->requests_count - index)
@ -677,6 +680,7 @@ void tf_ssb_connection_add_request(tf_ssb_connection_t* connection, int32_t requ
connection->ssb->request_count++;
}
_tf_ssb_notify_connections_changed(connection->ssb, k_tf_ssb_change_update, connection);
}
static int _message_request_compare(const void* a, const void* b)
@ -737,10 +741,11 @@ void tf_ssb_connection_remove_request(tf_ssb_connection_t* connection, int32_t r
connection->requests_count--;
connection->requests = tf_resize_vec(connection->requests, sizeof(tf_ssb_request_t) * connection->requests_count);
connection->ssb->request_count--;
_tf_ssb_notify_connections_changed(connection->ssb, k_tf_ssb_change_update, connection);
}
}
void tf_ssb_connection_rpc_send(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, const uint8_t* message, size_t size, tf_ssb_rpc_callback_t* callback,
void tf_ssb_connection_rpc_send(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, const char* new_request_name, const uint8_t* message, size_t size, tf_ssb_rpc_callback_t* callback,
tf_ssb_callback_cleanup_t* cleanup, void* user_data)
{
if (!connection)
@ -755,6 +760,7 @@ void tf_ssb_connection_rpc_send(tf_ssb_connection_t* connection, uint8_t flags,
{
assert(request_number > 0);
assert(!_tf_ssb_connection_get_request_callback(connection, request_number, NULL, NULL));
assert(new_request_name);
}
else if (!_tf_ssb_connection_get_request_callback(connection, request_number, NULL, NULL))
{
@ -785,18 +791,18 @@ void tf_ssb_connection_rpc_send(tf_ssb_connection_t* connection, uint8_t flags,
}
else if (flags & k_ssb_rpc_flag_new_request)
{
tf_ssb_connection_add_request(connection, request_number, callback, cleanup, user_data, NULL);
tf_ssb_connection_add_request(connection, request_number, new_request_name, callback, cleanup, user_data, NULL);
}
}
void tf_ssb_connection_rpc_send_json(
tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue message, tf_ssb_rpc_callback_t* callback, tf_ssb_callback_cleanup_t* cleanup, void* user_data)
tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, const char* new_request_name, JSValue message, tf_ssb_rpc_callback_t* callback, tf_ssb_callback_cleanup_t* cleanup, void* user_data)
{
JSContext* context = connection->ssb->context;
JSValue json = JS_JSONStringify(context, message, JS_NULL, JS_NULL);
size_t size = 0;
const char* json_string = JS_ToCStringLen(context, &size, json);
tf_ssb_connection_rpc_send(connection, k_ssb_rpc_flag_json | (flags & ~k_ssb_rpc_mask_type), request_number, (const uint8_t*)json_string, size, callback, cleanup, user_data);
tf_ssb_connection_rpc_send(connection, k_ssb_rpc_flag_json | (flags & ~k_ssb_rpc_mask_type), request_number, new_request_name, (const uint8_t*)json_string, size, callback, cleanup, user_data);
JS_FreeCString(context, json_string);
JS_FreeValue(context, json);
}
@ -810,7 +816,7 @@ void tf_ssb_connection_rpc_send_error(tf_ssb_connection_t* connection, uint8_t f
JS_SetPropertyStr(context, message, "stack", JS_NewString(context, stack ? stack : "stack unavailable"));
JS_SetPropertyStr(context, message, "message", JS_NewString(context, error));
tf_ssb_connection_rpc_send_json(
connection, ((flags & k_ssb_rpc_flag_stream) ? (k_ssb_rpc_flag_stream) : 0) | k_ssb_rpc_flag_end_error, request_number, message, NULL, NULL, NULL);
connection, ((flags & k_ssb_rpc_flag_stream) ? (k_ssb_rpc_flag_stream) : 0) | k_ssb_rpc_flag_end_error, request_number, NULL, message, NULL, NULL, NULL);
JS_FreeValue(context, message);
tf_free((void*)stack);
}
@ -1550,7 +1556,30 @@ static void _tf_ssb_connection_rpc_recv(tf_ssb_connection_t* connection, uint8_t
else if (JS_IsObject(val))
{
bool found = false;
tf_ssb_connection_add_request(connection, -request_number, NULL, NULL, NULL, NULL);
char namebuf[256] = "";
JSValue name = JS_GetPropertyStr(context, val, "name");
if (JS_IsArray(context, name))
{
int length = tf_util_get_length(context, name);
int offset = 0;
for (int i = 0; i < length; i++)
{
JSValue part = JS_GetPropertyUint32(context, name, i);
const char* part_str = JS_ToCString(context, part);
offset += snprintf(namebuf + offset, sizeof(namebuf) - offset, "%s%s", i == 0 ? "" : ".", part_str);
JS_FreeCString(context, part_str);
JS_FreeValue(context, part);
}
}
else if (JS_IsString(name))
{
const char* part_str = JS_ToCString(context, name);
snprintf(namebuf, sizeof(namebuf), "%s", part_str);
JS_FreeCString(context, part_str);
}
JS_FreeValue(context, name);
tf_ssb_connection_add_request(connection, -request_number, namebuf, NULL, NULL, NULL, NULL);
for (tf_ssb_rpc_callback_node_t* it = connection->ssb->rpc; it; it = it->next)
{
if (_tf_ssb_name_equals(context, val, it->name))
@ -2607,7 +2636,7 @@ static void _tf_ssb_connection_tunnel_callback(
tf_ssb_connection_t* tunnel = user_data;
if (flags & k_ssb_rpc_flag_end_error)
{
tf_ssb_connection_rpc_send(connection, flags, -request_number, (const uint8_t*)"false", strlen("false"), NULL, NULL, NULL);
tf_ssb_connection_rpc_send(connection, flags, -request_number, NULL, (const uint8_t*)"false", strlen("false"), NULL, NULL, NULL);
tf_ssb_connection_close(tunnel);
}
else
@ -2644,7 +2673,7 @@ tf_ssb_connection_t* tf_ssb_connection_tunnel_create(tf_ssb_t* ssb, const char*
ssb->connections_count++;
_tf_ssb_notify_connections_changed(ssb, k_tf_ssb_change_create, tunnel);
tf_ssb_connection_add_request(connection, request_number, _tf_ssb_connection_tunnel_callback, NULL, tunnel, tunnel);
tf_ssb_connection_add_request(connection, request_number, "tunnel.connect", _tf_ssb_connection_tunnel_callback, NULL, tunnel, tunnel);
if (request_number > 0)
{
tunnel->state = k_tf_ssb_state_connected;
@ -3379,7 +3408,7 @@ void tf_ssb_notify_message_added(tf_ssb_t* ssb, const char* id, JSValue message_
if (message_request)
{
tf_ssb_connection_rpc_send_json(
connection, k_ssb_rpc_flag_stream, message_request->request_number, message_request->keys ? message_keys : message, NULL, NULL, NULL);
connection, k_ssb_rpc_flag_stream, message_request->request_number, NULL, message_request->keys ? message_keys : message, NULL, NULL, NULL);
}
}
@ -3937,3 +3966,17 @@ bool tf_ssb_hmacsha256_verify(const char* public_key, const void* payload, size_
}
return result;
}
JSValue tf_ssb_connection_requests_to_object(tf_ssb_connection_t* connection)
{
JSContext* context = connection->ssb->context;
JSValue object = JS_NewArray(context);
for (int i = 0; i < connection->requests_count; i++)
{
JSValue request = JS_NewObject(context);
JS_SetPropertyStr(context, request, "name", JS_NewString(context, connection->requests[i].name));
JS_SetPropertyStr(context, request, "request_number", JS_NewInt32(context, connection->requests[i].request_number));
JS_SetPropertyUint32(context, object, i, request);
}
return object;
}

View File

@ -44,6 +44,7 @@ static void _tf_ssb_connections_changed_callback(tf_ssb_t* ssb, tf_ssb_change_t
}
break;
case k_tf_ssb_change_remove:
case k_tf_ssb_change_update:
break;
}
}

View File

@ -38,6 +38,7 @@ typedef enum _tf_ssb_change_t
k_tf_ssb_change_create,
k_tf_ssb_change_connect,
k_tf_ssb_change_remove,
k_tf_ssb_change_update,
} tf_ssb_change_t;
/**
@ -658,13 +659,14 @@ void tf_ssb_remove_rpc_callback(tf_ssb_t* ssb, const char** name, tf_ssb_rpc_cal
** @param connection The connection on which to send the message.
** @param flags The message flags.
** @param request_number The request number.
** @param new_request_name The name of the request if it is new.
** @param message The message payload.
** @param size The size of the message.
** @param callback A callback to call if a response is received.
** @param cleanup A callback to call if the callback is removed.
** @param user_data User data to pass to the callback.
*/
void tf_ssb_connection_rpc_send(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, const uint8_t* message, size_t size, tf_ssb_rpc_callback_t* callback,
void tf_ssb_connection_rpc_send(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, const char* new_request_name, const uint8_t* message, size_t size, tf_ssb_rpc_callback_t* callback,
tf_ssb_callback_cleanup_t* cleanup, void* user_data);
/**
@ -672,13 +674,14 @@ void tf_ssb_connection_rpc_send(tf_ssb_connection_t* connection, uint8_t flags,
** @param connection The connection on which to send the message.
** @param flags The message flags.
** @param request_number The request number.
** @param new_request_name The name of the request if it is new.
** @param message The JS message payload.
** @param callback A callback to call if a response is received.
** @param cleanup A callback to call if the callback is removed.
** @param user_data User data to pass to the callback.
*/
void tf_ssb_connection_rpc_send_json(
tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue message, tf_ssb_rpc_callback_t* callback, tf_ssb_callback_cleanup_t* cleanup, void* user_data);
tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, const char* new_request_name, JSValue message, tf_ssb_rpc_callback_t* callback, tf_ssb_callback_cleanup_t* cleanup, void* user_data);
/**
** Send a MUXRPC error message.
@ -703,12 +706,13 @@ void tf_ssb_connection_rpc_send_error_method_not_allowed(tf_ssb_connection_t* co
** request number.
** @param connection The connection on which to register the callback.
** @param request_number The request number.
** @param name The name of the RPC request.
** @param callback The callback.
** @param cleanup The function to call when the callback is removed.
** @param user_data User data to pass to the callback.
** @param dependent_connection A connection, which, if removed, invalidates this request.
*/
void tf_ssb_connection_add_request(tf_ssb_connection_t* connection, int32_t request_number, tf_ssb_rpc_callback_t* callback, tf_ssb_callback_cleanup_t* cleanup, void* user_data,
void tf_ssb_connection_add_request(tf_ssb_connection_t* connection, int32_t request_number, const char* name, tf_ssb_rpc_callback_t* callback, tf_ssb_callback_cleanup_t* cleanup, void* user_data,
tf_ssb_connection_t* dependent_connection);
/**
@ -719,6 +723,13 @@ void tf_ssb_connection_add_request(tf_ssb_connection_t* connection, int32_t requ
*/
void tf_ssb_connection_remove_request(tf_ssb_connection_t* connection, int32_t request_number);
/**
** Get a debug representation of active requests.
** @param connection The connection.
** @return The active requests as a JS object.
*/
JSValue tf_ssb_connection_requests_to_object(tf_ssb_connection_t* connection);
/**
** A function scheduled to be run later.
** @param connection The owning connection.

View File

@ -545,6 +545,7 @@ static JSValue _tf_ssb_connections(JSContext* context, JSValueConst this_val, in
}
JS_SetPropertyStr(context, object, "tunnel", JS_NewInt32(context, tunnel_index));
}
JS_SetPropertyStr(context, object, "requests", tf_ssb_connection_requests_to_object(connection));
JS_SetPropertyUint32(context, result, i, object);
}
}
@ -1146,6 +1147,22 @@ static void _tf_ssb_on_connections_changed_callback(tf_ssb_t* ssb, tf_ssb_change
{
case k_tf_ssb_change_create:
break;
case k_tf_ssb_change_update:
{
JSValue object = JS_DupValue(context, tf_ssb_connection_get_object(connection));
JSValue args[] = {
JS_NewString(context, "update"),
object,
};
response = JS_Call(context, callback, JS_UNDEFINED, 2, args);
if (tf_util_report_error(context, response))
{
tf_ssb_remove_connections_changed_callback(ssb, _tf_ssb_on_connections_changed_callback, user_data);
}
JS_FreeValue(context, args[0]);
JS_FreeValue(context, object);
}
break;
case k_tf_ssb_change_connect:
{
JSValue object = JS_DupValue(context, tf_ssb_connection_get_object(connection));
@ -1304,7 +1321,7 @@ static JSValue _tf_ssb_createTunnel(JSContext* context, JSValueConst this_val, i
JS_SetPropertyUint32(context, args, 0, arg);
JS_SetPropertyStr(context, message, "args", args);
JS_SetPropertyStr(context, message, "type", JS_NewString(context, "duplex"));
tf_ssb_connection_rpc_send_json(connection, k_ssb_rpc_flag_stream | k_ssb_rpc_flag_new_request, request_number, message, NULL, NULL, NULL);
tf_ssb_connection_rpc_send_json(connection, k_ssb_rpc_flag_stream | k_ssb_rpc_flag_new_request, request_number, "tunnel.connect", message, NULL, NULL, NULL);
JS_FreeValue(context, message);
tf_ssb_connection_tunnel_create(ssb, portal_id, request_number, target_id);

View File

@ -50,7 +50,7 @@ static void _tf_ssb_rpc_gossip_ping_callback(
{
char buffer[256];
snprintf(buffer, sizeof(buffer), "%" PRId64, (int64_t)time(NULL) * 1000);
tf_ssb_connection_rpc_send(connection, flags, -request_number, (const uint8_t*)buffer, strlen(buffer), NULL, NULL, NULL);
tf_ssb_connection_rpc_send(connection, flags, -request_number, NULL, (const uint8_t*)buffer, strlen(buffer), NULL, NULL, NULL);
if (flags & k_ssb_rpc_flag_end_error)
{
tf_ssb_connection_remove_request(connection, request_number);
@ -59,7 +59,7 @@ static void _tf_ssb_rpc_gossip_ping_callback(
static void _tf_ssb_rpc_gossip_ping(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data)
{
tf_ssb_connection_add_request(connection, -request_number, _tf_ssb_rpc_gossip_ping_callback, NULL, NULL, NULL);
tf_ssb_connection_add_request(connection, -request_number, "gossip.ping", _tf_ssb_rpc_gossip_ping_callback, NULL, NULL, NULL);
}
static void _tf_ssb_rpc_blobs_get_callback(
@ -74,7 +74,7 @@ static void _tf_ssb_rpc_blobs_get(tf_ssb_connection_t* connection, uint8_t flags
return;
}
tf_ssb_connection_add_request(connection, -request_number, _tf_ssb_rpc_blobs_get_callback, NULL, NULL, NULL);
tf_ssb_connection_add_request(connection, -request_number, "blobs.get", _tf_ssb_rpc_blobs_get_callback, NULL, NULL, NULL);
tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection);
JSContext* context = tf_ssb_connection_get_context(connection);
JSValue ids = JS_GetPropertyStr(context, args, "args");
@ -101,7 +101,7 @@ static void _tf_ssb_rpc_blobs_get(tf_ssb_connection_t* connection, uint8_t flags
{
for (size_t offset = 0; offset < size; offset += k_send_max)
{
tf_ssb_connection_rpc_send(connection, k_ssb_rpc_flag_binary | k_ssb_rpc_flag_stream, -request_number, blob + offset,
tf_ssb_connection_rpc_send(connection, k_ssb_rpc_flag_binary | k_ssb_rpc_flag_stream, -request_number, NULL, blob + offset,
offset + k_send_max <= size ? k_send_max : (size - offset), NULL, NULL, NULL);
}
success = true;
@ -111,7 +111,7 @@ static void _tf_ssb_rpc_blobs_get(tf_ssb_connection_t* connection, uint8_t flags
JS_FreeValue(context, arg);
}
JS_FreeValue(context, ids);
tf_ssb_connection_rpc_send(connection, k_ssb_rpc_flag_json | k_ssb_rpc_flag_end_error | k_ssb_rpc_flag_stream, -request_number, (const uint8_t*)(success ? "true" : "false"),
tf_ssb_connection_rpc_send(connection, k_ssb_rpc_flag_json | k_ssb_rpc_flag_end_error | k_ssb_rpc_flag_stream, -request_number, NULL, (const uint8_t*)(success ? "true" : "false"),
strlen(success ? "true" : "false"), NULL, NULL, NULL);
}
@ -126,7 +126,7 @@ static void _tf_ssb_rpc_blobs_has(tf_ssb_connection_t* connection, uint8_t flags
JS_FreeCString(context, id_str);
JS_FreeValue(context, id);
JS_FreeValue(context, ids);
tf_ssb_connection_rpc_send(connection, k_ssb_rpc_flag_json, -request_number, (const uint8_t*)(has ? "true" : "false"), strlen(has ? "true" : "false"), NULL, NULL, NULL);
tf_ssb_connection_rpc_send(connection, k_ssb_rpc_flag_json, -request_number, NULL, (const uint8_t*)(has ? "true" : "false"), strlen(has ? "true" : "false"), NULL, NULL, NULL);
}
static void _tf_ssb_rpc_blob_wants_added_callback(tf_ssb_t* ssb, const char* id, void* user_data)
@ -136,7 +136,7 @@ static void _tf_ssb_rpc_blob_wants_added_callback(tf_ssb_t* ssb, const char* id,
JSContext* context = tf_ssb_get_context(ssb);
JSValue message = JS_NewObject(context);
JS_SetPropertyStr(context, message, id, JS_NewInt64(context, -1));
tf_ssb_connection_rpc_send_json(connection, k_ssb_rpc_flag_stream, -blob_wants->request_number, message, NULL, NULL, NULL);
tf_ssb_connection_rpc_send_json(connection, k_ssb_rpc_flag_stream, -blob_wants->request_number, NULL, message, NULL, NULL, NULL);
JS_FreeValue(context, message);
}
@ -195,7 +195,7 @@ static void _tf_ssb_request_blob_wants_after_work(tf_ssb_connection_t* connectio
{
JSValue message = JS_NewObject(context);
JS_SetPropertyStr(context, message, work->out_id[i], JS_NewInt32(context, -1));
tf_ssb_connection_rpc_send_json(connection, k_ssb_rpc_flag_stream, -blob_wants->request_number, message, NULL, NULL, NULL);
tf_ssb_connection_rpc_send_json(connection, k_ssb_rpc_flag_stream, -blob_wants->request_number, NULL, message, NULL, NULL, NULL);
JS_FreeValue(context, message);
blob_wants->wants_sent++;
}
@ -239,7 +239,7 @@ static void _tf_ssb_rpc_tunnel_callback(tf_ssb_connection_t* connection, uint8_t
}
else
{
tf_ssb_connection_rpc_send(tun->connection, flags, tun->request_number, message, size, NULL, NULL, NULL);
tf_ssb_connection_rpc_send(tun->connection, flags, tun->request_number, NULL, message, size, NULL, NULL, NULL);
}
}
@ -290,7 +290,7 @@ static void _tf_ssb_rpc_tunnel_connect(tf_ssb_connection_t* connection, uint8_t
JS_SetPropertyStr(context, message, "args", arg_array);
JS_SetPropertyStr(context, message, "type", JS_NewString(context, "duplex"));
tf_ssb_connection_rpc_send_json(target_connection, k_ssb_rpc_flag_stream | k_ssb_rpc_flag_new_request, tunnel_request_number, message, NULL, NULL, NULL);
tf_ssb_connection_rpc_send_json(target_connection, k_ssb_rpc_flag_stream | k_ssb_rpc_flag_new_request, tunnel_request_number, "tunnel.connect", message, NULL, NULL, NULL);
tunnel_t* data0 = tf_malloc(sizeof(tunnel_t));
*data0 = (tunnel_t) {
@ -302,8 +302,8 @@ static void _tf_ssb_rpc_tunnel_connect(tf_ssb_connection_t* connection, uint8_t
.connection = connection,
.request_number = -request_number,
};
tf_ssb_connection_add_request(connection, -request_number, _tf_ssb_rpc_tunnel_callback, _tf_ssb_rpc_tunnel_cleanup, data0, target_connection);
tf_ssb_connection_add_request(target_connection, tunnel_request_number, _tf_ssb_rpc_tunnel_callback, _tf_ssb_rpc_tunnel_cleanup, data1, connection);
tf_ssb_connection_add_request(connection, -request_number, "tunnel.connect", _tf_ssb_rpc_tunnel_callback, _tf_ssb_rpc_tunnel_cleanup, data0, target_connection);
tf_ssb_connection_add_request(target_connection, tunnel_request_number, "tunnel.connect", _tf_ssb_rpc_tunnel_callback, _tf_ssb_rpc_tunnel_cleanup, data1, connection);
JS_FreeValue(context, message);
JS_FreeCString(context, portal_str);
@ -348,7 +348,7 @@ static void _tf_ssb_rpc_room_meta(tf_ssb_connection_t* connection, uint8_t flags
JS_SetPropertyUint32(context, features, 2, JS_NewString(context, "room2"));
JS_SetPropertyStr(context, response, "features", features);
}
tf_ssb_connection_rpc_send_json(connection, flags, -request_number, response, NULL, NULL, NULL);
tf_ssb_connection_rpc_send_json(connection, flags, -request_number, NULL, response, NULL, NULL, NULL);
JS_FreeValue(context, response);
}
@ -384,11 +384,11 @@ static void _tf_ssb_rpc_room_attendants(tf_ssb_connection_t* connection, uint8_t
{
JS_SetPropertyUint32(context, ids, id_count++, JS_NewString(context, id));
tf_ssb_connection_rpc_send_json(connections[i], flags, -tf_ssb_connection_get_attendant_request_number(connections[i]), joined, NULL, NULL, NULL);
tf_ssb_connection_rpc_send_json(connections[i], flags, -tf_ssb_connection_get_attendant_request_number(connections[i]), NULL, joined, NULL, NULL, NULL);
}
}
JS_SetPropertyStr(context, state, "ids", ids);
tf_ssb_connection_rpc_send_json(connection, flags, -request_number, state, NULL, NULL, NULL);
tf_ssb_connection_rpc_send_json(connection, flags, -request_number, NULL, state, NULL, NULL, NULL);
JS_FreeValue(context, joined);
JS_FreeValue(context, state);
@ -436,7 +436,7 @@ static void _tf_ssb_rpc_connection_blobs_get_callback(
}
/* TODO: Should we send the response in the callback? */
bool stored = true;
tf_ssb_connection_rpc_send(connection, k_ssb_rpc_flag_json | k_ssb_rpc_flag_stream | k_ssb_rpc_flag_end_error, -request_number, (const uint8_t*)(stored ? "true" : "false"),
tf_ssb_connection_rpc_send(connection, k_ssb_rpc_flag_json | k_ssb_rpc_flag_stream | k_ssb_rpc_flag_end_error, -request_number, NULL, (const uint8_t*)(stored ? "true" : "false"),
strlen(stored ? "true" : "false"), NULL, NULL, NULL);
}
}
@ -470,7 +470,7 @@ static void _tf_ssb_rpc_connection_blobs_get(tf_ssb_connection_t* connection, co
JS_SetPropertyUint32(context, args, 0, JS_NewString(context, blob_id));
JS_SetPropertyStr(context, message, "args", args);
tf_ssb_connection_rpc_send_json(connection, k_ssb_rpc_flag_stream | k_ssb_rpc_flag_new_request, tf_ssb_connection_next_request_number(connection), message,
tf_ssb_connection_rpc_send_json(connection, k_ssb_rpc_flag_stream | k_ssb_rpc_flag_new_request, tf_ssb_connection_next_request_number(connection), "blobs.get", message,
_tf_ssb_rpc_connection_blobs_get_callback, _tf_ssb_rpc_connection_blobs_get_cleanup, get);
JS_FreeValue(context, message);
@ -525,14 +525,14 @@ static void _tf_ssb_rpc_connection_blobs_createWants_callback(
{
JSValue message = JS_NewObject(context);
JS_SetPropertyStr(context, message, blob_id, JS_NewInt64(context, blob_size));
tf_ssb_connection_rpc_send_json(connection, k_ssb_rpc_flag_stream, -blob_wants->request_number, message, NULL, NULL, NULL);
tf_ssb_connection_rpc_send_json(connection, k_ssb_rpc_flag_stream, -blob_wants->request_number, NULL, message, NULL, NULL, NULL);
JS_FreeValue(context, message);
}
else if (size == -1LL)
{
JSValue message = JS_NewObject(context);
JS_SetPropertyStr(context, message, blob_id, JS_NewInt64(context, -2));
tf_ssb_connection_rpc_send_json(connection, k_ssb_rpc_flag_stream, -blob_wants->request_number, message, NULL, NULL, NULL);
tf_ssb_connection_rpc_send_json(connection, k_ssb_rpc_flag_stream, -blob_wants->request_number, NULL, message, NULL, NULL, NULL);
JS_FreeValue(context, message);
}
}
@ -646,7 +646,7 @@ static void _tf_ssb_rpc_connection_tunnel_isRoom_callback(
JS_SetPropertyStr(context, message, "name", name);
JS_SetPropertyStr(context, message, "type", JS_NewString(context, "source"));
JS_SetPropertyStr(context, message, "args", JS_NewArray(context));
tf_ssb_connection_rpc_send_json(connection, k_ssb_rpc_flag_stream | k_ssb_rpc_flag_new_request, tf_ssb_connection_next_request_number(connection), message,
tf_ssb_connection_rpc_send_json(connection, k_ssb_rpc_flag_stream | k_ssb_rpc_flag_new_request, tf_ssb_connection_next_request_number(connection), "room.attendants", message,
_tf_ssb_rpc_connection_room_attendants_callback, NULL, NULL);
JS_FreeValue(context, message);
}
@ -744,7 +744,7 @@ static void _tf_ssb_connection_send_history_stream_after_work(tf_ssb_connection_
{
for (int i = 0; i < request->out_messages_count; i++)
{
tf_ssb_connection_rpc_send(connection, k_ssb_rpc_flag_stream | k_ssb_rpc_flag_json, request->request_number, (const uint8_t*)request->out_messages[i],
tf_ssb_connection_rpc_send(connection, k_ssb_rpc_flag_stream | k_ssb_rpc_flag_json, request->request_number, NULL, (const uint8_t*)request->out_messages[i],
strlen(request->out_messages[i]), NULL, NULL, NULL);
}
if (!request->out_finished)
@ -753,7 +753,7 @@ static void _tf_ssb_connection_send_history_stream_after_work(tf_ssb_connection_
}
else if (!request->live)
{
tf_ssb_connection_rpc_send(connection, k_ssb_rpc_flag_json, request->request_number, (const uint8_t*)"false", strlen("false"), NULL, NULL, NULL);
tf_ssb_connection_rpc_send(connection, k_ssb_rpc_flag_json, request->request_number, NULL, (const uint8_t*)"false", strlen("false"), NULL, NULL, NULL);
}
}
for (int i = 0; i < request->out_messages_count; i++)
@ -899,7 +899,7 @@ static void _tf_ssb_rpc_ebt_replicate_send_clock_after_work(tf_ssb_connection_t*
if (work->out_clock)
{
tf_ssb_connection_rpc_send(
connection, k_ssb_rpc_flag_stream | k_ssb_rpc_flag_json, -work->request_number, (const uint8_t*)work->out_clock, strlen(work->out_clock), NULL, NULL, NULL);
connection, k_ssb_rpc_flag_stream | k_ssb_rpc_flag_json, -work->request_number, NULL, (const uint8_t*)work->out_clock, strlen(work->out_clock), NULL, NULL, NULL);
tf_free(work->out_clock);
}
tf_free(work);
@ -1060,7 +1060,7 @@ static void _tf_ssb_rpc_send_ebt_replicate(tf_ssb_connection_t* connection)
JS_SetPropertyStr(context, message, "args", args);
JS_SetPropertyStr(context, message, "type", JS_NewString(context, "duplex"));
int32_t request_number = tf_ssb_connection_next_request_number(connection);
tf_ssb_connection_rpc_send_json(connection, k_ssb_rpc_flag_stream | k_ssb_rpc_flag_new_request, request_number, message, _tf_ssb_rpc_ebt_replicate_client, NULL, NULL);
tf_ssb_connection_rpc_send_json(connection, k_ssb_rpc_flag_stream | k_ssb_rpc_flag_new_request, request_number, "ebt.replicate", message, _tf_ssb_rpc_ebt_replicate_client, NULL, NULL);
if (!tf_ssb_connection_get_ebt_request_number(connection))
{
tf_ssb_connection_set_ebt_request_number(connection, request_number);
@ -1076,7 +1076,7 @@ static void _tf_ssb_rpc_ebt_replicate_server(
return;
}
_tf_ssb_rpc_ebt_replicate(connection, flags, request_number, args, message, size, user_data);
tf_ssb_connection_add_request(connection, -request_number, _tf_ssb_rpc_ebt_replicate, NULL, NULL, NULL);
tf_ssb_connection_add_request(connection, -request_number, "ebt.replicate", _tf_ssb_rpc_ebt_replicate, NULL, NULL, NULL);
}
static void _tf_ssb_rpc_connections_changed_callback(tf_ssb_t* ssb, tf_ssb_change_t change, tf_ssb_connection_t* connection, void* user_data)
@ -1091,7 +1091,7 @@ static void _tf_ssb_rpc_connections_changed_callback(tf_ssb_t* ssb, tf_ssb_chang
JS_SetPropertyStr(context, message, "name", name);
JS_SetPropertyStr(context, message, "type", JS_NewString(context, "source"));
JS_SetPropertyStr(context, message, "args", JS_NewArray(context));
tf_ssb_connection_rpc_send_json(connection, k_ssb_rpc_flag_stream | k_ssb_rpc_flag_new_request, tf_ssb_connection_next_request_number(connection), message,
tf_ssb_connection_rpc_send_json(connection, k_ssb_rpc_flag_stream | k_ssb_rpc_flag_new_request, tf_ssb_connection_next_request_number(connection), "blobs.createWants", message,
_tf_ssb_rpc_connection_blobs_createWants_callback, NULL, NULL);
JS_FreeValue(context, message);
@ -1104,7 +1104,7 @@ static void _tf_ssb_rpc_connections_changed_callback(tf_ssb_t* ssb, tf_ssb_chang
JS_SetPropertyStr(context, message, "name", name);
JS_SetPropertyStr(context, message, "args", JS_NewArray(context));
tf_ssb_connection_rpc_send_json(
connection, k_ssb_rpc_flag_new_request, tf_ssb_connection_next_request_number(connection), message, _tf_ssb_rpc_connection_tunnel_isRoom_callback, NULL, NULL);
connection, k_ssb_rpc_flag_new_request, tf_ssb_connection_next_request_number(connection), "tunnel.isRoom", message, _tf_ssb_rpc_connection_tunnel_isRoom_callback, NULL, NULL);
JS_FreeValue(context, message);
_tf_ssb_rpc_send_ebt_replicate(connection);
@ -1126,7 +1126,7 @@ static void _tf_ssb_rpc_connections_changed_callback(tf_ssb_t* ssb, tf_ssb_chang
{
if (tf_ssb_connection_is_attendant(connections[i]))
{
tf_ssb_connection_rpc_send_json(connections[i], k_ssb_rpc_flag_stream, -tf_ssb_connection_get_attendant_request_number(connections[i]), left, NULL, NULL, NULL);
tf_ssb_connection_rpc_send_json(connections[i], k_ssb_rpc_flag_stream, -tf_ssb_connection_get_attendant_request_number(connections[i]), NULL, left, NULL, NULL, NULL);
}
}
JS_FreeValue(context, left);

View File

@ -444,7 +444,7 @@ void tf_ssb_test_rooms(const tf_test_options_t* options)
JS_SetPropertyStr(context, message, "args", args);
JS_SetPropertyStr(context, message, "type", JS_NewString(context, "duplex"));
tf_ssb_connection_rpc_send_json(connections[0], k_ssb_rpc_flag_stream | k_ssb_rpc_flag_new_request, tunnel_request_number, message, NULL, NULL, NULL);
tf_ssb_connection_rpc_send_json(connections[0], k_ssb_rpc_flag_stream | k_ssb_rpc_flag_new_request, tunnel_request_number, "tunnel.connect", message, NULL, NULL, NULL);
JS_FreeValue(context, message);
tf_ssb_connection_t* tun0 = tf_ssb_connection_tunnel_create(ssb1, id0, tunnel_request_number, id2);
@ -716,7 +716,7 @@ static void _close_callback(uv_timer_t* timer)
tf_printf("breaking %s %p\n", data->id, data->connection);
const char* message = "{\"name\":\"Error\",\"message\":\"whoops\",\"stack\":\"nah\"}";
tf_ssb_connection_rpc_send(
data->connection, k_ssb_rpc_flag_stream | k_ssb_rpc_flag_json | k_ssb_rpc_flag_end_error, data->request_number, (const uint8_t*)message, strlen(message), NULL, NULL, NULL);
data->connection, k_ssb_rpc_flag_stream | k_ssb_rpc_flag_json | k_ssb_rpc_flag_end_error, data->request_number, NULL, (const uint8_t*)message, strlen(message), NULL, NULL, NULL);
uv_close((uv_handle_t*)timer, _timer_close);
}
@ -769,7 +769,7 @@ static void _ssb_test_room_broadcasts_visit(const char* host, const struct socka
JS_SetPropertyStr(context, message, "args", args);
JS_SetPropertyStr(context, message, "type", JS_NewString(context, "duplex"));
tf_ssb_connection_rpc_send_json(tunnel, k_ssb_rpc_flag_stream | k_ssb_rpc_flag_new_request, tunnel_request_number, message, NULL, NULL, NULL);
tf_ssb_connection_rpc_send_json(tunnel, k_ssb_rpc_flag_stream | k_ssb_rpc_flag_new_request, tunnel_request_number, "tunnel.connect", message, NULL, NULL, NULL);
JS_FreeValue(context, message);
tf_printf("tunnel create ssb=%p portal=%s rn=%d target=%s\n", ssb, portal, (int)tunnel_request_number, target);