diff --git a/src/ssb.c b/src/ssb.c index ddd4ec50..a7d9e398 100644 --- a/src/ssb.c +++ b/src/ssb.c @@ -344,6 +344,7 @@ typedef struct _tf_ssb_connection_t int ref_count; int read_back_pressure; + int active_write_count; } tf_ssb_connection_t; static JSClassID _connection_class_id; @@ -460,9 +461,10 @@ static void _tf_ssb_connection_on_tcp_alloc(uv_handle_t* handle, size_t suggeste static void _tf_ssb_connection_on_write(uv_write_t* req, int status) { + tf_ssb_connection_t* connection = req->data; + tf_ssb_connection_adjust_write_count(connection, -1); if (status) { - tf_ssb_connection_t* connection = req->data; char buffer[256]; snprintf(buffer, sizeof(buffer), "write failed asynchronously: %s", uv_strerror(status)); _tf_ssb_connection_close(connection, buffer); @@ -477,9 +479,11 @@ static void _tf_ssb_write(tf_ssb_connection_t* connection, void* data, size_t si uv_write_t* write = tf_malloc(sizeof(uv_write_t) + size); *write = (uv_write_t) { .data = connection }; memcpy(write + 1, data, size); + tf_ssb_connection_adjust_write_count(connection, 1); int result = uv_write(write, (uv_stream_t*)&connection->tcp, &(uv_buf_t) { .base = (char*)(write + 1), .len = size }, 1, _tf_ssb_connection_on_write); if (result) { + tf_ssb_connection_adjust_write_count(connection, -1); _tf_ssb_connection_close(connection, "write failed"); tf_free(write); } @@ -606,6 +610,21 @@ static void _tf_ssb_connection_box_stream_send(tf_ssb_connection_t* connection, } } +static void _tf_ssb_connection_dispatch_scheduled(tf_ssb_connection_t* connection) +{ + while ((connection->active_write_count == 0 || connection->closing) && + connection->scheduled_count && + connection->scheduled) + { + tf_ssb_connection_scheduled_t scheduled = connection->scheduled[0]; + memmove(connection->scheduled, connection->scheduled + 1, sizeof(tf_ssb_connection_scheduled_t) * (connection->scheduled_count - 1)); + connection->scheduled_count--; + tf_trace_begin(connection->ssb->trace, "scheduled callback"); + scheduled.callback(connection, scheduled.user_data); + tf_trace_end(connection->ssb->trace); + } +} + void tf_ssb_connection_schedule_idle(tf_ssb_connection_t* connection, tf_ssb_scheduled_callback_t* callback, void* user_data) { connection->scheduled = tf_resize_vec(connection->scheduled, sizeof(tf_ssb_connection_scheduled_t) * (connection->scheduled_count + 1)); @@ -613,7 +632,7 @@ void tf_ssb_connection_schedule_idle(tf_ssb_connection_t* connection, tf_ssb_sch .callback = callback, .user_data = user_data, }; - uv_async_send(&connection->async); + _tf_ssb_connection_dispatch_scheduled(connection); } static int _request_compare(const void* a, const void* b) @@ -1815,24 +1834,6 @@ JSValue tf_ssb_sign_message(tf_ssb_t* ssb, const char* author, const uint8_t* pr return root; } -static void _tf_ssb_connection_dispatch_scheduled(tf_ssb_connection_t* connection) -{ - const int k_scheduled_batch_count = 8; - for (int i = 0; i < k_scheduled_batch_count && connection->scheduled_count && connection->scheduled; i++) - { - tf_ssb_connection_scheduled_t scheduled = connection->scheduled[0]; - memmove(connection->scheduled, connection->scheduled + 1, sizeof(tf_ssb_connection_scheduled_t) * (connection->scheduled_count - 1)); - connection->scheduled_count--; - tf_trace_begin(connection->ssb->trace, "scheduled callback"); - scheduled.callback(connection, scheduled.user_data); - tf_trace_end(connection->ssb->trace); - } - if (connection->scheduled_count) - { - uv_async_send(&connection->async); - } -} - static void _tf_ssb_connection_destroy(tf_ssb_connection_t* connection, const char* reason) { tf_ssb_t* ssb = connection->ssb; @@ -1841,10 +1842,7 @@ static void _tf_ssb_connection_destroy(tf_ssb_connection_t* connection, const ch { connection->destroy_reason = reason; } - while (connection->scheduled_count) - { - _tf_ssb_connection_dispatch_scheduled(connection); - } + _tf_ssb_connection_dispatch_scheduled(connection); tf_free(connection->scheduled); connection->scheduled = NULL; while (connection->requests) @@ -2613,7 +2611,6 @@ static void _tf_ssb_connection_process_message_async(uv_async_t* async) { uv_async_send(&connection->async); } - _tf_ssb_connection_dispatch_scheduled(connection); } tf_ssb_connection_t* tf_ssb_connection_create(tf_ssb_t* ssb, const char* host, const struct sockaddr_in* addr, const uint8_t* public_key) @@ -4108,3 +4105,9 @@ void tf_ssb_connection_adjust_read_backpressure(tf_ssb_connection_t* connection, _tf_ssb_connection_destroy(connection, "backpressure released"); } } + +void tf_ssb_connection_adjust_write_count(tf_ssb_connection_t* connection, int delta) +{ + connection->active_write_count += delta; + _tf_ssb_connection_dispatch_scheduled(connection); +} diff --git a/src/ssb.h b/src/ssb.h index e0f6c7bd..2e7720dd 100644 --- a/src/ssb.h +++ b/src/ssb.h @@ -745,7 +745,7 @@ JSValue tf_ssb_connection_requests_to_object(tf_ssb_connection_t* connection); typedef void(tf_ssb_scheduled_callback_t)(tf_ssb_connection_t* connection, void* user_data); /** -** Schedule work to be run when the server is next idle. +** Schedule work to be run when the connection is next idle. ** @param connection The owning connection. ** @param callback The callback to call. ** @param user_data User data to pass to the callback. @@ -1005,4 +1005,13 @@ bool tf_ssb_hmacsha256_verify(const char* public_key, const void* payload, size_ */ void tf_ssb_connection_adjust_read_backpressure(tf_ssb_connection_t* connection, int delta); +/** +** Adjust write count. Work scheduled by tf_ssb_connection_schedule_idle will +** only start when this reaches zero. +** @param connection The connection on which to affect backpressure. +** @param delta The change in write count. Higher will pause processing +** scheduled idle work queue. Lower will resume it. +*/ +void tf_ssb_connection_adjust_write_count(tf_ssb_connection_t* connection, int delta); + /** @} */ diff --git a/src/ssb.rpc.c b/src/ssb.rpc.c index 1f36568f..339871fe 100644 --- a/src/ssb.rpc.c +++ b/src/ssb.rpc.c @@ -744,6 +744,7 @@ static void _tf_ssb_connection_send_history_stream_work(tf_ssb_connection_t* con static void _tf_ssb_connection_send_history_stream_after_work(tf_ssb_connection_t* connection, int result, void* user_data) { tf_ssb_connection_send_history_stream_t* request = user_data; + tf_ssb_connection_adjust_write_count(connection, -1); if (tf_ssb_connection_is_connected(connection)) { for (int i = 0; i < request->out_messages_count; i++) @@ -768,6 +769,15 @@ static void _tf_ssb_connection_send_history_stream_after_work(tf_ssb_connection_ tf_free(request); } +static void _tf_ssb_connection_send_history_stream_callback(tf_ssb_connection_t* connection, void* user_data) +{ + tf_ssb_connection_adjust_write_count(connection, 1); + if (tf_ssb_connection_is_connected(connection)) + { + tf_ssb_connection_run_work(connection, _tf_ssb_connection_send_history_stream_work, _tf_ssb_connection_send_history_stream_after_work, user_data); + } +} + static void _tf_ssb_connection_send_history_stream(tf_ssb_connection_t* connection, int32_t request_number, const char* author, int64_t sequence, bool keys, bool live) { tf_ssb_connection_send_history_stream_t* async = tf_malloc(sizeof(tf_ssb_connection_send_history_stream_t)); @@ -778,7 +788,7 @@ static void _tf_ssb_connection_send_history_stream(tf_ssb_connection_t* connecti .live = live, }; snprintf(async->author, sizeof(async->author), "%s", author); - tf_ssb_connection_run_work(connection, _tf_ssb_connection_send_history_stream_work, _tf_ssb_connection_send_history_stream_after_work, async); + tf_ssb_connection_schedule_idle(connection, _tf_ssb_connection_send_history_stream_callback, async); } static void _tf_ssb_rpc_createHistoryStream(