diff --git a/src/ssb.c b/src/ssb.c index de3f974c..614ef94d 100644 --- a/src/ssb.c +++ b/src/ssb.c @@ -294,6 +294,7 @@ typedef struct _tf_ssb_connection_t uv_tcp_t tcp; uv_connect_t connect; uv_async_t async; + uv_async_t scheduled_async; uv_timer_t handshake_timer; bool closing; @@ -385,6 +386,7 @@ static bool _tf_ssb_parse_broadcast(const char* in_broadcast, tf_ssb_broadcast_t static void _tf_ssb_start_update_settings(tf_ssb_t* ssb); static void _tf_ssb_update_settings(tf_ssb_t* ssb); static void _tf_ssb_write(tf_ssb_connection_t* connection, void* data, size_t size); +static void _tf_ssb_connection_dispatch_scheduled(tf_ssb_connection_t* connection); static const char* _tf_ssb_connection_state_to_string(tf_ssb_state_t state) { @@ -663,6 +665,12 @@ static void _tf_ssb_connection_box_stream_send(tf_ssb_connection_t* connection, } } +static void _tf_ssb_connection_scheduled_async(uv_async_t* async) +{ + tf_ssb_connection_t* connection = async->data; + _tf_ssb_connection_dispatch_scheduled(connection); +} + static void _tf_ssb_connection_dispatch_scheduled(tf_ssb_connection_t* connection) { while (((connection->active_write_count == 0 && connection->read_back_pressure == 0) || connection->closing) && connection->scheduled_count && connection->scheduled) @@ -683,7 +691,7 @@ void tf_ssb_connection_schedule_idle(tf_ssb_connection_t* connection, tf_ssb_sch .callback = callback, .user_data = user_data, }; - _tf_ssb_connection_dispatch_scheduled(connection); + uv_async_send(&connection->scheduled_async); } static int _request_compare(const void* a, const void* b) @@ -1988,6 +1996,10 @@ static void _tf_ssb_connection_destroy(tf_ssb_connection_t* connection, const ch { uv_close((uv_handle_t*)&connection->async, _tf_ssb_connection_on_close); } + if (connection->scheduled_async.data && !uv_is_closing((uv_handle_t*)&connection->scheduled_async)) + { + uv_close((uv_handle_t*)&connection->scheduled_async, _tf_ssb_connection_on_close); + } if (connection->tcp.data && !uv_is_closing((uv_handle_t*)&connection->tcp)) { uv_close((uv_handle_t*)&connection->tcp, _tf_ssb_connection_on_close); @@ -1997,8 +2009,8 @@ static void _tf_ssb_connection_destroy(tf_ssb_connection_t* connection, const ch uv_close((uv_handle_t*)&connection->handshake_timer, _tf_ssb_connection_on_close); } - if (JS_IsUndefined(connection->object) && !connection->async.data && !connection->tcp.data && !connection->connect.data && !connection->handshake_timer.data && - connection->ref_count == 0) + if (JS_IsUndefined(connection->object) && !connection->async.data && !connection->scheduled_async.data && !connection->tcp.data && !connection->connect.data && + !connection->handshake_timer.data && connection->ref_count == 0) { tf_free(connection->message_requests); connection->message_requests = NULL; @@ -2792,6 +2804,8 @@ static tf_ssb_connection_t* _tf_ssb_connection_create( connection->port = ntohs(addr->sin_port); connection->async.data = connection; uv_async_init(ssb->loop, &connection->async, _tf_ssb_connection_process_message_async); + connection->scheduled_async.data = connection; + uv_async_init(ssb->loop, &connection->scheduled_async, _tf_ssb_connection_scheduled_async); connection->connect_callback = callback; connection->connect_callback_user_data = user_data; @@ -2861,6 +2875,8 @@ tf_ssb_connection_t* tf_ssb_connection_tunnel_create(tf_ssb_t* ssb, const char* tunnel->send_request_number = 1; tunnel->async.data = tunnel; uv_async_init(ssb->loop, &tunnel->async, _tf_ssb_connection_process_message_async); + tunnel->scheduled_async.data = tunnel; + uv_async_init(ssb->loop, &tunnel->scheduled_async, _tf_ssb_connection_scheduled_async); tunnel->handshake_timer.data = tunnel; uv_timer_init(ssb->loop, &tunnel->handshake_timer); @@ -2998,6 +3014,8 @@ static void _tf_ssb_on_connection(uv_stream_t* stream, int status) connection->send_request_number = 1; connection->async.data = connection; uv_async_init(ssb->loop, &connection->async, _tf_ssb_connection_process_message_async); + connection->scheduled_async.data = connection; + uv_async_init(ssb->loop, &connection->scheduled_async, _tf_ssb_connection_scheduled_async); connection->object = JS_NewObjectClass(ssb->context, _connection_class_id); JS_SetOpaque(connection->object, connection); @@ -4376,7 +4394,7 @@ void tf_ssb_connection_adjust_read_backpressure(tf_ssb_connection_t* connection, const int k_threshold = 256; int old_pressure = connection->read_back_pressure; connection->read_back_pressure += delta; - _tf_ssb_connection_dispatch_scheduled(connection); + uv_async_send(&connection->scheduled_async); if (!connection->closing) { if (old_pressure < k_threshold && connection->read_back_pressure >= k_threshold) @@ -4398,7 +4416,7 @@ void tf_ssb_connection_adjust_read_backpressure(tf_ssb_connection_t* connection, void tf_ssb_connection_adjust_write_count(tf_ssb_connection_t* connection, int delta) { connection->active_write_count += delta; - _tf_ssb_connection_dispatch_scheduled(connection); + uv_async_send(&connection->scheduled_async); } void tf_ssb_sync_start(tf_ssb_t* ssb)