diff --git a/src/ssb.c b/src/ssb.c index d1358b48..ddd4ec50 100644 --- a/src/ssb.c +++ b/src/ssb.c @@ -342,6 +342,8 @@ typedef struct _tf_ssb_connection_t tf_ssb_debug_message_t* debug_messages[k_debug_close_message_count]; int ref_count; + + int read_back_pressure; } tf_ssb_connection_t; static JSClassID _connection_class_id; @@ -2062,6 +2064,30 @@ static void _tf_ssb_connection_client_send_hello(tf_ssb_connection_t* connection connection->state = k_tf_ssb_state_sent_hello; } +static bool _tf_ssb_connection_read_start(tf_ssb_connection_t* connection) +{ + int result = uv_read_start((uv_stream_t*)&connection->tcp, _tf_ssb_connection_on_tcp_alloc, _tf_ssb_connection_on_tcp_recv); + if (result && result != UV_EALREADY) + { + tf_printf("uv_read_start => %s\n", uv_strerror(result)); + _tf_ssb_connection_close(connection, "uv_read_start failed"); + return false; + } + return true; +} + +static bool _tf_ssb_connection_read_stop(tf_ssb_connection_t* connection) +{ + int result = uv_read_stop((uv_stream_t*)&connection->tcp); + if (result && result != UV_EALREADY) + { + tf_printf("uv_read_stop => %s\n", uv_strerror(result)); + _tf_ssb_connection_close(connection, "uv_read_stop failed"); + return false; + } + return true; +} + static void _tf_ssb_connection_on_connect(uv_connect_t* connect, int status) { tf_ssb_connection_t* connection = connect->data; @@ -2069,13 +2095,7 @@ static void _tf_ssb_connection_on_connect(uv_connect_t* connect, int status) if (status == 0) { connection->state = k_tf_ssb_state_connected; - int result = uv_read_start(connect->handle, _tf_ssb_connection_on_tcp_alloc, _tf_ssb_connection_on_tcp_recv); - if (result) - { - tf_printf("uv_read_start => %s\n", uv_strerror(status)); - _tf_ssb_connection_close(connection, "uv_read_start failed"); - } - else + if (_tf_ssb_connection_read_start(connection)) { _tf_ssb_connection_client_send_hello(connection); } @@ -2826,7 +2846,7 @@ static void _tf_ssb_on_connection(uv_stream_t* stream, int status) _tf_ssb_notify_connections_changed(ssb, k_tf_ssb_change_create, connection); connection->state = k_tf_ssb_state_server_wait_hello; - uv_read_start((uv_stream_t*)&connection->tcp, _tf_ssb_connection_on_tcp_alloc, _tf_ssb_connection_on_tcp_recv); + _tf_ssb_connection_read_start(connection); } static void _tf_ssb_send_broadcast(tf_ssb_t* ssb, struct sockaddr_in* address, struct sockaddr_in* netmask) @@ -4065,3 +4085,26 @@ JSValue tf_ssb_connection_requests_to_object(tf_ssb_connection_t* connection) } return object; } + +void tf_ssb_connection_adjust_read_backpressure(tf_ssb_connection_t* connection, int delta) +{ + const int k_threshold = 256; + int old_pressure = connection->read_back_pressure; + connection->read_back_pressure += delta; + if (!connection->closing) + { + if (old_pressure < k_threshold && connection->read_back_pressure >= k_threshold) + { + _tf_ssb_connection_read_stop(connection); + } + else if (old_pressure >= k_threshold && connection->read_back_pressure < k_threshold) + { + _tf_ssb_connection_read_start(connection); + } + } + connection->ref_count += delta; + if (connection->ref_count == 0 && connection->closing) + { + _tf_ssb_connection_destroy(connection, "backpressure released"); + } +} diff --git a/src/ssb.h b/src/ssb.h index 4e24980b..e0f6c7bd 100644 --- a/src/ssb.h +++ b/src/ssb.h @@ -996,4 +996,13 @@ void tf_ssb_schedule_work(tf_ssb_t* ssb, int delay_ms, void (*callback)(tf_ssb_t */ bool tf_ssb_hmacsha256_verify(const char* public_key, const void* payload, size_t payload_length, const char* signature, bool signature_is_urlb64); +/** +** Adjust read backpressure. If it gets too high, TCP receive will be paused +** until it lowers. +** @param connection The connection on which to affect backpressure. +** @param delta The change in backpressure. Higher will eventually pause +** receive. Lower will resume it. +*/ +void tf_ssb_connection_adjust_read_backpressure(tf_ssb_connection_t* connection, int delta); + /** @} */ diff --git a/src/ssb.rpc.c b/src/ssb.rpc.c index 9fa6cf8b..1f36568f 100644 --- a/src/ssb.rpc.c +++ b/src/ssb.rpc.c @@ -404,6 +404,7 @@ typedef struct _blobs_get_t bool done; bool storing; tf_ssb_t* ssb; + tf_ssb_connection_t* connection; uint8_t buffer[]; } blobs_get_t; @@ -411,6 +412,7 @@ static void _tf_ssb_rpc_blob_store_callback(const char* id, bool is_new, void* u { blobs_get_t* get = user_data; get->storing = false; + tf_ssb_connection_adjust_read_backpressure(get->connection, -1); if (get->done) { tf_free(get); @@ -433,6 +435,7 @@ static void _tf_ssb_rpc_connection_blobs_get_callback( if (JS_ToBool(context, args)) { get->storing = true; + tf_ssb_connection_adjust_read_backpressure(connection, 1); tf_ssb_db_blob_store_async(ssb, get->buffer, get->received, _tf_ssb_rpc_blob_store_callback, get); } /* TODO: Should we send the response in the callback? */ @@ -455,7 +458,7 @@ static void _tf_ssb_rpc_connection_blobs_get_cleanup(tf_ssb_t* ssb, void* user_d static void _tf_ssb_rpc_connection_blobs_get(tf_ssb_connection_t* connection, const char* blob_id, size_t size) { blobs_get_t* get = tf_malloc(sizeof(blobs_get_t) + size); - *get = (blobs_get_t) { .ssb = tf_ssb_connection_get_ssb(connection), .expected_size = size }; + *get = (blobs_get_t) { .ssb = tf_ssb_connection_get_ssb(connection), .connection = connection, .expected_size = size }; snprintf(get->id, sizeof(get->id), "%s", blob_id); memset(get->buffer, 0, size); @@ -1000,6 +1003,12 @@ static void _tf_ssb_rpc_ebt_replicate_send_messages(tf_ssb_connection_t* connect } } +static void _tf_ssb_rpc_ebt_replicate_store_callback(const char* id, bool verified, bool is_new, void* user_data) +{ + tf_ssb_connection_t* connection = user_data; + tf_ssb_connection_adjust_read_backpressure(connection, -1); +} + static void _tf_ssb_rpc_ebt_replicate(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) { tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection); @@ -1022,7 +1031,8 @@ static void _tf_ssb_rpc_ebt_replicate(tf_ssb_connection_t* connection, uint8_t f if (!JS_IsUndefined(author)) { /* Looks like a message. */ - tf_ssb_verify_strip_and_store_message(ssb, args, NULL, NULL); + tf_ssb_connection_adjust_read_backpressure(connection, 1); + tf_ssb_verify_strip_and_store_message(ssb, args, _tf_ssb_rpc_ebt_replicate_store_callback, connection); } else {