ssb: Notifying apps of every single message added isn't practical for big replications. Only notify on the completion of each chunk. Update the tests. The apps need some treatment, but refreshing works around any issues for now.
Some checks failed
Build Tilde Friends / Build-All (push) Has been cancelled
Some checks failed
Build Tilde Friends / Build-All (push) Has been cancelled
This commit is contained in:
parent
d692734e55
commit
8928e8722b
43
src/ssb.db.c
43
src/ssb.db.c
@ -125,7 +125,7 @@ void tf_ssb_db_init(tf_ssb_t* ssb)
|
||||
"CREATE TABLE IF NOT EXISTS messages_stats ("
|
||||
" author TEXT PRIMARY KEY,"
|
||||
" max_sequence INTEGER,"
|
||||
" max_timestamp READ"
|
||||
" max_timestamp REAL"
|
||||
")");
|
||||
_tf_ssb_db_exec(
|
||||
db, "INSERT OR REPLACE INTO messages_stats (author, max_sequence, max_timestamp) SELECT author, MAX(sequence), MAX(timestamp) FROM messages GROUP BY author");
|
||||
@ -596,23 +596,13 @@ static void _tf_ssb_db_store_message_after_work(tf_ssb_t* ssb, int status, void*
|
||||
message_store_t* store = user_data;
|
||||
tf_trace_t* trace = tf_ssb_get_trace(ssb);
|
||||
|
||||
message_store_t* last_stored = NULL;
|
||||
|
||||
while (store)
|
||||
{
|
||||
if (store->out_stored)
|
||||
{
|
||||
tf_trace_begin(trace, "notify_message_added");
|
||||
JSContext* context = tf_ssb_get_context(ssb);
|
||||
JSValue formatted =
|
||||
tf_ssb_format_message(context, store->previous, store->author, store->sequence, store->timestamp, "sha256", store->content, store->signature, store->flags);
|
||||
JSValue message = JS_NewObject(context);
|
||||
JS_SetPropertyStr(context, message, "key", JS_NewString(context, store->id));
|
||||
JS_SetPropertyStr(context, message, "value", formatted);
|
||||
char timestamp_string[256];
|
||||
snprintf(timestamp_string, sizeof(timestamp_string), "%f", store->timestamp);
|
||||
JS_SetPropertyStr(context, message, "timestamp", JS_NewString(context, timestamp_string));
|
||||
tf_ssb_notify_message_added(ssb, store->author, store->sequence, store->id, message);
|
||||
JS_FreeValue(context, message);
|
||||
tf_trace_end(trace);
|
||||
last_stored = store;
|
||||
}
|
||||
if (store->out_blob_wants)
|
||||
{
|
||||
@ -625,11 +615,34 @@ static void _tf_ssb_db_store_message_after_work(tf_ssb_t* ssb, int status, void*
|
||||
tf_trace_end(trace);
|
||||
}
|
||||
|
||||
JSContext* context = tf_ssb_get_context(ssb);
|
||||
if (store->callback)
|
||||
{
|
||||
store->callback(store->id, store->out_stored, store->user_data);
|
||||
}
|
||||
store = store->next;
|
||||
}
|
||||
|
||||
if (last_stored)
|
||||
{
|
||||
tf_trace_begin(trace, "notify_message_added");
|
||||
JSContext* context = tf_ssb_get_context(ssb);
|
||||
JSValue formatted = tf_ssb_format_message(context, last_stored->previous, last_stored->author, last_stored->sequence, last_stored->timestamp, "sha256",
|
||||
last_stored->content, last_stored->signature, last_stored->flags);
|
||||
JSValue message = JS_NewObject(context);
|
||||
JS_SetPropertyStr(context, message, "key", JS_NewString(context, last_stored->id));
|
||||
JS_SetPropertyStr(context, message, "value", formatted);
|
||||
char timestamp_string[256];
|
||||
snprintf(timestamp_string, sizeof(timestamp_string), "%f", last_stored->timestamp);
|
||||
JS_SetPropertyStr(context, message, "timestamp", JS_NewString(context, timestamp_string));
|
||||
tf_ssb_notify_message_added(ssb, last_stored->author, last_stored->sequence, last_stored->id, message);
|
||||
JS_FreeValue(context, message);
|
||||
tf_trace_end(trace);
|
||||
}
|
||||
|
||||
JSContext* context = tf_ssb_get_context(ssb);
|
||||
store = user_data;
|
||||
while (store)
|
||||
{
|
||||
JS_FreeCString(context, store->content);
|
||||
message_store_t* last = store;
|
||||
store = store->next;
|
||||
|
@ -720,24 +720,21 @@ void tf_ssb_test_bench(const tf_test_options_t* options)
|
||||
tf_ssb_server_open(ssb0, 12347);
|
||||
tf_ssb_connect(ssb1, "127.0.0.1", 12347, id0bin, 0, NULL, NULL);
|
||||
|
||||
tf_printf("Waiting for messages.\n");
|
||||
clock_gettime(CLOCK_REALTIME, &start_time);
|
||||
int count = 0;
|
||||
tf_ssb_add_message_added_callback(ssb1, _message_added, NULL, &count);
|
||||
while (count < k_messages)
|
||||
{
|
||||
uv_run(&loop, UV_RUN_ONCE);
|
||||
}
|
||||
tf_ssb_remove_message_added_callback(ssb1, _message_added, &count);
|
||||
clock_gettime(CLOCK_REALTIME, &end_time);
|
||||
|
||||
tf_ssb_set_main_thread(ssb0, false);
|
||||
tf_ssb_set_main_thread(ssb1, false);
|
||||
count = _ssb_test_count_messages(ssb1);
|
||||
if (count < k_messages)
|
||||
|
||||
tf_printf("Waiting for messages.\n");
|
||||
clock_gettime(CLOCK_REALTIME, &start_time);
|
||||
while (_ssb_test_count_messages(ssb1) < k_messages)
|
||||
{
|
||||
abort();
|
||||
tf_ssb_set_main_thread(ssb0, true);
|
||||
tf_ssb_set_main_thread(ssb1, true);
|
||||
uv_run(&loop, UV_RUN_ONCE);
|
||||
tf_ssb_set_main_thread(ssb0, false);
|
||||
tf_ssb_set_main_thread(ssb1, false);
|
||||
}
|
||||
clock_gettime(CLOCK_REALTIME, &end_time);
|
||||
|
||||
tf_printf("Done.\n");
|
||||
tf_printf("replicate = %f seconds\n", (end_time.tv_sec - start_time.tv_sec) + (end_time.tv_nsec - start_time.tv_nsec) / 1e9);
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user