diff --git a/src/ssb.c b/src/ssb.c index 45707b96..01d74d5e 100644 --- a/src/ssb.c +++ b/src/ssb.c @@ -624,25 +624,40 @@ static bool _tf_ssb_connection_get_request_callback(tf_ssb_connection_t* connect void tf_ssb_connection_add_request(tf_ssb_connection_t* connection, int32_t request_number, tf_ssb_rpc_callback_t* callback, tf_ssb_callback_cleanup_t* cleanup, void* user_data, tf_ssb_connection_t* dependent_connection) { - tf_ssb_connection_remove_request(connection, request_number); - tf_ssb_request_t request = + tf_ssb_request_t* existing = connection->requests_count ? bsearch(&request_number, connection->requests, connection->requests_count, sizeof(tf_ssb_request_t), _request_compare) : NULL; + if (existing) { - .request_number = request_number, - .callback = callback, - .cleanup = cleanup, - .user_data = user_data, - .dependent_connection = dependent_connection, - }; - int index = tf_util_insert_index(&request_number, connection->requests, connection->requests_count, sizeof(tf_ssb_request_t), _request_compare); - connection->requests = tf_resize_vec(connection->requests, sizeof(tf_ssb_request_t) * (connection->requests_count + 1)); - if (connection->requests_count - index) - { - memmove(connection->requests + index + 1, connection->requests + index, sizeof(tf_ssb_request_t) * (connection->requests_count - index)); + assert(!existing->callback); + assert(!existing->cleanup); + assert(!existing->user_data); + assert(!existing->dependent_connection); + existing->callback = callback; + existing->cleanup = cleanup; + existing->user_data = user_data; + existing->dependent_connection = dependent_connection; } - connection->requests[index] = request; - connection->requests_count++; + else + { + tf_ssb_connection_remove_request(connection, request_number); + tf_ssb_request_t request = + { + .request_number = request_number, + .callback = callback, + .cleanup = cleanup, + .user_data = user_data, + .dependent_connection = dependent_connection, + }; + int index = tf_util_insert_index(&request_number, connection->requests, connection->requests_count, sizeof(tf_ssb_request_t), _request_compare); + connection->requests = tf_resize_vec(connection->requests, sizeof(tf_ssb_request_t) * (connection->requests_count + 1)); + if (connection->requests_count - index) + { + memmove(connection->requests + index + 1, connection->requests + index, sizeof(tf_ssb_request_t) * (connection->requests_count - index)); + } + connection->requests[index] = request; + connection->requests_count++; - connection->ssb->request_count++; + connection->ssb->request_count++; + } } static int _message_request_compare(const void* a, const void* b) @@ -714,8 +729,19 @@ void tf_ssb_connection_rpc_send(tf_ssb_connection_t* connection, uint8_t flags, } return; } + if (flags & k_ssb_rpc_flag_new_request) + { + assert(request_number > 0); + assert(!_tf_ssb_connection_get_request_callback(connection, request_number, NULL, NULL)); + } + else if (!_tf_ssb_connection_get_request_callback(connection, request_number, NULL, NULL)) + { + tf_printf("Dropping message with no active request (%d): %.*s\n", request_number, (int)size, message); + return; + } + uint8_t* combined = tf_malloc(9 + size); - *combined = flags; + *combined = flags & k_ssb_rpc_mask_send; uint32_t u32size = htonl((uint32_t)size); memcpy(combined + 1, &u32size, sizeof(u32size)); uint32_t rn = htonl((uint32_t)request_number); @@ -723,20 +749,20 @@ void tf_ssb_connection_rpc_send(tf_ssb_connection_t* connection, uint8_t flags, memcpy(combined + 1 + 2 * sizeof(uint32_t), message, size); if (connection->ssb->verbose) { - tf_printf(MAGENTA "%s RPC SEND" RESET " flags=%x RN=%d: [%zd B] %.*s\n", connection->name, flags, request_number, size, (flags & k_ssb_rpc_mask_type) == k_ssb_rpc_flag_binary ? 0 : (int)size, message); + tf_printf(MAGENTA "%s RPC SEND" RESET " flags=%x RN=%d: [%zd B] %.*s\n", connection->name, flags & k_ssb_rpc_mask_send, request_number, size, (flags & k_ssb_rpc_mask_type) == k_ssb_rpc_flag_binary ? 0 : (int)size, message); } - _tf_ssb_connection_add_debug_message(connection, true, flags, request_number, message, size); + _tf_ssb_connection_add_debug_message(connection, true, flags & k_ssb_rpc_mask_send, request_number, message, size); _tf_ssb_connection_box_stream_send(connection, combined, 1 + 2 * sizeof(uint32_t) + size); tf_free(combined); connection->ssb->rpc_out++; - if (request_number > 0 && callback) + if (flags & k_ssb_rpc_flag_end_error) + { + tf_ssb_connection_remove_request(connection, request_number); + } + else if (flags & k_ssb_rpc_flag_new_request) { tf_ssb_connection_add_request(connection, request_number, callback, cleanup, user_data, NULL); } - else if (cleanup) - { - cleanup(connection->ssb, user_data); - } } void tf_ssb_connection_rpc_send_json(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue message, tf_ssb_rpc_callback_t* callback, tf_ssb_callback_cleanup_t* cleanup, void* user_data) @@ -747,7 +773,7 @@ void tf_ssb_connection_rpc_send_json(tf_ssb_connection_t* connection, uint8_t fl const char* json_string = JS_ToCStringLen(context, &size, json); tf_ssb_connection_rpc_send( connection, - k_ssb_rpc_flag_json | (flags & k_ssb_rpc_flag_stream) | (flags & k_ssb_rpc_flag_end_error), + k_ssb_rpc_flag_json | (flags & ~k_ssb_rpc_mask_type), request_number, (const uint8_t*)json_string, size, @@ -1474,6 +1500,7 @@ static void _tf_ssb_connection_rpc_recv(tf_ssb_connection_t* connection, uint8_t { connection->ssb->rpc_in++; _tf_ssb_connection_add_debug_message(connection, false, flags, request_number, message, size); + bool close_connection = false; if (size == 0) { _tf_ssb_connection_close(connection, "rpc recv zero"); @@ -1492,9 +1519,25 @@ static void _tf_ssb_connection_rpc_recv(tf_ssb_connection_t* connection, uint8_t if (!JS_IsUndefined(val)) { - bool found = false; - if (JS_IsObject(val)) + tf_ssb_rpc_callback_t* callback = NULL; + void* user_data = NULL; + if (_tf_ssb_connection_get_request_callback(connection, -request_number, &callback, &user_data)) { + if (callback) + { + char buffer[64]; + snprintf(buffer, sizeof(buffer), "request %d", request_number); + tf_trace_begin(connection->ssb->trace, buffer); + PRE_CALLBACK(connection->ssb, callback); + callback(connection, flags, request_number, val, message, size, user_data); + POST_CALLBACK(connection->ssb, callback); + tf_trace_end(connection->ssb->trace); + } + } + else if (JS_IsObject(val)) + { + bool found = false; + tf_ssb_connection_add_request(connection, -request_number, NULL, NULL, NULL, NULL); for (tf_ssb_rpc_callback_node_t* it = connection->ssb->rpc; it; it = it->next) { if (_tf_ssb_name_equals(context, val, it->name)) @@ -1508,35 +1551,19 @@ static void _tf_ssb_connection_rpc_recv(tf_ssb_connection_t* connection, uint8_t break; } } - } - if (!found) - { - tf_ssb_rpc_callback_t* callback = NULL; - void* user_data = NULL; - if (_tf_ssb_connection_get_request_callback(connection, -request_number, &callback, &user_data)) - { - if (callback) - { - char buffer[64]; - snprintf(buffer, sizeof(buffer), "request %d", request_number); - tf_trace_begin(connection->ssb->trace, buffer); - PRE_CALLBACK(connection->ssb, callback); - callback(connection, flags, request_number, val, message, size, user_data); - POST_CALLBACK(connection->ssb, callback); - tf_trace_end(connection->ssb->trace); - } - } - else if (!_tf_ssb_name_equals(context, val, (const char*[]) { "Error", NULL })) + if (!found) { char buffer[256]; _tf_ssb_name_to_string(context, val, buffer, sizeof(buffer)); tf_ssb_connection_rpc_send_error_method_not_allowed(connection, flags, -request_number, buffer); } } + } else { tf_printf("Failed to parse %.*s\n", (int)size, message); + close_connection = true; } JS_FreeValue(context, val); @@ -1568,9 +1595,9 @@ static void _tf_ssb_connection_rpc_recv(tf_ssb_connection_t* connection, uint8_t } } - if (flags & k_ssb_rpc_flag_end_error) + if (close_connection) { - tf_ssb_connection_remove_request(connection, -request_number); + tf_ssb_connection_close(connection); } } diff --git a/src/ssb.h b/src/ssb.h index 9716aede..8c1a3438 100644 --- a/src/ssb.h +++ b/src/ssb.h @@ -15,6 +15,11 @@ enum k_ssb_rpc_flag_end_error = 0x4, k_ssb_rpc_flag_stream = 0x8, + k_ssb_rpc_mask_message = 0xC, + + k_ssb_rpc_mask_send = 0xf, + + k_ssb_rpc_flag_new_request = 0x10, k_ssb_blob_bytes_max = 5 * 1024 * 1024, }; diff --git a/src/ssb.js.c b/src/ssb.js.c index dd2081d1..13b92b7f 100644 --- a/src/ssb.js.c +++ b/src/ssb.js.c @@ -1096,7 +1096,7 @@ static JSValue _tf_ssb_createTunnel(JSContext* context, JSValueConst this_val, i JS_SetPropertyUint32(context, args, 0, arg); JS_SetPropertyStr(context, message, "args", args); JS_SetPropertyStr(context, message, "type", JS_NewString(context, "duplex")); - tf_ssb_connection_rpc_send_json(connection, k_ssb_rpc_flag_stream, request_number, message, NULL, NULL, NULL); + tf_ssb_connection_rpc_send_json(connection, k_ssb_rpc_flag_stream | k_ssb_rpc_flag_new_request, request_number, message, NULL, NULL, NULL); JS_FreeValue(context, message); tf_ssb_connection_tunnel_create(ssb, portal_id, request_number, target_id); diff --git a/src/ssb.rpc.c b/src/ssb.rpc.c index e08252ce..67a55124 100644 --- a/src/ssb.rpc.c +++ b/src/ssb.rpc.c @@ -351,7 +351,7 @@ static void _tf_ssb_rpc_tunnel_connect(tf_ssb_connection_t* connection, uint8_t tf_ssb_connection_rpc_send_json( target_connection, - k_ssb_rpc_flag_stream, + k_ssb_rpc_flag_stream | k_ssb_rpc_flag_new_request, tunnel_request_number, message, NULL, @@ -565,7 +565,7 @@ static void _tf_ssb_rpc_connection_blobs_get(tf_ssb_connection_t* connection, co tf_ssb_connection_rpc_send_json( connection, - k_ssb_rpc_flag_stream, + k_ssb_rpc_flag_stream | k_ssb_rpc_flag_new_request, tf_ssb_connection_next_request_number(connection), message, _tf_ssb_rpc_connection_blobs_get_callback, @@ -756,7 +756,7 @@ static void _tf_ssb_rpc_connection_tunnel_isRoom_callback(tf_ssb_connection_t* c JS_SetPropertyStr(context, message, "args", JS_NewArray(context)); tf_ssb_connection_rpc_send_json( connection, - k_ssb_rpc_flag_stream, + k_ssb_rpc_flag_stream | k_ssb_rpc_flag_new_request, tf_ssb_connection_next_request_number(connection), message, _tf_ssb_rpc_connection_room_attendants_callback, @@ -1074,7 +1074,7 @@ static void _tf_ssb_rpc_send_ebt_replicate(tf_ssb_connection_t* connection) int32_t request_number = tf_ssb_connection_next_request_number(connection); tf_ssb_connection_rpc_send_json( connection, - k_ssb_rpc_flag_stream, + k_ssb_rpc_flag_stream | k_ssb_rpc_flag_new_request, request_number, message, _tf_ssb_rpc_ebt_replicate_client, @@ -1111,7 +1111,7 @@ static void _tf_ssb_rpc_connections_changed_callback(tf_ssb_t* ssb, tf_ssb_chang JS_SetPropertyStr(context, message, "args", JS_NewArray(context)); tf_ssb_connection_rpc_send_json( connection, - k_ssb_rpc_flag_stream, + k_ssb_rpc_flag_stream | k_ssb_rpc_flag_new_request, tf_ssb_connection_next_request_number(connection), message, _tf_ssb_rpc_connection_blobs_createWants_callback, @@ -1129,7 +1129,7 @@ static void _tf_ssb_rpc_connections_changed_callback(tf_ssb_t* ssb, tf_ssb_chang JS_SetPropertyStr(context, message, "args", JS_NewArray(context)); tf_ssb_connection_rpc_send_json( connection, - 0, + k_ssb_rpc_flag_new_request, tf_ssb_connection_next_request_number(connection), message, _tf_ssb_rpc_connection_tunnel_isRoom_callback, diff --git a/src/ssb.tests.c b/src/ssb.tests.c index e2f64b4a..b875fa0b 100644 --- a/src/ssb.tests.c +++ b/src/ssb.tests.c @@ -443,7 +443,7 @@ void tf_ssb_test_rooms(const tf_test_options_t* options) tf_ssb_connection_rpc_send_json( connections[0], - k_ssb_rpc_flag_stream, + k_ssb_rpc_flag_stream | k_ssb_rpc_flag_new_request, tunnel_request_number, message, NULL,