diff --git a/src/ssb.c b/src/ssb.c index 3d079455..0e52b5d3 100644 --- a/src/ssb.c +++ b/src/ssb.c @@ -765,7 +765,14 @@ void tf_ssb_connection_rpc_send(tf_ssb_connection_t* connection, uint8_t flags, } else if (!_tf_ssb_connection_get_request_callback(connection, request_number, NULL, NULL)) { - tf_printf("Dropping message with no active request (%d): %.*s\n", request_number, (int)size, message); + if (flags & k_ssb_rpc_flag_binary) + { + tf_printf("Dropping message with no active request (%d): (%zd bytes).\n", request_number, size); + } + else + { + tf_printf("Dropping message with no active request (%d): %.*s\n", request_number, (int)size, message); + } return; } @@ -3714,6 +3721,8 @@ typedef struct _connection_work_t { uv_work_t work; tf_ssb_connection_t* connection; + const char* name; + const char* after_name; void (*work_callback)(tf_ssb_connection_t* connection, void* user_data); void (*after_work_callback)(tf_ssb_connection_t* connection, int result, void* user_data); void* user_data; @@ -3725,7 +3734,9 @@ static void _tf_ssb_connection_work_callback(uv_work_t* work) tf_ssb_record_thread_busy(data->connection->ssb, true); if (data->work_callback) { + tf_trace_begin(data->connection->ssb->trace, data->name); data->work_callback(data->connection, data->user_data); + tf_trace_end(data->connection->ssb->trace); } tf_ssb_record_thread_busy(data->connection->ssb, false); } @@ -3735,13 +3746,17 @@ static void _tf_ssb_connection_after_work_callback(uv_work_t* work, int status) connection_work_t* data = work->data; if (data->after_work_callback) { + tf_trace_begin(data->connection->ssb->trace, data->after_name); data->after_work_callback(data->connection, status, data->user_data); + tf_trace_end(data->connection->ssb->trace); } data->connection->ref_count--; if (data->connection->ref_count == 0 && data->connection->closing) { _tf_ssb_connection_destroy(data->connection, "work completed"); } + tf_free((void*)data->name); + tf_free((void*)data->after_name); tf_free(data); } @@ -3769,6 +3784,70 @@ void tf_ssb_connection_run_work(tf_ssb_connection_t* connection, void (*work_cal } } +typedef struct _ssb_work_t +{ + uv_work_t work; + tf_ssb_t* ssb; + const char* name; + const char* after_name; + void (*work_callback)(tf_ssb_t* ssb, void* user_data); + void (*after_work_callback)(tf_ssb_t* ssb, int result, void* user_data); + void* user_data; +} ssb_work_t; + +static void _tf_ssb_work_callback(uv_work_t* work) +{ + ssb_work_t* data = work->data; + tf_ssb_record_thread_busy(data->ssb, true); + if (data->work_callback) + { + tf_trace_begin(data->ssb->trace, data->name); + data->work_callback(data->ssb, data->user_data); + tf_trace_end(data->ssb->trace); + } + tf_ssb_record_thread_busy(data->ssb, false); +} + +static void _tf_ssb_after_work_callback(uv_work_t* work, int status) +{ + ssb_work_t* data = work->data; + if (data->after_work_callback) + { + tf_trace_begin(data->ssb->trace, data->after_name); + data->after_work_callback(data->ssb, status, data->user_data); + tf_trace_end(data->ssb->trace); + } + tf_ssb_unref(data->ssb); + tf_free((void*)data->name); + tf_free((void*)data->after_name); + tf_free(data); +} + +void tf_ssb_run_work(tf_ssb_t* ssb, void (*work_callback)(tf_ssb_t* ssb, void* user_data), void (*after_work_callback)(tf_ssb_t* ssb, int result, void* user_data), void* user_data) +{ + ssb_work_t* work = tf_malloc(sizeof(ssb_work_t)); + *work = (ssb_work_t) + { + .work = + { + .data = work, + }, + .name = tf_util_function_to_string(work_callback), + .after_name = tf_util_function_to_string(after_work_callback), + .ssb = ssb, + .work_callback = work_callback, + .after_work_callback = after_work_callback, + .user_data = user_data, + }; + + tf_ssb_ref(ssb); + int result = uv_queue_work(ssb->loop, &work->work, _tf_ssb_work_callback, _tf_ssb_after_work_callback); + if (result) + { + _tf_ssb_connection_after_work_callback(&work->work, result); + } +} + bool tf_ssb_is_room(tf_ssb_t* ssb) { return ssb->is_room; @@ -3792,8 +3871,6 @@ void tf_ssb_set_room_name(tf_ssb_t* ssb, const char* room_name) typedef struct _update_settings_t { - uv_work_t work; - tf_ssb_t* ssb; bool is_room; char room_name[1024]; } update_settings_t; @@ -3847,58 +3924,35 @@ static bool _get_global_setting_bool(tf_ssb_t* ssb, const char* name, bool defau return result; } -static void _tf_ssb_update_settings_work(uv_work_t* work) +static void _tf_ssb_update_settings_work(tf_ssb_t* ssb, void* user_data) { - update_settings_t* update = work->data; - tf_ssb_record_thread_busy(update->ssb, true); - update->is_room = _get_global_setting_bool(update->ssb, "room", true); - _get_global_setting_string(update->ssb, "room_name", update->room_name, sizeof(update->room_name)); - tf_ssb_record_thread_busy(update->ssb, false); + update_settings_t* update = user_data; + update->is_room = _get_global_setting_bool(ssb, "room", true); + _get_global_setting_string(ssb, "room_name", update->room_name, sizeof(update->room_name)); } -static void _tf_ssb_update_settings_after_work(uv_work_t* work, int result) +static void _tf_ssb_update_settings_after_work(tf_ssb_t* ssb, int result, void* user_data) { - update_settings_t* update = work->data; - tf_ssb_unref(update->ssb); - tf_ssb_set_is_room(update->ssb, update->is_room); - tf_ssb_set_room_name(update->ssb, update->room_name); - _tf_ssb_start_update_settings(update->ssb); + update_settings_t* update = user_data; + tf_ssb_set_is_room(ssb, update->is_room); + tf_ssb_set_room_name(ssb, update->room_name); + _tf_ssb_start_update_settings(ssb); tf_free(update); } static void _tf_ssb_start_update_settings_timer(tf_ssb_t* ssb, void* user_data) { update_settings_t* update = tf_malloc(sizeof(update_settings_t)); - *update = (update_settings_t) - { - .work = - { - .data = update, - }, - .ssb = ssb, - }; - tf_ssb_ref(ssb); - int result = uv_queue_work(tf_ssb_get_loop(ssb), &update->work, _tf_ssb_update_settings_work, _tf_ssb_update_settings_after_work); - if (result) - { - _tf_ssb_update_settings_after_work(&update->work, result); - } + *update = (update_settings_t) { 0 }; + tf_ssb_run_work(ssb, _tf_ssb_update_settings_work, _tf_ssb_update_settings_after_work, update); } static void _tf_ssb_update_settings(tf_ssb_t* ssb) { update_settings_t* update = tf_malloc(sizeof(update_settings_t)); - *update = (update_settings_t) - { - .work = - { - .data = update, - }, - .ssb = ssb, - }; - tf_ssb_ref(ssb); - _tf_ssb_update_settings_work(&update->work); - _tf_ssb_update_settings_after_work(&update->work, 0); + *update = (update_settings_t) { 0 }; + _tf_ssb_update_settings_work(ssb, update); + _tf_ssb_update_settings_after_work(ssb, 0, update); } static void _tf_ssb_start_update_settings(tf_ssb_t* ssb) diff --git a/src/ssb.connections.c b/src/ssb.connections.c index 6c240cd6..0e8def28 100644 --- a/src/ssb.connections.c +++ b/src/ssb.connections.c @@ -76,35 +76,30 @@ static bool _tf_ssb_connections_get_next_connection(tf_ssb_connections_t* connec typedef struct _tf_ssb_connections_get_next_t { - uv_work_t work; tf_ssb_connections_t* connections; - tf_ssb_t* ssb; bool ready; char host[256]; int port; char key[k_id_base64_len]; } tf_ssb_connections_get_next_t; -static void _tf_ssb_connections_get_next_work(uv_work_t* work) +static void _tf_ssb_connections_get_next_work(tf_ssb_t* ssb, void* user_data) { - tf_ssb_connections_get_next_t* next = work->data; - tf_ssb_record_thread_busy(next->ssb, true); + tf_ssb_connections_get_next_t* next = user_data; next->ready = _tf_ssb_connections_get_next_connection(next->connections, next->host, sizeof(next->host), &next->port, next->key, sizeof(next->key)); - tf_ssb_record_thread_busy(next->ssb, false); } -static void _tf_ssb_connections_get_next_after_work(uv_work_t* work, int status) +static void _tf_ssb_connections_get_next_after_work(tf_ssb_t* ssb, int status, void* user_data) { - tf_ssb_connections_get_next_t* next = work->data; + tf_ssb_connections_get_next_t* next = user_data; if (next->ready) { uint8_t key_bin[k_id_bin_len]; if (tf_ssb_id_str_to_bin(key_bin, next->key)) { - tf_ssb_connect(next->ssb, next->host, next->port, key_bin); + tf_ssb_connect(ssb, next->host, next->port, key_bin); } } - tf_ssb_unref(next->ssb); tf_free(next); } @@ -116,21 +111,10 @@ static void _tf_ssb_connections_timer(uv_timer_t* timer) if (count < (int)_countof(active)) { tf_ssb_connections_get_next_t* next = tf_malloc(sizeof(tf_ssb_connections_get_next_t)); - *next = (tf_ssb_connections_get_next_t) - { - .work = - { - .data = next, - }, - .ssb = connections->ssb, + *next = (tf_ssb_connections_get_next_t) { .connections = connections, }; - tf_ssb_ref(connections->ssb); - int result = uv_queue_work(tf_ssb_get_loop(connections->ssb), &next->work, _tf_ssb_connections_get_next_work, _tf_ssb_connections_get_next_after_work); - if (result) - { - _tf_ssb_connections_get_next_after_work(&next->work, result); - } + tf_ssb_run_work(connections->ssb, _tf_ssb_connections_get_next_work, _tf_ssb_connections_get_next_after_work, next); } } @@ -165,8 +149,6 @@ void tf_ssb_connections_destroy(tf_ssb_connections_t* connections) typedef struct _tf_ssb_connections_update_t { - uv_work_t work; - tf_ssb_t* ssb; char host[256]; int port; char key[k_id_base64_len]; @@ -174,12 +156,11 @@ typedef struct _tf_ssb_connections_update_t bool succeeded; } tf_ssb_connections_update_t; -static void _tf_ssb_connections_update_work(uv_work_t* work) +static void _tf_ssb_connections_update_work(tf_ssb_t* ssb, void* user_data) { - tf_ssb_connections_update_t* update = work->data; - tf_ssb_record_thread_busy(update->ssb, true); + tf_ssb_connections_update_t* update = user_data; sqlite3_stmt* statement; - sqlite3* db = tf_ssb_acquire_db_writer(update->ssb); + sqlite3* db = tf_ssb_acquire_db_writer(ssb); if (update->attempted) { if (sqlite3_prepare(db, "UPDATE connections SET last_attempt = strftime('%s', 'now') WHERE host = ?1 AND port = ?2 AND key = ?3", -1, &statement, NULL) == SQLITE_OK) @@ -226,31 +207,23 @@ static void _tf_ssb_connections_update_work(uv_work_t* work) sqlite3_finalize(statement); } } - tf_ssb_release_db_writer(update->ssb, db); - tf_ssb_record_thread_busy(update->ssb, false); + tf_ssb_release_db_writer(ssb, db); } -static void _tf_ssb_connections_update_after_work(uv_work_t* work, int status) +static void _tf_ssb_connections_update_after_work(tf_ssb_t* ssb, int status, void* user_data) { - tf_ssb_connections_update_t* update = work->data; - tf_free(update); + tf_free(user_data); } static void _tf_ssb_connections_queue_update(tf_ssb_connections_t* connections, tf_ssb_connections_update_t* update) { - update->work.data = update; - int result = uv_queue_work(tf_ssb_get_loop(connections->ssb), &update->work, _tf_ssb_connections_update_work, _tf_ssb_connections_update_after_work); - if (result) - { - _tf_ssb_connections_update_after_work(&update->work, result); - } + tf_ssb_run_work(connections->ssb, _tf_ssb_connections_update_work, _tf_ssb_connections_update_after_work, update); } void tf_ssb_connections_store(tf_ssb_connections_t* connections, const char* host, int port, const char* key) { tf_ssb_connections_update_t* update = tf_malloc(sizeof(tf_ssb_connections_update_t)); *update = (tf_ssb_connections_update_t) { - .ssb = connections->ssb, .port = port, }; snprintf(update->host, sizeof(update->host), "%s", host); @@ -262,7 +235,6 @@ void tf_ssb_connections_set_attempted(tf_ssb_connections_t* connections, const c { tf_ssb_connections_update_t* update = tf_malloc(sizeof(tf_ssb_connections_update_t)); *update = (tf_ssb_connections_update_t) { - .ssb = connections->ssb, .port = port, .attempted = true, }; @@ -275,7 +247,6 @@ void tf_ssb_connections_set_succeeded(tf_ssb_connections_t* connections, const c { tf_ssb_connections_update_t* update = tf_malloc(sizeof(tf_ssb_connections_update_t)); *update = (tf_ssb_connections_update_t) { - .ssb = connections->ssb, .port = port, .succeeded = true, }; diff --git a/src/ssb.db.c b/src/ssb.db.c index 86b9cd11..de8b6eab 100644 --- a/src/ssb.db.c +++ b/src/ssb.db.c @@ -19,8 +19,7 @@ typedef struct _message_store_t message_store_t; -static void _tf_ssb_db_store_message_work_finish(message_store_t* store); -static void _tf_ssb_db_store_message_after_work(uv_work_t* work, int status); +static void _tf_ssb_db_store_message_after_work(tf_ssb_t* ssb, int status, void* user_data); static void _tf_ssb_db_exec(sqlite3* db, const char* statement) { @@ -400,8 +399,6 @@ static char* _tf_ssb_db_get_message_blob_wants(tf_ssb_t* ssb, int64_t rowid) typedef struct _message_store_t { - uv_work_t work; - tf_ssb_t* ssb; char id[k_id_base64_len]; char signature[512]; int flags; @@ -421,21 +418,16 @@ typedef struct _message_store_t message_store_t* next; } message_store_t; -static void _tf_ssb_db_store_message_work(uv_work_t* work) +static void _tf_ssb_db_store_message_work(tf_ssb_t* ssb, void* user_data) { - message_store_t* store = work->data; - tf_ssb_record_thread_busy(store->ssb, true); - tf_trace_t* trace = tf_ssb_get_trace(store->ssb); - tf_trace_begin(trace, "message_store_work"); - int64_t last_row_id = _tf_ssb_db_store_message_raw(store->ssb, store->id, *store->previous ? store->previous : NULL, store->author, store->sequence, store->timestamp, - store->content, store->length, store->signature, store->flags); + message_store_t* store = user_data; + int64_t last_row_id = _tf_ssb_db_store_message_raw( + ssb, store->id, *store->previous ? store->previous : NULL, store->author, store->sequence, store->timestamp, store->content, store->length, store->signature, store->flags); if (last_row_id != -1) { store->out_stored = true; - store->out_blob_wants = _tf_ssb_db_get_message_blob_wants(store->ssb, last_row_id); + store->out_blob_wants = _tf_ssb_db_get_message_blob_wants(ssb, last_row_id); } - tf_trace_end(trace); - tf_ssb_record_thread_busy(store->ssb, false); } static void _wake_up_queue(tf_ssb_t* ssb, tf_ssb_store_queue_t* queue) @@ -452,38 +444,19 @@ static void _wake_up_queue(tf_ssb_t* ssb, tf_ssb_store_queue_t* queue) } next->next = NULL; queue->running = true; - int r = uv_queue_work(tf_ssb_get_loop(ssb), &next->work, _tf_ssb_db_store_message_work, _tf_ssb_db_store_message_after_work); - if (r) - { - _tf_ssb_db_store_message_work_finish(next); - } + tf_ssb_run_work(ssb, _tf_ssb_db_store_message_work, _tf_ssb_db_store_message_after_work, next); } } } -static void _tf_ssb_db_store_message_work_finish(message_store_t* store) +static void _tf_ssb_db_store_message_after_work(tf_ssb_t* ssb, int status, void* user_data) { - JSContext* context = tf_ssb_get_context(store->ssb); - if (store->callback) - { - store->callback(store->id, store->out_stored, store->user_data); - } - JS_FreeCString(context, store->content); - tf_ssb_store_queue_t* queue = tf_ssb_get_store_queue(store->ssb); - queue->running = false; - _wake_up_queue(store->ssb, queue); - tf_free(store); -} - -static void _tf_ssb_db_store_message_after_work(uv_work_t* work, int status) -{ - message_store_t* store = work->data; - tf_trace_t* trace = tf_ssb_get_trace(store->ssb); - tf_trace_begin(trace, "message_store_after_work"); + message_store_t* store = user_data; + tf_trace_t* trace = tf_ssb_get_trace(ssb); if (store->out_stored) { tf_trace_begin(trace, "notify_message_added"); - JSContext* context = tf_ssb_get_context(store->ssb); + JSContext* context = tf_ssb_get_context(ssb); JSValue formatted = tf_ssb_format_message(context, store->previous, store->author, store->sequence, store->timestamp, "sha256", store->content, store->signature, store->flags); JSValue message = JS_NewObject(context); @@ -492,7 +465,7 @@ static void _tf_ssb_db_store_message_after_work(uv_work_t* work, int status) char timestamp_string[256]; snprintf(timestamp_string, sizeof(timestamp_string), "%f", store->timestamp); JS_SetPropertyStr(context, message, "timestamp", JS_NewString(context, timestamp_string)); - tf_ssb_notify_message_added(store->ssb, store->id, message); + tf_ssb_notify_message_added(ssb, store->id, message); JS_FreeValue(context, message); tf_trace_end(trace); } @@ -501,13 +474,22 @@ static void _tf_ssb_db_store_message_after_work(uv_work_t* work, int status) tf_trace_begin(trace, "notify_blob_wants_added"); for (char* p = store->out_blob_wants; *p; p = p + strlen(p)) { - tf_ssb_notify_blob_want_added(store->ssb, p); + tf_ssb_notify_blob_want_added(ssb, p); } tf_free(store->out_blob_wants); tf_trace_end(trace); } - _tf_ssb_db_store_message_work_finish(store); - tf_trace_end(trace); + + JSContext* context = tf_ssb_get_context(ssb); + if (store->callback) + { + store->callback(store->id, store->out_stored, store->user_data); + } + JS_FreeCString(context, store->content); + tf_ssb_store_queue_t* queue = tf_ssb_get_store_queue(ssb); + queue->running = false; + _wake_up_queue(ssb, queue); + tf_free(store); } void tf_ssb_db_store_message( @@ -539,13 +521,7 @@ void tf_ssb_db_store_message( JS_FreeValue(context, contentval); message_store_t* store = tf_malloc(sizeof(message_store_t)); - *store = (message_store_t) - { - .work = - { - .data = store, - }, - .ssb = ssb, + *store = (message_store_t) { .sequence = sequence, .timestamp = timestamp, .content = contentstr, @@ -660,8 +636,6 @@ bool tf_ssb_db_blob_get(tf_ssb_t* ssb, const char* id, uint8_t** out_blob, size_ typedef struct _blob_store_work_t { - uv_work_t work; - tf_ssb_t* ssb; const uint8_t* blob; size_t size; char id[k_blob_id_len]; @@ -670,25 +644,18 @@ typedef struct _blob_store_work_t void* user_data; } blob_store_work_t; -static void _tf_ssb_db_blob_store_work(uv_work_t* work) +static void _tf_ssb_db_blob_store_work(tf_ssb_t* ssb, void* user_data) { - blob_store_work_t* blob_work = work->data; - tf_ssb_record_thread_busy(blob_work->ssb, true); - tf_trace_t* trace = tf_ssb_get_trace(blob_work->ssb); - tf_trace_begin(trace, "blob_store_work"); - tf_ssb_db_blob_store(blob_work->ssb, blob_work->blob, blob_work->size, blob_work->id, sizeof(blob_work->id), &blob_work->is_new); - tf_trace_end(trace); - tf_ssb_record_thread_busy(blob_work->ssb, false); + blob_store_work_t* blob_work = user_data; + tf_ssb_db_blob_store(ssb, blob_work->blob, blob_work->size, blob_work->id, sizeof(blob_work->id), &blob_work->is_new); } -static void _tf_ssb_db_blob_store_after_work(uv_work_t* work, int status) +static void _tf_ssb_db_blob_store_after_work(tf_ssb_t* ssb, int status, void* user_data) { - blob_store_work_t* blob_work = work->data; - tf_trace_t* trace = tf_ssb_get_trace(blob_work->ssb); - tf_trace_begin(trace, "blob_store_after_work"); + blob_store_work_t* blob_work = user_data; if (status == 0 && *blob_work->id) { - tf_ssb_notify_blob_stored(blob_work->ssb, blob_work->id); + tf_ssb_notify_blob_stored(ssb, blob_work->id); } if (status != 0) { @@ -698,35 +665,19 @@ static void _tf_ssb_db_blob_store_after_work(uv_work_t* work, int status) { blob_work->callback(status == 0 ? blob_work->id : NULL, blob_work->is_new, blob_work->user_data); } - tf_trace_end(trace); tf_free(blob_work); } void tf_ssb_db_blob_store_async(tf_ssb_t* ssb, const uint8_t* blob, size_t size, tf_ssb_db_blob_store_callback_t* callback, void* user_data) { blob_store_work_t* work = tf_malloc(sizeof(blob_store_work_t)); - *work = (blob_store_work_t) - { - .work = - { - .data = work, - }, - .ssb = ssb, + *work = (blob_store_work_t) { .blob = blob, .size = size, .callback = callback, .user_data = user_data, }; - int r = uv_queue_work(tf_ssb_get_loop(ssb), &work->work, _tf_ssb_db_blob_store_work, _tf_ssb_db_blob_store_after_work); - if (r) - { - tf_printf("tf_ssb_db_blob_store_async -> uv_queue_work failed immediately: %s\n", uv_strerror(r)); - if (callback) - { - callback(NULL, false, user_data); - } - tf_free(work); - } + tf_ssb_run_work(ssb, _tf_ssb_db_blob_store_work, _tf_ssb_db_blob_store_after_work, work); } bool tf_ssb_db_blob_store(tf_ssb_t* ssb, const uint8_t* blob, size_t size, char* out_id, size_t out_id_size, bool* out_new) @@ -1755,19 +1706,17 @@ bool tf_ssb_db_identity_get_active(sqlite3* db, const char* user, const char* pa typedef struct _resolve_index_t { - uv_work_t work; - tf_ssb_t* ssb; const char* host; const char* path; void (*callback)(const char* path, void* user_data); void* user_data; } resolve_index_t; -static void _tf_ssb_db_resolve_index_work(uv_work_t* work) +static void _tf_ssb_db_resolve_index_work(tf_ssb_t* ssb, void* user_data) { - resolve_index_t* request = work->data; + resolve_index_t* request = user_data; - sqlite3* db = tf_ssb_acquire_db_reader(request->ssb); + sqlite3* db = tf_ssb_acquire_db_reader(ssb); sqlite3_stmt* statement; if (sqlite3_prepare(db, "SELECT json_extract(value, '$.index_map') FROM properties WHERE id = 'core' AND key = 'settings'", -1, &statement, NULL) == SQLITE_OK) { @@ -1813,12 +1762,12 @@ static void _tf_ssb_db_resolve_index_work(uv_work_t* work) tf_printf("prepare failed: %s\n", sqlite3_errmsg(db)); } } - tf_ssb_release_db_reader(request->ssb, db); + tf_ssb_release_db_reader(ssb, db); } -static void _tf_ssb_db_resolve_index_after_work(uv_work_t* work, int status) +static void _tf_ssb_db_resolve_index_after_work(tf_ssb_t* ssb, int status, void* user_data) { - resolve_index_t* request = work->data; + resolve_index_t* request = user_data; request->callback(request->path, request->user_data); tf_free((void*)request->host); tf_free((void*)request->path); @@ -1829,15 +1778,9 @@ void tf_ssb_db_resolve_index_async(tf_ssb_t* ssb, const char* host, void (*callb { resolve_index_t* request = tf_malloc(sizeof(resolve_index_t)); *request = (resolve_index_t) { - .work = { .data = request }, - .ssb = ssb, .host = tf_strdup(host), .callback = callback, .user_data = user_data, }; - int r = uv_queue_work(tf_ssb_get_loop(ssb), &request->work, _tf_ssb_db_resolve_index_work, _tf_ssb_db_resolve_index_after_work); - if (r) - { - _tf_ssb_db_resolve_index_after_work(&request->work, r); - } + tf_ssb_run_work(ssb, _tf_ssb_db_resolve_index_work, _tf_ssb_db_resolve_index_after_work, request); } diff --git a/src/ssb.h b/src/ssb.h index 678c3f90..0be2401c 100644 --- a/src/ssb.h +++ b/src/ssb.h @@ -755,6 +755,16 @@ void tf_ssb_connection_schedule_idle(tf_ssb_connection_t* connection, tf_ssb_sch void tf_ssb_connection_run_work(tf_ssb_connection_t* connection, void (*work_callback)(tf_ssb_connection_t* connection, void* user_data), void (*after_work_callback)(tf_ssb_connection_t* connection, int result, void* user_data), void* user_data); +/** +** Schedule work to run on a worker thread. +** @param ssb The owning SSB instance. +** @param work_callback The callback to run on a thread. +** @param after_work_callback The callback to run on the main thread when the work is complete. +** @param user_data User data to pass to the callback. +*/ +void tf_ssb_run_work( + tf_ssb_t* ssb, void (*work_callback)(tf_ssb_t* ssb, void* user_data), void (*after_work_callback)(tf_ssb_t* ssb, int result, void* user_data), void* user_data); + /** ** Register for new messages on a connection. ** @param connection The SHS connection. diff --git a/src/ssb.js.c b/src/ssb.js.c index 91969c76..32d15373 100644 --- a/src/ssb.js.c +++ b/src/ssb.js.c @@ -4,7 +4,6 @@ #include "mem.h" #include "ssb.db.h" #include "ssb.h" -#include "trace.h" #include "util.js.h" #include "sodium/crypto_box.h" @@ -302,8 +301,6 @@ static JSValue _tf_ssb_getAllIdentities(JSContext* context, JSValueConst this_va typedef struct _active_identity_work_t { - uv_work_t request; - tf_ssb_t* ssb; JSContext* context; const char* name; const char* package_owner; @@ -322,29 +319,22 @@ static void _tf_ssb_getActiveIdentity_visit(const char* identity, void* user_dat } } -static void _tf_ssb_getActiveIdentity_work(uv_work_t* work) +static void _tf_ssb_getActiveIdentity_work(tf_ssb_t* ssb, void* user_data) { - active_identity_work_t* request = work->data; - tf_ssb_record_thread_busy(request->ssb, true); - tf_trace_t* trace = tf_ssb_get_trace(request->ssb); - tf_trace_begin(trace, "_tf_ssb_getActiveIdentity_work"); - - sqlite3* db = tf_ssb_acquire_db_reader(request->ssb); + active_identity_work_t* request = user_data; + sqlite3* db = tf_ssb_acquire_db_reader(ssb); tf_ssb_db_identity_get_active(db, request->name, request->package_owner, request->package_name, request->identity, sizeof(request->identity)); - tf_ssb_release_db_reader(request->ssb, db); + tf_ssb_release_db_reader(ssb, db); if (!*request->identity) { - tf_ssb_db_identity_visit(request->ssb, request->name, _tf_ssb_getActiveIdentity_visit, request); + tf_ssb_db_identity_visit(ssb, request->name, _tf_ssb_getActiveIdentity_visit, request); } - - tf_trace_end(trace); - tf_ssb_record_thread_busy(request->ssb, false); } -static void _tf_ssb_getActiveIdentity_after_work(uv_work_t* work, int status) +static void _tf_ssb_getActiveIdentity_after_work(tf_ssb_t* ssb, int status, void* user_data) { - active_identity_work_t* request = work->data; + active_identity_work_t* request = user_data; JSContext* context = request->context; if (request->result == 0) { @@ -370,13 +360,12 @@ static void _tf_ssb_getActiveIdentity_after_work(uv_work_t* work, int status) static JSValue _tf_ssb_getActiveIdentity(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv) { + tf_ssb_t* ssb = JS_GetOpaque(this_val, _tf_ssb_classId); const char* name = JS_ToCString(context, argv[0]); const char* package_owner = JS_ToCString(context, argv[1]); const char* package_name = JS_ToCString(context, argv[2]); active_identity_work_t* work = tf_malloc(sizeof(active_identity_work_t)); *work = (active_identity_work_t) { - .request = { .data = work }, - .ssb = JS_GetOpaque(this_val, _tf_ssb_classId), .context = context, .name = tf_strdup(name), .package_owner = tf_strdup(package_owner), @@ -387,18 +376,12 @@ static JSValue _tf_ssb_getActiveIdentity(JSContext* context, JSValueConst this_v JS_FreeCString(context, package_owner); JS_FreeCString(context, package_name); - int r = uv_queue_work(tf_ssb_get_loop(work->ssb), &work->request, _tf_ssb_getActiveIdentity_work, _tf_ssb_getActiveIdentity_after_work); - if (r) - { - _tf_ssb_getActiveIdentity_after_work(&work->request, r); - } + tf_ssb_run_work(ssb, _tf_ssb_getActiveIdentity_work, _tf_ssb_getActiveIdentity_after_work, work); return result; } typedef struct _identity_info_work_t { - uv_work_t request; - tf_ssb_t* ssb; JSContext* context; const char* name; const char* package_owner; @@ -422,12 +405,12 @@ static void _tf_ssb_getIdentityInfo_visit(const char* identity, void* data) ; } -static void _tf_ssb_getIdentityInfo_work(uv_work_t* work) +static void _tf_ssb_getIdentityInfo_work(tf_ssb_t* ssb, void* user_data) { - identity_info_work_t* request = work->data; - tf_ssb_db_identity_visit(request->ssb, request->name, _tf_ssb_getIdentityInfo_visit, request); + identity_info_work_t* request = user_data; + tf_ssb_db_identity_visit(ssb, request->name, _tf_ssb_getIdentityInfo_visit, request); - sqlite3* db = tf_ssb_acquire_db_reader(request->ssb); + sqlite3* db = tf_ssb_acquire_db_reader(ssb); sqlite3_stmt* statement = NULL; request->result = sqlite3_prepare(db, "SELECT author, name FROM ( " @@ -467,12 +450,12 @@ static void _tf_ssb_getIdentityInfo_work(uv_work_t* work) { snprintf(request->active_identity, sizeof(request->active_identity), "%s", request->identities[0]); } - tf_ssb_release_db_reader(request->ssb, db); + tf_ssb_release_db_reader(ssb, db); } -static void _tf_ssb_getIdentityInfo_after_work(uv_work_t* work, int status) +static void _tf_ssb_getIdentityInfo_after_work(tf_ssb_t* ssb, int status, void* user_data) { - identity_info_work_t* request = work->data; + identity_info_work_t* request = user_data; JSContext* context = request->context; JSValue result = JS_NewObject(context); @@ -514,13 +497,12 @@ static void _tf_ssb_getIdentityInfo_after_work(uv_work_t* work, int status) static JSValue _tf_ssb_getIdentityInfo(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv) { + tf_ssb_t* ssb = JS_GetOpaque(this_val, _tf_ssb_classId); const char* name = JS_ToCString(context, argv[0]); const char* package_owner = JS_ToCString(context, argv[1]); const char* package_name = JS_ToCString(context, argv[2]); identity_info_work_t* work = tf_malloc(sizeof(identity_info_work_t)); *work = (identity_info_work_t) { - .request = { .data = work }, - .ssb = JS_GetOpaque(this_val, _tf_ssb_classId), .context = context, .name = tf_strdup(name), .package_owner = tf_strdup(package_owner), @@ -531,11 +513,7 @@ static JSValue _tf_ssb_getIdentityInfo(JSContext* context, JSValueConst this_val JS_FreeCString(context, package_owner); JS_FreeCString(context, package_name); - int r = uv_queue_work(tf_ssb_get_loop(work->ssb), &work->request, _tf_ssb_getIdentityInfo_work, _tf_ssb_getIdentityInfo_after_work); - if (r) - { - _tf_ssb_getIdentityInfo_after_work(&work->request, r); - } + tf_ssb_run_work(ssb, _tf_ssb_getIdentityInfo_work, _tf_ssb_getIdentityInfo_after_work, work); return result; } @@ -845,7 +823,6 @@ typedef struct _sql_work_t uint8_t* rows; size_t binds_count; size_t rows_count; - uv_work_t request; uv_async_t async; uv_timer_t timeout; uv_mutex_t lock; @@ -861,13 +838,10 @@ static void _tf_ssb_sql_append(uint8_t** rows, size_t* rows_count, const void* d *rows_count += size; } -static void _tf_ssb_sqlAsync_work(uv_work_t* work) +static void _tf_ssb_sqlAsync_work(tf_ssb_t* ssb, void* user_data) { - sql_work_t* sql_work = work->data; - tf_ssb_record_thread_busy(sql_work->ssb, true); - tf_trace_t* trace = tf_ssb_get_trace(sql_work->ssb); - tf_trace_begin(trace, "sql_async_work"); - sqlite3* db = tf_ssb_acquire_db_reader_restricted(sql_work->ssb); + sql_work_t* sql_work = user_data; + sqlite3* db = tf_ssb_acquire_db_reader_restricted(ssb); uv_mutex_lock(&sql_work->lock); sql_work->db = db; uv_mutex_unlock(&sql_work->lock); @@ -975,9 +949,7 @@ static void _tf_ssb_sqlAsync_work(uv_work_t* work) uv_mutex_lock(&sql_work->lock); sql_work->db = NULL; uv_mutex_unlock(&sql_work->lock); - tf_ssb_release_db_reader(sql_work->ssb, db); - tf_ssb_record_thread_busy(sql_work->ssb, false); - tf_trace_end(trace); + tf_ssb_release_db_reader(ssb, db); } static void _tf_ssb_sqlAsync_handle_close(uv_handle_t* handle) @@ -1003,12 +975,10 @@ static void _tf_ssb_sqlAsync_destroy(sql_work_t* work) uv_close((uv_handle_t*)&work->async, _tf_ssb_sqlAsync_handle_close); } -static void _tf_ssb_sqlAsync_after_work(uv_work_t* work, int status) +static void _tf_ssb_sqlAsync_after_work(tf_ssb_t* ssb, int status, void* user_data) { - sql_work_t* sql_work = work->data; - tf_trace_t* trace = tf_ssb_get_trace(sql_work->ssb); - tf_trace_begin(trace, "sql_async_after_work"); - JSContext* context = tf_ssb_get_context(sql_work->ssb); + sql_work_t* sql_work = user_data; + JSContext* context = tf_ssb_get_context(ssb); uint8_t* p = sql_work->rows; while (p < sql_work->rows + sql_work->rows_count) { @@ -1093,7 +1063,6 @@ static void _tf_ssb_sqlAsync_after_work(uv_work_t* work, int status) JS_FreeValue(context, sql_work->callback); JS_FreeCString(context, sql_work->query); _tf_ssb_sqlAsync_destroy(sql_work); - tf_trace_end(trace); } static void _tf_ssb_sqlAsync_timeout(uv_timer_t* timer) @@ -1120,10 +1089,6 @@ static JSValue _tf_ssb_sqlAsync(JSContext* context, JSValueConst this_val, int a sql_work_t* work = tf_malloc(sizeof(sql_work_t)); *work = (sql_work_t) { - .request = - { - .data = work, - }, .async = { .data = work, @@ -1183,11 +1148,7 @@ static JSValue _tf_ssb_sqlAsync(JSContext* context, JSValueConst this_val, int a } JS_FreeValue(context, value); } - int r = uv_queue_work(tf_ssb_get_loop(ssb), &work->request, _tf_ssb_sqlAsync_work, _tf_ssb_sqlAsync_after_work); - if (r) - { - error_value = JS_ThrowInternalError(context, "uv_queue_work failed: %s", uv_strerror(r)); - } + tf_ssb_run_work(ssb, _tf_ssb_sqlAsync_work, _tf_ssb_sqlAsync_after_work, work); } if (!JS_IsUndefined(error_value)) { @@ -1809,8 +1770,6 @@ static JSValue _tf_ssb_private_message_decrypt(JSContext* context, JSValueConst typedef struct _following_t { - uv_work_t work; - tf_ssb_t* ssb; JSContext* context; JSValue promise[2]; @@ -1821,17 +1780,15 @@ typedef struct _following_t const char* ids[]; } following_t; -static void _tf_ssb_following_work(uv_work_t* work) +static void _tf_ssb_following_work(tf_ssb_t* ssb, void* user_data) { - following_t* following = work->data; - tf_ssb_record_thread_busy(following->ssb, true); - following->out_following = tf_ssb_db_following_deep(following->ssb, following->ids, following->ids_count, following->depth); - tf_ssb_record_thread_busy(following->ssb, false); + following_t* following = user_data; + following->out_following = tf_ssb_db_following_deep(ssb, following->ids, following->ids_count, following->depth); } -static void _tf_ssb_following_after_work(uv_work_t* work, int status) +static void _tf_ssb_following_after_work(tf_ssb_t* ssb, int status, void* user_data) { - following_t* following = work->data; + following_t* following = user_data; JSContext* context = following->context; if (status == 0) { @@ -1878,14 +1835,8 @@ static JSValue _tf_ssb_following(JSContext* context, JSValueConst this_val, int int ids_count = tf_util_get_length(context, argv[0]); following_t* following = tf_malloc(sizeof(following_t) + sizeof(char*) * ids_count); - *following = (following_t) - { - .work = - { - .data = following, - }, + *following = (following_t) { .context = context, - .ssb = ssb, }; JS_ToInt32(context, &following->depth, argv[1]); @@ -1903,11 +1854,7 @@ static JSValue _tf_ssb_following(JSContext* context, JSValueConst this_val, int } } - int r = uv_queue_work(tf_ssb_get_loop(ssb), &following->work, _tf_ssb_following_work, _tf_ssb_following_after_work); - if (r) - { - _tf_ssb_following_after_work(&following->work, r); - } + tf_ssb_run_work(ssb, _tf_ssb_following_work, _tf_ssb_following_after_work, following); return result; } diff --git a/src/ssb.rpc.c b/src/ssb.rpc.c index fe37b84d..9fa6cf8b 100644 --- a/src/ssb.rpc.c +++ b/src/ssb.rpc.c @@ -1137,12 +1137,6 @@ static void _tf_ssb_rpc_connections_changed_callback(tf_ssb_t* ssb, tf_ssb_chang } } -typedef struct _delete_blobs_work_t -{ - uv_work_t work; - tf_ssb_t* ssb; -} delete_blobs_work_t; - static void _tf_ssb_rpc_checkpoint(tf_ssb_t* ssb) { int64_t checkpoint_start_ms = uv_hrtime(); @@ -1160,16 +1154,12 @@ static void _tf_ssb_rpc_checkpoint(tf_ssb_t* ssb) tf_ssb_release_db_writer(ssb, db); } -static void _tf_ssb_rpc_delete_blobs_work(uv_work_t* work) +static void _tf_ssb_rpc_delete_blobs_work(tf_ssb_t* ssb, void* user_data) { - delete_blobs_work_t* delete = work->data; - tf_ssb_t* ssb = delete->ssb; - tf_ssb_record_thread_busy(ssb, true); int64_t age = _get_global_setting_int64(ssb, "blob_expire_age_seconds", -1); if (age <= 0) { _tf_ssb_rpc_checkpoint(ssb); - tf_ssb_record_thread_busy(ssb, false); return; } int64_t start_ns = uv_hrtime(); @@ -1211,28 +1201,16 @@ static void _tf_ssb_rpc_delete_blobs_work(uv_work_t* work) tf_printf("Deleted %d blobs in %d ms.\n", deleted, (int)duration_ms); _tf_ssb_rpc_checkpoint(ssb); _tf_ssb_rpc_start_delete_blobs(ssb, deleted ? (int)duration_ms : (15 * 60 * 1000)); - tf_ssb_record_thread_busy(ssb, false); } -static void _tf_ssb_rpc_delete_blobs_after_work(uv_work_t* work, int status) +static void _tf_ssb_rpc_delete_blobs_after_work(tf_ssb_t* ssb, int status, void* user_data) { - delete_blobs_work_t* delete = work->data; - tf_ssb_unref(delete->ssb); - tf_free(delete); + tf_ssb_unref(ssb); } static void _tf_ssb_rpc_start_delete_callback(tf_ssb_t* ssb, void* user_data) { - delete_blobs_work_t* work = tf_malloc(sizeof(delete_blobs_work_t)); - *work = (delete_blobs_work_t) { .work = { .data = work }, .ssb = ssb }; - tf_ssb_ref(ssb); - int r = uv_queue_work(tf_ssb_get_loop(ssb), &work->work, _tf_ssb_rpc_delete_blobs_work, _tf_ssb_rpc_delete_blobs_after_work); - if (r) - { - tf_printf("uv_queue_work: %s\n", uv_strerror(r)); - tf_free(work); - tf_ssb_unref(ssb); - } + tf_ssb_run_work(ssb, _tf_ssb_rpc_delete_blobs_work, _tf_ssb_rpc_delete_blobs_after_work, NULL); } static void _tf_ssb_rpc_start_delete_blobs(tf_ssb_t* ssb, int delay_ms)