From bb7d2d7ae0f0605f64cf659f62c8462871cc6afd Mon Sep 17 00:00:00 2001 From: Cory McWilliams Date: Sun, 16 Mar 2025 22:04:06 -0400 Subject: [PATCH] ssb: Update ebt received value on the fly. --- apps/ssb/tf-tab-connections.js | 4 +++- src/ssb.c | 7 +++---- src/ssb.db.c | 2 +- src/ssb.ebt.c | 11 +++++++++++ src/ssb.ebt.h | 8 ++++++++ src/ssb.h | 8 ++++++-- src/ssb.js.c | 2 +- src/ssb.rpc.c | 3 ++- src/ssb.tests.c | 2 +- 9 files changed, 36 insertions(+), 11 deletions(-) diff --git a/apps/ssb/tf-tab-connections.js b/apps/ssb/tf-tab-connections.js index 51058a3d..82460756 100644 --- a/apps/ssb/tf-tab-connections.js +++ b/apps/ssb/tf-tab-connections.js @@ -109,7 +109,9 @@ class TfTabConnectionsElement extends LitElement {
${name} ${value} / ${max} (${Math.round((100.0 * value) / max)}%)
diff --git a/src/ssb.c b/src/ssb.c index cdd5f63a..15f3ce43 100644 --- a/src/ssb.c +++ b/src/ssb.c @@ -3882,8 +3882,7 @@ JSValue tf_ssb_connection_get_object(tf_ssb_connection_t* connection) return connection ? connection->object : JS_UNDEFINED; } -void tf_ssb_add_message_added_callback( - tf_ssb_t* ssb, void (*callback)(tf_ssb_t* ssb, const char* id, void* user_data), void (*cleanup)(tf_ssb_t* ssb, void* user_data), void* user_data) +void tf_ssb_add_message_added_callback(tf_ssb_t* ssb, tf_ssb_message_added_callback_t* callback, void (*cleanup)(tf_ssb_t* ssb, void* user_data), void* user_data) { tf_ssb_message_added_callback_node_t* node = tf_malloc(sizeof(tf_ssb_message_added_callback_node_t)); *node = (tf_ssb_message_added_callback_node_t) { @@ -3924,7 +3923,7 @@ void tf_ssb_notify_blob_stored(tf_ssb_t* ssb, const char* id) ssb->blobs_stored++; } -void tf_ssb_notify_message_added(tf_ssb_t* ssb, const char* id, JSValue message_keys) +void tf_ssb_notify_message_added(tf_ssb_t* ssb, const char* author, int64_t sequence, const char* id, JSValue message_keys) { tf_ssb_message_added_callback_node_t* next = NULL; ssb->messages_stored++; @@ -3933,7 +3932,7 @@ void tf_ssb_notify_message_added(tf_ssb_t* ssb, const char* id, JSValue message_ next = node->next; tf_trace_begin(ssb->trace, "message added callback"); PRE_CALLBACK(ssb, node->callback); - node->callback(ssb, id, node->user_data); + node->callback(ssb, author, sequence, id, node->user_data); POST_CALLBACK(ssb, node->callback); tf_trace_end(ssb->trace); } diff --git a/src/ssb.db.c b/src/ssb.db.c index c82ef44b..6a9b4f31 100644 --- a/src/ssb.db.c +++ b/src/ssb.db.c @@ -581,7 +581,7 @@ static void _tf_ssb_db_store_message_after_work(tf_ssb_t* ssb, int status, void* char timestamp_string[256]; snprintf(timestamp_string, sizeof(timestamp_string), "%f", store->timestamp); JS_SetPropertyStr(context, message, "timestamp", JS_NewString(context, timestamp_string)); - tf_ssb_notify_message_added(ssb, store->id, message); + tf_ssb_notify_message_added(ssb, store->author, store->sequence, store->id, message); JS_FreeValue(context, message); tf_trace_end(trace); } diff --git a/src/ssb.ebt.c b/src/ssb.ebt.c index 40fed1a2..2ad56960 100644 --- a/src/ssb.ebt.c +++ b/src/ssb.ebt.c @@ -365,6 +365,17 @@ void tf_ssb_ebt_set_messages_sent(tf_ssb_ebt_t* ebt, const char* id, int64_t seq uv_mutex_unlock(&ebt->mutex); } +void tf_ssb_ebt_set_messages_received(tf_ssb_ebt_t* ebt, const char* id, int64_t sequence) +{ + uv_mutex_lock(&ebt->mutex); + ebt_entry_t* entry = _ebt_get_entry(ebt, id); + if (entry) + { + entry->out = tf_max(entry->out, sequence); + } + uv_mutex_unlock(&ebt->mutex); +} + int tf_ssb_ebt_get_send_clock_pending(tf_ssb_ebt_t* ebt) { return ebt->send_clock_pending; diff --git a/src/ssb.ebt.h b/src/ssb.ebt.h index 3b87d354..e0aaf857 100644 --- a/src/ssb.ebt.h +++ b/src/ssb.ebt.h @@ -78,6 +78,14 @@ tf_ssb_ebt_clock_t* tf_ssb_ebt_get_messages_to_send(tf_ssb_ebt_t* ebt); */ void tf_ssb_ebt_set_messages_sent(tf_ssb_ebt_t* ebt, const char* id, int64_t sequence); +/** +** Update the clock state indicating the messages that have been received for an account. +** @param ebt The EBT instance. +** @param id The identity to update. +** @param sequence The maximum sequence number received. +*/ +void tf_ssb_ebt_set_messages_received(tf_ssb_ebt_t* ebt, const char* id, int64_t sequence); + /** ** Destroy an EBT instance. ** @param ebt The EBT instance. diff --git a/src/ssb.h b/src/ssb.h index 9e0f0194..31d28847 100644 --- a/src/ssb.h +++ b/src/ssb.h @@ -636,10 +636,12 @@ void tf_ssb_remove_broadcasts_changed_callback(tf_ssb_t* ssb, tf_ssb_broadcasts_ /** ** A callback called when a message is added to the database. ** @param ssb The SSB instance. +** @param author The author identity. +** @param sequence The message sequence number. ** @param id The message identifier. ** @param user_data The user data. */ -typedef void(tf_ssb_message_added_callback_t)(tf_ssb_t* ssb, const char* id, void* user_data); +typedef void(tf_ssb_message_added_callback_t)(tf_ssb_t* ssb, const char* author, int64_t sequence, const char* id, void* user_data); /** ** Register a callback called when a message is added to the database. @@ -661,10 +663,12 @@ void tf_ssb_remove_message_added_callback(tf_ssb_t* ssb, tf_ssb_message_added_ca /** ** Call all callbacks registered for when a message is added to the database. ** @param ssb The SSB instance. +** @param author The message author's identity. +** @param sequence The message sequence number. ** @param id The message identity added. ** @param message_with_keys The message added in the format required if keys are requested. */ -void tf_ssb_notify_message_added(tf_ssb_t* ssb, const char* id, JSValue message_with_keys); +void tf_ssb_notify_message_added(tf_ssb_t* ssb, const char* author, int64_t sequence, const char* id, JSValue message_with_keys); /** ** Record that a new blob was stored. diff --git a/src/ssb.js.c b/src/ssb.js.c index 0b918e9a..c1ccda00 100644 --- a/src/ssb.js.c +++ b/src/ssb.js.c @@ -1594,7 +1594,7 @@ static void _tf_ssb_cleanup_value(tf_ssb_t* ssb, void* user_data) JS_FreeValue(tf_ssb_get_context(ssb), callback); } -static void _tf_ssb_on_message_added_callback(tf_ssb_t* ssb, const char* id, void* user_data) +static void _tf_ssb_on_message_added_callback(tf_ssb_t* ssb, const char* author, int64_t sequence, const char* id, void* user_data) { JSContext* context = tf_ssb_get_context(ssb); JSValue callback = JS_MKPTR(JS_TAG_OBJECT, user_data); diff --git a/src/ssb.rpc.c b/src/ssb.rpc.c index edab7f0e..1e5b985f 100644 --- a/src/ssb.rpc.c +++ b/src/ssb.rpc.c @@ -1901,7 +1901,7 @@ static void _tf_ssb_rpc_invite_use(tf_ssb_connection_t* connection, uint8_t flag tf_ssb_connection_run_work(connection, _tf_ssb_rpc_invite_use_work, _tf_ssb_rpc_invite_use_after_work, work); } -static void _tf_ssb_rpc_message_added_callback(tf_ssb_t* ssb, const char* id, void* user_data) +static void _tf_ssb_rpc_message_added_callback(tf_ssb_t* ssb, const char* author, int64_t sequence, const char* id, void* user_data) { tf_ssb_connection_t* connections[256]; int count = tf_ssb_get_connections(ssb, connections, tf_countof(connections)); @@ -1910,6 +1910,7 @@ static void _tf_ssb_rpc_message_added_callback(tf_ssb_t* ssb, const char* id, vo tf_ssb_connection_t* connection = connections[i]; if (tf_ssb_connection_is_connected(connection) && !tf_ssb_is_shutting_down(tf_ssb_connection_get_ssb(connection)) && !tf_ssb_connection_is_closing(connection)) { + tf_ssb_ebt_set_messages_received(tf_ssb_connection_get_ebt(connections[i]), author, sequence); _tf_ssb_rpc_ebt_schedule_send_clock(connections[i]); } } diff --git a/src/ssb.tests.c b/src/ssb.tests.c index 58080edc..bf0951b9 100644 --- a/src/ssb.tests.c +++ b/src/ssb.tests.c @@ -112,7 +112,7 @@ static int _ssb_test_count_messages(tf_ssb_t* ssb) return count.count; } -static void _message_added(tf_ssb_t* ssb, const char* id, void* user_data) +static void _message_added(tf_ssb_t* ssb, const char* author, int64_t sequence, const char* id, void* user_data) { ++*(int*)user_data; }