An experiment in controlling memory usage when syncing. uv_read_stop when we have too active message/blob writes to the database and uv_read_start when we're back under control. #64
This commit is contained in:
parent
1be94ae0be
commit
d974a5e044
59
src/ssb.c
59
src/ssb.c
@ -342,6 +342,8 @@ typedef struct _tf_ssb_connection_t
|
|||||||
|
|
||||||
tf_ssb_debug_message_t* debug_messages[k_debug_close_message_count];
|
tf_ssb_debug_message_t* debug_messages[k_debug_close_message_count];
|
||||||
int ref_count;
|
int ref_count;
|
||||||
|
|
||||||
|
int read_back_pressure;
|
||||||
} tf_ssb_connection_t;
|
} tf_ssb_connection_t;
|
||||||
|
|
||||||
static JSClassID _connection_class_id;
|
static JSClassID _connection_class_id;
|
||||||
@ -2062,6 +2064,30 @@ static void _tf_ssb_connection_client_send_hello(tf_ssb_connection_t* connection
|
|||||||
connection->state = k_tf_ssb_state_sent_hello;
|
connection->state = k_tf_ssb_state_sent_hello;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static bool _tf_ssb_connection_read_start(tf_ssb_connection_t* connection)
|
||||||
|
{
|
||||||
|
int result = uv_read_start((uv_stream_t*)&connection->tcp, _tf_ssb_connection_on_tcp_alloc, _tf_ssb_connection_on_tcp_recv);
|
||||||
|
if (result && result != UV_EALREADY)
|
||||||
|
{
|
||||||
|
tf_printf("uv_read_start => %s\n", uv_strerror(result));
|
||||||
|
_tf_ssb_connection_close(connection, "uv_read_start failed");
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
static bool _tf_ssb_connection_read_stop(tf_ssb_connection_t* connection)
|
||||||
|
{
|
||||||
|
int result = uv_read_stop((uv_stream_t*)&connection->tcp);
|
||||||
|
if (result && result != UV_EALREADY)
|
||||||
|
{
|
||||||
|
tf_printf("uv_read_stop => %s\n", uv_strerror(result));
|
||||||
|
_tf_ssb_connection_close(connection, "uv_read_stop failed");
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
static void _tf_ssb_connection_on_connect(uv_connect_t* connect, int status)
|
static void _tf_ssb_connection_on_connect(uv_connect_t* connect, int status)
|
||||||
{
|
{
|
||||||
tf_ssb_connection_t* connection = connect->data;
|
tf_ssb_connection_t* connection = connect->data;
|
||||||
@ -2069,13 +2095,7 @@ static void _tf_ssb_connection_on_connect(uv_connect_t* connect, int status)
|
|||||||
if (status == 0)
|
if (status == 0)
|
||||||
{
|
{
|
||||||
connection->state = k_tf_ssb_state_connected;
|
connection->state = k_tf_ssb_state_connected;
|
||||||
int result = uv_read_start(connect->handle, _tf_ssb_connection_on_tcp_alloc, _tf_ssb_connection_on_tcp_recv);
|
if (_tf_ssb_connection_read_start(connection))
|
||||||
if (result)
|
|
||||||
{
|
|
||||||
tf_printf("uv_read_start => %s\n", uv_strerror(status));
|
|
||||||
_tf_ssb_connection_close(connection, "uv_read_start failed");
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
{
|
||||||
_tf_ssb_connection_client_send_hello(connection);
|
_tf_ssb_connection_client_send_hello(connection);
|
||||||
}
|
}
|
||||||
@ -2826,7 +2846,7 @@ static void _tf_ssb_on_connection(uv_stream_t* stream, int status)
|
|||||||
_tf_ssb_notify_connections_changed(ssb, k_tf_ssb_change_create, connection);
|
_tf_ssb_notify_connections_changed(ssb, k_tf_ssb_change_create, connection);
|
||||||
|
|
||||||
connection->state = k_tf_ssb_state_server_wait_hello;
|
connection->state = k_tf_ssb_state_server_wait_hello;
|
||||||
uv_read_start((uv_stream_t*)&connection->tcp, _tf_ssb_connection_on_tcp_alloc, _tf_ssb_connection_on_tcp_recv);
|
_tf_ssb_connection_read_start(connection);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void _tf_ssb_send_broadcast(tf_ssb_t* ssb, struct sockaddr_in* address, struct sockaddr_in* netmask)
|
static void _tf_ssb_send_broadcast(tf_ssb_t* ssb, struct sockaddr_in* address, struct sockaddr_in* netmask)
|
||||||
@ -4065,3 +4085,26 @@ JSValue tf_ssb_connection_requests_to_object(tf_ssb_connection_t* connection)
|
|||||||
}
|
}
|
||||||
return object;
|
return object;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void tf_ssb_connection_adjust_read_backpressure(tf_ssb_connection_t* connection, int delta)
|
||||||
|
{
|
||||||
|
const int k_threshold = 256;
|
||||||
|
int old_pressure = connection->read_back_pressure;
|
||||||
|
connection->read_back_pressure += delta;
|
||||||
|
if (!connection->closing)
|
||||||
|
{
|
||||||
|
if (old_pressure < k_threshold && connection->read_back_pressure >= k_threshold)
|
||||||
|
{
|
||||||
|
_tf_ssb_connection_read_stop(connection);
|
||||||
|
}
|
||||||
|
else if (old_pressure >= k_threshold && connection->read_back_pressure < k_threshold)
|
||||||
|
{
|
||||||
|
_tf_ssb_connection_read_start(connection);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
connection->ref_count += delta;
|
||||||
|
if (connection->ref_count == 0 && connection->closing)
|
||||||
|
{
|
||||||
|
_tf_ssb_connection_destroy(connection, "backpressure released");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -996,4 +996,13 @@ void tf_ssb_schedule_work(tf_ssb_t* ssb, int delay_ms, void (*callback)(tf_ssb_t
|
|||||||
*/
|
*/
|
||||||
bool tf_ssb_hmacsha256_verify(const char* public_key, const void* payload, size_t payload_length, const char* signature, bool signature_is_urlb64);
|
bool tf_ssb_hmacsha256_verify(const char* public_key, const void* payload, size_t payload_length, const char* signature, bool signature_is_urlb64);
|
||||||
|
|
||||||
|
/**
|
||||||
|
** Adjust read backpressure. If it gets too high, TCP receive will be paused
|
||||||
|
** until it lowers.
|
||||||
|
** @param connection The connection on which to affect backpressure.
|
||||||
|
** @param delta The change in backpressure. Higher will eventually pause
|
||||||
|
** receive. Lower will resume it.
|
||||||
|
*/
|
||||||
|
void tf_ssb_connection_adjust_read_backpressure(tf_ssb_connection_t* connection, int delta);
|
||||||
|
|
||||||
/** @} */
|
/** @} */
|
||||||
|
@ -404,6 +404,7 @@ typedef struct _blobs_get_t
|
|||||||
bool done;
|
bool done;
|
||||||
bool storing;
|
bool storing;
|
||||||
tf_ssb_t* ssb;
|
tf_ssb_t* ssb;
|
||||||
|
tf_ssb_connection_t* connection;
|
||||||
uint8_t buffer[];
|
uint8_t buffer[];
|
||||||
} blobs_get_t;
|
} blobs_get_t;
|
||||||
|
|
||||||
@ -411,6 +412,7 @@ static void _tf_ssb_rpc_blob_store_callback(const char* id, bool is_new, void* u
|
|||||||
{
|
{
|
||||||
blobs_get_t* get = user_data;
|
blobs_get_t* get = user_data;
|
||||||
get->storing = false;
|
get->storing = false;
|
||||||
|
tf_ssb_connection_adjust_read_backpressure(get->connection, -1);
|
||||||
if (get->done)
|
if (get->done)
|
||||||
{
|
{
|
||||||
tf_free(get);
|
tf_free(get);
|
||||||
@ -433,6 +435,7 @@ static void _tf_ssb_rpc_connection_blobs_get_callback(
|
|||||||
if (JS_ToBool(context, args))
|
if (JS_ToBool(context, args))
|
||||||
{
|
{
|
||||||
get->storing = true;
|
get->storing = true;
|
||||||
|
tf_ssb_connection_adjust_read_backpressure(connection, 1);
|
||||||
tf_ssb_db_blob_store_async(ssb, get->buffer, get->received, _tf_ssb_rpc_blob_store_callback, get);
|
tf_ssb_db_blob_store_async(ssb, get->buffer, get->received, _tf_ssb_rpc_blob_store_callback, get);
|
||||||
}
|
}
|
||||||
/* TODO: Should we send the response in the callback? */
|
/* TODO: Should we send the response in the callback? */
|
||||||
@ -455,7 +458,7 @@ static void _tf_ssb_rpc_connection_blobs_get_cleanup(tf_ssb_t* ssb, void* user_d
|
|||||||
static void _tf_ssb_rpc_connection_blobs_get(tf_ssb_connection_t* connection, const char* blob_id, size_t size)
|
static void _tf_ssb_rpc_connection_blobs_get(tf_ssb_connection_t* connection, const char* blob_id, size_t size)
|
||||||
{
|
{
|
||||||
blobs_get_t* get = tf_malloc(sizeof(blobs_get_t) + size);
|
blobs_get_t* get = tf_malloc(sizeof(blobs_get_t) + size);
|
||||||
*get = (blobs_get_t) { .ssb = tf_ssb_connection_get_ssb(connection), .expected_size = size };
|
*get = (blobs_get_t) { .ssb = tf_ssb_connection_get_ssb(connection), .connection = connection, .expected_size = size };
|
||||||
snprintf(get->id, sizeof(get->id), "%s", blob_id);
|
snprintf(get->id, sizeof(get->id), "%s", blob_id);
|
||||||
memset(get->buffer, 0, size);
|
memset(get->buffer, 0, size);
|
||||||
|
|
||||||
@ -1000,6 +1003,12 @@ static void _tf_ssb_rpc_ebt_replicate_send_messages(tf_ssb_connection_t* connect
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void _tf_ssb_rpc_ebt_replicate_store_callback(const char* id, bool verified, bool is_new, void* user_data)
|
||||||
|
{
|
||||||
|
tf_ssb_connection_t* connection = user_data;
|
||||||
|
tf_ssb_connection_adjust_read_backpressure(connection, -1);
|
||||||
|
}
|
||||||
|
|
||||||
static void _tf_ssb_rpc_ebt_replicate(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data)
|
static void _tf_ssb_rpc_ebt_replicate(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data)
|
||||||
{
|
{
|
||||||
tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection);
|
tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection);
|
||||||
@ -1022,7 +1031,8 @@ static void _tf_ssb_rpc_ebt_replicate(tf_ssb_connection_t* connection, uint8_t f
|
|||||||
if (!JS_IsUndefined(author))
|
if (!JS_IsUndefined(author))
|
||||||
{
|
{
|
||||||
/* Looks like a message. */
|
/* Looks like a message. */
|
||||||
tf_ssb_verify_strip_and_store_message(ssb, args, NULL, NULL);
|
tf_ssb_connection_adjust_read_backpressure(connection, 1);
|
||||||
|
tf_ssb_verify_strip_and_store_message(ssb, args, _tf_ssb_rpc_ebt_replicate_store_callback, connection);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
Loading…
Reference in New Issue
Block a user