From 44d9f69434be360bc684e13380b7df1d904d1f6c Mon Sep 17 00:00:00 2001 From: Cory McWilliams Date: Fri, 3 Jan 2025 13:59:25 -0500 Subject: [PATCH] ssb: Refactoring EBT implementation. I think this works not worse than before and will let me schedule message replication better in a future change. #93 --- src/ssb.c | 11 ++ src/ssb.ebt.c | 302 ++++++++++++++++++++++++++++++++++++++++++++++++++ src/ssb.ebt.h | 85 ++++++++++++++ src/ssb.h | 9 ++ src/ssb.rpc.c | 235 +++++++-------------------------------- src/util.js.h | 13 +++ 6 files changed, 459 insertions(+), 196 deletions(-) create mode 100644 src/ssb.ebt.c create mode 100644 src/ssb.ebt.h diff --git a/src/ssb.c b/src/ssb.c index 2d892752..9078c139 100644 --- a/src/ssb.c +++ b/src/ssb.c @@ -4,6 +4,7 @@ #include "mem.h" #include "ssb.connections.h" #include "ssb.db.h" +#include "ssb.ebt.h" #include "ssb.rpc.h" #include "trace.h" #include "util.js.h" @@ -280,6 +281,7 @@ typedef struct _tf_ssb_connection_t tf_ssb_blob_wants_t blob_wants; bool sent_clock; int32_t ebt_request_number; + tf_ssb_ebt_t* ebt; JSValue object; @@ -1986,6 +1988,9 @@ static void _tf_ssb_connection_destroy(tf_ssb_connection_t* connection, const ch connection->message_requests = NULL; connection->message_requests_count = 0; + tf_ssb_ebt_destroy(connection->ebt); + connection->ebt = NULL; + for (tf_ssb_connection_t** it = &connection->ssb->connections; *it; it = &(*it)->next) { if (*it == connection) @@ -2744,6 +2749,7 @@ static tf_ssb_connection_t* _tf_ssb_connection_create_internal(tf_ssb_t* ssb, co connection->ssb = ssb; connection->send_request_number = 1; randombytes_buf(&connection->prng, sizeof(connection->prng)); + connection->ebt = tf_ssb_ebt_create(connection); connection->async.data = connection; uv_async_init(ssb->loop, &connection->async, _tf_ssb_connection_process_message_async); @@ -4341,3 +4347,8 @@ int tf_ssb_connection_get_flags(tf_ssb_connection_t* connection) { return connection->flags; } + +tf_ssb_ebt_t* tf_ssb_connection_get_ebt(tf_ssb_connection_t* connection) +{ + return connection ? connection->ebt : NULL; +} diff --git a/src/ssb.ebt.c b/src/ssb.ebt.c new file mode 100644 index 00000000..240a622c --- /dev/null +++ b/src/ssb.ebt.c @@ -0,0 +1,302 @@ +#include "ssb.ebt.h" + +#include "mem.h" +#include "ssb.db.h" +#include "ssb.h" +#include "util.js.h" + +#include "uv.h" + +#include + +typedef struct _ebt_entry_t +{ + char id[k_id_base64_len]; + int64_t out; + int64_t in; + bool out_replicate; + bool out_receive; + bool in_replicate; + bool in_receive; +} ebt_entry_t; + +typedef struct _tf_ssb_ebt_t +{ + tf_ssb_connection_t* connection; + uv_mutex_t mutex; + + ebt_entry_t* entries; + int entries_count; +} tf_ssb_ebt_t; + +tf_ssb_ebt_t* tf_ssb_ebt_create(tf_ssb_connection_t* connection) +{ + tf_ssb_ebt_t* ebt = tf_malloc(sizeof(tf_ssb_ebt_t)); + *ebt = (tf_ssb_ebt_t) { + .connection = connection, + }; + uv_mutex_init(&ebt->mutex); + return ebt; +} + +void tf_ssb_ebt_destroy(tf_ssb_ebt_t* ebt) +{ + uv_mutex_destroy(&ebt->mutex); + tf_free(ebt->entries); + tf_free(ebt); +} + +static int _ebt_entry_compare(const void* a, const void* b) +{ + const char* id = a; + const ebt_entry_t* entry = b; + return strcmp(id, entry->id); +} + +static ebt_entry_t* _ebt_get_entry(tf_ssb_ebt_t* ebt, const char* id) +{ + int index = tf_util_insert_index(id, ebt->entries, ebt->entries_count, sizeof(ebt_entry_t), _ebt_entry_compare); + if (index < ebt->entries_count && strcmp(id, ebt->entries[index].id) == 0) + { + return &ebt->entries[index]; + } + else + { + ebt->entries = tf_resize_vec(ebt->entries, (ebt->entries_count + 1) * sizeof(ebt_entry_t)); + if (index < ebt->entries_count) + { + memmove(ebt->entries + index + 1, ebt->entries + index, (ebt->entries_count - index) * sizeof(ebt_entry_t)); + } + ebt->entries[index] = (ebt_entry_t) { + .in = -1, + .out = -1, + }; + snprintf(ebt->entries[index].id, sizeof(ebt->entries[index].id), "%s", id); + ebt->entries_count++; + return &ebt->entries[index]; + } +} + +void tf_ssb_ebt_receive_clock(tf_ssb_ebt_t* ebt, JSContext* context, JSValue clock) +{ + JSPropertyEnum* ptab = NULL; + uint32_t plen = 0; + if (JS_GetOwnPropertyNames(context, &ptab, &plen, clock, JS_GPN_STRING_MASK) == 0) + { + uv_mutex_lock(&ebt->mutex); + for (uint32_t i = 0; i < plen; ++i) + { + JSValue in_clock = JS_UNDEFINED; + JSPropertyDescriptor desc = { 0 }; + if (JS_GetOwnProperty(context, &desc, clock, ptab[i].atom) == 1) + { + in_clock = desc.value; + JS_FreeValue(context, desc.setter); + JS_FreeValue(context, desc.getter); + } + if (!JS_IsUndefined(in_clock)) + { + JSValue key = JS_AtomToString(context, ptab[i].atom); + const char* author = JS_ToCString(context, key); + int64_t sequence = -1; + JS_ToInt64(context, &sequence, in_clock); + + ebt_entry_t* entry = _ebt_get_entry(ebt, author); + if (sequence < 0) + { + entry->in = -1; + entry->in_replicate = false; + entry->in_receive = false; + } + else + { + entry->in = sequence >> 1; + entry->in_replicate = true; + entry->in_receive = (sequence & 1) == 0; + } + JS_FreeCString(context, author); + JS_FreeValue(context, key); + } + JS_FreeValue(context, in_clock); + } + uv_mutex_unlock(&ebt->mutex); + for (uint32_t i = 0; i < plen; ++i) + { + JS_FreeAtom(context, ptab[i].atom); + } + js_free(context, ptab); + } +} + +typedef struct _ebt_get_clock_t +{ + tf_ssb_ebt_t* ebt; + int32_t request_number; + tf_ssb_ebt_clock_callback_t* callback; + tf_ssb_ebt_clock_t* clock; + void* user_data; +} ebt_get_clock_t; + +static int _ebt_compare_entry(const void* a, const void* b) +{ + const char* id = a; + const tf_ssb_ebt_clock_entry_t* entry = b; + return strcmp(id, entry->id); +} + +static void _ebt_add_to_clock(ebt_get_clock_t* work, const char* id, int64_t value, bool replicate, bool receive) +{ + int count = work->clock ? work->clock->count : 0; + ebt_entry_t* entry = _ebt_get_entry(work->ebt, id); + if (entry->out_replicate != replicate || entry->out_receive != receive || ((replicate || receive) && entry->out != value)) + { + entry->out = value; + entry->out_replicate = replicate; + entry->out_receive = receive; + + int index = tf_util_insert_index(id, count ? work->clock->entries : NULL, count, sizeof(tf_ssb_ebt_clock_entry_t), _ebt_compare_entry); + int64_t out_value = replicate ? ((value << 1) | (receive ? 0 : 1)) : -1; + if (index < count && strcmp(id, work->clock->entries[index].id) == 0) + { + work->clock->entries[index].value = out_value; + } + else + { + work->clock = tf_resize_vec(work->clock, sizeof(tf_ssb_ebt_clock_t) + (count + 1) * sizeof(tf_ssb_ebt_clock_entry_t)); + if (index < count) + { + memmove(work->clock->entries + index + 1, work->clock->entries + index, (count - index) * sizeof(tf_ssb_ebt_clock_entry_t)); + } + work->clock->entries[index] = (tf_ssb_ebt_clock_entry_t) { .value = out_value }; + snprintf(work->clock->entries[index].id, sizeof(work->clock->entries[index].id), "%s", id); + work->clock->count = count + 1; + } + } +} + +static void _tf_ssb_ebt_get_send_clock_work(tf_ssb_connection_t* connection, void* user_data) +{ + ebt_get_clock_t* work = user_data; + tf_ssb_t* ssb = tf_ssb_connection_get_ssb(work->ebt->connection); + + int64_t depth = 2; + sqlite3* db = tf_ssb_acquire_db_reader(ssb); + tf_ssb_db_get_global_setting_int64(db, "replication_hops", &depth); + tf_ssb_release_db_reader(ssb, db); + + /* Ask for every identity we know is being followed from local accounts. */ + const char** visible = tf_ssb_db_get_all_visible_identities(ssb, depth); + if (visible) + { + int64_t* sequences = NULL; + for (int i = 0; visible[i]; i++) + { + int64_t sequence = 0; + tf_ssb_db_get_latest_message_by_author(ssb, visible[i], &sequence, NULL, 0); + sequences = tf_resize_vec(sequences, (i + 1) * sizeof(int64_t)); + sequences[i] = sequence; + } + + uv_mutex_lock(&work->ebt->mutex); + for (int i = 0; visible[i]; i++) + { + _ebt_add_to_clock(work, visible[i], sequences[i], true, true); + } + uv_mutex_unlock(&work->ebt->mutex); + + tf_free(visible); + tf_free(sequences); + } + + /* Ask about the incoming connection, too. */ + char id[k_id_base64_len] = ""; + if (tf_ssb_connection_get_id(connection, id, sizeof(id))) + { + int64_t sequence = 0; + tf_ssb_db_get_latest_message_by_author(ssb, id, &sequence, NULL, 0); + uv_mutex_lock(&work->ebt->mutex); + _ebt_add_to_clock(work, id, sequence, true, true); + uv_mutex_unlock(&work->ebt->mutex); + } + + /* Also respond with what we know about all requested identities. */ + tf_ssb_ebt_clock_entry_t* requested = NULL; + int requested_count = 0; + uv_mutex_lock(&work->ebt->mutex); + for (int i = 0; i < work->ebt->entries_count; i++) + { + ebt_entry_t* entry = &work->ebt->entries[i]; + if (entry->in_replicate && !entry->out_replicate) + { + requested = tf_resize_vec(requested, (requested_count + 1) * sizeof(tf_ssb_ebt_clock_entry_t)); + requested[requested_count] = (tf_ssb_ebt_clock_entry_t) { 0 }; + snprintf(requested[requested_count].id, sizeof(requested[requested_count].id), "%s", entry->id); + requested_count++; + } + } + uv_mutex_unlock(&work->ebt->mutex); + + if (requested_count) + { + for (int i = 0; i < requested_count; i++) + { + tf_ssb_db_get_latest_message_by_author(ssb, requested[i].id, &requested[i].value, NULL, 0); + } + + uv_mutex_lock(&work->ebt->mutex); + for (int i = 0; i < requested_count; i++) + { + _ebt_add_to_clock(work, requested[i].id, requested[i].value, true, true); + } + uv_mutex_unlock(&work->ebt->mutex); + tf_free(requested); + } +} + +static void _tf_ssb_ebt_get_send_clock_after_work(tf_ssb_connection_t* connection, int status, void* user_data) +{ + ebt_get_clock_t* work = user_data; + work->callback(work->clock, work->request_number, work->user_data); + tf_free(work->clock); + tf_free(work); +} + +void tf_ssb_ebt_get_send_clock(tf_ssb_ebt_t* ebt, int32_t request_number, tf_ssb_ebt_clock_callback_t* callback, void* user_data) +{ + ebt_get_clock_t* work = tf_malloc(sizeof(ebt_get_clock_t)); + *work = (ebt_get_clock_t) { + .ebt = ebt, + .request_number = request_number, + .callback = callback, + .user_data = user_data, + }; + tf_ssb_connection_run_work(ebt->connection, _tf_ssb_ebt_get_send_clock_work, _tf_ssb_ebt_get_send_clock_after_work, work); +} + +tf_ssb_ebt_clock_t* tf_ssb_ebt_get_messages_to_send(tf_ssb_ebt_t* ebt) +{ + int count = 0; + tf_ssb_ebt_clock_t* clock = NULL; + uv_mutex_lock(&ebt->mutex); + for (int i = 0; i < ebt->entries_count; i++) + { + ebt_entry_t* entry = &ebt->entries[i]; + if (entry->in_replicate && entry->in_receive && entry->out > entry->in) + { + clock = tf_resize_vec(clock, sizeof(tf_ssb_ebt_clock_t) + (count + 1) * sizeof(tf_ssb_ebt_clock_entry_t)); + clock->entries[count] = (tf_ssb_ebt_clock_entry_t) { .value = entry->in }; + snprintf(clock->entries[count].id, sizeof(clock->entries[count].id), "%s", entry->id); + clock->count = ++count; + } + } + uv_mutex_unlock(&ebt->mutex); + return clock; +} + +void tf_ssb_ebt_set_messages_sent(tf_ssb_ebt_t* ebt, const char* id, int64_t sequence) +{ + uv_mutex_lock(&ebt->mutex); + ebt_entry_t* entry = _ebt_get_entry(ebt, id); + entry->in = tf_max(entry->in, sequence); + uv_mutex_unlock(&ebt->mutex); +} diff --git a/src/ssb.ebt.h b/src/ssb.ebt.h new file mode 100644 index 00000000..9558bd64 --- /dev/null +++ b/src/ssb.ebt.h @@ -0,0 +1,85 @@ +#pragma once + +#include "ssb.h" + +#include "quickjs.h" + +typedef struct _tf_ssb_connection_t tf_ssb_connection_t; + +/** +** SSB EBT state. +*/ +typedef struct _tf_ssb_ebt_t tf_ssb_ebt_t; + +/** +** An EBT clock entry (identity + sequence pair). +*/ +typedef struct _tf_ssb_ebt_clock_entry_t +{ + /** The identity. */ + char id[k_id_base64_len]; + /** The sequence number. */ + int64_t value; +} tf_ssb_ebt_clock_entry_t; + +/** +** A set of IDs and sequence values. +*/ +typedef struct _tf_ssb_ebt_clock_t +{ + /** Number of entries. */ + int count; + /** Clock entries. */ + tf_ssb_ebt_clock_entry_t entries[]; +} tf_ssb_ebt_clock_t; + +/** +** A callback with EBT clock state. +*/ +typedef void(tf_ssb_ebt_clock_callback_t)(const tf_ssb_ebt_clock_t* clock, int32_t request_number, void* user_data); + +/** +** Create an EBT instance. +** @param connection The SSB connection to which this EBT state applies. +** @return The EBT instance. +*/ +tf_ssb_ebt_t* tf_ssb_ebt_create(tf_ssb_connection_t* connection); + +/** +** Update the EBT state with a received clock. +** @param ebt The EBT instance. +** @param context The JS context. +** @param clock The received clock. +*/ +void tf_ssb_ebt_receive_clock(tf_ssb_ebt_t* ebt, JSContext* context, JSValue clock); + +/** +** Get the EBT clock state to send. +** @param ebt The EBT instance. +** @param request_number The request number for which the clock will be sent. +** @param callback Called with the clock when determined. +** @param user_data User data passed to the callback. +*/ +void tf_ssb_ebt_get_send_clock(tf_ssb_ebt_t* ebt, int32_t request_number, tf_ssb_ebt_clock_callback_t* callback, void* user_data); + +/** +** Get the set of messages requested to be sent. +** @param ebt The EBT instance. +** @return A clock of identities and sequence numbers indicating which messages +** are due to be sent. The caller must free with tf_free(). +*/ +tf_ssb_ebt_clock_t* tf_ssb_ebt_get_messages_to_send(tf_ssb_ebt_t* ebt); + +/** +** Update the clock state indicating the messages that have been sent for an account. +** @param ebt The EBT instance. +** @param id The identity to update. +** @param sequence The maximum sequence number sent. +*/ +void tf_ssb_ebt_set_messages_sent(tf_ssb_ebt_t* ebt, const char* id, int64_t sequence); + +/** +** Destroy an EBT instance. +** @param ebt The EBT instance. +*/ +void tf_ssb_ebt_destroy(tf_ssb_ebt_t* ebt); diff --git a/src/ssb.h b/src/ssb.h index 1dd5f913..447238b2 100644 --- a/src/ssb.h +++ b/src/ssb.h @@ -77,6 +77,8 @@ typedef enum _tf_ssb_connect_flags_t typedef struct _tf_ssb_t tf_ssb_t; /** An SSB connection. */ typedef struct _tf_ssb_connection_t tf_ssb_connection_t; +/** A connection's EBT state. */ +typedef struct _tf_ssb_ebt_t tf_ssb_ebt_t; /** A trace instance. */ typedef struct _tf_trace_t tf_trace_t; /** An SQLite database handle. */ @@ -1132,4 +1134,11 @@ void tf_ssb_sync_start(tf_ssb_t* ssb); */ int tf_ssb_connection_get_flags(tf_ssb_connection_t* connection); +/** +** Get a connection's EBT state. +** @param connection The connection. +** @return the EBT state for the connection. +*/ +tf_ssb_ebt_t* tf_ssb_connection_get_ebt(tf_ssb_connection_t* connection); + /** @} */ diff --git a/src/ssb.rpc.c b/src/ssb.rpc.c index 0857b19b..7004681c 100644 --- a/src/ssb.rpc.c +++ b/src/ssb.rpc.c @@ -3,6 +3,7 @@ #include "log.h" #include "mem.h" #include "ssb.db.h" +#include "ssb.ebt.h" #include "ssb.h" #include "util.js.h" @@ -855,6 +856,7 @@ static void _tf_ssb_connection_send_history_stream_after_work(tf_ssb_connection_ break; } } + tf_ssb_ebt_set_messages_sent(tf_ssb_connection_get_ebt(connection), request->author, request->out_max_sequence_seen); if (!request->out_finished) { _tf_ssb_connection_send_history_stream( @@ -943,197 +945,24 @@ static void _tf_ssb_rpc_createHistoryStream( JS_FreeValue(context, arg_array); } -typedef struct _ebt_clock_row_t +static void _tf_ssb_rpc_ebt_replicate_send_messages(tf_ssb_connection_t* connection) { - char id[k_id_base64_len]; - int64_t value; -} ebt_clock_row_t; - -typedef struct _ebt_replicate_send_clock_t -{ - int64_t request_number; - ebt_clock_row_t* clock; - int clock_count; - - char* out_clock; -} ebt_replicate_send_clock_t; - -static void _tf_ssb_rpc_ebt_replicate_send_clock_work(tf_ssb_connection_t* connection, void* user_data) -{ - ebt_replicate_send_clock_t* work = user_data; - - JSMallocFunctions funcs = { 0 }; - tf_get_js_malloc_functions(&funcs); - JSRuntime* runtime = JS_NewRuntime2(&funcs, NULL); - JSContext* context = JS_NewContext(runtime); - - tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection); - JSValue full_clock = JS_NewObject(context); - - int64_t depth = 2; - sqlite3* db = tf_ssb_acquire_db_reader(ssb); - tf_ssb_db_get_global_setting_int64(db, "replication_hops", &depth); - tf_ssb_release_db_reader(ssb, db); - - /* Ask for every identity we know is being followed from local accounts. */ - const char** visible = tf_ssb_db_get_all_visible_identities(ssb, depth); - for (int i = 0; visible[i]; i++) + tf_ssb_ebt_t* ebt = tf_ssb_connection_get_ebt(connection); + tf_ssb_ebt_clock_t* clock = tf_ssb_ebt_get_messages_to_send(ebt); + if (clock) { - int64_t sequence = 0; - tf_ssb_db_get_latest_message_by_author(ssb, visible[i], &sequence, NULL, 0); - JS_SetPropertyStr(context, full_clock, visible[i], JS_NewInt64(context, sequence == -1 ? -1 : (sequence << 1))); - } - tf_free(visible); - - /* Ask about the incoming connection, too. */ - char id[k_id_base64_len] = ""; - if (tf_ssb_connection_get_id(connection, id, sizeof(id))) - { - JSValue in_clock = JS_GetPropertyStr(context, full_clock, id); - if (JS_IsUndefined(in_clock)) + for (int i = 0; i < clock->count; i++) { - int64_t sequence = 0; - tf_ssb_db_get_latest_message_by_author(ssb, id, &sequence, NULL, 0); - JS_SetPropertyStr(context, full_clock, id, JS_NewInt64(context, sequence == -1 ? -1 : (sequence << 1))); - } - JS_FreeValue(context, in_clock); - } - - /* Also respond with what we know about all requested identities. */ - for (int i = 0; i < work->clock_count; i++) - { - JSValue in_clock = JS_GetPropertyStr(context, full_clock, work->clock[i].id); - if (JS_IsUndefined(in_clock)) - { - int64_t sequence = -1; - tf_ssb_db_get_latest_message_by_author(ssb, work->clock[i].id, &sequence, NULL, 0); - JS_SetPropertyStr(context, full_clock, work->clock[i].id, JS_NewInt64(context, sequence == -1 ? -1 : (sequence << 1))); - } - JS_FreeValue(context, in_clock); - } - - JSValue json = JS_JSONStringify(context, full_clock, JS_NULL, JS_NULL); - size_t size = 0; - const char* string = JS_ToCStringLen(context, &size, json); - char* copy = tf_malloc(size + 1); - memcpy(copy, string, size + 1); - work->out_clock = copy; - JS_FreeCString(context, string); - JS_FreeValue(context, json); - JS_FreeValue(context, full_clock); - - JS_FreeContext(context); - JS_FreeRuntime(runtime); -} - -static void _tf_ssb_rpc_ebt_replicate_send_clock_after_work(tf_ssb_connection_t* connection, int result, void* user_data) -{ - ebt_replicate_send_clock_t* work = user_data; - tf_free(work->clock); - if (work->out_clock) - { - tf_ssb_connection_rpc_send( - connection, k_ssb_rpc_flag_stream | k_ssb_rpc_flag_json, -work->request_number, NULL, (const uint8_t*)work->out_clock, strlen(work->out_clock), NULL, NULL, NULL); - tf_free(work->out_clock); - } - tf_free(work); -} - -static void _tf_ssb_rpc_ebt_replicate_send_clock(tf_ssb_connection_t* connection, int32_t request_number, JSValue message) -{ - ebt_replicate_send_clock_t* work = tf_malloc(sizeof(ebt_replicate_send_clock_t)); - *work = (ebt_replicate_send_clock_t) { - .request_number = request_number, - }; - JSContext* context = tf_ssb_connection_get_context(connection); - - if (!JS_IsUndefined(message)) - { - JSPropertyEnum* ptab = NULL; - uint32_t plen = 0; - if (JS_GetOwnPropertyNames(context, &ptab, &plen, message, JS_GPN_STRING_MASK) == 0) - { - work->clock_count = (int)plen; - work->clock = tf_malloc(sizeof(ebt_clock_row_t) * plen); - memset(work->clock, 0, sizeof(ebt_clock_row_t) * plen); - for (uint32_t i = 0; i < plen; ++i) + tf_ssb_ebt_clock_entry_t* entry = &clock->entries[i]; + int32_t request_number = tf_ssb_connection_get_ebt_request_number(connection); + bool live = (tf_ssb_connection_get_flags(connection) & k_tf_ssb_connect_flag_one_shot) == 0; + _tf_ssb_connection_send_history_stream(connection, request_number, entry->id, entry->value, false, live, false); + if (live) { - const char* id = JS_AtomToCString(context, ptab[i].atom); - snprintf(work->clock[i].id, sizeof(work->clock[i].id), "%s", id); - JS_FreeCString(context, id); - - JSPropertyDescriptor desc = { 0 }; - JSValue key_value = JS_UNDEFINED; - if (JS_GetOwnProperty(context, &desc, message, ptab[i].atom) == 1) - { - key_value = desc.value; - JS_FreeValue(context, desc.setter); - JS_FreeValue(context, desc.getter); - } - JS_ToInt64(context, &work->clock[i].value, key_value); - JS_FreeValue(context, key_value); - JS_FreeAtom(context, ptab[i].atom); + tf_ssb_connection_add_new_message_request(connection, entry->id, request_number, false); } - js_free(context, ptab); } - } - - tf_ssb_connection_run_work(connection, _tf_ssb_rpc_ebt_replicate_send_clock_work, _tf_ssb_rpc_ebt_replicate_send_clock_after_work, work); -} - -static void _tf_ssb_rpc_ebt_replicate_send_messages(tf_ssb_connection_t* connection, JSValue message) -{ - if (JS_IsUndefined(message)) - { - return; - } - - tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection); - JSContext* context = tf_ssb_get_context(ssb); - JSPropertyEnum* ptab = NULL; - uint32_t plen = 0; - if (JS_GetOwnPropertyNames(context, &ptab, &plen, message, JS_GPN_STRING_MASK) == 0) - { - for (uint32_t i = 0; i < plen; ++i) - { - JSValue in_clock = JS_UNDEFINED; - JSPropertyDescriptor desc = { 0 }; - if (JS_GetOwnProperty(context, &desc, message, ptab[i].atom) == 1) - { - in_clock = desc.value; - JS_FreeValue(context, desc.setter); - JS_FreeValue(context, desc.getter); - } - if (!JS_IsUndefined(in_clock)) - { - JSValue key = JS_AtomToString(context, ptab[i].atom); - int64_t sequence = -1; - JS_ToInt64(context, &sequence, in_clock); - const char* author = JS_ToCString(context, key); - if (sequence >= 0 && (sequence & 1) == 0) - { - int32_t request_number = tf_ssb_connection_get_ebt_request_number(connection); - bool live = (tf_ssb_connection_get_flags(connection) & k_tf_ssb_connect_flag_one_shot) == 0; - _tf_ssb_connection_send_history_stream(connection, request_number, author, sequence >> 1, false, live, false); - if (live) - { - tf_ssb_connection_add_new_message_request(connection, author, request_number, false); - } - } - else - { - tf_ssb_connection_remove_new_message_request(connection, author); - } - JS_FreeCString(context, author); - JS_FreeValue(context, key); - } - JS_FreeValue(context, in_clock); - } - for (uint32_t i = 0; i < plen; ++i) - { - JS_FreeAtom(context, ptab[i].atom); - } - js_free(context, ptab); + tf_free(clock); } } @@ -1143,6 +972,25 @@ static void _tf_ssb_rpc_ebt_replicate_store_callback(const char* id, bool verifi tf_ssb_connection_adjust_read_backpressure(connection, -1); } +static void _tf_ssb_rpc_ebt_send_clock_callback(const tf_ssb_ebt_clock_t* clock, int32_t request_number, void* user_data) +{ + tf_ssb_connection_t* connection = user_data; + + if (clock && clock->count) + { + JSContext* context = tf_ssb_connection_get_context(connection); + JSValue message = JS_NewObject(context); + for (int i = 0; i < clock->count; i++) + { + JS_SetPropertyStr(context, message, clock->entries[i].id, JS_NewInt64(context, clock->entries[i].value)); + } + tf_ssb_connection_rpc_send_json(connection, k_ssb_rpc_flag_stream | k_ssb_rpc_flag_json, -request_number, NULL, message, NULL, NULL, NULL); + JS_FreeValue(context, message); + } + + _tf_ssb_rpc_ebt_replicate_send_messages(connection); +} + typedef struct _resend_clock_t { tf_ssb_connection_t* connection; @@ -1154,8 +1002,8 @@ static void _tf_ssb_rpc_ebt_replicate_resend_clock(tf_ssb_connection_t* connecti resend_clock_t* resend = user_data; if (!skip) { - _tf_ssb_rpc_ebt_replicate_send_clock(resend->connection, resend->request_number, JS_UNDEFINED); - tf_ssb_connection_set_sent_clock(resend->connection, true); + tf_ssb_ebt_t* ebt = tf_ssb_connection_get_ebt(connection); + tf_ssb_ebt_get_send_clock(ebt, resend->request_number, _tf_ssb_rpc_ebt_send_clock_callback, connection); } tf_free(user_data); } @@ -1186,9 +1034,8 @@ static void _tf_ssb_rpc_ebt_replicate(tf_ssb_connection_t* connection, uint8_t f tf_ssb_connection_adjust_read_backpressure(connection, 1); tf_ssb_verify_strip_and_store_message(ssb, args, _tf_ssb_rpc_ebt_replicate_store_callback, connection); - if (tf_ssb_connection_get_sent_clock(connection) && !tf_ssb_is_shutting_down(ssb) && !tf_ssb_connection_is_closing(connection)) + if (!tf_ssb_is_shutting_down(ssb) && !tf_ssb_connection_is_closing(connection)) { - tf_ssb_connection_set_sent_clock(connection, false); resend_clock_t* resend = tf_malloc(sizeof(resend_clock_t)); *resend = (resend_clock_t) { .connection = connection, @@ -1199,13 +1046,9 @@ static void _tf_ssb_rpc_ebt_replicate(tf_ssb_connection_t* connection, uint8_t f } else { - /* EBT clock. */ - if (!tf_ssb_connection_get_sent_clock(connection)) - { - _tf_ssb_rpc_ebt_replicate_send_clock(connection, request_number, in_clock); - tf_ssb_connection_set_sent_clock(connection, true); - } - _tf_ssb_rpc_ebt_replicate_send_messages(connection, in_clock); + tf_ssb_ebt_t* ebt = tf_ssb_connection_get_ebt(connection); + tf_ssb_ebt_receive_clock(ebt, context, in_clock); + tf_ssb_ebt_get_send_clock(ebt, request_number, _tf_ssb_rpc_ebt_send_clock_callback, connection); } JS_FreeValue(context, name); JS_FreeValue(context, author); diff --git a/src/util.js.h b/src/util.js.h index 876640dd..228e2957 100644 --- a/src/util.js.h +++ b/src/util.js.h @@ -150,6 +150,19 @@ const char* tf_util_function_to_string(void* function); _a > _b ? _b : _a; \ }) +/** +** Get the maximum of two values. +** @param a The first value. +** @param b The second value. +** @return The maximum of a and b. +*/ +#define tf_max(a, b) \ + ({ \ + __typeof__(a) _a = (a); \ + __typeof__(b) _b = (b); \ + _a > _b ? _a : _b; \ + }) + /** ** Get the number of elements in an array. ** @param a The array.