ssb: Fix a crash on Windows when we would call uv_async_send on a handle that had already been closed. Various other cleanup and improvements along the journey. #96

This commit is contained in:
Cory McWilliams 2025-01-02 12:35:58 -05:00
parent fd40596ce7
commit cd2fe9f8d9
5 changed files with 38 additions and 37 deletions

View File

@ -1011,10 +1011,8 @@ static void _error_handler(int sig)
#if defined(_WIN32) #if defined(_WIN32)
static LONG WINAPI _win32_exception_handler(EXCEPTION_POINTERS* info) static LONG WINAPI _win32_exception_handler(EXCEPTION_POINTERS* info)
{ {
if (info->ExceptionRecord->ExceptionCode == STATUS_ACCESS_VIOLATION || if (info->ExceptionRecord->ExceptionCode == STATUS_ACCESS_VIOLATION || info->ExceptionRecord->ExceptionCode == STATUS_ILLEGAL_INSTRUCTION ||
info->ExceptionRecord->ExceptionCode == STATUS_ILLEGAL_INSTRUCTION || info->ExceptionRecord->ExceptionCode == STATUS_STACK_OVERFLOW || info->ExceptionRecord->ExceptionCode == STATUS_HEAP_CORRUPTION)
info->ExceptionRecord->ExceptionCode == STATUS_STACK_OVERFLOW ||
info->ExceptionRecord->ExceptionCode == STATUS_HEAP_CORRUPTION)
{ {
const char* stack = tf_util_backtrace_string(); const char* stack = tf_util_backtrace_string();
tf_printf("ERROR:\n%s\n", stack); tf_printf("ERROR:\n%s\n", stack);
@ -1058,16 +1056,16 @@ static void _startup(int argc, char* argv[])
#endif #endif
bool use_error_handler = false; bool use_error_handler = false;
#if defined(__ANDROID__) #if defined(__ANDROID__) || defined(_WIN32)
use_error_handler = true; use_error_handler = true;
#endif #endif
if (use_error_handler) if (use_error_handler)
{ {
if ( if (
#if !defined(_WIN32) #if !defined(_WIN32)
signal(SIGSYS, _error_handler) == SIG_ERR || signal(SIGABRT, _error_handler) == SIG_ERR || signal(SIGSYS, _error_handler) == SIG_ERR ||
#endif #endif
signal(SIGSEGV, _error_handler) == SIG_ERR) signal(SIGABRT, _error_handler) == SIG_ERR || signal(SIGSEGV, _error_handler) == SIG_ERR)
{ {
perror("signal"); perror("signal");
} }

View File

@ -272,7 +272,7 @@ typedef struct _tf_ssb_connection_t
uv_async_t scheduled_async; uv_async_t scheduled_async;
uv_timer_t handshake_timer; uv_timer_t handshake_timer;
uv_timer_t linger_timer; uv_timer_t linger_timer;
bool closing; bool is_closing;
tf_ssb_connection_t* tunnel_connection; tf_ssb_connection_t* tunnel_connection;
int32_t tunnel_request_number; int32_t tunnel_request_number;
@ -600,7 +600,7 @@ static uint32_t _tf_ssb_connection_prng(tf_ssb_connection_t* connection)
static void _tf_ssb_connection_dispatch_scheduled(tf_ssb_connection_t* connection) static void _tf_ssb_connection_dispatch_scheduled(tf_ssb_connection_t* connection)
{ {
while (((connection->active_write_count == 0 && connection->read_back_pressure == 0) || connection->closing) && connection->scheduled_count && connection->scheduled) while (((connection->active_write_count == 0 && connection->read_back_pressure == 0) || connection->is_closing) && connection->scheduled_count && connection->scheduled)
{ {
int index = _tf_ssb_connection_prng(connection) % connection->scheduled_count; int index = _tf_ssb_connection_prng(connection) % connection->scheduled_count;
tf_ssb_connection_scheduled_t scheduled = connection->scheduled[index]; tf_ssb_connection_scheduled_t scheduled = connection->scheduled[index];
@ -628,9 +628,9 @@ static int _tf_ssb_connection_scheduled_compare(const void* a, const void* b)
void tf_ssb_connection_schedule_idle(tf_ssb_connection_t* connection, const char* key, tf_ssb_scheduled_callback_t* callback, void* user_data) void tf_ssb_connection_schedule_idle(tf_ssb_connection_t* connection, const char* key, tf_ssb_scheduled_callback_t* callback, void* user_data)
{ {
int index = tf_util_insert_index(key, connection->scheduled, connection->scheduled_count, sizeof(tf_ssb_connection_scheduled_t), _tf_ssb_connection_scheduled_compare); int index = tf_util_insert_index(key, connection->scheduled, connection->scheduled_count, sizeof(tf_ssb_connection_scheduled_t), _tf_ssb_connection_scheduled_compare);
if (index != connection->scheduled_count && strcmp(key, connection->scheduled[index].key) == 0) if (connection->is_closing || (index != connection->scheduled_count && strcmp(key, connection->scheduled[index].key) == 0))
{ {
/* Keep the old request. Skip the new request. */ /* Skip the new request. */
tf_trace_begin(connection->ssb->trace, "scheduled callback (skip)"); tf_trace_begin(connection->ssb->trace, "scheduled callback (skip)");
PRE_CALLBACK(connection->ssb, callback); PRE_CALLBACK(connection->ssb, callback);
callback(connection, true, user_data); callback(connection, true, user_data);
@ -647,9 +647,9 @@ void tf_ssb_connection_schedule_idle(tf_ssb_connection_t* connection, const char
}; };
snprintf(connection->scheduled[index].key, sizeof(connection->scheduled[index].key), "%s", key); snprintf(connection->scheduled[index].key, sizeof(connection->scheduled[index].key), "%s", key);
connection->scheduled_count++; connection->scheduled_count++;
}
uv_async_send(&connection->scheduled_async); uv_async_send(&connection->scheduled_async);
}
} }
static int _request_compare(const void* a, const void* b) static int _request_compare(const void* a, const void* b)
@ -1340,6 +1340,11 @@ bool tf_ssb_connection_is_connected(tf_ssb_connection_t* connection)
return connection->state == k_tf_ssb_state_verified || connection->state == k_tf_ssb_state_server_verified; return connection->state == k_tf_ssb_state_verified || connection->state == k_tf_ssb_state_server_verified;
} }
bool tf_ssb_connection_is_closing(tf_ssb_connection_t* connection)
{
return connection && connection->is_closing;
}
const char* tf_ssb_connection_get_host(tf_ssb_connection_t* connection) const char* tf_ssb_connection_get_host(tf_ssb_connection_t* connection)
{ {
return connection->host; return connection->host;
@ -1886,9 +1891,9 @@ static void _tf_ssb_connection_linger_timer(uv_timer_t* timer)
static void _tf_ssb_connection_destroy(tf_ssb_connection_t* connection, const char* reason) static void _tf_ssb_connection_destroy(tf_ssb_connection_t* connection, const char* reason)
{ {
tf_ssb_t* ssb = connection->ssb; tf_ssb_t* ssb = connection->ssb;
if (!connection->closing) if (!connection->is_closing)
{ {
connection->closing = true; connection->is_closing = true;
uv_timer_start(&connection->linger_timer, _tf_ssb_connection_linger_timer, 5000, 0); uv_timer_start(&connection->linger_timer, _tf_ssb_connection_linger_timer, 5000, 0);
_tf_ssb_notify_connections_changed(ssb, k_tf_ssb_change_update, connection); _tf_ssb_notify_connections_changed(ssb, k_tf_ssb_change_update, connection);
} }
@ -2006,7 +2011,7 @@ static void _tf_ssb_connection_on_close(uv_handle_t* handle)
{ {
tf_ssb_connection_t* connection = handle->data; tf_ssb_connection_t* connection = handle->data;
handle->data = NULL; handle->data = NULL;
if (connection && connection->closing) if (connection && connection->is_closing)
{ {
_tf_ssb_connection_destroy(connection, "handle closed"); _tf_ssb_connection_destroy(connection, "handle closed");
} }
@ -3978,7 +3983,7 @@ static void _tf_ssb_connection_after_work_callback(uv_work_t* work, int status)
tf_trace_end(data->connection->ssb->trace); tf_trace_end(data->connection->ssb->trace);
} }
data->connection->ref_count--; data->connection->ref_count--;
if (data->connection->ref_count == 0 && data->connection->closing) if (data->connection->ref_count == 0 && data->connection->is_closing)
{ {
_tf_ssb_connection_destroy(data->connection, "work completed"); _tf_ssb_connection_destroy(data->connection, "work completed");
} }
@ -4261,9 +4266,9 @@ void tf_ssb_connection_adjust_read_backpressure(tf_ssb_connection_t* connection,
const int k_threshold = 256; const int k_threshold = 256;
int old_pressure = connection->read_back_pressure; int old_pressure = connection->read_back_pressure;
connection->read_back_pressure += delta; connection->read_back_pressure += delta;
uv_async_send(&connection->scheduled_async); if (!connection->is_closing)
if (!connection->closing)
{ {
uv_async_send(&connection->scheduled_async);
if (old_pressure < k_threshold && connection->read_back_pressure >= k_threshold) if (old_pressure < k_threshold && connection->read_back_pressure >= k_threshold)
{ {
_tf_ssb_connection_read_stop(connection); _tf_ssb_connection_read_stop(connection);
@ -4274,7 +4279,7 @@ void tf_ssb_connection_adjust_read_backpressure(tf_ssb_connection_t* connection,
} }
} }
connection->ref_count += delta; connection->ref_count += delta;
if (connection->ref_count == 0 && connection->closing) if (connection->ref_count == 0 && connection->is_closing)
{ {
_tf_ssb_connection_destroy(connection, "backpressure released"); _tf_ssb_connection_destroy(connection, "backpressure released");
} }
@ -4283,7 +4288,10 @@ void tf_ssb_connection_adjust_read_backpressure(tf_ssb_connection_t* connection,
void tf_ssb_connection_adjust_write_count(tf_ssb_connection_t* connection, int delta) void tf_ssb_connection_adjust_write_count(tf_ssb_connection_t* connection, int delta)
{ {
connection->active_write_count += delta; connection->active_write_count += delta;
uv_async_send(&connection->scheduled_async); if (!connection->is_closing)
{
uv_async_send(&connection->scheduled_async);
}
} }
const char* tf_ssb_connection_get_destroy_reason(tf_ssb_connection_t* connection) const char* tf_ssb_connection_get_destroy_reason(tf_ssb_connection_t* connection)

View File

@ -525,6 +525,13 @@ void tf_ssb_connection_close(tf_ssb_connection_t* connection, const char* reason
*/ */
bool tf_ssb_connection_is_connected(tf_ssb_connection_t* connection); bool tf_ssb_connection_is_connected(tf_ssb_connection_t* connection);
/**
** Check whether a connection is in the process of closing.
** @param connection The connection.
** @return True if the connection is closing.
*/
bool tf_ssb_connection_is_closing(tf_ssb_connection_t* connection);
/** /**
** Get the next outgoing request number for a connection. ** Get the next outgoing request number for a connection.
** @param connection The connection. ** @param connection The connection.

View File

@ -1495,10 +1495,10 @@ static JSValue _tf_ssb_sqlAsync(JSContext* context, JSValueConst this_val, int a
uv_mutex_init(&work->lock); uv_mutex_init(&work->lock);
uv_async_init(tf_ssb_get_loop(ssb), &work->async, _tf_ssb_sqlAsync_start_timer); uv_async_init(tf_ssb_get_loop(ssb), &work->async, _tf_ssb_sqlAsync_start_timer);
uv_timer_init(tf_ssb_get_loop(ssb), &work->timeout); uv_timer_init(tf_ssb_get_loop(ssb), &work->timeout);
JSValue result = JS_NewPromiseCapability(context, work->promise); JSValue result = JS_UNDEFINED;
JSValue error_value = JS_UNDEFINED;
if (ssb) if (ssb)
{ {
result = JS_NewPromiseCapability(context, work->promise);
int32_t length = tf_util_get_length(context, argv[1]); int32_t length = tf_util_get_length(context, argv[1]);
for (int i = 0; i < length; i++) for (int i = 0; i < length; i++)
{ {
@ -1541,18 +1541,6 @@ static JSValue _tf_ssb_sqlAsync(JSContext* context, JSValueConst this_val, int a
} }
tf_ssb_run_work(ssb, _tf_ssb_sqlAsync_work, _tf_ssb_sqlAsync_after_work, work); tf_ssb_run_work(ssb, _tf_ssb_sqlAsync_work, _tf_ssb_sqlAsync_after_work, work);
} }
if (!JS_IsUndefined(error_value))
{
JSValue call_result = JS_Call(context, work->promise[1], JS_UNDEFINED, 1, &error_value);
tf_util_report_error(context, call_result);
JS_FreeValue(context, call_result);
JS_FreeValue(context, error_value);
JS_FreeCString(context, query);
JS_FreeValue(context, work->promise[0]);
JS_FreeValue(context, work->promise[1]);
JS_FreeValue(context, work->callback);
_tf_ssb_sqlAsync_destroy(work);
}
return result; return result;
} }

View File

@ -908,7 +908,7 @@ static void _tf_ssb_connection_send_history_stream_callback(tf_ssb_connection_t*
static void _tf_ssb_connection_send_history_stream( static void _tf_ssb_connection_send_history_stream(
tf_ssb_connection_t* connection, int32_t request_number, const char* author, int64_t sequence, bool keys, bool live, bool end_request) tf_ssb_connection_t* connection, int32_t request_number, const char* author, int64_t sequence, bool keys, bool live, bool end_request)
{ {
if (tf_ssb_connection_is_connected(connection) && !tf_ssb_is_shutting_down(tf_ssb_connection_get_ssb(connection))) if (tf_ssb_connection_is_connected(connection) && !tf_ssb_is_shutting_down(tf_ssb_connection_get_ssb(connection)) && !tf_ssb_connection_is_closing(connection))
{ {
tf_ssb_connection_send_history_stream_t* async = tf_malloc(sizeof(tf_ssb_connection_send_history_stream_t)); tf_ssb_connection_send_history_stream_t* async = tf_malloc(sizeof(tf_ssb_connection_send_history_stream_t));
*async = (tf_ssb_connection_send_history_stream_t) { *async = (tf_ssb_connection_send_history_stream_t) {
@ -1210,7 +1210,7 @@ static void _tf_ssb_rpc_ebt_replicate(tf_ssb_connection_t* connection, uint8_t f
tf_ssb_connection_adjust_read_backpressure(connection, 1); tf_ssb_connection_adjust_read_backpressure(connection, 1);
tf_ssb_verify_strip_and_store_message(ssb, args, _tf_ssb_rpc_ebt_replicate_store_callback, connection); tf_ssb_verify_strip_and_store_message(ssb, args, _tf_ssb_rpc_ebt_replicate_store_callback, connection);
if (tf_ssb_connection_get_sent_clock(connection) && !tf_ssb_is_shutting_down(ssb)) if (tf_ssb_connection_get_sent_clock(connection) && !tf_ssb_is_shutting_down(ssb) && !tf_ssb_connection_is_closing(connection))
{ {
tf_ssb_connection_set_sent_clock(connection, false); tf_ssb_connection_set_sent_clock(connection, false);
resend_clock_t* resend = tf_malloc(sizeof(resend_clock_t)); resend_clock_t* resend = tf_malloc(sizeof(resend_clock_t));