diff --git a/apps/ssb/tf-tab-connections.js b/apps/ssb/tf-tab-connections.js
index 51058a3d..82460756 100644
--- a/apps/ssb/tf-tab-connections.js
+++ b/apps/ssb/tf-tab-connections.js
@@ -109,7 +109,9 @@ class TfTabConnectionsElement extends LitElement {
${name} ${value} / ${max} (${Math.round((100.0 * value) / max)}%)
diff --git a/src/ssb.c b/src/ssb.c
index cdd5f63a..15f3ce43 100644
--- a/src/ssb.c
+++ b/src/ssb.c
@@ -3882,8 +3882,7 @@ JSValue tf_ssb_connection_get_object(tf_ssb_connection_t* connection)
return connection ? connection->object : JS_UNDEFINED;
}
-void tf_ssb_add_message_added_callback(
- tf_ssb_t* ssb, void (*callback)(tf_ssb_t* ssb, const char* id, void* user_data), void (*cleanup)(tf_ssb_t* ssb, void* user_data), void* user_data)
+void tf_ssb_add_message_added_callback(tf_ssb_t* ssb, tf_ssb_message_added_callback_t* callback, void (*cleanup)(tf_ssb_t* ssb, void* user_data), void* user_data)
{
tf_ssb_message_added_callback_node_t* node = tf_malloc(sizeof(tf_ssb_message_added_callback_node_t));
*node = (tf_ssb_message_added_callback_node_t) {
@@ -3924,7 +3923,7 @@ void tf_ssb_notify_blob_stored(tf_ssb_t* ssb, const char* id)
ssb->blobs_stored++;
}
-void tf_ssb_notify_message_added(tf_ssb_t* ssb, const char* id, JSValue message_keys)
+void tf_ssb_notify_message_added(tf_ssb_t* ssb, const char* author, int64_t sequence, const char* id, JSValue message_keys)
{
tf_ssb_message_added_callback_node_t* next = NULL;
ssb->messages_stored++;
@@ -3933,7 +3932,7 @@ void tf_ssb_notify_message_added(tf_ssb_t* ssb, const char* id, JSValue message_
next = node->next;
tf_trace_begin(ssb->trace, "message added callback");
PRE_CALLBACK(ssb, node->callback);
- node->callback(ssb, id, node->user_data);
+ node->callback(ssb, author, sequence, id, node->user_data);
POST_CALLBACK(ssb, node->callback);
tf_trace_end(ssb->trace);
}
diff --git a/src/ssb.db.c b/src/ssb.db.c
index c82ef44b..6a9b4f31 100644
--- a/src/ssb.db.c
+++ b/src/ssb.db.c
@@ -581,7 +581,7 @@ static void _tf_ssb_db_store_message_after_work(tf_ssb_t* ssb, int status, void*
char timestamp_string[256];
snprintf(timestamp_string, sizeof(timestamp_string), "%f", store->timestamp);
JS_SetPropertyStr(context, message, "timestamp", JS_NewString(context, timestamp_string));
- tf_ssb_notify_message_added(ssb, store->id, message);
+ tf_ssb_notify_message_added(ssb, store->author, store->sequence, store->id, message);
JS_FreeValue(context, message);
tf_trace_end(trace);
}
diff --git a/src/ssb.ebt.c b/src/ssb.ebt.c
index 40fed1a2..2ad56960 100644
--- a/src/ssb.ebt.c
+++ b/src/ssb.ebt.c
@@ -365,6 +365,17 @@ void tf_ssb_ebt_set_messages_sent(tf_ssb_ebt_t* ebt, const char* id, int64_t seq
uv_mutex_unlock(&ebt->mutex);
}
+void tf_ssb_ebt_set_messages_received(tf_ssb_ebt_t* ebt, const char* id, int64_t sequence)
+{
+ uv_mutex_lock(&ebt->mutex);
+ ebt_entry_t* entry = _ebt_get_entry(ebt, id);
+ if (entry)
+ {
+ entry->out = tf_max(entry->out, sequence);
+ }
+ uv_mutex_unlock(&ebt->mutex);
+}
+
int tf_ssb_ebt_get_send_clock_pending(tf_ssb_ebt_t* ebt)
{
return ebt->send_clock_pending;
diff --git a/src/ssb.ebt.h b/src/ssb.ebt.h
index 3b87d354..e0aaf857 100644
--- a/src/ssb.ebt.h
+++ b/src/ssb.ebt.h
@@ -78,6 +78,14 @@ tf_ssb_ebt_clock_t* tf_ssb_ebt_get_messages_to_send(tf_ssb_ebt_t* ebt);
*/
void tf_ssb_ebt_set_messages_sent(tf_ssb_ebt_t* ebt, const char* id, int64_t sequence);
+/**
+** Update the clock state indicating the messages that have been received for an account.
+** @param ebt The EBT instance.
+** @param id The identity to update.
+** @param sequence The maximum sequence number received.
+*/
+void tf_ssb_ebt_set_messages_received(tf_ssb_ebt_t* ebt, const char* id, int64_t sequence);
+
/**
** Destroy an EBT instance.
** @param ebt The EBT instance.
diff --git a/src/ssb.h b/src/ssb.h
index 9e0f0194..31d28847 100644
--- a/src/ssb.h
+++ b/src/ssb.h
@@ -636,10 +636,12 @@ void tf_ssb_remove_broadcasts_changed_callback(tf_ssb_t* ssb, tf_ssb_broadcasts_
/**
** A callback called when a message is added to the database.
** @param ssb The SSB instance.
+** @param author The author identity.
+** @param sequence The message sequence number.
** @param id The message identifier.
** @param user_data The user data.
*/
-typedef void(tf_ssb_message_added_callback_t)(tf_ssb_t* ssb, const char* id, void* user_data);
+typedef void(tf_ssb_message_added_callback_t)(tf_ssb_t* ssb, const char* author, int64_t sequence, const char* id, void* user_data);
/**
** Register a callback called when a message is added to the database.
@@ -661,10 +663,12 @@ void tf_ssb_remove_message_added_callback(tf_ssb_t* ssb, tf_ssb_message_added_ca
/**
** Call all callbacks registered for when a message is added to the database.
** @param ssb The SSB instance.
+** @param author The message author's identity.
+** @param sequence The message sequence number.
** @param id The message identity added.
** @param message_with_keys The message added in the format required if keys are requested.
*/
-void tf_ssb_notify_message_added(tf_ssb_t* ssb, const char* id, JSValue message_with_keys);
+void tf_ssb_notify_message_added(tf_ssb_t* ssb, const char* author, int64_t sequence, const char* id, JSValue message_with_keys);
/**
** Record that a new blob was stored.
diff --git a/src/ssb.js.c b/src/ssb.js.c
index 0b918e9a..c1ccda00 100644
--- a/src/ssb.js.c
+++ b/src/ssb.js.c
@@ -1594,7 +1594,7 @@ static void _tf_ssb_cleanup_value(tf_ssb_t* ssb, void* user_data)
JS_FreeValue(tf_ssb_get_context(ssb), callback);
}
-static void _tf_ssb_on_message_added_callback(tf_ssb_t* ssb, const char* id, void* user_data)
+static void _tf_ssb_on_message_added_callback(tf_ssb_t* ssb, const char* author, int64_t sequence, const char* id, void* user_data)
{
JSContext* context = tf_ssb_get_context(ssb);
JSValue callback = JS_MKPTR(JS_TAG_OBJECT, user_data);
diff --git a/src/ssb.rpc.c b/src/ssb.rpc.c
index edab7f0e..1e5b985f 100644
--- a/src/ssb.rpc.c
+++ b/src/ssb.rpc.c
@@ -1901,7 +1901,7 @@ static void _tf_ssb_rpc_invite_use(tf_ssb_connection_t* connection, uint8_t flag
tf_ssb_connection_run_work(connection, _tf_ssb_rpc_invite_use_work, _tf_ssb_rpc_invite_use_after_work, work);
}
-static void _tf_ssb_rpc_message_added_callback(tf_ssb_t* ssb, const char* id, void* user_data)
+static void _tf_ssb_rpc_message_added_callback(tf_ssb_t* ssb, const char* author, int64_t sequence, const char* id, void* user_data)
{
tf_ssb_connection_t* connections[256];
int count = tf_ssb_get_connections(ssb, connections, tf_countof(connections));
@@ -1910,6 +1910,7 @@ static void _tf_ssb_rpc_message_added_callback(tf_ssb_t* ssb, const char* id, vo
tf_ssb_connection_t* connection = connections[i];
if (tf_ssb_connection_is_connected(connection) && !tf_ssb_is_shutting_down(tf_ssb_connection_get_ssb(connection)) && !tf_ssb_connection_is_closing(connection))
{
+ tf_ssb_ebt_set_messages_received(tf_ssb_connection_get_ebt(connections[i]), author, sequence);
_tf_ssb_rpc_ebt_schedule_send_clock(connections[i]);
}
}
diff --git a/src/ssb.tests.c b/src/ssb.tests.c
index 58080edc..bf0951b9 100644
--- a/src/ssb.tests.c
+++ b/src/ssb.tests.c
@@ -112,7 +112,7 @@ static int _ssb_test_count_messages(tf_ssb_t* ssb)
return count.count;
}
-static void _message_added(tf_ssb_t* ssb, const char* id, void* user_data)
+static void _message_added(tf_ssb_t* ssb, const char* author, int64_t sequence, const char* id, void* user_data)
{
++*(int*)user_data;
}