Refactor most uses of uv_queue_work to go through a helper that keeps track of thread business, traces, and is generally less code.

This commit is contained in:
Cory McWilliams 2024-05-08 21:00:37 -04:00
parent 5ca5323782
commit 385524352c
6 changed files with 195 additions and 292 deletions

132
src/ssb.c
View File

@ -764,8 +764,15 @@ void tf_ssb_connection_rpc_send(tf_ssb_connection_t* connection, uint8_t flags,
assert(new_request_name);
}
else if (!_tf_ssb_connection_get_request_callback(connection, request_number, NULL, NULL))
{
if (flags & k_ssb_rpc_flag_binary)
{
tf_printf("Dropping message with no active request (%d): (%zd bytes).\n", request_number, size);
}
else
{
tf_printf("Dropping message with no active request (%d): %.*s\n", request_number, (int)size, message);
}
return;
}
@ -3714,6 +3721,8 @@ typedef struct _connection_work_t
{
uv_work_t work;
tf_ssb_connection_t* connection;
const char* name;
const char* after_name;
void (*work_callback)(tf_ssb_connection_t* connection, void* user_data);
void (*after_work_callback)(tf_ssb_connection_t* connection, int result, void* user_data);
void* user_data;
@ -3725,7 +3734,9 @@ static void _tf_ssb_connection_work_callback(uv_work_t* work)
tf_ssb_record_thread_busy(data->connection->ssb, true);
if (data->work_callback)
{
tf_trace_begin(data->connection->ssb->trace, data->name);
data->work_callback(data->connection, data->user_data);
tf_trace_end(data->connection->ssb->trace);
}
tf_ssb_record_thread_busy(data->connection->ssb, false);
}
@ -3735,13 +3746,17 @@ static void _tf_ssb_connection_after_work_callback(uv_work_t* work, int status)
connection_work_t* data = work->data;
if (data->after_work_callback)
{
tf_trace_begin(data->connection->ssb->trace, data->after_name);
data->after_work_callback(data->connection, status, data->user_data);
tf_trace_end(data->connection->ssb->trace);
}
data->connection->ref_count--;
if (data->connection->ref_count == 0 && data->connection->closing)
{
_tf_ssb_connection_destroy(data->connection, "work completed");
}
tf_free((void*)data->name);
tf_free((void*)data->after_name);
tf_free(data);
}
@ -3769,6 +3784,70 @@ void tf_ssb_connection_run_work(tf_ssb_connection_t* connection, void (*work_cal
}
}
typedef struct _ssb_work_t
{
uv_work_t work;
tf_ssb_t* ssb;
const char* name;
const char* after_name;
void (*work_callback)(tf_ssb_t* ssb, void* user_data);
void (*after_work_callback)(tf_ssb_t* ssb, int result, void* user_data);
void* user_data;
} ssb_work_t;
static void _tf_ssb_work_callback(uv_work_t* work)
{
ssb_work_t* data = work->data;
tf_ssb_record_thread_busy(data->ssb, true);
if (data->work_callback)
{
tf_trace_begin(data->ssb->trace, data->name);
data->work_callback(data->ssb, data->user_data);
tf_trace_end(data->ssb->trace);
}
tf_ssb_record_thread_busy(data->ssb, false);
}
static void _tf_ssb_after_work_callback(uv_work_t* work, int status)
{
ssb_work_t* data = work->data;
if (data->after_work_callback)
{
tf_trace_begin(data->ssb->trace, data->after_name);
data->after_work_callback(data->ssb, status, data->user_data);
tf_trace_end(data->ssb->trace);
}
tf_ssb_unref(data->ssb);
tf_free((void*)data->name);
tf_free((void*)data->after_name);
tf_free(data);
}
void tf_ssb_run_work(tf_ssb_t* ssb, void (*work_callback)(tf_ssb_t* ssb, void* user_data), void (*after_work_callback)(tf_ssb_t* ssb, int result, void* user_data), void* user_data)
{
ssb_work_t* work = tf_malloc(sizeof(ssb_work_t));
*work = (ssb_work_t)
{
.work =
{
.data = work,
},
.name = tf_util_function_to_string(work_callback),
.after_name = tf_util_function_to_string(after_work_callback),
.ssb = ssb,
.work_callback = work_callback,
.after_work_callback = after_work_callback,
.user_data = user_data,
};
tf_ssb_ref(ssb);
int result = uv_queue_work(ssb->loop, &work->work, _tf_ssb_work_callback, _tf_ssb_after_work_callback);
if (result)
{
_tf_ssb_connection_after_work_callback(&work->work, result);
}
}
bool tf_ssb_is_room(tf_ssb_t* ssb)
{
return ssb->is_room;
@ -3792,8 +3871,6 @@ void tf_ssb_set_room_name(tf_ssb_t* ssb, const char* room_name)
typedef struct _update_settings_t
{
uv_work_t work;
tf_ssb_t* ssb;
bool is_room;
char room_name[1024];
} update_settings_t;
@ -3847,58 +3924,35 @@ static bool _get_global_setting_bool(tf_ssb_t* ssb, const char* name, bool defau
return result;
}
static void _tf_ssb_update_settings_work(uv_work_t* work)
static void _tf_ssb_update_settings_work(tf_ssb_t* ssb, void* user_data)
{
update_settings_t* update = work->data;
tf_ssb_record_thread_busy(update->ssb, true);
update->is_room = _get_global_setting_bool(update->ssb, "room", true);
_get_global_setting_string(update->ssb, "room_name", update->room_name, sizeof(update->room_name));
tf_ssb_record_thread_busy(update->ssb, false);
update_settings_t* update = user_data;
update->is_room = _get_global_setting_bool(ssb, "room", true);
_get_global_setting_string(ssb, "room_name", update->room_name, sizeof(update->room_name));
}
static void _tf_ssb_update_settings_after_work(uv_work_t* work, int result)
static void _tf_ssb_update_settings_after_work(tf_ssb_t* ssb, int result, void* user_data)
{
update_settings_t* update = work->data;
tf_ssb_unref(update->ssb);
tf_ssb_set_is_room(update->ssb, update->is_room);
tf_ssb_set_room_name(update->ssb, update->room_name);
_tf_ssb_start_update_settings(update->ssb);
update_settings_t* update = user_data;
tf_ssb_set_is_room(ssb, update->is_room);
tf_ssb_set_room_name(ssb, update->room_name);
_tf_ssb_start_update_settings(ssb);
tf_free(update);
}
static void _tf_ssb_start_update_settings_timer(tf_ssb_t* ssb, void* user_data)
{
update_settings_t* update = tf_malloc(sizeof(update_settings_t));
*update = (update_settings_t)
{
.work =
{
.data = update,
},
.ssb = ssb,
};
tf_ssb_ref(ssb);
int result = uv_queue_work(tf_ssb_get_loop(ssb), &update->work, _tf_ssb_update_settings_work, _tf_ssb_update_settings_after_work);
if (result)
{
_tf_ssb_update_settings_after_work(&update->work, result);
}
*update = (update_settings_t) { 0 };
tf_ssb_run_work(ssb, _tf_ssb_update_settings_work, _tf_ssb_update_settings_after_work, update);
}
static void _tf_ssb_update_settings(tf_ssb_t* ssb)
{
update_settings_t* update = tf_malloc(sizeof(update_settings_t));
*update = (update_settings_t)
{
.work =
{
.data = update,
},
.ssb = ssb,
};
tf_ssb_ref(ssb);
_tf_ssb_update_settings_work(&update->work);
_tf_ssb_update_settings_after_work(&update->work, 0);
*update = (update_settings_t) { 0 };
_tf_ssb_update_settings_work(ssb, update);
_tf_ssb_update_settings_after_work(ssb, 0, update);
}
static void _tf_ssb_start_update_settings(tf_ssb_t* ssb)

View File

@ -76,35 +76,30 @@ static bool _tf_ssb_connections_get_next_connection(tf_ssb_connections_t* connec
typedef struct _tf_ssb_connections_get_next_t
{
uv_work_t work;
tf_ssb_connections_t* connections;
tf_ssb_t* ssb;
bool ready;
char host[256];
int port;
char key[k_id_base64_len];
} tf_ssb_connections_get_next_t;
static void _tf_ssb_connections_get_next_work(uv_work_t* work)
static void _tf_ssb_connections_get_next_work(tf_ssb_t* ssb, void* user_data)
{
tf_ssb_connections_get_next_t* next = work->data;
tf_ssb_record_thread_busy(next->ssb, true);
tf_ssb_connections_get_next_t* next = user_data;
next->ready = _tf_ssb_connections_get_next_connection(next->connections, next->host, sizeof(next->host), &next->port, next->key, sizeof(next->key));
tf_ssb_record_thread_busy(next->ssb, false);
}
static void _tf_ssb_connections_get_next_after_work(uv_work_t* work, int status)
static void _tf_ssb_connections_get_next_after_work(tf_ssb_t* ssb, int status, void* user_data)
{
tf_ssb_connections_get_next_t* next = work->data;
tf_ssb_connections_get_next_t* next = user_data;
if (next->ready)
{
uint8_t key_bin[k_id_bin_len];
if (tf_ssb_id_str_to_bin(key_bin, next->key))
{
tf_ssb_connect(next->ssb, next->host, next->port, key_bin);
tf_ssb_connect(ssb, next->host, next->port, key_bin);
}
}
tf_ssb_unref(next->ssb);
tf_free(next);
}
@ -116,21 +111,10 @@ static void _tf_ssb_connections_timer(uv_timer_t* timer)
if (count < (int)_countof(active))
{
tf_ssb_connections_get_next_t* next = tf_malloc(sizeof(tf_ssb_connections_get_next_t));
*next = (tf_ssb_connections_get_next_t)
{
.work =
{
.data = next,
},
.ssb = connections->ssb,
*next = (tf_ssb_connections_get_next_t) {
.connections = connections,
};
tf_ssb_ref(connections->ssb);
int result = uv_queue_work(tf_ssb_get_loop(connections->ssb), &next->work, _tf_ssb_connections_get_next_work, _tf_ssb_connections_get_next_after_work);
if (result)
{
_tf_ssb_connections_get_next_after_work(&next->work, result);
}
tf_ssb_run_work(connections->ssb, _tf_ssb_connections_get_next_work, _tf_ssb_connections_get_next_after_work, next);
}
}
@ -165,8 +149,6 @@ void tf_ssb_connections_destroy(tf_ssb_connections_t* connections)
typedef struct _tf_ssb_connections_update_t
{
uv_work_t work;
tf_ssb_t* ssb;
char host[256];
int port;
char key[k_id_base64_len];
@ -174,12 +156,11 @@ typedef struct _tf_ssb_connections_update_t
bool succeeded;
} tf_ssb_connections_update_t;
static void _tf_ssb_connections_update_work(uv_work_t* work)
static void _tf_ssb_connections_update_work(tf_ssb_t* ssb, void* user_data)
{
tf_ssb_connections_update_t* update = work->data;
tf_ssb_record_thread_busy(update->ssb, true);
tf_ssb_connections_update_t* update = user_data;
sqlite3_stmt* statement;
sqlite3* db = tf_ssb_acquire_db_writer(update->ssb);
sqlite3* db = tf_ssb_acquire_db_writer(ssb);
if (update->attempted)
{
if (sqlite3_prepare(db, "UPDATE connections SET last_attempt = strftime('%s', 'now') WHERE host = ?1 AND port = ?2 AND key = ?3", -1, &statement, NULL) == SQLITE_OK)
@ -226,31 +207,23 @@ static void _tf_ssb_connections_update_work(uv_work_t* work)
sqlite3_finalize(statement);
}
}
tf_ssb_release_db_writer(update->ssb, db);
tf_ssb_record_thread_busy(update->ssb, false);
tf_ssb_release_db_writer(ssb, db);
}
static void _tf_ssb_connections_update_after_work(uv_work_t* work, int status)
static void _tf_ssb_connections_update_after_work(tf_ssb_t* ssb, int status, void* user_data)
{
tf_ssb_connections_update_t* update = work->data;
tf_free(update);
tf_free(user_data);
}
static void _tf_ssb_connections_queue_update(tf_ssb_connections_t* connections, tf_ssb_connections_update_t* update)
{
update->work.data = update;
int result = uv_queue_work(tf_ssb_get_loop(connections->ssb), &update->work, _tf_ssb_connections_update_work, _tf_ssb_connections_update_after_work);
if (result)
{
_tf_ssb_connections_update_after_work(&update->work, result);
}
tf_ssb_run_work(connections->ssb, _tf_ssb_connections_update_work, _tf_ssb_connections_update_after_work, update);
}
void tf_ssb_connections_store(tf_ssb_connections_t* connections, const char* host, int port, const char* key)
{
tf_ssb_connections_update_t* update = tf_malloc(sizeof(tf_ssb_connections_update_t));
*update = (tf_ssb_connections_update_t) {
.ssb = connections->ssb,
.port = port,
};
snprintf(update->host, sizeof(update->host), "%s", host);
@ -262,7 +235,6 @@ void tf_ssb_connections_set_attempted(tf_ssb_connections_t* connections, const c
{
tf_ssb_connections_update_t* update = tf_malloc(sizeof(tf_ssb_connections_update_t));
*update = (tf_ssb_connections_update_t) {
.ssb = connections->ssb,
.port = port,
.attempted = true,
};
@ -275,7 +247,6 @@ void tf_ssb_connections_set_succeeded(tf_ssb_connections_t* connections, const c
{
tf_ssb_connections_update_t* update = tf_malloc(sizeof(tf_ssb_connections_update_t));
*update = (tf_ssb_connections_update_t) {
.ssb = connections->ssb,
.port = port,
.succeeded = true,
};

View File

@ -19,8 +19,7 @@
typedef struct _message_store_t message_store_t;
static void _tf_ssb_db_store_message_work_finish(message_store_t* store);
static void _tf_ssb_db_store_message_after_work(uv_work_t* work, int status);
static void _tf_ssb_db_store_message_after_work(tf_ssb_t* ssb, int status, void* user_data);
static void _tf_ssb_db_exec(sqlite3* db, const char* statement)
{
@ -400,8 +399,6 @@ static char* _tf_ssb_db_get_message_blob_wants(tf_ssb_t* ssb, int64_t rowid)
typedef struct _message_store_t
{
uv_work_t work;
tf_ssb_t* ssb;
char id[k_id_base64_len];
char signature[512];
int flags;
@ -421,21 +418,16 @@ typedef struct _message_store_t
message_store_t* next;
} message_store_t;
static void _tf_ssb_db_store_message_work(uv_work_t* work)
static void _tf_ssb_db_store_message_work(tf_ssb_t* ssb, void* user_data)
{
message_store_t* store = work->data;
tf_ssb_record_thread_busy(store->ssb, true);
tf_trace_t* trace = tf_ssb_get_trace(store->ssb);
tf_trace_begin(trace, "message_store_work");
int64_t last_row_id = _tf_ssb_db_store_message_raw(store->ssb, store->id, *store->previous ? store->previous : NULL, store->author, store->sequence, store->timestamp,
store->content, store->length, store->signature, store->flags);
message_store_t* store = user_data;
int64_t last_row_id = _tf_ssb_db_store_message_raw(
ssb, store->id, *store->previous ? store->previous : NULL, store->author, store->sequence, store->timestamp, store->content, store->length, store->signature, store->flags);
if (last_row_id != -1)
{
store->out_stored = true;
store->out_blob_wants = _tf_ssb_db_get_message_blob_wants(store->ssb, last_row_id);
store->out_blob_wants = _tf_ssb_db_get_message_blob_wants(ssb, last_row_id);
}
tf_trace_end(trace);
tf_ssb_record_thread_busy(store->ssb, false);
}
static void _wake_up_queue(tf_ssb_t* ssb, tf_ssb_store_queue_t* queue)
@ -452,38 +444,19 @@ static void _wake_up_queue(tf_ssb_t* ssb, tf_ssb_store_queue_t* queue)
}
next->next = NULL;
queue->running = true;
int r = uv_queue_work(tf_ssb_get_loop(ssb), &next->work, _tf_ssb_db_store_message_work, _tf_ssb_db_store_message_after_work);
if (r)
{
_tf_ssb_db_store_message_work_finish(next);
}
tf_ssb_run_work(ssb, _tf_ssb_db_store_message_work, _tf_ssb_db_store_message_after_work, next);
}
}
}
static void _tf_ssb_db_store_message_work_finish(message_store_t* store)
static void _tf_ssb_db_store_message_after_work(tf_ssb_t* ssb, int status, void* user_data)
{
JSContext* context = tf_ssb_get_context(store->ssb);
if (store->callback)
{
store->callback(store->id, store->out_stored, store->user_data);
}
JS_FreeCString(context, store->content);
tf_ssb_store_queue_t* queue = tf_ssb_get_store_queue(store->ssb);
queue->running = false;
_wake_up_queue(store->ssb, queue);
tf_free(store);
}
static void _tf_ssb_db_store_message_after_work(uv_work_t* work, int status)
{
message_store_t* store = work->data;
tf_trace_t* trace = tf_ssb_get_trace(store->ssb);
tf_trace_begin(trace, "message_store_after_work");
message_store_t* store = user_data;
tf_trace_t* trace = tf_ssb_get_trace(ssb);
if (store->out_stored)
{
tf_trace_begin(trace, "notify_message_added");
JSContext* context = tf_ssb_get_context(store->ssb);
JSContext* context = tf_ssb_get_context(ssb);
JSValue formatted =
tf_ssb_format_message(context, store->previous, store->author, store->sequence, store->timestamp, "sha256", store->content, store->signature, store->flags);
JSValue message = JS_NewObject(context);
@ -492,7 +465,7 @@ static void _tf_ssb_db_store_message_after_work(uv_work_t* work, int status)
char timestamp_string[256];
snprintf(timestamp_string, sizeof(timestamp_string), "%f", store->timestamp);
JS_SetPropertyStr(context, message, "timestamp", JS_NewString(context, timestamp_string));
tf_ssb_notify_message_added(store->ssb, store->id, message);
tf_ssb_notify_message_added(ssb, store->id, message);
JS_FreeValue(context, message);
tf_trace_end(trace);
}
@ -501,13 +474,22 @@ static void _tf_ssb_db_store_message_after_work(uv_work_t* work, int status)
tf_trace_begin(trace, "notify_blob_wants_added");
for (char* p = store->out_blob_wants; *p; p = p + strlen(p))
{
tf_ssb_notify_blob_want_added(store->ssb, p);
tf_ssb_notify_blob_want_added(ssb, p);
}
tf_free(store->out_blob_wants);
tf_trace_end(trace);
}
_tf_ssb_db_store_message_work_finish(store);
tf_trace_end(trace);
JSContext* context = tf_ssb_get_context(ssb);
if (store->callback)
{
store->callback(store->id, store->out_stored, store->user_data);
}
JS_FreeCString(context, store->content);
tf_ssb_store_queue_t* queue = tf_ssb_get_store_queue(ssb);
queue->running = false;
_wake_up_queue(ssb, queue);
tf_free(store);
}
void tf_ssb_db_store_message(
@ -539,13 +521,7 @@ void tf_ssb_db_store_message(
JS_FreeValue(context, contentval);
message_store_t* store = tf_malloc(sizeof(message_store_t));
*store = (message_store_t)
{
.work =
{
.data = store,
},
.ssb = ssb,
*store = (message_store_t) {
.sequence = sequence,
.timestamp = timestamp,
.content = contentstr,
@ -660,8 +636,6 @@ bool tf_ssb_db_blob_get(tf_ssb_t* ssb, const char* id, uint8_t** out_blob, size_
typedef struct _blob_store_work_t
{
uv_work_t work;
tf_ssb_t* ssb;
const uint8_t* blob;
size_t size;
char id[k_blob_id_len];
@ -670,25 +644,18 @@ typedef struct _blob_store_work_t
void* user_data;
} blob_store_work_t;
static void _tf_ssb_db_blob_store_work(uv_work_t* work)
static void _tf_ssb_db_blob_store_work(tf_ssb_t* ssb, void* user_data)
{
blob_store_work_t* blob_work = work->data;
tf_ssb_record_thread_busy(blob_work->ssb, true);
tf_trace_t* trace = tf_ssb_get_trace(blob_work->ssb);
tf_trace_begin(trace, "blob_store_work");
tf_ssb_db_blob_store(blob_work->ssb, blob_work->blob, blob_work->size, blob_work->id, sizeof(blob_work->id), &blob_work->is_new);
tf_trace_end(trace);
tf_ssb_record_thread_busy(blob_work->ssb, false);
blob_store_work_t* blob_work = user_data;
tf_ssb_db_blob_store(ssb, blob_work->blob, blob_work->size, blob_work->id, sizeof(blob_work->id), &blob_work->is_new);
}
static void _tf_ssb_db_blob_store_after_work(uv_work_t* work, int status)
static void _tf_ssb_db_blob_store_after_work(tf_ssb_t* ssb, int status, void* user_data)
{
blob_store_work_t* blob_work = work->data;
tf_trace_t* trace = tf_ssb_get_trace(blob_work->ssb);
tf_trace_begin(trace, "blob_store_after_work");
blob_store_work_t* blob_work = user_data;
if (status == 0 && *blob_work->id)
{
tf_ssb_notify_blob_stored(blob_work->ssb, blob_work->id);
tf_ssb_notify_blob_stored(ssb, blob_work->id);
}
if (status != 0)
{
@ -698,35 +665,19 @@ static void _tf_ssb_db_blob_store_after_work(uv_work_t* work, int status)
{
blob_work->callback(status == 0 ? blob_work->id : NULL, blob_work->is_new, blob_work->user_data);
}
tf_trace_end(trace);
tf_free(blob_work);
}
void tf_ssb_db_blob_store_async(tf_ssb_t* ssb, const uint8_t* blob, size_t size, tf_ssb_db_blob_store_callback_t* callback, void* user_data)
{
blob_store_work_t* work = tf_malloc(sizeof(blob_store_work_t));
*work = (blob_store_work_t)
{
.work =
{
.data = work,
},
.ssb = ssb,
*work = (blob_store_work_t) {
.blob = blob,
.size = size,
.callback = callback,
.user_data = user_data,
};
int r = uv_queue_work(tf_ssb_get_loop(ssb), &work->work, _tf_ssb_db_blob_store_work, _tf_ssb_db_blob_store_after_work);
if (r)
{
tf_printf("tf_ssb_db_blob_store_async -> uv_queue_work failed immediately: %s\n", uv_strerror(r));
if (callback)
{
callback(NULL, false, user_data);
}
tf_free(work);
}
tf_ssb_run_work(ssb, _tf_ssb_db_blob_store_work, _tf_ssb_db_blob_store_after_work, work);
}
bool tf_ssb_db_blob_store(tf_ssb_t* ssb, const uint8_t* blob, size_t size, char* out_id, size_t out_id_size, bool* out_new)
@ -1755,19 +1706,17 @@ bool tf_ssb_db_identity_get_active(sqlite3* db, const char* user, const char* pa
typedef struct _resolve_index_t
{
uv_work_t work;
tf_ssb_t* ssb;
const char* host;
const char* path;
void (*callback)(const char* path, void* user_data);
void* user_data;
} resolve_index_t;
static void _tf_ssb_db_resolve_index_work(uv_work_t* work)
static void _tf_ssb_db_resolve_index_work(tf_ssb_t* ssb, void* user_data)
{
resolve_index_t* request = work->data;
resolve_index_t* request = user_data;
sqlite3* db = tf_ssb_acquire_db_reader(request->ssb);
sqlite3* db = tf_ssb_acquire_db_reader(ssb);
sqlite3_stmt* statement;
if (sqlite3_prepare(db, "SELECT json_extract(value, '$.index_map') FROM properties WHERE id = 'core' AND key = 'settings'", -1, &statement, NULL) == SQLITE_OK)
{
@ -1813,12 +1762,12 @@ static void _tf_ssb_db_resolve_index_work(uv_work_t* work)
tf_printf("prepare failed: %s\n", sqlite3_errmsg(db));
}
}
tf_ssb_release_db_reader(request->ssb, db);
tf_ssb_release_db_reader(ssb, db);
}
static void _tf_ssb_db_resolve_index_after_work(uv_work_t* work, int status)
static void _tf_ssb_db_resolve_index_after_work(tf_ssb_t* ssb, int status, void* user_data)
{
resolve_index_t* request = work->data;
resolve_index_t* request = user_data;
request->callback(request->path, request->user_data);
tf_free((void*)request->host);
tf_free((void*)request->path);
@ -1829,15 +1778,9 @@ void tf_ssb_db_resolve_index_async(tf_ssb_t* ssb, const char* host, void (*callb
{
resolve_index_t* request = tf_malloc(sizeof(resolve_index_t));
*request = (resolve_index_t) {
.work = { .data = request },
.ssb = ssb,
.host = tf_strdup(host),
.callback = callback,
.user_data = user_data,
};
int r = uv_queue_work(tf_ssb_get_loop(ssb), &request->work, _tf_ssb_db_resolve_index_work, _tf_ssb_db_resolve_index_after_work);
if (r)
{
_tf_ssb_db_resolve_index_after_work(&request->work, r);
}
tf_ssb_run_work(ssb, _tf_ssb_db_resolve_index_work, _tf_ssb_db_resolve_index_after_work, request);
}

View File

@ -755,6 +755,16 @@ void tf_ssb_connection_schedule_idle(tf_ssb_connection_t* connection, tf_ssb_sch
void tf_ssb_connection_run_work(tf_ssb_connection_t* connection, void (*work_callback)(tf_ssb_connection_t* connection, void* user_data),
void (*after_work_callback)(tf_ssb_connection_t* connection, int result, void* user_data), void* user_data);
/**
** Schedule work to run on a worker thread.
** @param ssb The owning SSB instance.
** @param work_callback The callback to run on a thread.
** @param after_work_callback The callback to run on the main thread when the work is complete.
** @param user_data User data to pass to the callback.
*/
void tf_ssb_run_work(
tf_ssb_t* ssb, void (*work_callback)(tf_ssb_t* ssb, void* user_data), void (*after_work_callback)(tf_ssb_t* ssb, int result, void* user_data), void* user_data);
/**
** Register for new messages on a connection.
** @param connection The SHS connection.

View File

@ -4,7 +4,6 @@
#include "mem.h"
#include "ssb.db.h"
#include "ssb.h"
#include "trace.h"
#include "util.js.h"
#include "sodium/crypto_box.h"
@ -302,8 +301,6 @@ static JSValue _tf_ssb_getAllIdentities(JSContext* context, JSValueConst this_va
typedef struct _active_identity_work_t
{
uv_work_t request;
tf_ssb_t* ssb;
JSContext* context;
const char* name;
const char* package_owner;
@ -322,29 +319,22 @@ static void _tf_ssb_getActiveIdentity_visit(const char* identity, void* user_dat
}
}
static void _tf_ssb_getActiveIdentity_work(uv_work_t* work)
static void _tf_ssb_getActiveIdentity_work(tf_ssb_t* ssb, void* user_data)
{
active_identity_work_t* request = work->data;
tf_ssb_record_thread_busy(request->ssb, true);
tf_trace_t* trace = tf_ssb_get_trace(request->ssb);
tf_trace_begin(trace, "_tf_ssb_getActiveIdentity_work");
sqlite3* db = tf_ssb_acquire_db_reader(request->ssb);
active_identity_work_t* request = user_data;
sqlite3* db = tf_ssb_acquire_db_reader(ssb);
tf_ssb_db_identity_get_active(db, request->name, request->package_owner, request->package_name, request->identity, sizeof(request->identity));
tf_ssb_release_db_reader(request->ssb, db);
tf_ssb_release_db_reader(ssb, db);
if (!*request->identity)
{
tf_ssb_db_identity_visit(request->ssb, request->name, _tf_ssb_getActiveIdentity_visit, request);
tf_ssb_db_identity_visit(ssb, request->name, _tf_ssb_getActiveIdentity_visit, request);
}
}
tf_trace_end(trace);
tf_ssb_record_thread_busy(request->ssb, false);
}
static void _tf_ssb_getActiveIdentity_after_work(uv_work_t* work, int status)
static void _tf_ssb_getActiveIdentity_after_work(tf_ssb_t* ssb, int status, void* user_data)
{
active_identity_work_t* request = work->data;
active_identity_work_t* request = user_data;
JSContext* context = request->context;
if (request->result == 0)
{
@ -370,13 +360,12 @@ static void _tf_ssb_getActiveIdentity_after_work(uv_work_t* work, int status)
static JSValue _tf_ssb_getActiveIdentity(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv)
{
tf_ssb_t* ssb = JS_GetOpaque(this_val, _tf_ssb_classId);
const char* name = JS_ToCString(context, argv[0]);
const char* package_owner = JS_ToCString(context, argv[1]);
const char* package_name = JS_ToCString(context, argv[2]);
active_identity_work_t* work = tf_malloc(sizeof(active_identity_work_t));
*work = (active_identity_work_t) {
.request = { .data = work },
.ssb = JS_GetOpaque(this_val, _tf_ssb_classId),
.context = context,
.name = tf_strdup(name),
.package_owner = tf_strdup(package_owner),
@ -387,18 +376,12 @@ static JSValue _tf_ssb_getActiveIdentity(JSContext* context, JSValueConst this_v
JS_FreeCString(context, package_owner);
JS_FreeCString(context, package_name);
int r = uv_queue_work(tf_ssb_get_loop(work->ssb), &work->request, _tf_ssb_getActiveIdentity_work, _tf_ssb_getActiveIdentity_after_work);
if (r)
{
_tf_ssb_getActiveIdentity_after_work(&work->request, r);
}
tf_ssb_run_work(ssb, _tf_ssb_getActiveIdentity_work, _tf_ssb_getActiveIdentity_after_work, work);
return result;
}
typedef struct _identity_info_work_t
{
uv_work_t request;
tf_ssb_t* ssb;
JSContext* context;
const char* name;
const char* package_owner;
@ -422,12 +405,12 @@ static void _tf_ssb_getIdentityInfo_visit(const char* identity, void* data)
;
}
static void _tf_ssb_getIdentityInfo_work(uv_work_t* work)
static void _tf_ssb_getIdentityInfo_work(tf_ssb_t* ssb, void* user_data)
{
identity_info_work_t* request = work->data;
tf_ssb_db_identity_visit(request->ssb, request->name, _tf_ssb_getIdentityInfo_visit, request);
identity_info_work_t* request = user_data;
tf_ssb_db_identity_visit(ssb, request->name, _tf_ssb_getIdentityInfo_visit, request);
sqlite3* db = tf_ssb_acquire_db_reader(request->ssb);
sqlite3* db = tf_ssb_acquire_db_reader(ssb);
sqlite3_stmt* statement = NULL;
request->result = sqlite3_prepare(db,
"SELECT author, name FROM ( "
@ -467,12 +450,12 @@ static void _tf_ssb_getIdentityInfo_work(uv_work_t* work)
{
snprintf(request->active_identity, sizeof(request->active_identity), "%s", request->identities[0]);
}
tf_ssb_release_db_reader(request->ssb, db);
tf_ssb_release_db_reader(ssb, db);
}
static void _tf_ssb_getIdentityInfo_after_work(uv_work_t* work, int status)
static void _tf_ssb_getIdentityInfo_after_work(tf_ssb_t* ssb, int status, void* user_data)
{
identity_info_work_t* request = work->data;
identity_info_work_t* request = user_data;
JSContext* context = request->context;
JSValue result = JS_NewObject(context);
@ -514,13 +497,12 @@ static void _tf_ssb_getIdentityInfo_after_work(uv_work_t* work, int status)
static JSValue _tf_ssb_getIdentityInfo(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv)
{
tf_ssb_t* ssb = JS_GetOpaque(this_val, _tf_ssb_classId);
const char* name = JS_ToCString(context, argv[0]);
const char* package_owner = JS_ToCString(context, argv[1]);
const char* package_name = JS_ToCString(context, argv[2]);
identity_info_work_t* work = tf_malloc(sizeof(identity_info_work_t));
*work = (identity_info_work_t) {
.request = { .data = work },
.ssb = JS_GetOpaque(this_val, _tf_ssb_classId),
.context = context,
.name = tf_strdup(name),
.package_owner = tf_strdup(package_owner),
@ -531,11 +513,7 @@ static JSValue _tf_ssb_getIdentityInfo(JSContext* context, JSValueConst this_val
JS_FreeCString(context, package_owner);
JS_FreeCString(context, package_name);
int r = uv_queue_work(tf_ssb_get_loop(work->ssb), &work->request, _tf_ssb_getIdentityInfo_work, _tf_ssb_getIdentityInfo_after_work);
if (r)
{
_tf_ssb_getIdentityInfo_after_work(&work->request, r);
}
tf_ssb_run_work(ssb, _tf_ssb_getIdentityInfo_work, _tf_ssb_getIdentityInfo_after_work, work);
return result;
}
@ -845,7 +823,6 @@ typedef struct _sql_work_t
uint8_t* rows;
size_t binds_count;
size_t rows_count;
uv_work_t request;
uv_async_t async;
uv_timer_t timeout;
uv_mutex_t lock;
@ -861,13 +838,10 @@ static void _tf_ssb_sql_append(uint8_t** rows, size_t* rows_count, const void* d
*rows_count += size;
}
static void _tf_ssb_sqlAsync_work(uv_work_t* work)
static void _tf_ssb_sqlAsync_work(tf_ssb_t* ssb, void* user_data)
{
sql_work_t* sql_work = work->data;
tf_ssb_record_thread_busy(sql_work->ssb, true);
tf_trace_t* trace = tf_ssb_get_trace(sql_work->ssb);
tf_trace_begin(trace, "sql_async_work");
sqlite3* db = tf_ssb_acquire_db_reader_restricted(sql_work->ssb);
sql_work_t* sql_work = user_data;
sqlite3* db = tf_ssb_acquire_db_reader_restricted(ssb);
uv_mutex_lock(&sql_work->lock);
sql_work->db = db;
uv_mutex_unlock(&sql_work->lock);
@ -975,9 +949,7 @@ static void _tf_ssb_sqlAsync_work(uv_work_t* work)
uv_mutex_lock(&sql_work->lock);
sql_work->db = NULL;
uv_mutex_unlock(&sql_work->lock);
tf_ssb_release_db_reader(sql_work->ssb, db);
tf_ssb_record_thread_busy(sql_work->ssb, false);
tf_trace_end(trace);
tf_ssb_release_db_reader(ssb, db);
}
static void _tf_ssb_sqlAsync_handle_close(uv_handle_t* handle)
@ -1003,12 +975,10 @@ static void _tf_ssb_sqlAsync_destroy(sql_work_t* work)
uv_close((uv_handle_t*)&work->async, _tf_ssb_sqlAsync_handle_close);
}
static void _tf_ssb_sqlAsync_after_work(uv_work_t* work, int status)
static void _tf_ssb_sqlAsync_after_work(tf_ssb_t* ssb, int status, void* user_data)
{
sql_work_t* sql_work = work->data;
tf_trace_t* trace = tf_ssb_get_trace(sql_work->ssb);
tf_trace_begin(trace, "sql_async_after_work");
JSContext* context = tf_ssb_get_context(sql_work->ssb);
sql_work_t* sql_work = user_data;
JSContext* context = tf_ssb_get_context(ssb);
uint8_t* p = sql_work->rows;
while (p < sql_work->rows + sql_work->rows_count)
{
@ -1093,7 +1063,6 @@ static void _tf_ssb_sqlAsync_after_work(uv_work_t* work, int status)
JS_FreeValue(context, sql_work->callback);
JS_FreeCString(context, sql_work->query);
_tf_ssb_sqlAsync_destroy(sql_work);
tf_trace_end(trace);
}
static void _tf_ssb_sqlAsync_timeout(uv_timer_t* timer)
@ -1120,10 +1089,6 @@ static JSValue _tf_ssb_sqlAsync(JSContext* context, JSValueConst this_val, int a
sql_work_t* work = tf_malloc(sizeof(sql_work_t));
*work = (sql_work_t)
{
.request =
{
.data = work,
},
.async =
{
.data = work,
@ -1183,11 +1148,7 @@ static JSValue _tf_ssb_sqlAsync(JSContext* context, JSValueConst this_val, int a
}
JS_FreeValue(context, value);
}
int r = uv_queue_work(tf_ssb_get_loop(ssb), &work->request, _tf_ssb_sqlAsync_work, _tf_ssb_sqlAsync_after_work);
if (r)
{
error_value = JS_ThrowInternalError(context, "uv_queue_work failed: %s", uv_strerror(r));
}
tf_ssb_run_work(ssb, _tf_ssb_sqlAsync_work, _tf_ssb_sqlAsync_after_work, work);
}
if (!JS_IsUndefined(error_value))
{
@ -1809,8 +1770,6 @@ static JSValue _tf_ssb_private_message_decrypt(JSContext* context, JSValueConst
typedef struct _following_t
{
uv_work_t work;
tf_ssb_t* ssb;
JSContext* context;
JSValue promise[2];
@ -1821,17 +1780,15 @@ typedef struct _following_t
const char* ids[];
} following_t;
static void _tf_ssb_following_work(uv_work_t* work)
static void _tf_ssb_following_work(tf_ssb_t* ssb, void* user_data)
{
following_t* following = work->data;
tf_ssb_record_thread_busy(following->ssb, true);
following->out_following = tf_ssb_db_following_deep(following->ssb, following->ids, following->ids_count, following->depth);
tf_ssb_record_thread_busy(following->ssb, false);
following_t* following = user_data;
following->out_following = tf_ssb_db_following_deep(ssb, following->ids, following->ids_count, following->depth);
}
static void _tf_ssb_following_after_work(uv_work_t* work, int status)
static void _tf_ssb_following_after_work(tf_ssb_t* ssb, int status, void* user_data)
{
following_t* following = work->data;
following_t* following = user_data;
JSContext* context = following->context;
if (status == 0)
{
@ -1878,14 +1835,8 @@ static JSValue _tf_ssb_following(JSContext* context, JSValueConst this_val, int
int ids_count = tf_util_get_length(context, argv[0]);
following_t* following = tf_malloc(sizeof(following_t) + sizeof(char*) * ids_count);
*following = (following_t)
{
.work =
{
.data = following,
},
*following = (following_t) {
.context = context,
.ssb = ssb,
};
JS_ToInt32(context, &following->depth, argv[1]);
@ -1903,11 +1854,7 @@ static JSValue _tf_ssb_following(JSContext* context, JSValueConst this_val, int
}
}
int r = uv_queue_work(tf_ssb_get_loop(ssb), &following->work, _tf_ssb_following_work, _tf_ssb_following_after_work);
if (r)
{
_tf_ssb_following_after_work(&following->work, r);
}
tf_ssb_run_work(ssb, _tf_ssb_following_work, _tf_ssb_following_after_work, following);
return result;
}

View File

@ -1137,12 +1137,6 @@ static void _tf_ssb_rpc_connections_changed_callback(tf_ssb_t* ssb, tf_ssb_chang
}
}
typedef struct _delete_blobs_work_t
{
uv_work_t work;
tf_ssb_t* ssb;
} delete_blobs_work_t;
static void _tf_ssb_rpc_checkpoint(tf_ssb_t* ssb)
{
int64_t checkpoint_start_ms = uv_hrtime();
@ -1160,16 +1154,12 @@ static void _tf_ssb_rpc_checkpoint(tf_ssb_t* ssb)
tf_ssb_release_db_writer(ssb, db);
}
static void _tf_ssb_rpc_delete_blobs_work(uv_work_t* work)
static void _tf_ssb_rpc_delete_blobs_work(tf_ssb_t* ssb, void* user_data)
{
delete_blobs_work_t* delete = work->data;
tf_ssb_t* ssb = delete->ssb;
tf_ssb_record_thread_busy(ssb, true);
int64_t age = _get_global_setting_int64(ssb, "blob_expire_age_seconds", -1);
if (age <= 0)
{
_tf_ssb_rpc_checkpoint(ssb);
tf_ssb_record_thread_busy(ssb, false);
return;
}
int64_t start_ns = uv_hrtime();
@ -1211,28 +1201,16 @@ static void _tf_ssb_rpc_delete_blobs_work(uv_work_t* work)
tf_printf("Deleted %d blobs in %d ms.\n", deleted, (int)duration_ms);
_tf_ssb_rpc_checkpoint(ssb);
_tf_ssb_rpc_start_delete_blobs(ssb, deleted ? (int)duration_ms : (15 * 60 * 1000));
tf_ssb_record_thread_busy(ssb, false);
}
static void _tf_ssb_rpc_delete_blobs_after_work(uv_work_t* work, int status)
static void _tf_ssb_rpc_delete_blobs_after_work(tf_ssb_t* ssb, int status, void* user_data)
{
delete_blobs_work_t* delete = work->data;
tf_ssb_unref(delete->ssb);
tf_free(delete);
tf_ssb_unref(ssb);
}
static void _tf_ssb_rpc_start_delete_callback(tf_ssb_t* ssb, void* user_data)
{
delete_blobs_work_t* work = tf_malloc(sizeof(delete_blobs_work_t));
*work = (delete_blobs_work_t) { .work = { .data = work }, .ssb = ssb };
tf_ssb_ref(ssb);
int r = uv_queue_work(tf_ssb_get_loop(ssb), &work->work, _tf_ssb_rpc_delete_blobs_work, _tf_ssb_rpc_delete_blobs_after_work);
if (r)
{
tf_printf("uv_queue_work: %s\n", uv_strerror(r));
tf_free(work);
tf_ssb_unref(ssb);
}
tf_ssb_run_work(ssb, _tf_ssb_rpc_delete_blobs_work, _tf_ssb_rpc_delete_blobs_after_work, NULL);
}
static void _tf_ssb_rpc_start_delete_blobs(tf_ssb_t* ssb, int delay_ms)