From 8928e8722b5805bd8e56ff2df0f63cc509ad424f Mon Sep 17 00:00:00 2001 From: Cory McWilliams Date: Fri, 21 Mar 2025 20:45:49 -0400 Subject: [PATCH] ssb: Notifying apps of every single message added isn't practical for big replications. Only notify on the completion of each chunk. Update the tests. The apps need some treatment, but refreshing works around any issues for now. --- src/ssb.db.c | 43 ++++++++++++++++++++++++++++--------------- src/ssb.tests.c | 25 +++++++++++-------------- 2 files changed, 39 insertions(+), 29 deletions(-) diff --git a/src/ssb.db.c b/src/ssb.db.c index d9510626..c3928dc3 100644 --- a/src/ssb.db.c +++ b/src/ssb.db.c @@ -125,7 +125,7 @@ void tf_ssb_db_init(tf_ssb_t* ssb) "CREATE TABLE IF NOT EXISTS messages_stats (" " author TEXT PRIMARY KEY," " max_sequence INTEGER," - " max_timestamp READ" + " max_timestamp REAL" ")"); _tf_ssb_db_exec( db, "INSERT OR REPLACE INTO messages_stats (author, max_sequence, max_timestamp) SELECT author, MAX(sequence), MAX(timestamp) FROM messages GROUP BY author"); @@ -596,23 +596,13 @@ static void _tf_ssb_db_store_message_after_work(tf_ssb_t* ssb, int status, void* message_store_t* store = user_data; tf_trace_t* trace = tf_ssb_get_trace(ssb); + message_store_t* last_stored = NULL; + while (store) { if (store->out_stored) { - tf_trace_begin(trace, "notify_message_added"); - JSContext* context = tf_ssb_get_context(ssb); - JSValue formatted = - tf_ssb_format_message(context, store->previous, store->author, store->sequence, store->timestamp, "sha256", store->content, store->signature, store->flags); - JSValue message = JS_NewObject(context); - JS_SetPropertyStr(context, message, "key", JS_NewString(context, store->id)); - JS_SetPropertyStr(context, message, "value", formatted); - char timestamp_string[256]; - snprintf(timestamp_string, sizeof(timestamp_string), "%f", store->timestamp); - JS_SetPropertyStr(context, message, "timestamp", JS_NewString(context, timestamp_string)); - tf_ssb_notify_message_added(ssb, store->author, store->sequence, store->id, message); - JS_FreeValue(context, message); - tf_trace_end(trace); + last_stored = store; } if (store->out_blob_wants) { @@ -625,11 +615,34 @@ static void _tf_ssb_db_store_message_after_work(tf_ssb_t* ssb, int status, void* tf_trace_end(trace); } - JSContext* context = tf_ssb_get_context(ssb); if (store->callback) { store->callback(store->id, store->out_stored, store->user_data); } + store = store->next; + } + + if (last_stored) + { + tf_trace_begin(trace, "notify_message_added"); + JSContext* context = tf_ssb_get_context(ssb); + JSValue formatted = tf_ssb_format_message(context, last_stored->previous, last_stored->author, last_stored->sequence, last_stored->timestamp, "sha256", + last_stored->content, last_stored->signature, last_stored->flags); + JSValue message = JS_NewObject(context); + JS_SetPropertyStr(context, message, "key", JS_NewString(context, last_stored->id)); + JS_SetPropertyStr(context, message, "value", formatted); + char timestamp_string[256]; + snprintf(timestamp_string, sizeof(timestamp_string), "%f", last_stored->timestamp); + JS_SetPropertyStr(context, message, "timestamp", JS_NewString(context, timestamp_string)); + tf_ssb_notify_message_added(ssb, last_stored->author, last_stored->sequence, last_stored->id, message); + JS_FreeValue(context, message); + tf_trace_end(trace); + } + + JSContext* context = tf_ssb_get_context(ssb); + store = user_data; + while (store) + { JS_FreeCString(context, store->content); message_store_t* last = store; store = store->next; diff --git a/src/ssb.tests.c b/src/ssb.tests.c index bf0951b9..191d4186 100644 --- a/src/ssb.tests.c +++ b/src/ssb.tests.c @@ -720,24 +720,21 @@ void tf_ssb_test_bench(const tf_test_options_t* options) tf_ssb_server_open(ssb0, 12347); tf_ssb_connect(ssb1, "127.0.0.1", 12347, id0bin, 0, NULL, NULL); - tf_printf("Waiting for messages.\n"); - clock_gettime(CLOCK_REALTIME, &start_time); - int count = 0; - tf_ssb_add_message_added_callback(ssb1, _message_added, NULL, &count); - while (count < k_messages) - { - uv_run(&loop, UV_RUN_ONCE); - } - tf_ssb_remove_message_added_callback(ssb1, _message_added, &count); - clock_gettime(CLOCK_REALTIME, &end_time); - tf_ssb_set_main_thread(ssb0, false); tf_ssb_set_main_thread(ssb1, false); - count = _ssb_test_count_messages(ssb1); - if (count < k_messages) + + tf_printf("Waiting for messages.\n"); + clock_gettime(CLOCK_REALTIME, &start_time); + while (_ssb_test_count_messages(ssb1) < k_messages) { - abort(); + tf_ssb_set_main_thread(ssb0, true); + tf_ssb_set_main_thread(ssb1, true); + uv_run(&loop, UV_RUN_ONCE); + tf_ssb_set_main_thread(ssb0, false); + tf_ssb_set_main_thread(ssb1, false); } + clock_gettime(CLOCK_REALTIME, &end_time); + tf_printf("Done.\n"); tf_printf("replicate = %f seconds\n", (end_time.tv_sec - start_time.tv_sec) + (end_time.tv_nsec - start_time.tv_nsec) / 1e9);