ssb: Commit messages in transactions for performance.
All checks were successful
Build Tilde Friends / Build-All (push) Successful in 30m39s
All checks were successful
Build Tilde Friends / Build-All (push) Successful in 30m39s
This commit is contained in:
parent
7f34b585d3
commit
e92c439724
125
src/ssb.db.c
125
src/ssb.db.c
@ -23,6 +23,17 @@ typedef struct _message_store_t message_store_t;
|
|||||||
|
|
||||||
static void _tf_ssb_db_store_message_after_work(tf_ssb_t* ssb, int status, void* user_data);
|
static void _tf_ssb_db_store_message_after_work(tf_ssb_t* ssb, int status, void* user_data);
|
||||||
|
|
||||||
|
static int _tf_ssb_db_try_exec(sqlite3* db, const char* statement)
|
||||||
|
{
|
||||||
|
char* error = NULL;
|
||||||
|
int result = sqlite3_exec(db, statement, NULL, NULL, &error);
|
||||||
|
if (result != SQLITE_OK)
|
||||||
|
{
|
||||||
|
tf_printf("Error running '%s': %s.\n", statement, error ? error : sqlite3_errmsg(db));
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
static void _tf_ssb_db_exec(sqlite3* db, const char* statement)
|
static void _tf_ssb_db_exec(sqlite3* db, const char* statement)
|
||||||
{
|
{
|
||||||
char* error = NULL;
|
char* error = NULL;
|
||||||
@ -412,10 +423,9 @@ static bool _tf_ssb_db_previous_message_exists(sqlite3* db, const char* author,
|
|||||||
return exists;
|
return exists;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int64_t _tf_ssb_db_store_message_raw(tf_ssb_t* ssb, const char* id, const char* previous, const char* author, int64_t sequence, double timestamp, const char* content,
|
static int64_t _tf_ssb_db_store_message_raw(sqlite3* db, const char* id, const char* previous, const char* author, int64_t sequence, double timestamp, const char* content,
|
||||||
size_t content_len, const char* signature, int flags)
|
size_t content_len, const char* signature, int flags)
|
||||||
{
|
{
|
||||||
sqlite3* db = tf_ssb_acquire_db_writer(ssb);
|
|
||||||
int64_t last_row_id = -1;
|
int64_t last_row_id = -1;
|
||||||
bool id_mismatch = false;
|
bool id_mismatch = false;
|
||||||
|
|
||||||
@ -464,7 +474,6 @@ static int64_t _tf_ssb_db_store_message_raw(tf_ssb_t* ssb, const char* id, const
|
|||||||
*/
|
*/
|
||||||
tf_printf("%p: Previous message doesn't exist for author=%s sequence=%" PRId64 " previous=%s.\n", db, author, sequence, previous);
|
tf_printf("%p: Previous message doesn't exist for author=%s sequence=%" PRId64 " previous=%s.\n", db, author, sequence, previous);
|
||||||
}
|
}
|
||||||
tf_ssb_release_db_writer(ssb, db);
|
|
||||||
return last_row_id;
|
return last_row_id;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -537,30 +546,47 @@ typedef struct _message_store_t
|
|||||||
static void _tf_ssb_db_store_message_work(tf_ssb_t* ssb, void* user_data)
|
static void _tf_ssb_db_store_message_work(tf_ssb_t* ssb, void* user_data)
|
||||||
{
|
{
|
||||||
message_store_t* store = user_data;
|
message_store_t* store = user_data;
|
||||||
int64_t last_row_id = _tf_ssb_db_store_message_raw(
|
sqlite3* db = tf_ssb_acquire_db_writer(ssb);
|
||||||
ssb, store->id, *store->previous ? store->previous : NULL, store->author, store->sequence, store->timestamp, store->content, store->length, store->signature, store->flags);
|
bool in_transaction = _tf_ssb_db_try_exec(db, "BEGIN TRANSACTION") == SQLITE_OK;
|
||||||
if (last_row_id != -1)
|
|
||||||
|
while (store)
|
||||||
{
|
{
|
||||||
store->out_stored = true;
|
int64_t last_row_id = _tf_ssb_db_store_message_raw(db, store->id, *store->previous ? store->previous : NULL, store->author, store->sequence, store->timestamp,
|
||||||
store->out_blob_wants = _tf_ssb_db_get_message_blob_wants(ssb, last_row_id);
|
store->content, store->length, store->signature, store->flags);
|
||||||
|
if (last_row_id != -1)
|
||||||
|
{
|
||||||
|
store->out_stored = true;
|
||||||
|
store->out_blob_wants = _tf_ssb_db_get_message_blob_wants(ssb, last_row_id);
|
||||||
|
}
|
||||||
|
store = store->next;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (in_transaction)
|
||||||
|
{
|
||||||
|
if (_tf_ssb_db_try_exec(db, "COMMIT TRANSACTION") != SQLITE_OK)
|
||||||
|
{
|
||||||
|
store = user_data;
|
||||||
|
while (store)
|
||||||
|
{
|
||||||
|
store->out_stored = false;
|
||||||
|
store = store->next;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
tf_ssb_release_db_writer(ssb, db);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void _wake_up_queue(tf_ssb_t* ssb, tf_ssb_store_queue_t* queue)
|
static void _wake_up_queue(tf_ssb_t* ssb, tf_ssb_store_queue_t* queue)
|
||||||
{
|
{
|
||||||
if (!queue->running)
|
if (!queue->running)
|
||||||
{
|
{
|
||||||
message_store_t* next = queue->head;
|
message_store_t* stores = queue->head;
|
||||||
if (next)
|
queue->head = NULL;
|
||||||
|
queue->tail = NULL;
|
||||||
|
if (stores)
|
||||||
{
|
{
|
||||||
queue->head = next->next;
|
|
||||||
if (queue->tail == next)
|
|
||||||
{
|
|
||||||
queue->tail = NULL;
|
|
||||||
}
|
|
||||||
next->next = NULL;
|
|
||||||
queue->running = true;
|
queue->running = true;
|
||||||
tf_ssb_run_work(ssb, _tf_ssb_db_store_message_work, _tf_ssb_db_store_message_after_work, next);
|
tf_ssb_run_work(ssb, _tf_ssb_db_store_message_work, _tf_ssb_db_store_message_after_work, stores);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -569,43 +595,50 @@ static void _tf_ssb_db_store_message_after_work(tf_ssb_t* ssb, int status, void*
|
|||||||
{
|
{
|
||||||
message_store_t* store = user_data;
|
message_store_t* store = user_data;
|
||||||
tf_trace_t* trace = tf_ssb_get_trace(ssb);
|
tf_trace_t* trace = tf_ssb_get_trace(ssb);
|
||||||
if (store->out_stored)
|
|
||||||
|
while (store)
|
||||||
{
|
{
|
||||||
tf_trace_begin(trace, "notify_message_added");
|
if (store->out_stored)
|
||||||
JSContext* context = tf_ssb_get_context(ssb);
|
|
||||||
JSValue formatted =
|
|
||||||
tf_ssb_format_message(context, store->previous, store->author, store->sequence, store->timestamp, "sha256", store->content, store->signature, store->flags);
|
|
||||||
JSValue message = JS_NewObject(context);
|
|
||||||
JS_SetPropertyStr(context, message, "key", JS_NewString(context, store->id));
|
|
||||||
JS_SetPropertyStr(context, message, "value", formatted);
|
|
||||||
char timestamp_string[256];
|
|
||||||
snprintf(timestamp_string, sizeof(timestamp_string), "%f", store->timestamp);
|
|
||||||
JS_SetPropertyStr(context, message, "timestamp", JS_NewString(context, timestamp_string));
|
|
||||||
tf_ssb_notify_message_added(ssb, store->author, store->sequence, store->id, message);
|
|
||||||
JS_FreeValue(context, message);
|
|
||||||
tf_trace_end(trace);
|
|
||||||
}
|
|
||||||
if (store->out_blob_wants)
|
|
||||||
{
|
|
||||||
tf_trace_begin(trace, "notify_blob_wants_added");
|
|
||||||
for (char* p = store->out_blob_wants; *p; p = p + strlen(p))
|
|
||||||
{
|
{
|
||||||
tf_ssb_notify_blob_want_added(ssb, p);
|
tf_trace_begin(trace, "notify_message_added");
|
||||||
|
JSContext* context = tf_ssb_get_context(ssb);
|
||||||
|
JSValue formatted =
|
||||||
|
tf_ssb_format_message(context, store->previous, store->author, store->sequence, store->timestamp, "sha256", store->content, store->signature, store->flags);
|
||||||
|
JSValue message = JS_NewObject(context);
|
||||||
|
JS_SetPropertyStr(context, message, "key", JS_NewString(context, store->id));
|
||||||
|
JS_SetPropertyStr(context, message, "value", formatted);
|
||||||
|
char timestamp_string[256];
|
||||||
|
snprintf(timestamp_string, sizeof(timestamp_string), "%f", store->timestamp);
|
||||||
|
JS_SetPropertyStr(context, message, "timestamp", JS_NewString(context, timestamp_string));
|
||||||
|
tf_ssb_notify_message_added(ssb, store->author, store->sequence, store->id, message);
|
||||||
|
JS_FreeValue(context, message);
|
||||||
|
tf_trace_end(trace);
|
||||||
}
|
}
|
||||||
tf_free(store->out_blob_wants);
|
if (store->out_blob_wants)
|
||||||
tf_trace_end(trace);
|
{
|
||||||
|
tf_trace_begin(trace, "notify_blob_wants_added");
|
||||||
|
for (char* p = store->out_blob_wants; *p; p = p + strlen(p))
|
||||||
|
{
|
||||||
|
tf_ssb_notify_blob_want_added(ssb, p);
|
||||||
|
}
|
||||||
|
tf_free(store->out_blob_wants);
|
||||||
|
tf_trace_end(trace);
|
||||||
|
}
|
||||||
|
|
||||||
|
JSContext* context = tf_ssb_get_context(ssb);
|
||||||
|
if (store->callback)
|
||||||
|
{
|
||||||
|
store->callback(store->id, store->out_stored, store->user_data);
|
||||||
|
}
|
||||||
|
JS_FreeCString(context, store->content);
|
||||||
|
message_store_t* last = store;
|
||||||
|
store = store->next;
|
||||||
|
tf_free(last);
|
||||||
}
|
}
|
||||||
|
|
||||||
JSContext* context = tf_ssb_get_context(ssb);
|
|
||||||
if (store->callback)
|
|
||||||
{
|
|
||||||
store->callback(store->id, store->out_stored, store->user_data);
|
|
||||||
}
|
|
||||||
JS_FreeCString(context, store->content);
|
|
||||||
tf_ssb_store_queue_t* queue = tf_ssb_get_store_queue(ssb);
|
tf_ssb_store_queue_t* queue = tf_ssb_get_store_queue(ssb);
|
||||||
queue->running = false;
|
queue->running = false;
|
||||||
_wake_up_queue(ssb, queue);
|
_wake_up_queue(ssb, queue);
|
||||||
tf_free(store);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void tf_ssb_db_store_message(
|
void tf_ssb_db_store_message(
|
||||||
|
Loading…
x
Reference in New Issue
Block a user