diff --git a/src/ssb.db.c b/src/ssb.db.c index d9510626..c3928dc3 100644 --- a/src/ssb.db.c +++ b/src/ssb.db.c @@ -125,7 +125,7 @@ void tf_ssb_db_init(tf_ssb_t* ssb) "CREATE TABLE IF NOT EXISTS messages_stats (" " author TEXT PRIMARY KEY," " max_sequence INTEGER," - " max_timestamp READ" + " max_timestamp REAL" ")"); _tf_ssb_db_exec( db, "INSERT OR REPLACE INTO messages_stats (author, max_sequence, max_timestamp) SELECT author, MAX(sequence), MAX(timestamp) FROM messages GROUP BY author"); @@ -596,23 +596,13 @@ static void _tf_ssb_db_store_message_after_work(tf_ssb_t* ssb, int status, void* message_store_t* store = user_data; tf_trace_t* trace = tf_ssb_get_trace(ssb); + message_store_t* last_stored = NULL; + while (store) { if (store->out_stored) { - tf_trace_begin(trace, "notify_message_added"); - JSContext* context = tf_ssb_get_context(ssb); - JSValue formatted = - tf_ssb_format_message(context, store->previous, store->author, store->sequence, store->timestamp, "sha256", store->content, store->signature, store->flags); - JSValue message = JS_NewObject(context); - JS_SetPropertyStr(context, message, "key", JS_NewString(context, store->id)); - JS_SetPropertyStr(context, message, "value", formatted); - char timestamp_string[256]; - snprintf(timestamp_string, sizeof(timestamp_string), "%f", store->timestamp); - JS_SetPropertyStr(context, message, "timestamp", JS_NewString(context, timestamp_string)); - tf_ssb_notify_message_added(ssb, store->author, store->sequence, store->id, message); - JS_FreeValue(context, message); - tf_trace_end(trace); + last_stored = store; } if (store->out_blob_wants) { @@ -625,11 +615,34 @@ static void _tf_ssb_db_store_message_after_work(tf_ssb_t* ssb, int status, void* tf_trace_end(trace); } - JSContext* context = tf_ssb_get_context(ssb); if (store->callback) { store->callback(store->id, store->out_stored, store->user_data); } + store = store->next; + } + + if (last_stored) + { + tf_trace_begin(trace, "notify_message_added"); + JSContext* context = tf_ssb_get_context(ssb); + JSValue formatted = tf_ssb_format_message(context, last_stored->previous, last_stored->author, last_stored->sequence, last_stored->timestamp, "sha256", + last_stored->content, last_stored->signature, last_stored->flags); + JSValue message = JS_NewObject(context); + JS_SetPropertyStr(context, message, "key", JS_NewString(context, last_stored->id)); + JS_SetPropertyStr(context, message, "value", formatted); + char timestamp_string[256]; + snprintf(timestamp_string, sizeof(timestamp_string), "%f", last_stored->timestamp); + JS_SetPropertyStr(context, message, "timestamp", JS_NewString(context, timestamp_string)); + tf_ssb_notify_message_added(ssb, last_stored->author, last_stored->sequence, last_stored->id, message); + JS_FreeValue(context, message); + tf_trace_end(trace); + } + + JSContext* context = tf_ssb_get_context(ssb); + store = user_data; + while (store) + { JS_FreeCString(context, store->content); message_store_t* last = store; store = store->next; diff --git a/src/ssb.tests.c b/src/ssb.tests.c index bf0951b9..191d4186 100644 --- a/src/ssb.tests.c +++ b/src/ssb.tests.c @@ -720,24 +720,21 @@ void tf_ssb_test_bench(const tf_test_options_t* options) tf_ssb_server_open(ssb0, 12347); tf_ssb_connect(ssb1, "127.0.0.1", 12347, id0bin, 0, NULL, NULL); - tf_printf("Waiting for messages.\n"); - clock_gettime(CLOCK_REALTIME, &start_time); - int count = 0; - tf_ssb_add_message_added_callback(ssb1, _message_added, NULL, &count); - while (count < k_messages) - { - uv_run(&loop, UV_RUN_ONCE); - } - tf_ssb_remove_message_added_callback(ssb1, _message_added, &count); - clock_gettime(CLOCK_REALTIME, &end_time); - tf_ssb_set_main_thread(ssb0, false); tf_ssb_set_main_thread(ssb1, false); - count = _ssb_test_count_messages(ssb1); - if (count < k_messages) + + tf_printf("Waiting for messages.\n"); + clock_gettime(CLOCK_REALTIME, &start_time); + while (_ssb_test_count_messages(ssb1) < k_messages) { - abort(); + tf_ssb_set_main_thread(ssb0, true); + tf_ssb_set_main_thread(ssb1, true); + uv_run(&loop, UV_RUN_ONCE); + tf_ssb_set_main_thread(ssb0, false); + tf_ssb_set_main_thread(ssb1, false); } + clock_gettime(CLOCK_REALTIME, &end_time); + tf_printf("Done.\n"); tf_printf("replicate = %f seconds\n", (end_time.tv_sec - start_time.tv_sec) + (end_time.tv_nsec - start_time.tv_nsec) / 1e9);