ssb: Don't schedule duplicate history stream requests for the same account. Changes how we schedule idle work. Let's see if this is better.

This commit is contained in:
Cory McWilliams 2024-12-27 15:02:52 -05:00
parent 4127898655
commit 9cddd93dad
3 changed files with 67 additions and 25 deletions

View File

@ -261,6 +261,7 @@ typedef struct _tf_ssb_connection_message_request_t
typedef struct _tf_ssb_connection_scheduled_t typedef struct _tf_ssb_connection_scheduled_t
{ {
char key[k_id_base64_len];
tf_ssb_scheduled_callback_t* callback; tf_ssb_scheduled_callback_t* callback;
void* user_data; void* user_data;
} tf_ssb_connection_scheduled_t; } tf_ssb_connection_scheduled_t;
@ -595,24 +596,55 @@ static void _tf_ssb_connection_dispatch_scheduled(tf_ssb_connection_t* connectio
{ {
while (((connection->active_write_count == 0 && connection->read_back_pressure == 0) || connection->closing) && connection->scheduled_count && connection->scheduled) while (((connection->active_write_count == 0 && connection->read_back_pressure == 0) || connection->closing) && connection->scheduled_count && connection->scheduled)
{ {
tf_ssb_connection_scheduled_t scheduled = connection->scheduled[0]; int index = uv_hrtime() % connection->scheduled_count;
memmove(connection->scheduled, connection->scheduled + 1, sizeof(tf_ssb_connection_scheduled_t) * (connection->scheduled_count - 1)); tf_ssb_connection_scheduled_t scheduled = connection->scheduled[index];
if (index != connection->scheduled_count - 1)
{
memmove(connection->scheduled + index, connection->scheduled + index + 1, sizeof(tf_ssb_connection_scheduled_t) * (connection->scheduled_count - index - 1));
}
connection->scheduled_count--; connection->scheduled_count--;
tf_trace_begin(connection->ssb->trace, "scheduled callback"); tf_trace_begin(connection->ssb->trace, "scheduled callback");
PRE_CALLBACK(connection->ssb, scheduled.callback); PRE_CALLBACK(connection->ssb, scheduled.callback);
scheduled.callback(connection, scheduled.user_data); scheduled.callback(connection, false, scheduled.user_data);
POST_CALLBACK(connection->ssb, scheduled.callback); POST_CALLBACK(connection->ssb, scheduled.callback);
tf_trace_end(connection->ssb->trace); tf_trace_end(connection->ssb->trace);
} }
} }
void tf_ssb_connection_schedule_idle(tf_ssb_connection_t* connection, tf_ssb_scheduled_callback_t* callback, void* user_data) static int _tf_ssb_connection_scheduled_compare(const void* a, const void* b)
{
const char* key = a;
const tf_ssb_connection_scheduled_t* scheduled = b;
return strcmp(key, scheduled->key);
}
void tf_ssb_connection_schedule_idle(tf_ssb_connection_t* connection, const char* key, tf_ssb_scheduled_callback_t* callback, void* user_data)
{
int index = tf_util_insert_index(key, connection->scheduled, connection->scheduled_count, sizeof(tf_ssb_connection_scheduled_t), _tf_ssb_connection_scheduled_compare);
if (index != connection->scheduled_count && strcmp(key, connection->scheduled[index].key) == 0)
{
tf_ssb_connection_scheduled_t scheduled = connection->scheduled[index];
connection->scheduled[index].callback = callback;
connection->scheduled[index].user_data = user_data;
tf_trace_begin(connection->ssb->trace, "scheduled callback (skip)");
PRE_CALLBACK(connection->ssb, scheduled.callback);
scheduled.callback(connection, true, scheduled.user_data);
POST_CALLBACK(connection->ssb, scheduled.callback);
tf_trace_end(connection->ssb->trace);
}
else
{ {
connection->scheduled = tf_resize_vec(connection->scheduled, sizeof(tf_ssb_connection_scheduled_t) * (connection->scheduled_count + 1)); connection->scheduled = tf_resize_vec(connection->scheduled, sizeof(tf_ssb_connection_scheduled_t) * (connection->scheduled_count + 1));
connection->scheduled[connection->scheduled_count++] = (tf_ssb_connection_scheduled_t) { memmove(connection->scheduled + index + 1, connection->scheduled + index, sizeof(tf_ssb_connection_scheduled_t) * (connection->scheduled_count - index));
connection->scheduled[index] = (tf_ssb_connection_scheduled_t) {
.callback = callback, .callback = callback,
.user_data = user_data, .user_data = user_data,
}; };
snprintf(connection->scheduled[index].key, sizeof(connection->scheduled[index].key), "%s", key);
connection->scheduled_count++;
}
uv_async_send(&connection->scheduled_async); uv_async_send(&connection->scheduled_async);
} }

View File

@ -799,17 +799,19 @@ JSValue tf_ssb_connection_requests_to_object(tf_ssb_connection_t* connection);
/** /**
** A function scheduled to be run later. ** A function scheduled to be run later.
** @param connection The owning connection. ** @param connection The owning connection.
** @param skip Whether the work ought to be skipped, because it is being replaced.
** @param user_data User data registered with the callback. ** @param user_data User data registered with the callback.
*/ */
typedef void(tf_ssb_scheduled_callback_t)(tf_ssb_connection_t* connection, void* user_data); typedef void(tf_ssb_scheduled_callback_t)(tf_ssb_connection_t* connection, bool skip, void* user_data);
/** /**
** Schedule work to be run when the connection is next idle. ** Schedule work to be run when the connection is next idle.
** @param connection The owning connection. ** @param connection The owning connection.
** @param key A key identifying the work. If work by the same key already exists, it will be replaced.
** @param callback The callback to call. ** @param callback The callback to call.
** @param user_data User data to pass to the callback. ** @param user_data User data to pass to the callback.
*/ */
void tf_ssb_connection_schedule_idle(tf_ssb_connection_t* connection, tf_ssb_scheduled_callback_t* callback, void* user_data); void tf_ssb_connection_schedule_idle(tf_ssb_connection_t* connection, const char* key, tf_ssb_scheduled_callback_t* callback, void* user_data);
/** /**
** Schedule work to run on a worker thread. ** Schedule work to run on a worker thread.

View File

@ -855,6 +855,16 @@ static void _tf_ssb_connection_send_history_stream_work(tf_ssb_connection_t* con
request->out_finished = request->out_max_sequence_seen != request->sequence + k_max - 1; request->out_finished = request->out_max_sequence_seen != request->sequence + k_max - 1;
} }
static void _tf_ssb_connection_send_history_stream_destroy(tf_ssb_connection_send_history_stream_t* request)
{
for (int i = 0; i < request->out_messages_count; i++)
{
tf_free(request->out_messages[i]);
}
tf_free(request->out_messages);
tf_free(request);
}
static void _tf_ssb_connection_send_history_stream_after_work(tf_ssb_connection_t* connection, int result, void* user_data) static void _tf_ssb_connection_send_history_stream_after_work(tf_ssb_connection_t* connection, int result, void* user_data)
{ {
tf_ssb_connection_send_history_stream_t* request = user_data; tf_ssb_connection_send_history_stream_t* request = user_data;
@ -879,24 +889,19 @@ static void _tf_ssb_connection_send_history_stream_after_work(tf_ssb_connection_
tf_ssb_connection_rpc_send(connection, k_ssb_rpc_flag_json, request->request_number, NULL, (const uint8_t*)"false", strlen("false"), NULL, NULL, NULL); tf_ssb_connection_rpc_send(connection, k_ssb_rpc_flag_json, request->request_number, NULL, (const uint8_t*)"false", strlen("false"), NULL, NULL, NULL);
} }
} }
for (int i = 0; i < request->out_messages_count; i++) _tf_ssb_connection_send_history_stream_destroy(request);
{
tf_free(request->out_messages[i]);
}
tf_free(request->out_messages);
tf_free(request);
} }
static void _tf_ssb_connection_send_history_stream_callback(tf_ssb_connection_t* connection, void* user_data) static void _tf_ssb_connection_send_history_stream_callback(tf_ssb_connection_t* connection, bool skip, void* user_data)
{ {
tf_ssb_connection_adjust_write_count(connection, 1); tf_ssb_connection_adjust_write_count(connection, 1);
if (tf_ssb_connection_is_connected(connection) && !tf_ssb_is_shutting_down(tf_ssb_connection_get_ssb(connection))) if (!skip && tf_ssb_connection_is_connected(connection) && !tf_ssb_is_shutting_down(tf_ssb_connection_get_ssb(connection)))
{ {
tf_ssb_connection_run_work(connection, _tf_ssb_connection_send_history_stream_work, _tf_ssb_connection_send_history_stream_after_work, user_data); tf_ssb_connection_run_work(connection, _tf_ssb_connection_send_history_stream_work, _tf_ssb_connection_send_history_stream_after_work, user_data);
} }
else else
{ {
_tf_ssb_connection_send_history_stream_after_work(connection, -1, user_data); _tf_ssb_connection_send_history_stream_destroy(user_data);
} }
} }
@ -914,7 +919,7 @@ static void _tf_ssb_connection_send_history_stream(
.end_request = end_request, .end_request = end_request,
}; };
snprintf(async->author, sizeof(async->author), "%s", author); snprintf(async->author, sizeof(async->author), "%s", author);
tf_ssb_connection_schedule_idle(connection, _tf_ssb_connection_send_history_stream_callback, async); tf_ssb_connection_schedule_idle(connection, author, _tf_ssb_connection_send_history_stream_callback, async);
} }
} }
@ -1166,11 +1171,14 @@ typedef struct _resend_clock_t
int32_t request_number; int32_t request_number;
} resend_clock_t; } resend_clock_t;
static void _tf_ssb_rpc_ebt_replicate_resend_clock(tf_ssb_connection_t* connection, void* user_data) static void _tf_ssb_rpc_ebt_replicate_resend_clock(tf_ssb_connection_t* connection, bool skip, void* user_data)
{ {
resend_clock_t* resend = user_data; resend_clock_t* resend = user_data;
if (!skip)
{
_tf_ssb_rpc_ebt_replicate_send_clock(resend->connection, resend->request_number, JS_UNDEFINED); _tf_ssb_rpc_ebt_replicate_send_clock(resend->connection, resend->request_number, JS_UNDEFINED);
tf_ssb_connection_set_sent_clock(resend->connection, true); tf_ssb_connection_set_sent_clock(resend->connection, true);
}
tf_free(user_data); tf_free(user_data);
} }
@ -1208,7 +1216,7 @@ static void _tf_ssb_rpc_ebt_replicate(tf_ssb_connection_t* connection, uint8_t f
.connection = connection, .connection = connection,
.request_number = request_number, .request_number = request_number,
}; };
tf_ssb_connection_schedule_idle(connection, _tf_ssb_rpc_ebt_replicate_resend_clock, resend); tf_ssb_connection_schedule_idle(connection, "ebt.clock", _tf_ssb_rpc_ebt_replicate_resend_clock, resend);
} }
} }
else else