Move sending history streams to the worker threads.

git-svn-id: https://www.unprompted.com/svn/projects/tildefriends/trunk@4499 ed5197a5-7fde-0310-b194-c3ffbd925b24
This commit is contained in:
Cory McWilliams 2023-10-08 15:14:42 +00:00
parent 11373faf23
commit 62cdc592c0
3 changed files with 123 additions and 23 deletions

View File

@ -257,6 +257,7 @@ typedef struct _tf_ssb_connection_t
uv_tcp_t tcp; uv_tcp_t tcp;
uv_connect_t connect; uv_connect_t connect;
uv_async_t async; uv_async_t async;
bool closing;
tf_ssb_connection_t* tunnel_connection; tf_ssb_connection_t* tunnel_connection;
int32_t tunnel_request_number; int32_t tunnel_request_number;
@ -318,6 +319,7 @@ typedef struct _tf_ssb_connection_t
int scheduled_count; int scheduled_count;
tf_ssb_debug_message_t* debug_messages[k_debug_close_message_count]; tf_ssb_debug_message_t* debug_messages[k_debug_close_message_count];
int ref_count;
} tf_ssb_connection_t; } tf_ssb_connection_t;
static JSClassID _connection_class_id; static JSClassID _connection_class_id;
@ -1767,6 +1769,7 @@ static void _tf_ssb_connection_dispatch_scheduled(tf_ssb_connection_t* connectio
static void _tf_ssb_connection_destroy(tf_ssb_connection_t* connection, const char* reason) static void _tf_ssb_connection_destroy(tf_ssb_connection_t* connection, const char* reason)
{ {
tf_ssb_t* ssb = connection->ssb; tf_ssb_t* ssb = connection->ssb;
connection->closing = true;
if (!connection->destroy_reason) if (!connection->destroy_reason)
{ {
connection->destroy_reason = reason; connection->destroy_reason = reason;
@ -1851,7 +1854,8 @@ static void _tf_ssb_connection_destroy(tf_ssb_connection_t* connection, const ch
if (JS_IsUndefined(connection->object) && if (JS_IsUndefined(connection->object) &&
!connection->async.data && !connection->async.data &&
!connection->tcp.data && !connection->tcp.data &&
!connection->connect.data) !connection->connect.data &&
connection->ref_count == 0)
{ {
JS_FreeValue(ssb->context, connection->ebt_send_clock); JS_FreeValue(ssb->context, connection->ebt_send_clock);
connection->ebt_send_clock = JS_UNDEFINED; connection->ebt_send_clock = JS_UNDEFINED;
@ -3646,3 +3650,66 @@ void tf_ssb_set_main_thread(tf_ssb_t* ssb)
{ {
ssb->thread_self = uv_thread_self(); ssb->thread_self = uv_thread_self();
} }
typedef struct _connection_work_t
{
uv_work_t work;
tf_ssb_connection_t* connection;
void (*work_callback)(tf_ssb_connection_t* connection, void* user_data);
void (*after_work_callback)(tf_ssb_connection_t* connection, int result, void* user_data);
void* user_data;
} connection_work_t;
static void _tf_ssb_connection_work_callback(uv_work_t* work)
{
connection_work_t* data = work->data;
tf_ssb_record_thread_busy(data->connection->ssb, true);
if (data->work_callback)
{
data->work_callback(data->connection, data->user_data);
}
tf_ssb_record_thread_busy(data->connection->ssb, false);
}
static void _tf_ssb_connection_after_work_callback(uv_work_t* work, int status)
{
connection_work_t* data = work->data;
if (data->after_work_callback)
{
data->after_work_callback(data->connection, status, data->user_data);
}
data->connection->ref_count--;
if (data->connection->ref_count == 0 &&
data->connection->closing)
{
_tf_ssb_connection_destroy(data->connection, "work completed");
}
tf_free(data);
}
void tf_ssb_connection_run_work(
tf_ssb_connection_t* connection,
void (*work_callback)(tf_ssb_connection_t* connection, void* user_data),
void (*after_work_callback)(tf_ssb_connection_t* connection, int result, void* user_data),
void* user_data)
{
connection_work_t* work = tf_malloc(sizeof(connection_work_t));
*work = (connection_work_t)
{
.work =
{
.data = work,
},
.connection = connection,
.work_callback = work_callback,
.after_work_callback = after_work_callback,
.user_data = user_data,
};
connection->ref_count++;
int result = uv_queue_work(connection->ssb->loop, &work->work, _tf_ssb_connection_work_callback, _tf_ssb_connection_after_work_callback);
if (result)
{
_tf_ssb_connection_after_work_callback(&work->work, result);
}
}

View File

@ -173,6 +173,11 @@ void tf_ssb_connection_remove_request(tf_ssb_connection_t* connection, int32_t r
typedef void (tf_ssb_scheduled_callback_t)(tf_ssb_connection_t* connection, void* user_data); typedef void (tf_ssb_scheduled_callback_t)(tf_ssb_connection_t* connection, void* user_data);
void tf_ssb_connection_schedule_idle(tf_ssb_connection_t* connection, tf_ssb_scheduled_callback_t* callback, void* user_data); void tf_ssb_connection_schedule_idle(tf_ssb_connection_t* connection, tf_ssb_scheduled_callback_t* callback, void* user_data);
void tf_ssb_connection_run_work(
tf_ssb_connection_t* connection,
void (*work_callback)(tf_ssb_connection_t* connection, void* user_data),
void (*after_work_callback)(tf_ssb_connection_t* connection, int result, void* user_data),
void* user_data);
void tf_ssb_connection_add_new_message_request(tf_ssb_connection_t* connection, const char* author, int32_t request_number, bool keys); void tf_ssb_connection_add_new_message_request(tf_ssb_connection_t* connection, const char* author, int32_t request_number, bool keys);
void tf_ssb_connection_remove_new_message_request(tf_ssb_connection_t* connection, const char* author); void tf_ssb_connection_remove_new_message_request(tf_ssb_connection_t* connection, const char* author);

View File

@ -19,7 +19,6 @@
#endif #endif
static void _tf_ssb_connection_send_history_stream(tf_ssb_connection_t* connection, int32_t request_number, const char* author, int64_t sequence, bool keys, bool live); static void _tf_ssb_connection_send_history_stream(tf_ssb_connection_t* connection, int32_t request_number, const char* author, int64_t sequence, bool keys, bool live);
static void _tf_ssb_connection_send_history_stream_internal(tf_ssb_connection_t* connection, int32_t request_number, const char* author, int64_t sequence, bool keys, bool live);
static void _tf_ssb_rpc_start_delete_blobs(tf_ssb_t* ssb, int delay_ms); static void _tf_ssb_rpc_start_delete_blobs(tf_ssb_t* ssb, int delay_ms);
static int64_t _get_global_setting_int64(tf_ssb_t* ssb, const char* name, int64_t default_value) static int64_t _get_global_setting_int64(tf_ssb_t* ssb, const char* name, int64_t default_value)
@ -765,36 +764,39 @@ typedef struct _tf_ssb_connection_send_history_stream_t
int64_t sequence; int64_t sequence;
bool keys; bool keys;
bool live; bool live;
bool out_finished;
int64_t out_max_sequence_seen;
char** out_messages;
int out_messages_count;
} tf_ssb_connection_send_history_stream_t; } tf_ssb_connection_send_history_stream_t;
static void _tf_ssb_connection_send_history_stream_callback(tf_ssb_connection_t* connection, void* user_data) static void _tf_ssb_connection_send_history_stream_work(tf_ssb_connection_t* connection, void* user_data)
{ {
tf_ssb_connection_send_history_stream_t* request = user_data; tf_ssb_connection_send_history_stream_t* request = user_data;
if (tf_ssb_connection_is_connected(connection)) if (!tf_ssb_connection_is_connected(connection))
{ {
_tf_ssb_connection_send_history_stream_internal(connection, request->request_number, request->author, request->sequence, request->keys, request->live); return;
} }
tf_free(request);
}
static void _tf_ssb_connection_send_history_stream_internal(tf_ssb_connection_t* connection, int32_t request_number, const char* author, int64_t sequence, bool keys, bool live)
{
tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection); tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection);
JSContext* context = tf_ssb_get_context(ssb);
sqlite3* db = tf_ssb_acquire_db_reader(ssb); sqlite3* db = tf_ssb_acquire_db_reader(ssb);
sqlite3_stmt* statement; sqlite3_stmt* statement;
const int k_max = 32; const int k_max = 32;
int64_t max_sequence_seen = 0;
if (sqlite3_prepare(db, "SELECT previous, author, id, sequence, timestamp, hash, content, signature, sequence_before_author FROM messages WHERE author = ?1 AND sequence >= ?2 AND sequence < ?3 ORDER BY sequence", -1, &statement, NULL) == SQLITE_OK) if (sqlite3_prepare(db, "SELECT previous, author, id, sequence, timestamp, hash, content, signature, sequence_before_author FROM messages WHERE author = ?1 AND sequence >= ?2 AND sequence < ?3 ORDER BY sequence", -1, &statement, NULL) == SQLITE_OK)
{ {
if (sqlite3_bind_text(statement, 1, author, -1, NULL) == SQLITE_OK && if (sqlite3_bind_text(statement, 1, request->author, -1, NULL) == SQLITE_OK &&
sqlite3_bind_int64(statement, 2, sequence) == SQLITE_OK && sqlite3_bind_int64(statement, 2, request->sequence) == SQLITE_OK &&
sqlite3_bind_int64(statement, 3, sequence + k_max) == SQLITE_OK) sqlite3_bind_int64(statement, 3, request->sequence + k_max) == SQLITE_OK)
{ {
JSMallocFunctions funcs = { 0 };
tf_get_js_malloc_functions(&funcs);
JSRuntime* runtime = JS_NewRuntime2(&funcs, NULL);
JSContext* context = JS_NewContext(runtime);
while (sqlite3_step(statement) == SQLITE_ROW) while (sqlite3_step(statement) == SQLITE_ROW)
{ {
JSValue message = JS_UNDEFINED; JSValue message = JS_UNDEFINED;
max_sequence_seen = sqlite3_column_int64(statement, 3); request->out_max_sequence_seen = sqlite3_column_int64(statement, 3);
JSValue formatted = tf_ssb_format_message( JSValue formatted = tf_ssb_format_message(
context, context,
@ -806,7 +808,7 @@ static void _tf_ssb_connection_send_history_stream_internal(tf_ssb_connection_t*
(const char*)sqlite3_column_text(statement, 6), (const char*)sqlite3_column_text(statement, 6),
(const char*)sqlite3_column_text(statement, 7), (const char*)sqlite3_column_text(statement, 7),
sqlite3_column_int(statement, 8)); sqlite3_column_int(statement, 8));
if (keys) if (request->keys)
{ {
message = JS_NewObject(context); message = JS_NewObject(context);
JS_SetPropertyStr(context, message, "key", JS_NewString(context, (const char*)sqlite3_column_text(statement, 2))); JS_SetPropertyStr(context, message, "key", JS_NewString(context, (const char*)sqlite3_column_text(statement, 2)));
@ -817,9 +819,22 @@ static void _tf_ssb_connection_send_history_stream_internal(tf_ssb_connection_t*
{ {
message = formatted; message = formatted;
} }
tf_ssb_connection_rpc_send_json(connection, k_ssb_rpc_flag_stream, request_number, message, NULL, NULL, NULL); JSValue json = JS_JSONStringify(context, message, JS_NULL, JS_NULL);
size_t size = 0;
const char* string = JS_ToCStringLen(context, &size, json);
request->out_messages = tf_resize_vec(request->out_messages, sizeof(char*) * (request->out_messages_count + 1));
char* copy = tf_malloc(size + 1);
memcpy(copy, string, size + 1);
JS_FreeCString(context, string);
request->out_messages[request->out_messages_count++] = copy;
JS_FreeValue(context, json);
JS_FreeValue(context, message); JS_FreeValue(context, message);
} }
JS_FreeContext(context);
JS_FreeRuntime(runtime);
} }
sqlite3_finalize(statement); sqlite3_finalize(statement);
} }
@ -828,23 +843,36 @@ static void _tf_ssb_connection_send_history_stream_internal(tf_ssb_connection_t*
tf_printf("prepare failed: %s\n", sqlite3_errmsg(db)); tf_printf("prepare failed: %s\n", sqlite3_errmsg(db));
} }
tf_ssb_release_db_reader(ssb, db); tf_ssb_release_db_reader(ssb, db);
request->out_finished = request->out_max_sequence_seen != request->sequence + k_max - 1;
}
if (max_sequence_seen == sequence + k_max - 1) static void _tf_ssb_connection_send_history_stream_after_work(tf_ssb_connection_t* connection, int result, void* user_data)
{
tf_ssb_connection_send_history_stream_t* request = user_data;
for (int i = 0; i < request->out_messages_count; i++)
{ {
_tf_ssb_connection_send_history_stream(connection, request_number, author, max_sequence_seen + 1, keys, live); tf_ssb_connection_rpc_send(connection, k_ssb_rpc_flag_stream | k_ssb_rpc_flag_json, request->request_number, (const uint8_t*)request->out_messages[i], strlen(request->out_messages[i]), NULL, NULL, NULL);
tf_free(request->out_messages[i]);
} }
else if (!live) tf_free(request->out_messages);
if (!request->out_finished)
{
_tf_ssb_connection_send_history_stream(connection, request->request_number, request->author, request->out_max_sequence_seen + 1, request->keys, request->live);
}
else if (!request->live)
{ {
tf_ssb_connection_rpc_send( tf_ssb_connection_rpc_send(
connection, connection,
k_ssb_rpc_flag_json, k_ssb_rpc_flag_json,
request_number, request->request_number,
(const uint8_t*)"false", (const uint8_t*)"false",
strlen("false"), strlen("false"),
NULL, NULL,
NULL, NULL,
NULL); NULL);
} }
tf_free(request);
} }
static void _tf_ssb_connection_send_history_stream(tf_ssb_connection_t* connection, int32_t request_number, const char* author, int64_t sequence, bool keys, bool live) static void _tf_ssb_connection_send_history_stream(tf_ssb_connection_t* connection, int32_t request_number, const char* author, int64_t sequence, bool keys, bool live)
@ -858,7 +886,7 @@ static void _tf_ssb_connection_send_history_stream(tf_ssb_connection_t* connecti
.live = live, .live = live,
}; };
snprintf(async->author, sizeof(async->author), "%s", author); snprintf(async->author, sizeof(async->author), "%s", author);
tf_ssb_connection_schedule_idle(connection, _tf_ssb_connection_send_history_stream_callback, async); tf_ssb_connection_run_work(connection, _tf_ssb_connection_send_history_stream_work, _tf_ssb_connection_send_history_stream_after_work, async);
} }
static void _tf_ssb_rpc_createHistoryStream(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) static void _tf_ssb_rpc_createHistoryStream(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data)