ssb: Schedule the clock for reevaluation any time new messages are added. I think this will improve initial replication.

This commit is contained in:
Cory McWilliams 2025-02-21 22:30:14 -05:00
parent 6192f1b94d
commit 9c9efb845c

View File

@ -1101,6 +1101,23 @@ static void _tf_ssb_rpc_ebt_replicate_resend_clock(tf_ssb_connection_t* connecti
}
}
static void _tf_ssb_rpc_ebt_schedule_send_clock(tf_ssb_connection_t* connection)
{
tf_ssb_ebt_t* ebt = tf_ssb_connection_get_ebt(connection);
int pending = tf_ssb_ebt_get_send_clock_pending(ebt) + 1;
tf_ssb_ebt_set_send_clock_pending(ebt, pending);
if (pending == 1)
{
resend_clock_t* resend = tf_malloc(sizeof(resend_clock_t));
*resend = (resend_clock_t) {
.connection = connection,
.request_number = -tf_ssb_connection_get_ebt_request_number(connection),
.pending = pending,
};
tf_ssb_connection_schedule_idle(connection, "ebt.clock", _tf_ssb_rpc_ebt_replicate_resend_clock, resend);
}
}
static void _tf_ssb_rpc_ebt_replicate(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data)
{
tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection);
@ -1140,18 +1157,7 @@ static void _tf_ssb_rpc_ebt_replicate(tf_ssb_connection_t* connection, uint8_t f
if (resend_clock && tf_ssb_connection_is_connected(connection) && !tf_ssb_is_shutting_down(tf_ssb_connection_get_ssb(connection)) && !tf_ssb_connection_is_closing(connection))
{
int pending = tf_ssb_ebt_get_send_clock_pending(ebt) + 1;
tf_ssb_ebt_set_send_clock_pending(ebt, pending);
if (pending == 1)
{
resend_clock_t* resend = tf_malloc(sizeof(resend_clock_t));
*resend = (resend_clock_t) {
.connection = connection,
.request_number = request_number,
.pending = pending,
};
tf_ssb_connection_schedule_idle(connection, "ebt.clock", _tf_ssb_rpc_ebt_replicate_resend_clock, resend);
}
_tf_ssb_rpc_ebt_schedule_send_clock(connection);
}
JS_FreeValue(context, name);
JS_FreeValue(context, author);
@ -1855,10 +1861,25 @@ static void _tf_ssb_rpc_invite_use(tf_ssb_connection_t* connection, uint8_t flag
tf_ssb_connection_run_work(connection, _tf_ssb_rpc_invite_use_work, _tf_ssb_rpc_invite_use_after_work, work);
}
static void _tf_ssb_rpc_message_added_callback(tf_ssb_t* ssb, const char* id, void* user_data)
{
tf_ssb_connection_t* connections[256];
int count = tf_ssb_get_connections(ssb, connections, tf_countof(connections));
for (int i = 0; i < count; i++)
{
tf_ssb_connection_t* connection = connections[i];
if (tf_ssb_connection_is_connected(connection) && !tf_ssb_is_shutting_down(tf_ssb_connection_get_ssb(connection)) && !tf_ssb_connection_is_closing(connection))
{
_tf_ssb_rpc_ebt_schedule_send_clock(connections[i]);
}
}
}
void tf_ssb_rpc_register(tf_ssb_t* ssb)
{
tf_ssb_add_connections_changed_callback(ssb, _tf_ssb_rpc_connections_changed_callback, NULL, NULL);
tf_ssb_add_broadcasts_changed_callback(ssb, _tf_ssb_rpc_broadcasts_changed_callback, NULL, NULL);
tf_ssb_add_message_added_callback(ssb, _tf_ssb_rpc_message_added_callback, NULL, NULL);
tf_ssb_add_rpc_callback(ssb, "gossip.ping", _tf_ssb_rpc_gossip_ping, NULL, NULL); /* DUPLEX */
tf_ssb_add_rpc_callback(ssb, "blobs.get", _tf_ssb_rpc_blobs_get, NULL, NULL); /* SOURCE */
tf_ssb_add_rpc_callback(ssb, "blobs.has", _tf_ssb_rpc_blobs_has, NULL, NULL); /* ASYNC */