From e92c439724f2428f88b2c82e96f7c19c3156510a Mon Sep 17 00:00:00 2001 From: Cory McWilliams Date: Tue, 18 Mar 2025 12:49:47 -0400 Subject: [PATCH] ssb: Commit messages in transactions for performance. --- src/ssb.db.c | 125 ++++++++++++++++++++++++++++++++------------------- 1 file changed, 79 insertions(+), 46 deletions(-) diff --git a/src/ssb.db.c b/src/ssb.db.c index 6a9b4f31..7841c544 100644 --- a/src/ssb.db.c +++ b/src/ssb.db.c @@ -23,6 +23,17 @@ typedef struct _message_store_t message_store_t; static void _tf_ssb_db_store_message_after_work(tf_ssb_t* ssb, int status, void* user_data); +static int _tf_ssb_db_try_exec(sqlite3* db, const char* statement) +{ + char* error = NULL; + int result = sqlite3_exec(db, statement, NULL, NULL, &error); + if (result != SQLITE_OK) + { + tf_printf("Error running '%s': %s.\n", statement, error ? error : sqlite3_errmsg(db)); + } + return result; +} + static void _tf_ssb_db_exec(sqlite3* db, const char* statement) { char* error = NULL; @@ -412,10 +423,9 @@ static bool _tf_ssb_db_previous_message_exists(sqlite3* db, const char* author, return exists; } -static int64_t _tf_ssb_db_store_message_raw(tf_ssb_t* ssb, const char* id, const char* previous, const char* author, int64_t sequence, double timestamp, const char* content, +static int64_t _tf_ssb_db_store_message_raw(sqlite3* db, const char* id, const char* previous, const char* author, int64_t sequence, double timestamp, const char* content, size_t content_len, const char* signature, int flags) { - sqlite3* db = tf_ssb_acquire_db_writer(ssb); int64_t last_row_id = -1; bool id_mismatch = false; @@ -464,7 +474,6 @@ static int64_t _tf_ssb_db_store_message_raw(tf_ssb_t* ssb, const char* id, const */ tf_printf("%p: Previous message doesn't exist for author=%s sequence=%" PRId64 " previous=%s.\n", db, author, sequence, previous); } - tf_ssb_release_db_writer(ssb, db); return last_row_id; } @@ -537,30 +546,47 @@ typedef struct _message_store_t static void _tf_ssb_db_store_message_work(tf_ssb_t* ssb, void* user_data) { message_store_t* store = user_data; - int64_t last_row_id = _tf_ssb_db_store_message_raw( - ssb, store->id, *store->previous ? store->previous : NULL, store->author, store->sequence, store->timestamp, store->content, store->length, store->signature, store->flags); - if (last_row_id != -1) + sqlite3* db = tf_ssb_acquire_db_writer(ssb); + bool in_transaction = _tf_ssb_db_try_exec(db, "BEGIN TRANSACTION") == SQLITE_OK; + + while (store) { - store->out_stored = true; - store->out_blob_wants = _tf_ssb_db_get_message_blob_wants(ssb, last_row_id); + int64_t last_row_id = _tf_ssb_db_store_message_raw(db, store->id, *store->previous ? store->previous : NULL, store->author, store->sequence, store->timestamp, + store->content, store->length, store->signature, store->flags); + if (last_row_id != -1) + { + store->out_stored = true; + store->out_blob_wants = _tf_ssb_db_get_message_blob_wants(ssb, last_row_id); + } + store = store->next; } + + if (in_transaction) + { + if (_tf_ssb_db_try_exec(db, "COMMIT TRANSACTION") != SQLITE_OK) + { + store = user_data; + while (store) + { + store->out_stored = false; + store = store->next; + } + } + } + tf_ssb_release_db_writer(ssb, db); } static void _wake_up_queue(tf_ssb_t* ssb, tf_ssb_store_queue_t* queue) { if (!queue->running) { - message_store_t* next = queue->head; - if (next) + message_store_t* stores = queue->head; + queue->head = NULL; + queue->tail = NULL; + if (stores) { - queue->head = next->next; - if (queue->tail == next) - { - queue->tail = NULL; - } - next->next = NULL; queue->running = true; - tf_ssb_run_work(ssb, _tf_ssb_db_store_message_work, _tf_ssb_db_store_message_after_work, next); + tf_ssb_run_work(ssb, _tf_ssb_db_store_message_work, _tf_ssb_db_store_message_after_work, stores); } } } @@ -569,43 +595,50 @@ static void _tf_ssb_db_store_message_after_work(tf_ssb_t* ssb, int status, void* { message_store_t* store = user_data; tf_trace_t* trace = tf_ssb_get_trace(ssb); - if (store->out_stored) + + while (store) { - tf_trace_begin(trace, "notify_message_added"); - JSContext* context = tf_ssb_get_context(ssb); - JSValue formatted = - tf_ssb_format_message(context, store->previous, store->author, store->sequence, store->timestamp, "sha256", store->content, store->signature, store->flags); - JSValue message = JS_NewObject(context); - JS_SetPropertyStr(context, message, "key", JS_NewString(context, store->id)); - JS_SetPropertyStr(context, message, "value", formatted); - char timestamp_string[256]; - snprintf(timestamp_string, sizeof(timestamp_string), "%f", store->timestamp); - JS_SetPropertyStr(context, message, "timestamp", JS_NewString(context, timestamp_string)); - tf_ssb_notify_message_added(ssb, store->author, store->sequence, store->id, message); - JS_FreeValue(context, message); - tf_trace_end(trace); - } - if (store->out_blob_wants) - { - tf_trace_begin(trace, "notify_blob_wants_added"); - for (char* p = store->out_blob_wants; *p; p = p + strlen(p)) + if (store->out_stored) { - tf_ssb_notify_blob_want_added(ssb, p); + tf_trace_begin(trace, "notify_message_added"); + JSContext* context = tf_ssb_get_context(ssb); + JSValue formatted = + tf_ssb_format_message(context, store->previous, store->author, store->sequence, store->timestamp, "sha256", store->content, store->signature, store->flags); + JSValue message = JS_NewObject(context); + JS_SetPropertyStr(context, message, "key", JS_NewString(context, store->id)); + JS_SetPropertyStr(context, message, "value", formatted); + char timestamp_string[256]; + snprintf(timestamp_string, sizeof(timestamp_string), "%f", store->timestamp); + JS_SetPropertyStr(context, message, "timestamp", JS_NewString(context, timestamp_string)); + tf_ssb_notify_message_added(ssb, store->author, store->sequence, store->id, message); + JS_FreeValue(context, message); + tf_trace_end(trace); } - tf_free(store->out_blob_wants); - tf_trace_end(trace); + if (store->out_blob_wants) + { + tf_trace_begin(trace, "notify_blob_wants_added"); + for (char* p = store->out_blob_wants; *p; p = p + strlen(p)) + { + tf_ssb_notify_blob_want_added(ssb, p); + } + tf_free(store->out_blob_wants); + tf_trace_end(trace); + } + + JSContext* context = tf_ssb_get_context(ssb); + if (store->callback) + { + store->callback(store->id, store->out_stored, store->user_data); + } + JS_FreeCString(context, store->content); + message_store_t* last = store; + store = store->next; + tf_free(last); } - JSContext* context = tf_ssb_get_context(ssb); - if (store->callback) - { - store->callback(store->id, store->out_stored, store->user_data); - } - JS_FreeCString(context, store->content); tf_ssb_store_queue_t* queue = tf_ssb_get_store_queue(ssb); queue->running = false; _wake_up_queue(ssb, queue); - tf_free(store); } void tf_ssb_db_store_message(