diff --git a/src/ssb.rpc.c b/src/ssb.rpc.c index 651a41748..acc13bc45 100644 --- a/src/ssb.rpc.c +++ b/src/ssb.rpc.c @@ -1101,6 +1101,23 @@ static void _tf_ssb_rpc_ebt_replicate_resend_clock(tf_ssb_connection_t* connecti } } +static void _tf_ssb_rpc_ebt_schedule_send_clock(tf_ssb_connection_t* connection) +{ + tf_ssb_ebt_t* ebt = tf_ssb_connection_get_ebt(connection); + int pending = tf_ssb_ebt_get_send_clock_pending(ebt) + 1; + tf_ssb_ebt_set_send_clock_pending(ebt, pending); + if (pending == 1) + { + resend_clock_t* resend = tf_malloc(sizeof(resend_clock_t)); + *resend = (resend_clock_t) { + .connection = connection, + .request_number = -tf_ssb_connection_get_ebt_request_number(connection), + .pending = pending, + }; + tf_ssb_connection_schedule_idle(connection, "ebt.clock", _tf_ssb_rpc_ebt_replicate_resend_clock, resend); + } +} + static void _tf_ssb_rpc_ebt_replicate(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) { tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection); @@ -1140,18 +1157,7 @@ static void _tf_ssb_rpc_ebt_replicate(tf_ssb_connection_t* connection, uint8_t f if (resend_clock && tf_ssb_connection_is_connected(connection) && !tf_ssb_is_shutting_down(tf_ssb_connection_get_ssb(connection)) && !tf_ssb_connection_is_closing(connection)) { - int pending = tf_ssb_ebt_get_send_clock_pending(ebt) + 1; - tf_ssb_ebt_set_send_clock_pending(ebt, pending); - if (pending == 1) - { - resend_clock_t* resend = tf_malloc(sizeof(resend_clock_t)); - *resend = (resend_clock_t) { - .connection = connection, - .request_number = request_number, - .pending = pending, - }; - tf_ssb_connection_schedule_idle(connection, "ebt.clock", _tf_ssb_rpc_ebt_replicate_resend_clock, resend); - } + _tf_ssb_rpc_ebt_schedule_send_clock(connection); } JS_FreeValue(context, name); JS_FreeValue(context, author); @@ -1855,10 +1861,25 @@ static void _tf_ssb_rpc_invite_use(tf_ssb_connection_t* connection, uint8_t flag tf_ssb_connection_run_work(connection, _tf_ssb_rpc_invite_use_work, _tf_ssb_rpc_invite_use_after_work, work); } +static void _tf_ssb_rpc_message_added_callback(tf_ssb_t* ssb, const char* id, void* user_data) +{ + tf_ssb_connection_t* connections[256]; + int count = tf_ssb_get_connections(ssb, connections, tf_countof(connections)); + for (int i = 0; i < count; i++) + { + tf_ssb_connection_t* connection = connections[i]; + if (tf_ssb_connection_is_connected(connection) && !tf_ssb_is_shutting_down(tf_ssb_connection_get_ssb(connection)) && !tf_ssb_connection_is_closing(connection)) + { + _tf_ssb_rpc_ebt_schedule_send_clock(connections[i]); + } + } +} + void tf_ssb_rpc_register(tf_ssb_t* ssb) { tf_ssb_add_connections_changed_callback(ssb, _tf_ssb_rpc_connections_changed_callback, NULL, NULL); tf_ssb_add_broadcasts_changed_callback(ssb, _tf_ssb_rpc_broadcasts_changed_callback, NULL, NULL); + tf_ssb_add_message_added_callback(ssb, _tf_ssb_rpc_message_added_callback, NULL, NULL); tf_ssb_add_rpc_callback(ssb, "gossip.ping", _tf_ssb_rpc_gossip_ping, NULL, NULL); /* DUPLEX */ tf_ssb_add_rpc_callback(ssb, "blobs.get", _tf_ssb_rpc_blobs_get, NULL, NULL); /* SOURCE */ tf_ssb_add_rpc_callback(ssb, "blobs.has", _tf_ssb_rpc_blobs_has, NULL, NULL); /* ASYNC */