diff --git a/src/ssb.ebt.c b/src/ssb.ebt.c index 88256749..d9cf06bf 100644 --- a/src/ssb.ebt.c +++ b/src/ssb.ebt.c @@ -27,6 +27,8 @@ typedef struct _tf_ssb_ebt_t ebt_entry_t* entries; int entries_count; + + int send_clock_pending; } tf_ssb_ebt_t; tf_ssb_ebt_t* tf_ssb_ebt_create(tf_ssb_connection_t* connection) @@ -308,3 +310,13 @@ void tf_ssb_ebt_set_messages_sent(tf_ssb_ebt_t* ebt, const char* id, int64_t seq } uv_mutex_unlock(&ebt->mutex); } + +int tf_ssb_ebt_get_send_clock_pending(tf_ssb_ebt_t* ebt) +{ + return ebt->send_clock_pending; +} + +void tf_ssb_ebt_set_send_clock_pending(tf_ssb_ebt_t* ebt, int pending) +{ + ebt->send_clock_pending = pending; +} diff --git a/src/ssb.ebt.h b/src/ssb.ebt.h index 9558bd64..91e76dfd 100644 --- a/src/ssb.ebt.h +++ b/src/ssb.ebt.h @@ -83,3 +83,17 @@ void tf_ssb_ebt_set_messages_sent(tf_ssb_ebt_t* ebt, const char* id, int64_t seq ** @param ebt The EBT instance. */ void tf_ssb_ebt_destroy(tf_ssb_ebt_t* ebt); + +/** +** Get whether sending the clock is pending. +** @param ebt The EBT instance. +** @return The last value set by tf_ssb_ebt_set_send_clock_pending(). +*/ +int tf_ssb_ebt_get_send_clock_pending(tf_ssb_ebt_t* ebt); + +/** +** Set whether sending the clock is pending. +** @param ebt The EBT instance. +** @param pending A value representing the pending status. +*/ +void tf_ssb_ebt_set_send_clock_pending(tf_ssb_ebt_t* ebt, int pending); diff --git a/src/ssb.rpc.c b/src/ssb.rpc.c index 3710b350..f450f26a 100644 --- a/src/ssb.rpc.c +++ b/src/ssb.rpc.c @@ -20,6 +20,7 @@ static void _tf_ssb_connection_send_history_stream( static void _tf_ssb_rpc_send_peers_exchange(tf_ssb_connection_t* connection); static void _tf_ssb_rpc_start_delete_blobs(tf_ssb_t* ssb, int delay_ms); static void _tf_ssb_rpc_start_delete_feeds(tf_ssb_t* ssb, int delay_ms); +static void _tf_ssb_rpc_ebt_replicate_resend_clock(tf_ssb_connection_t* connection, bool skip, void* user_data); static void _tf_ssb_rpc_gossip_ping_callback( tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) @@ -968,9 +969,17 @@ static void _tf_ssb_rpc_ebt_replicate_store_callback(const char* id, bool verifi tf_ssb_connection_adjust_read_backpressure(connection, -1); } +typedef struct _resend_clock_t +{ + tf_ssb_connection_t* connection; + int32_t request_number; + int pending; +} resend_clock_t; + static void _tf_ssb_rpc_ebt_send_clock_callback(const tf_ssb_ebt_clock_t* clock, int32_t request_number, void* user_data) { - tf_ssb_connection_t* connection = user_data; + resend_clock_t* resend = user_data; + tf_ssb_connection_t* connection = resend->connection; if (clock && clock->count) { @@ -984,24 +993,34 @@ static void _tf_ssb_rpc_ebt_send_clock_callback(const tf_ssb_ebt_clock_t* clock, JS_FreeValue(context, message); } + tf_ssb_ebt_t* ebt = tf_ssb_connection_get_ebt(connection); + if (resend->pending != tf_ssb_ebt_get_send_clock_pending(ebt) && tf_ssb_connection_is_connected(connection) && + !tf_ssb_is_shutting_down(tf_ssb_connection_get_ssb(connection)) && !tf_ssb_connection_is_closing(connection)) + { + resend->pending = tf_ssb_ebt_get_send_clock_pending(ebt); + tf_ssb_connection_schedule_idle(connection, "ebt.clock", _tf_ssb_rpc_ebt_replicate_resend_clock, resend); + } + else + { + tf_ssb_ebt_set_send_clock_pending(ebt, 0); + tf_free(resend); + } + _tf_ssb_rpc_ebt_replicate_send_messages(connection); } -typedef struct _resend_clock_t -{ - tf_ssb_connection_t* connection; - int32_t request_number; -} resend_clock_t; - static void _tf_ssb_rpc_ebt_replicate_resend_clock(tf_ssb_connection_t* connection, bool skip, void* user_data) { resend_clock_t* resend = user_data; if (!skip) { tf_ssb_ebt_t* ebt = tf_ssb_connection_get_ebt(connection); - tf_ssb_ebt_get_send_clock(ebt, resend->request_number, _tf_ssb_rpc_ebt_send_clock_callback, connection); + tf_ssb_ebt_get_send_clock(ebt, resend->request_number, _tf_ssb_rpc_ebt_send_clock_callback, resend); + } + else + { + tf_free(resend); } - tf_free(user_data); } static void _tf_ssb_rpc_ebt_replicate(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) @@ -1026,6 +1045,7 @@ static void _tf_ssb_rpc_ebt_replicate(tf_ssb_connection_t* connection, uint8_t f bool resend_clock = false; + tf_ssb_ebt_t* ebt = tf_ssb_connection_get_ebt(connection); if (!JS_IsUndefined(author)) { /* Looks like a message. */ @@ -1036,19 +1056,24 @@ static void _tf_ssb_rpc_ebt_replicate(tf_ssb_connection_t* connection, uint8_t f } else { - tf_ssb_ebt_t* ebt = tf_ssb_connection_get_ebt(connection); tf_ssb_ebt_receive_clock(ebt, context, in_clock); resend_clock = true; } - if (resend_clock) + if (resend_clock && tf_ssb_connection_is_connected(connection) && !tf_ssb_is_shutting_down(tf_ssb_connection_get_ssb(connection)) && !tf_ssb_connection_is_closing(connection)) { - resend_clock_t* resend = tf_malloc(sizeof(resend_clock_t)); - *resend = (resend_clock_t) { - .connection = connection, - .request_number = request_number, - }; - tf_ssb_connection_schedule_idle(connection, "ebt.clock", _tf_ssb_rpc_ebt_replicate_resend_clock, resend); + int pending = tf_ssb_ebt_get_send_clock_pending(ebt) + 1; + tf_ssb_ebt_set_send_clock_pending(ebt, pending); + if (pending == 1) + { + resend_clock_t* resend = tf_malloc(sizeof(resend_clock_t)); + *resend = (resend_clock_t) { + .connection = connection, + .request_number = request_number, + .pending = pending, + }; + tf_ssb_connection_schedule_idle(connection, "ebt.clock", _tf_ssb_rpc_ebt_replicate_resend_clock, resend); + } } JS_FreeValue(context, name); JS_FreeValue(context, author);