Limit message sends in a continued attempt to fix intermittent runaway memory usage. #64

This commit is contained in:
Cory McWilliams 2024-06-02 12:38:12 -04:00
parent 5d5567e94c
commit feb4bf9e87
3 changed files with 49 additions and 27 deletions

View File

@ -344,6 +344,7 @@ typedef struct _tf_ssb_connection_t
int ref_count;
int read_back_pressure;
int active_write_count;
} tf_ssb_connection_t;
static JSClassID _connection_class_id;
@ -460,9 +461,10 @@ static void _tf_ssb_connection_on_tcp_alloc(uv_handle_t* handle, size_t suggeste
static void _tf_ssb_connection_on_write(uv_write_t* req, int status)
{
tf_ssb_connection_t* connection = req->data;
tf_ssb_connection_adjust_write_count(connection, -1);
if (status)
{
tf_ssb_connection_t* connection = req->data;
char buffer[256];
snprintf(buffer, sizeof(buffer), "write failed asynchronously: %s", uv_strerror(status));
_tf_ssb_connection_close(connection, buffer);
@ -477,9 +479,11 @@ static void _tf_ssb_write(tf_ssb_connection_t* connection, void* data, size_t si
uv_write_t* write = tf_malloc(sizeof(uv_write_t) + size);
*write = (uv_write_t) { .data = connection };
memcpy(write + 1, data, size);
tf_ssb_connection_adjust_write_count(connection, 1);
int result = uv_write(write, (uv_stream_t*)&connection->tcp, &(uv_buf_t) { .base = (char*)(write + 1), .len = size }, 1, _tf_ssb_connection_on_write);
if (result)
{
tf_ssb_connection_adjust_write_count(connection, -1);
_tf_ssb_connection_close(connection, "write failed");
tf_free(write);
}
@ -606,6 +610,21 @@ static void _tf_ssb_connection_box_stream_send(tf_ssb_connection_t* connection,
}
}
static void _tf_ssb_connection_dispatch_scheduled(tf_ssb_connection_t* connection)
{
while ((connection->active_write_count == 0 || connection->closing) &&
connection->scheduled_count &&
connection->scheduled)
{
tf_ssb_connection_scheduled_t scheduled = connection->scheduled[0];
memmove(connection->scheduled, connection->scheduled + 1, sizeof(tf_ssb_connection_scheduled_t) * (connection->scheduled_count - 1));
connection->scheduled_count--;
tf_trace_begin(connection->ssb->trace, "scheduled callback");
scheduled.callback(connection, scheduled.user_data);
tf_trace_end(connection->ssb->trace);
}
}
void tf_ssb_connection_schedule_idle(tf_ssb_connection_t* connection, tf_ssb_scheduled_callback_t* callback, void* user_data)
{
connection->scheduled = tf_resize_vec(connection->scheduled, sizeof(tf_ssb_connection_scheduled_t) * (connection->scheduled_count + 1));
@ -613,7 +632,7 @@ void tf_ssb_connection_schedule_idle(tf_ssb_connection_t* connection, tf_ssb_sch
.callback = callback,
.user_data = user_data,
};
uv_async_send(&connection->async);
_tf_ssb_connection_dispatch_scheduled(connection);
}
static int _request_compare(const void* a, const void* b)
@ -1815,24 +1834,6 @@ JSValue tf_ssb_sign_message(tf_ssb_t* ssb, const char* author, const uint8_t* pr
return root;
}
static void _tf_ssb_connection_dispatch_scheduled(tf_ssb_connection_t* connection)
{
const int k_scheduled_batch_count = 8;
for (int i = 0; i < k_scheduled_batch_count && connection->scheduled_count && connection->scheduled; i++)
{
tf_ssb_connection_scheduled_t scheduled = connection->scheduled[0];
memmove(connection->scheduled, connection->scheduled + 1, sizeof(tf_ssb_connection_scheduled_t) * (connection->scheduled_count - 1));
connection->scheduled_count--;
tf_trace_begin(connection->ssb->trace, "scheduled callback");
scheduled.callback(connection, scheduled.user_data);
tf_trace_end(connection->ssb->trace);
}
if (connection->scheduled_count)
{
uv_async_send(&connection->async);
}
}
static void _tf_ssb_connection_destroy(tf_ssb_connection_t* connection, const char* reason)
{
tf_ssb_t* ssb = connection->ssb;
@ -1841,10 +1842,7 @@ static void _tf_ssb_connection_destroy(tf_ssb_connection_t* connection, const ch
{
connection->destroy_reason = reason;
}
while (connection->scheduled_count)
{
_tf_ssb_connection_dispatch_scheduled(connection);
}
_tf_ssb_connection_dispatch_scheduled(connection);
tf_free(connection->scheduled);
connection->scheduled = NULL;
while (connection->requests)
@ -2613,7 +2611,6 @@ static void _tf_ssb_connection_process_message_async(uv_async_t* async)
{
uv_async_send(&connection->async);
}
_tf_ssb_connection_dispatch_scheduled(connection);
}
tf_ssb_connection_t* tf_ssb_connection_create(tf_ssb_t* ssb, const char* host, const struct sockaddr_in* addr, const uint8_t* public_key)
@ -4108,3 +4105,9 @@ void tf_ssb_connection_adjust_read_backpressure(tf_ssb_connection_t* connection,
_tf_ssb_connection_destroy(connection, "backpressure released");
}
}
void tf_ssb_connection_adjust_write_count(tf_ssb_connection_t* connection, int delta)
{
connection->active_write_count += delta;
_tf_ssb_connection_dispatch_scheduled(connection);
}

View File

@ -745,7 +745,7 @@ JSValue tf_ssb_connection_requests_to_object(tf_ssb_connection_t* connection);
typedef void(tf_ssb_scheduled_callback_t)(tf_ssb_connection_t* connection, void* user_data);
/**
** Schedule work to be run when the server is next idle.
** Schedule work to be run when the connection is next idle.
** @param connection The owning connection.
** @param callback The callback to call.
** @param user_data User data to pass to the callback.
@ -1005,4 +1005,13 @@ bool tf_ssb_hmacsha256_verify(const char* public_key, const void* payload, size_
*/
void tf_ssb_connection_adjust_read_backpressure(tf_ssb_connection_t* connection, int delta);
/**
** Adjust write count. Work scheduled by tf_ssb_connection_schedule_idle will
** only start when this reaches zero.
** @param connection The connection on which to affect backpressure.
** @param delta The change in write count. Higher will pause processing
** scheduled idle work queue. Lower will resume it.
*/
void tf_ssb_connection_adjust_write_count(tf_ssb_connection_t* connection, int delta);
/** @} */

View File

@ -744,6 +744,7 @@ static void _tf_ssb_connection_send_history_stream_work(tf_ssb_connection_t* con
static void _tf_ssb_connection_send_history_stream_after_work(tf_ssb_connection_t* connection, int result, void* user_data)
{
tf_ssb_connection_send_history_stream_t* request = user_data;
tf_ssb_connection_adjust_write_count(connection, -1);
if (tf_ssb_connection_is_connected(connection))
{
for (int i = 0; i < request->out_messages_count; i++)
@ -768,6 +769,15 @@ static void _tf_ssb_connection_send_history_stream_after_work(tf_ssb_connection_
tf_free(request);
}
static void _tf_ssb_connection_send_history_stream_callback(tf_ssb_connection_t* connection, void* user_data)
{
tf_ssb_connection_adjust_write_count(connection, 1);
if (tf_ssb_connection_is_connected(connection))
{
tf_ssb_connection_run_work(connection, _tf_ssb_connection_send_history_stream_work, _tf_ssb_connection_send_history_stream_after_work, user_data);
}
}
static void _tf_ssb_connection_send_history_stream(tf_ssb_connection_t* connection, int32_t request_number, const char* author, int64_t sequence, bool keys, bool live)
{
tf_ssb_connection_send_history_stream_t* async = tf_malloc(sizeof(tf_ssb_connection_send_history_stream_t));
@ -778,7 +788,7 @@ static void _tf_ssb_connection_send_history_stream(tf_ssb_connection_t* connecti
.live = live,
};
snprintf(async->author, sizeof(async->author), "%s", author);
tf_ssb_connection_run_work(connection, _tf_ssb_connection_send_history_stream_work, _tf_ssb_connection_send_history_stream_after_work, async);
tf_ssb_connection_schedule_idle(connection, _tf_ssb_connection_send_history_stream_callback, async);
}
static void _tf_ssb_rpc_createHistoryStream(