ssb: Try to go easier on the main thread, still.

This commit is contained in:
Cory McWilliams 2024-12-14 21:36:33 -05:00
parent 68ae45dd58
commit 9c0f6481c0

View File

@ -294,6 +294,7 @@ typedef struct _tf_ssb_connection_t
uv_tcp_t tcp; uv_tcp_t tcp;
uv_connect_t connect; uv_connect_t connect;
uv_async_t async; uv_async_t async;
uv_async_t scheduled_async;
uv_timer_t handshake_timer; uv_timer_t handshake_timer;
bool closing; bool closing;
@ -385,6 +386,7 @@ static bool _tf_ssb_parse_broadcast(const char* in_broadcast, tf_ssb_broadcast_t
static void _tf_ssb_start_update_settings(tf_ssb_t* ssb); static void _tf_ssb_start_update_settings(tf_ssb_t* ssb);
static void _tf_ssb_update_settings(tf_ssb_t* ssb); static void _tf_ssb_update_settings(tf_ssb_t* ssb);
static void _tf_ssb_write(tf_ssb_connection_t* connection, void* data, size_t size); static void _tf_ssb_write(tf_ssb_connection_t* connection, void* data, size_t size);
static void _tf_ssb_connection_dispatch_scheduled(tf_ssb_connection_t* connection);
static const char* _tf_ssb_connection_state_to_string(tf_ssb_state_t state) static const char* _tf_ssb_connection_state_to_string(tf_ssb_state_t state)
{ {
@ -663,6 +665,12 @@ static void _tf_ssb_connection_box_stream_send(tf_ssb_connection_t* connection,
} }
} }
static void _tf_ssb_connection_scheduled_async(uv_async_t* async)
{
tf_ssb_connection_t* connection = async->data;
_tf_ssb_connection_dispatch_scheduled(connection);
}
static void _tf_ssb_connection_dispatch_scheduled(tf_ssb_connection_t* connection) static void _tf_ssb_connection_dispatch_scheduled(tf_ssb_connection_t* connection)
{ {
while (((connection->active_write_count == 0 && connection->read_back_pressure == 0) || connection->closing) && connection->scheduled_count && connection->scheduled) while (((connection->active_write_count == 0 && connection->read_back_pressure == 0) || connection->closing) && connection->scheduled_count && connection->scheduled)
@ -683,7 +691,7 @@ void tf_ssb_connection_schedule_idle(tf_ssb_connection_t* connection, tf_ssb_sch
.callback = callback, .callback = callback,
.user_data = user_data, .user_data = user_data,
}; };
_tf_ssb_connection_dispatch_scheduled(connection); uv_async_send(&connection->scheduled_async);
} }
static int _request_compare(const void* a, const void* b) static int _request_compare(const void* a, const void* b)
@ -1988,6 +1996,10 @@ static void _tf_ssb_connection_destroy(tf_ssb_connection_t* connection, const ch
{ {
uv_close((uv_handle_t*)&connection->async, _tf_ssb_connection_on_close); uv_close((uv_handle_t*)&connection->async, _tf_ssb_connection_on_close);
} }
if (connection->scheduled_async.data && !uv_is_closing((uv_handle_t*)&connection->scheduled_async))
{
uv_close((uv_handle_t*)&connection->scheduled_async, _tf_ssb_connection_on_close);
}
if (connection->tcp.data && !uv_is_closing((uv_handle_t*)&connection->tcp)) if (connection->tcp.data && !uv_is_closing((uv_handle_t*)&connection->tcp))
{ {
uv_close((uv_handle_t*)&connection->tcp, _tf_ssb_connection_on_close); uv_close((uv_handle_t*)&connection->tcp, _tf_ssb_connection_on_close);
@ -1997,8 +2009,8 @@ static void _tf_ssb_connection_destroy(tf_ssb_connection_t* connection, const ch
uv_close((uv_handle_t*)&connection->handshake_timer, _tf_ssb_connection_on_close); uv_close((uv_handle_t*)&connection->handshake_timer, _tf_ssb_connection_on_close);
} }
if (JS_IsUndefined(connection->object) && !connection->async.data && !connection->tcp.data && !connection->connect.data && !connection->handshake_timer.data && if (JS_IsUndefined(connection->object) && !connection->async.data && !connection->scheduled_async.data && !connection->tcp.data && !connection->connect.data &&
connection->ref_count == 0) !connection->handshake_timer.data && connection->ref_count == 0)
{ {
tf_free(connection->message_requests); tf_free(connection->message_requests);
connection->message_requests = NULL; connection->message_requests = NULL;
@ -2792,6 +2804,8 @@ static tf_ssb_connection_t* _tf_ssb_connection_create(
connection->port = ntohs(addr->sin_port); connection->port = ntohs(addr->sin_port);
connection->async.data = connection; connection->async.data = connection;
uv_async_init(ssb->loop, &connection->async, _tf_ssb_connection_process_message_async); uv_async_init(ssb->loop, &connection->async, _tf_ssb_connection_process_message_async);
connection->scheduled_async.data = connection;
uv_async_init(ssb->loop, &connection->scheduled_async, _tf_ssb_connection_scheduled_async);
connection->connect_callback = callback; connection->connect_callback = callback;
connection->connect_callback_user_data = user_data; connection->connect_callback_user_data = user_data;
@ -2861,6 +2875,8 @@ tf_ssb_connection_t* tf_ssb_connection_tunnel_create(tf_ssb_t* ssb, const char*
tunnel->send_request_number = 1; tunnel->send_request_number = 1;
tunnel->async.data = tunnel; tunnel->async.data = tunnel;
uv_async_init(ssb->loop, &tunnel->async, _tf_ssb_connection_process_message_async); uv_async_init(ssb->loop, &tunnel->async, _tf_ssb_connection_process_message_async);
tunnel->scheduled_async.data = tunnel;
uv_async_init(ssb->loop, &tunnel->scheduled_async, _tf_ssb_connection_scheduled_async);
tunnel->handshake_timer.data = tunnel; tunnel->handshake_timer.data = tunnel;
uv_timer_init(ssb->loop, &tunnel->handshake_timer); uv_timer_init(ssb->loop, &tunnel->handshake_timer);
@ -2998,6 +3014,8 @@ static void _tf_ssb_on_connection(uv_stream_t* stream, int status)
connection->send_request_number = 1; connection->send_request_number = 1;
connection->async.data = connection; connection->async.data = connection;
uv_async_init(ssb->loop, &connection->async, _tf_ssb_connection_process_message_async); uv_async_init(ssb->loop, &connection->async, _tf_ssb_connection_process_message_async);
connection->scheduled_async.data = connection;
uv_async_init(ssb->loop, &connection->scheduled_async, _tf_ssb_connection_scheduled_async);
connection->object = JS_NewObjectClass(ssb->context, _connection_class_id); connection->object = JS_NewObjectClass(ssb->context, _connection_class_id);
JS_SetOpaque(connection->object, connection); JS_SetOpaque(connection->object, connection);
@ -4376,7 +4394,7 @@ void tf_ssb_connection_adjust_read_backpressure(tf_ssb_connection_t* connection,
const int k_threshold = 256; const int k_threshold = 256;
int old_pressure = connection->read_back_pressure; int old_pressure = connection->read_back_pressure;
connection->read_back_pressure += delta; connection->read_back_pressure += delta;
_tf_ssb_connection_dispatch_scheduled(connection); uv_async_send(&connection->scheduled_async);
if (!connection->closing) if (!connection->closing)
{ {
if (old_pressure < k_threshold && connection->read_back_pressure >= k_threshold) if (old_pressure < k_threshold && connection->read_back_pressure >= k_threshold)
@ -4398,7 +4416,7 @@ void tf_ssb_connection_adjust_read_backpressure(tf_ssb_connection_t* connection,
void tf_ssb_connection_adjust_write_count(tf_ssb_connection_t* connection, int delta) void tf_ssb_connection_adjust_write_count(tf_ssb_connection_t* connection, int delta)
{ {
connection->active_write_count += delta; connection->active_write_count += delta;
_tf_ssb_connection_dispatch_scheduled(connection); uv_async_send(&connection->scheduled_async);
} }
void tf_ssb_sync_start(tf_ssb_t* ssb) void tf_ssb_sync_start(tf_ssb_t* ssb)