diff --git a/src/ssb.c b/src/ssb.c index 83c1ab70..c7a727ba 100644 --- a/src/ssb.c +++ b/src/ssb.c @@ -289,7 +289,6 @@ static void _tf_ssb_connection_close(tf_ssb_connection_t* connection, const char static void _tf_ssb_nonce_inc(uint8_t* nonce); static void _tf_ssb_write(tf_ssb_connection_t* connection, void* data, size_t size); static void _tf_ssb_connection_finalizer(JSRuntime* runtime, JSValue value); -static void _tf_ssb_connection_remove_request(tf_ssb_connection_t* connection, int32_t request_number); static void _tf_ssb_connection_send_close(tf_ssb_connection_t* connection) { @@ -529,7 +528,7 @@ static bool _tf_ssb_connection_get_request_callback(tf_ssb_connection_t* connect void tf_ssb_connection_add_request(tf_ssb_connection_t* connection, int32_t request_number, tf_ssb_rpc_callback_t* callback, tf_ssb_callback_cleanup_t* cleanup, void* user_data, tf_ssb_connection_t* dependent_connection) { - _tf_ssb_connection_remove_request(connection, request_number); + tf_ssb_connection_remove_request(connection, request_number); tf_ssb_request_t request = { .request_number = request_number, @@ -590,7 +589,7 @@ void tf_ssb_connection_remove_new_message_request(tf_ssb_connection_t* connectio } } -static void _tf_ssb_connection_remove_request(tf_ssb_connection_t* connection, int32_t request_number) +void tf_ssb_connection_remove_request(tf_ssb_connection_t* connection, int32_t request_number) { tf_ssb_request_t* request = bsearch(&request_number, connection->requests, connection->requests_count, sizeof(tf_ssb_request_t), _request_compare); if (request) @@ -1455,7 +1454,7 @@ static void _tf_ssb_connection_rpc_recv(tf_ssb_connection_t* connection, uint8_t if (flags & k_ssb_rpc_flag_end_error) { - _tf_ssb_connection_remove_request(connection, -request_number); + tf_ssb_connection_remove_request(connection, -request_number); } } @@ -1661,7 +1660,7 @@ void _tf_ssb_connection_destroy(tf_ssb_connection_t* connection, const char* rea connection->scheduled = NULL; while (connection->requests) { - _tf_ssb_connection_remove_request(connection, connection->requests->request_number); + tf_ssb_connection_remove_request(connection, connection->requests->request_number); } for (tf_ssb_broadcast_t* node = ssb->broadcasts; node; node = node->next) { @@ -1677,7 +1676,7 @@ void _tf_ssb_connection_destroy(tf_ssb_connection_t* connection, const char* rea { if ((*it)->requests[i].dependent_connection == connection) { - _tf_ssb_connection_remove_request(*it, (*it)->requests[i].request_number); + tf_ssb_connection_remove_request(*it, (*it)->requests[i].request_number); } } if (*it == connection) @@ -1704,7 +1703,7 @@ void _tf_ssb_connection_destroy(tf_ssb_connection_t* connection, const char* rea } else if (it == connection->tunnel_connection) { - _tf_ssb_connection_remove_request(it, connection->tunnel_request_number); + tf_ssb_connection_remove_request(it, connection->tunnel_request_number); connection->tunnel_connection = NULL; connection->tunnel_request_number = 0; } diff --git a/src/ssb.h b/src/ssb.h index 4052f980..167a9bd6 100644 --- a/src/ssb.h +++ b/src/ssb.h @@ -153,6 +153,7 @@ void tf_ssb_connection_rpc_send_json(tf_ssb_connection_t* connection, uint8_t fl void tf_ssb_connection_rpc_send_error(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, const char* error); void tf_ssb_connection_rpc_send_error_method_not_allowed(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number); void tf_ssb_connection_add_request(tf_ssb_connection_t* connection, int32_t request_number, tf_ssb_rpc_callback_t* callback, tf_ssb_callback_cleanup_t* cleanup, void* user_data, tf_ssb_connection_t* dependent_connection); +void tf_ssb_connection_remove_request(tf_ssb_connection_t* connection, int32_t request_number); typedef void (tf_ssb_scheduled_callback_t)(tf_ssb_connection_t* connection, void* user_data); void tf_ssb_connection_schedule_idle(tf_ssb_connection_t* connection, tf_ssb_scheduled_callback_t* callback, void* user_data); diff --git a/src/ssb.rpc.c b/src/ssb.rpc.c index bccbf609..81c66f0f 100644 --- a/src/ssb.rpc.c +++ b/src/ssb.rpc.c @@ -15,7 +15,7 @@ #define _countof(a) ((int)(sizeof((a)) / sizeof(*(a)))) #endif -static void _tf_ssb_connection_send_history_stream(tf_ssb_connection_t* connection, int32_t request_number, const char* author, int64_t sequence, bool keys); +static void _tf_ssb_connection_send_history_stream(tf_ssb_connection_t* connection, int32_t request_number, const char* author, int64_t sequence, bool keys, bool live); static void _tf_ssb_rpc_gossip_ping_callback(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) { @@ -30,6 +30,10 @@ static void _tf_ssb_rpc_gossip_ping_callback(tf_ssb_connection_t* connection, ui NULL, NULL, NULL); + if (flags & k_ssb_rpc_flag_end_error) + { + tf_ssb_connection_remove_request(connection, request_number); + } } static void _tf_ssb_rpc_gossip_ping(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) @@ -111,7 +115,7 @@ static void _tf_ssb_rpc_blobs_has(tf_ssb_connection_t* connection, uint8_t flags JS_FreeValue(context, ids); tf_ssb_connection_rpc_send( connection, - k_ssb_rpc_flag_json | k_ssb_rpc_flag_end_error, + k_ssb_rpc_flag_json, -request_number, (const uint8_t*)(has ? "true" : "false"), strlen(has ? "true" : "false"), @@ -314,7 +318,6 @@ static void _tf_ssb_rpc_tunnel_connect(tf_ssb_connection_t* connection, uint8_t const char* origin_str = JS_ToCString(context, origin); const char* portal_str = JS_ToCString(context, portal); const char* target_str = JS_ToCString(context, target); - printf("TUNNEL CREATE\n"); tf_ssb_connection_tunnel_create(ssb, portal_str, -request_number, origin_str); JS_FreeCString(context, origin_str); JS_FreeCString(context, portal_str); @@ -357,11 +360,6 @@ static void _tf_ssb_rpc_room_meta(tf_ssb_connection_t* connection, uint8_t flags JS_FreeValue(context, response); } -static void _tf_ssb_rpc_room_attendants_callback(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) -{ - printf("room attendants callback??\n"); -} - static void _tf_ssb_rpc_room_attendants(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) { tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection); @@ -411,7 +409,6 @@ static void _tf_ssb_rpc_room_attendants(tf_ssb_connection_t* connection, uint8_t JS_FreeValue(context, state); tf_ssb_connection_set_attendant(connection, true, request_number); - tf_ssb_connection_add_request(connection, -request_number, _tf_ssb_rpc_room_attendants_callback, NULL, NULL, NULL); } typedef struct _blobs_get_t @@ -683,6 +680,7 @@ typedef struct _tf_ssb_connection_send_history_stream_t char author[k_id_base64_len]; int64_t sequence; bool keys; + bool live; } tf_ssb_connection_send_history_stream_t; static void _tf_ssb_connection_send_history_stream_callback(tf_ssb_connection_t* connection, void* user_data) @@ -690,12 +688,12 @@ static void _tf_ssb_connection_send_history_stream_callback(tf_ssb_connection_t* tf_ssb_connection_send_history_stream_t* request = user_data; if (tf_ssb_connection_is_connected(connection)) { - _tf_ssb_connection_send_history_stream(connection, request->request_number, request->author, request->sequence, request->keys); + _tf_ssb_connection_send_history_stream(connection, request->request_number, request->author, request->sequence, request->keys, request->live); } tf_free(request); } -static void _tf_ssb_connection_send_history_stream(tf_ssb_connection_t* connection, int32_t request_number, const char* author, int64_t sequence, bool keys) +static void _tf_ssb_connection_send_history_stream(tf_ssb_connection_t* connection, int32_t request_number, const char* author, int64_t sequence, bool keys, bool live) { tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection); JSContext* context = tf_ssb_get_context(ssb); @@ -750,10 +748,23 @@ static void _tf_ssb_connection_send_history_stream(tf_ssb_connection_t* connecti .request_number = request_number, .sequence = max_sequence_seen + 1, .keys = keys, + .live = live, }; snprintf(async->author, sizeof(async->author), "%s", author); tf_ssb_connection_schedule_idle(connection, _tf_ssb_connection_send_history_stream_callback, async); } + else if (!live) + { + tf_ssb_connection_rpc_send( + connection, + k_ssb_rpc_flag_json, + request_number, + (const uint8_t*)"false", + strlen("false"), + NULL, + NULL, + NULL); + } } static void _tf_ssb_rpc_createHistoryStream(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) @@ -776,7 +787,7 @@ static void _tf_ssb_rpc_createHistoryStream(tf_ssb_connection_t* connection, uin JS_ToInt64(context, &sequence, seq); const char* author = JS_ToCString(context, id); - _tf_ssb_connection_send_history_stream(connection, -request_number, author, sequence, is_keys); + _tf_ssb_connection_send_history_stream(connection, -request_number, author, sequence, is_keys, is_live); if (is_live) { @@ -881,7 +892,7 @@ static void _tf_ssb_rpc_ebt_replicate_send_messages(tf_ssb_connection_t* connect if (sequence >= 0 && (sequence & 1) == 0) { int32_t request_number = tf_ssb_connection_get_ebt_request_number(connection); - _tf_ssb_connection_send_history_stream(connection, request_number, author, sequence >> 1, false); + _tf_ssb_connection_send_history_stream(connection, request_number, author, sequence >> 1, false, true); tf_ssb_connection_add_new_message_request(connection, author, request_number, false); } else @@ -1063,14 +1074,14 @@ static void _tf_ssb_rpc_connections_changed_callback(tf_ssb_t* ssb, tf_ssb_chang void tf_ssb_rpc_register(tf_ssb_t* ssb) { tf_ssb_add_connections_changed_callback(ssb, _tf_ssb_rpc_connections_changed_callback, NULL, NULL); - tf_ssb_add_rpc_callback(ssb, (const char*[]) { "gossip", "ping", NULL }, _tf_ssb_rpc_gossip_ping, NULL, NULL); - tf_ssb_add_rpc_callback(ssb, (const char*[]) { "blobs", "get", NULL }, _tf_ssb_rpc_blobs_get, NULL, NULL); - tf_ssb_add_rpc_callback(ssb, (const char*[]) { "blobs", "has", NULL }, _tf_ssb_rpc_blobs_has, NULL, NULL); - tf_ssb_add_rpc_callback(ssb, (const char*[]) { "blobs", "createWants", NULL }, _tf_ssb_rpc_blobs_createWants, NULL, NULL); - tf_ssb_add_rpc_callback(ssb, (const char*[]) { "tunnel", "connect", NULL }, _tf_ssb_rpc_tunnel_connect, NULL, NULL); - tf_ssb_add_rpc_callback(ssb, (const char*[]) { "tunnel", "isRoom", NULL }, _tf_ssb_rpc_room_meta, NULL, NULL); - tf_ssb_add_rpc_callback(ssb, (const char*[]) { "room", "metadata", NULL }, _tf_ssb_rpc_room_meta, NULL, NULL); - tf_ssb_add_rpc_callback(ssb, (const char*[]) { "room", "attendants", NULL }, _tf_ssb_rpc_room_attendants, NULL, NULL); - tf_ssb_add_rpc_callback(ssb, (const char*[]) { "createHistoryStream", NULL }, _tf_ssb_rpc_createHistoryStream, NULL, NULL); - tf_ssb_add_rpc_callback(ssb, (const char*[]) { "ebt", "replicate", NULL }, _tf_ssb_rpc_ebt_replicate_server, NULL, NULL); + tf_ssb_add_rpc_callback(ssb, (const char*[]) { "gossip", "ping", NULL }, _tf_ssb_rpc_gossip_ping, NULL, NULL); /* DUPLEX */ + tf_ssb_add_rpc_callback(ssb, (const char*[]) { "blobs", "get", NULL }, _tf_ssb_rpc_blobs_get, NULL, NULL); /* SOURCE */ + tf_ssb_add_rpc_callback(ssb, (const char*[]) { "blobs", "has", NULL }, _tf_ssb_rpc_blobs_has, NULL, NULL); /* ASYNC */ + tf_ssb_add_rpc_callback(ssb, (const char*[]) { "blobs", "createWants", NULL }, _tf_ssb_rpc_blobs_createWants, NULL, NULL); /* SOURCE */ + tf_ssb_add_rpc_callback(ssb, (const char*[]) { "tunnel", "connect", NULL }, _tf_ssb_rpc_tunnel_connect, NULL, NULL); /* DUPLEX */ + tf_ssb_add_rpc_callback(ssb, (const char*[]) { "tunnel", "isRoom", NULL }, _tf_ssb_rpc_room_meta, NULL, NULL); /* FAKE-ASYNC */ + tf_ssb_add_rpc_callback(ssb, (const char*[]) { "room", "metadata", NULL }, _tf_ssb_rpc_room_meta, NULL, NULL); /* ASYNC */ + tf_ssb_add_rpc_callback(ssb, (const char*[]) { "room", "attendants", NULL }, _tf_ssb_rpc_room_attendants, NULL, NULL); /* SOURCE */ + tf_ssb_add_rpc_callback(ssb, (const char*[]) { "createHistoryStream", NULL }, _tf_ssb_rpc_createHistoryStream, NULL, NULL); /* SOURCE */ + tf_ssb_add_rpc_callback(ssb, (const char*[]) { "ebt", "replicate", NULL }, _tf_ssb_rpc_ebt_replicate_server, NULL, NULL); /* DUPLEX */ }