Move blob wants to the worker threads.

git-svn-id: https://www.unprompted.com/svn/projects/tildefriends/trunk@4500 ed5197a5-7fde-0310-b194-c3ffbd925b24
This commit is contained in:
Cory McWilliams 2023-10-08 15:40:20 +00:00
parent 62cdc592c0
commit 575f6c2f0a

View File

@ -217,9 +217,15 @@ static void _tf_ssb_rpc_blob_wants_added_callback(tf_ssb_t* ssb, const char* id,
JS_FreeValue(context, message); JS_FreeValue(context, message);
} }
static void _tf_ssb_rpc_request_more_blobs(tf_ssb_connection_t* connection) typedef struct _blob_wants_work_t
{ {
JSContext* context = tf_ssb_connection_get_context(connection); char out_id[k_blob_id_len][32];
int out_id_count;
} blob_wants_work_t;
static void _tf_ssb_request_blob_wants_work(tf_ssb_connection_t* connection, void* user_data)
{
blob_wants_work_t* work = user_data;
tf_ssb_blob_wants_t* blob_wants = tf_ssb_connection_get_blob_wants_state(connection); tf_ssb_blob_wants_t* blob_wants = tf_ssb_connection_get_blob_wants_state(connection);
tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection); tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection);
int64_t age = _get_global_setting_int64(ssb, "blob_fetch_age_seconds", -1); int64_t age = _get_global_setting_int64(ssb, "blob_fetch_age_seconds", -1);
@ -237,20 +243,16 @@ static void _tf_ssb_rpc_request_more_blobs(tf_ssb_connection_t* connection)
sqlite3* db = tf_ssb_acquire_db_reader(ssb); sqlite3* db = tf_ssb_acquire_db_reader(ssb);
sqlite3_stmt* statement; sqlite3_stmt* statement;
if (sqlite3_prepare(db, "SELECT id FROM blob_wants_view WHERE id > ? AND timestamp > ? ORDER BY id LIMIT 32", -1, &statement, NULL) == SQLITE_OK) if (sqlite3_prepare(db, "SELECT id FROM blob_wants_view WHERE id > ? AND timestamp > ? ORDER BY id LIMIT ?", -1, &statement, NULL) == SQLITE_OK)
{ {
if (sqlite3_bind_text(statement, 1, blob_wants->last_id, -1, NULL) == SQLITE_OK && if (sqlite3_bind_text(statement, 1, blob_wants->last_id, -1, NULL) == SQLITE_OK &&
sqlite3_bind_int64(statement, 2, timestamp) == SQLITE_OK) sqlite3_bind_int64(statement, 2, timestamp) == SQLITE_OK &&
sqlite3_bind_int(statement, 3, _countof(work->out_id)) == SQLITE_OK)
{ {
while (sqlite3_step(statement) == SQLITE_ROW) while (sqlite3_step(statement) == SQLITE_ROW)
{ {
const char* blob = (const char*)sqlite3_column_text(statement, 0); snprintf(work->out_id[work->out_id_count], sizeof(work->out_id[work->out_id_count]), "%s", (const char*)sqlite3_column_text(statement, 0));
JSValue message = JS_NewObject(context); work->out_id_count++;
JS_SetPropertyStr(context, message, blob, JS_NewInt32(context, -1));
tf_ssb_connection_rpc_send_json(connection, k_ssb_rpc_flag_stream, -blob_wants->request_number, message, NULL, NULL, NULL);
JS_FreeValue(context, message);
snprintf(blob_wants->last_id, sizeof(blob_wants->last_id), "%s", blob);
blob_wants->wants_sent++;
} }
} }
sqlite3_finalize(statement); sqlite3_finalize(statement);
@ -262,6 +264,33 @@ static void _tf_ssb_rpc_request_more_blobs(tf_ssb_connection_t* connection)
tf_ssb_release_db_reader(ssb, db); tf_ssb_release_db_reader(ssb, db);
} }
static void _tf_ssb_request_blob_wants_after_work(tf_ssb_connection_t* connection, int result, void* user_data)
{
blob_wants_work_t* work = user_data;
JSContext* context = tf_ssb_connection_get_context(connection);
tf_ssb_blob_wants_t* blob_wants = tf_ssb_connection_get_blob_wants_state(connection);
for (int i = 0; i < work->out_id_count; i++)
{
JSValue message = JS_NewObject(context);
JS_SetPropertyStr(context, message, work->out_id[i], JS_NewInt32(context, -1));
tf_ssb_connection_rpc_send_json(connection, k_ssb_rpc_flag_stream, -blob_wants->request_number, message, NULL, NULL, NULL);
JS_FreeValue(context, message);
blob_wants->wants_sent++;
}
if (work->out_id_count)
{
snprintf(blob_wants->last_id, sizeof(blob_wants->last_id), "%s", work->out_id[work->out_id_count - 1]);
}
tf_free(work);
}
static void _tf_ssb_rpc_request_more_blobs(tf_ssb_connection_t* connection)
{
blob_wants_work_t* work = tf_malloc(sizeof(blob_wants_work_t));
memset(work, 0, sizeof(*work));
tf_ssb_connection_run_work(connection, _tf_ssb_request_blob_wants_work, _tf_ssb_request_blob_wants_after_work, work);
}
static void _tf_ssb_rpc_blobs_createWants(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) static void _tf_ssb_rpc_blobs_createWants(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data)
{ {
tf_ssb_blob_wants_t* blob_wants = tf_ssb_connection_get_blob_wants_state(connection); tf_ssb_blob_wants_t* blob_wants = tf_ssb_connection_get_blob_wants_state(connection);
@ -849,13 +878,12 @@ static void _tf_ssb_connection_send_history_stream_work(tf_ssb_connection_t* con
static void _tf_ssb_connection_send_history_stream_after_work(tf_ssb_connection_t* connection, int result, void* user_data) static void _tf_ssb_connection_send_history_stream_after_work(tf_ssb_connection_t* connection, int result, void* user_data)
{ {
tf_ssb_connection_send_history_stream_t* request = user_data; tf_ssb_connection_send_history_stream_t* request = user_data;
if (tf_ssb_connection_is_connected(connection))
{
for (int i = 0; i < request->out_messages_count; i++) for (int i = 0; i < request->out_messages_count; i++)
{ {
tf_ssb_connection_rpc_send(connection, k_ssb_rpc_flag_stream | k_ssb_rpc_flag_json, request->request_number, (const uint8_t*)request->out_messages[i], strlen(request->out_messages[i]), NULL, NULL, NULL); tf_ssb_connection_rpc_send(connection, k_ssb_rpc_flag_stream | k_ssb_rpc_flag_json, request->request_number, (const uint8_t*)request->out_messages[i], strlen(request->out_messages[i]), NULL, NULL, NULL);
tf_free(request->out_messages[i]);
} }
tf_free(request->out_messages);
if (!request->out_finished) if (!request->out_finished)
{ {
_tf_ssb_connection_send_history_stream(connection, request->request_number, request->author, request->out_max_sequence_seen + 1, request->keys, request->live); _tf_ssb_connection_send_history_stream(connection, request->request_number, request->author, request->out_max_sequence_seen + 1, request->keys, request->live);
@ -872,6 +900,12 @@ static void _tf_ssb_connection_send_history_stream_after_work(tf_ssb_connection_
NULL, NULL,
NULL); NULL);
} }
}
for (int i = 0; i < request->out_messages_count; i++)
{
tf_free(request->out_messages[i]);
}
tf_free(request->out_messages);
tf_free(request); tf_free(request);
} }