ssb: Ease up on excessively re-hitting the database for ebt.replicate even more.
All checks were successful
Build Tilde Friends / Build-All (push) Successful in 23m20s

This commit is contained in:
Cory McWilliams 2025-01-04 09:58:16 -05:00
parent 2223245861
commit 2328f3afb5
3 changed files with 68 additions and 17 deletions

View File

@ -27,6 +27,8 @@ typedef struct _tf_ssb_ebt_t
ebt_entry_t* entries;
int entries_count;
int send_clock_pending;
} tf_ssb_ebt_t;
tf_ssb_ebt_t* tf_ssb_ebt_create(tf_ssb_connection_t* connection)
@ -308,3 +310,13 @@ void tf_ssb_ebt_set_messages_sent(tf_ssb_ebt_t* ebt, const char* id, int64_t seq
}
uv_mutex_unlock(&ebt->mutex);
}
int tf_ssb_ebt_get_send_clock_pending(tf_ssb_ebt_t* ebt)
{
return ebt->send_clock_pending;
}
void tf_ssb_ebt_set_send_clock_pending(tf_ssb_ebt_t* ebt, int pending)
{
ebt->send_clock_pending = pending;
}

View File

@ -83,3 +83,17 @@ void tf_ssb_ebt_set_messages_sent(tf_ssb_ebt_t* ebt, const char* id, int64_t seq
** @param ebt The EBT instance.
*/
void tf_ssb_ebt_destroy(tf_ssb_ebt_t* ebt);
/**
** Get whether sending the clock is pending.
** @param ebt The EBT instance.
** @return The last value set by tf_ssb_ebt_set_send_clock_pending().
*/
int tf_ssb_ebt_get_send_clock_pending(tf_ssb_ebt_t* ebt);
/**
** Set whether sending the clock is pending.
** @param ebt The EBT instance.
** @param pending A value representing the pending status.
*/
void tf_ssb_ebt_set_send_clock_pending(tf_ssb_ebt_t* ebt, int pending);

View File

@ -20,6 +20,7 @@ static void _tf_ssb_connection_send_history_stream(
static void _tf_ssb_rpc_send_peers_exchange(tf_ssb_connection_t* connection);
static void _tf_ssb_rpc_start_delete_blobs(tf_ssb_t* ssb, int delay_ms);
static void _tf_ssb_rpc_start_delete_feeds(tf_ssb_t* ssb, int delay_ms);
static void _tf_ssb_rpc_ebt_replicate_resend_clock(tf_ssb_connection_t* connection, bool skip, void* user_data);
static void _tf_ssb_rpc_gossip_ping_callback(
tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data)
@ -968,9 +969,17 @@ static void _tf_ssb_rpc_ebt_replicate_store_callback(const char* id, bool verifi
tf_ssb_connection_adjust_read_backpressure(connection, -1);
}
typedef struct _resend_clock_t
{
tf_ssb_connection_t* connection;
int32_t request_number;
int pending;
} resend_clock_t;
static void _tf_ssb_rpc_ebt_send_clock_callback(const tf_ssb_ebt_clock_t* clock, int32_t request_number, void* user_data)
{
tf_ssb_connection_t* connection = user_data;
resend_clock_t* resend = user_data;
tf_ssb_connection_t* connection = resend->connection;
if (clock && clock->count)
{
@ -984,24 +993,34 @@ static void _tf_ssb_rpc_ebt_send_clock_callback(const tf_ssb_ebt_clock_t* clock,
JS_FreeValue(context, message);
}
tf_ssb_ebt_t* ebt = tf_ssb_connection_get_ebt(connection);
if (resend->pending != tf_ssb_ebt_get_send_clock_pending(ebt) && tf_ssb_connection_is_connected(connection) &&
!tf_ssb_is_shutting_down(tf_ssb_connection_get_ssb(connection)) && !tf_ssb_connection_is_closing(connection))
{
resend->pending = tf_ssb_ebt_get_send_clock_pending(ebt);
tf_ssb_connection_schedule_idle(connection, "ebt.clock", _tf_ssb_rpc_ebt_replicate_resend_clock, resend);
}
else
{
tf_ssb_ebt_set_send_clock_pending(ebt, 0);
tf_free(resend);
}
_tf_ssb_rpc_ebt_replicate_send_messages(connection);
}
typedef struct _resend_clock_t
{
tf_ssb_connection_t* connection;
int32_t request_number;
} resend_clock_t;
static void _tf_ssb_rpc_ebt_replicate_resend_clock(tf_ssb_connection_t* connection, bool skip, void* user_data)
{
resend_clock_t* resend = user_data;
if (!skip)
{
tf_ssb_ebt_t* ebt = tf_ssb_connection_get_ebt(connection);
tf_ssb_ebt_get_send_clock(ebt, resend->request_number, _tf_ssb_rpc_ebt_send_clock_callback, connection);
tf_ssb_ebt_get_send_clock(ebt, resend->request_number, _tf_ssb_rpc_ebt_send_clock_callback, resend);
}
else
{
tf_free(resend);
}
tf_free(user_data);
}
static void _tf_ssb_rpc_ebt_replicate(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data)
@ -1026,6 +1045,7 @@ static void _tf_ssb_rpc_ebt_replicate(tf_ssb_connection_t* connection, uint8_t f
bool resend_clock = false;
tf_ssb_ebt_t* ebt = tf_ssb_connection_get_ebt(connection);
if (!JS_IsUndefined(author))
{
/* Looks like a message. */
@ -1036,19 +1056,24 @@ static void _tf_ssb_rpc_ebt_replicate(tf_ssb_connection_t* connection, uint8_t f
}
else
{
tf_ssb_ebt_t* ebt = tf_ssb_connection_get_ebt(connection);
tf_ssb_ebt_receive_clock(ebt, context, in_clock);
resend_clock = true;
}
if (resend_clock)
if (resend_clock && tf_ssb_connection_is_connected(connection) && !tf_ssb_is_shutting_down(tf_ssb_connection_get_ssb(connection)) && !tf_ssb_connection_is_closing(connection))
{
resend_clock_t* resend = tf_malloc(sizeof(resend_clock_t));
*resend = (resend_clock_t) {
.connection = connection,
.request_number = request_number,
};
tf_ssb_connection_schedule_idle(connection, "ebt.clock", _tf_ssb_rpc_ebt_replicate_resend_clock, resend);
int pending = tf_ssb_ebt_get_send_clock_pending(ebt) + 1;
tf_ssb_ebt_set_send_clock_pending(ebt, pending);
if (pending == 1)
{
resend_clock_t* resend = tf_malloc(sizeof(resend_clock_t));
*resend = (resend_clock_t) {
.connection = connection,
.request_number = request_number,
.pending = pending,
};
tf_ssb_connection_schedule_idle(connection, "ebt.clock", _tf_ssb_rpc_ebt_replicate_resend_clock, resend);
}
}
JS_FreeValue(context, name);
JS_FreeValue(context, author);