diff --git a/src/ssb.rpc.c b/src/ssb.rpc.c index 9428867e..2088f617 100644 --- a/src/ssb.rpc.c +++ b/src/ssb.rpc.c @@ -217,9 +217,15 @@ static void _tf_ssb_rpc_blob_wants_added_callback(tf_ssb_t* ssb, const char* id, JS_FreeValue(context, message); } -static void _tf_ssb_rpc_request_more_blobs(tf_ssb_connection_t* connection) +typedef struct _blob_wants_work_t { - JSContext* context = tf_ssb_connection_get_context(connection); + char out_id[k_blob_id_len][32]; + int out_id_count; +} blob_wants_work_t; + +static void _tf_ssb_request_blob_wants_work(tf_ssb_connection_t* connection, void* user_data) +{ + blob_wants_work_t* work = user_data; tf_ssb_blob_wants_t* blob_wants = tf_ssb_connection_get_blob_wants_state(connection); tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection); int64_t age = _get_global_setting_int64(ssb, "blob_fetch_age_seconds", -1); @@ -237,20 +243,16 @@ static void _tf_ssb_rpc_request_more_blobs(tf_ssb_connection_t* connection) sqlite3* db = tf_ssb_acquire_db_reader(ssb); sqlite3_stmt* statement; - if (sqlite3_prepare(db, "SELECT id FROM blob_wants_view WHERE id > ? AND timestamp > ? ORDER BY id LIMIT 32", -1, &statement, NULL) == SQLITE_OK) + if (sqlite3_prepare(db, "SELECT id FROM blob_wants_view WHERE id > ? AND timestamp > ? ORDER BY id LIMIT ?", -1, &statement, NULL) == SQLITE_OK) { if (sqlite3_bind_text(statement, 1, blob_wants->last_id, -1, NULL) == SQLITE_OK && - sqlite3_bind_int64(statement, 2, timestamp) == SQLITE_OK) + sqlite3_bind_int64(statement, 2, timestamp) == SQLITE_OK && + sqlite3_bind_int(statement, 3, _countof(work->out_id)) == SQLITE_OK) { while (sqlite3_step(statement) == SQLITE_ROW) { - const char* blob = (const char*)sqlite3_column_text(statement, 0); - JSValue message = JS_NewObject(context); - JS_SetPropertyStr(context, message, blob, JS_NewInt32(context, -1)); - tf_ssb_connection_rpc_send_json(connection, k_ssb_rpc_flag_stream, -blob_wants->request_number, message, NULL, NULL, NULL); - JS_FreeValue(context, message); - snprintf(blob_wants->last_id, sizeof(blob_wants->last_id), "%s", blob); - blob_wants->wants_sent++; + snprintf(work->out_id[work->out_id_count], sizeof(work->out_id[work->out_id_count]), "%s", (const char*)sqlite3_column_text(statement, 0)); + work->out_id_count++; } } sqlite3_finalize(statement); @@ -262,6 +264,33 @@ static void _tf_ssb_rpc_request_more_blobs(tf_ssb_connection_t* connection) tf_ssb_release_db_reader(ssb, db); } +static void _tf_ssb_request_blob_wants_after_work(tf_ssb_connection_t* connection, int result, void* user_data) +{ + blob_wants_work_t* work = user_data; + JSContext* context = tf_ssb_connection_get_context(connection); + tf_ssb_blob_wants_t* blob_wants = tf_ssb_connection_get_blob_wants_state(connection); + for (int i = 0; i < work->out_id_count; i++) + { + JSValue message = JS_NewObject(context); + JS_SetPropertyStr(context, message, work->out_id[i], JS_NewInt32(context, -1)); + tf_ssb_connection_rpc_send_json(connection, k_ssb_rpc_flag_stream, -blob_wants->request_number, message, NULL, NULL, NULL); + JS_FreeValue(context, message); + blob_wants->wants_sent++; + } + if (work->out_id_count) + { + snprintf(blob_wants->last_id, sizeof(blob_wants->last_id), "%s", work->out_id[work->out_id_count - 1]); + } + tf_free(work); +} + +static void _tf_ssb_rpc_request_more_blobs(tf_ssb_connection_t* connection) +{ + blob_wants_work_t* work = tf_malloc(sizeof(blob_wants_work_t)); + memset(work, 0, sizeof(*work)); + tf_ssb_connection_run_work(connection, _tf_ssb_request_blob_wants_work, _tf_ssb_request_blob_wants_after_work, work); +} + static void _tf_ssb_rpc_blobs_createWants(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) { tf_ssb_blob_wants_t* blob_wants = tf_ssb_connection_get_blob_wants_state(connection); @@ -849,29 +878,34 @@ static void _tf_ssb_connection_send_history_stream_work(tf_ssb_connection_t* con static void _tf_ssb_connection_send_history_stream_after_work(tf_ssb_connection_t* connection, int result, void* user_data) { tf_ssb_connection_send_history_stream_t* request = user_data; + if (tf_ssb_connection_is_connected(connection)) + { + for (int i = 0; i < request->out_messages_count; i++) + { + tf_ssb_connection_rpc_send(connection, k_ssb_rpc_flag_stream | k_ssb_rpc_flag_json, request->request_number, (const uint8_t*)request->out_messages[i], strlen(request->out_messages[i]), NULL, NULL, NULL); + } + if (!request->out_finished) + { + _tf_ssb_connection_send_history_stream(connection, request->request_number, request->author, request->out_max_sequence_seen + 1, request->keys, request->live); + } + else if (!request->live) + { + tf_ssb_connection_rpc_send( + connection, + k_ssb_rpc_flag_json, + request->request_number, + (const uint8_t*)"false", + strlen("false"), + NULL, + NULL, + NULL); + } + } for (int i = 0; i < request->out_messages_count; i++) { - tf_ssb_connection_rpc_send(connection, k_ssb_rpc_flag_stream | k_ssb_rpc_flag_json, request->request_number, (const uint8_t*)request->out_messages[i], strlen(request->out_messages[i]), NULL, NULL, NULL); tf_free(request->out_messages[i]); } tf_free(request->out_messages); - - if (!request->out_finished) - { - _tf_ssb_connection_send_history_stream(connection, request->request_number, request->author, request->out_max_sequence_seen + 1, request->keys, request->live); - } - else if (!request->live) - { - tf_ssb_connection_rpc_send( - connection, - k_ssb_rpc_flag_json, - request->request_number, - (const uint8_t*)"false", - strlen("false"), - NULL, - NULL, - NULL); - } tf_free(request); }