Clean up connections that don't handshake in time.

This commit is contained in:
Cory McWilliams 2024-09-09 15:25:10 -04:00
parent a5814074fe
commit 6e06ec0904

@ -80,6 +80,7 @@ enum
k_seed_expire_seconds = 10 * 60, k_seed_expire_seconds = 10 * 60,
k_seed_check_interval_seconds = 5 * 60, k_seed_check_interval_seconds = 5 * 60,
k_udp_discovery_expires_seconds = 10, k_udp_discovery_expires_seconds = 10,
k_handshake_timeout_ms = 15000,
}; };
typedef struct _tf_ssb_broadcast_t tf_ssb_broadcast_t; typedef struct _tf_ssb_broadcast_t tf_ssb_broadcast_t;
@ -294,6 +295,7 @@ typedef struct _tf_ssb_connection_t
uv_tcp_t tcp; uv_tcp_t tcp;
uv_connect_t connect; uv_connect_t connect;
uv_async_t async; uv_async_t async;
uv_timer_t handshake_timer;
bool closing; bool closing;
tf_ssb_connection_t* tunnel_connection; tf_ssb_connection_t* tunnel_connection;
@ -1258,6 +1260,10 @@ static void _tf_ssb_connection_verify_identity(tf_ssb_connection_t* connection,
JS_SetPropertyStr(context, connection->object, "is_client", JS_TRUE); JS_SetPropertyStr(context, connection->object, "is_client", JS_TRUE);
connection->state = k_tf_ssb_state_verified; connection->state = k_tf_ssb_state_verified;
if (connection->handshake_timer.data)
{
uv_timer_stop(&connection->handshake_timer);
}
_tf_ssb_notify_connections_changed(connection->ssb, k_tf_ssb_change_connect, connection); _tf_ssb_notify_connections_changed(connection->ssb, k_tf_ssb_change_connect, connection);
} }
@ -1486,6 +1492,10 @@ static void _tf_ssb_connection_verify_client_identity(tf_ssb_connection_t* conne
JS_SetPropertyStr(context, connection->object, "is_client", JS_FALSE); JS_SetPropertyStr(context, connection->object, "is_client", JS_FALSE);
connection->state = k_tf_ssb_state_server_verified; connection->state = k_tf_ssb_state_server_verified;
if (connection->handshake_timer.data)
{
uv_timer_stop(&connection->handshake_timer);
}
_tf_ssb_notify_connections_changed(connection->ssb, k_tf_ssb_change_connect, connection); _tf_ssb_notify_connections_changed(connection->ssb, k_tf_ssb_change_connect, connection);
} }
@ -1931,8 +1941,13 @@ static void _tf_ssb_connection_destroy(tf_ssb_connection_t* connection, const ch
{ {
uv_close((uv_handle_t*)&connection->tcp, _tf_ssb_connection_on_close); uv_close((uv_handle_t*)&connection->tcp, _tf_ssb_connection_on_close);
} }
if (connection->handshake_timer.data && !uv_is_closing((uv_handle_t*)&connection->handshake_timer))
{
uv_close((uv_handle_t*)&connection->handshake_timer, _tf_ssb_connection_on_close);
}
if (JS_IsUndefined(connection->object) && !connection->async.data && !connection->tcp.data && !connection->connect.data && connection->ref_count == 0) if (JS_IsUndefined(connection->object) && !connection->async.data && !connection->tcp.data && !connection->connect.data && !connection->handshake_timer.data &&
connection->ref_count == 0)
{ {
tf_free(connection->message_requests); tf_free(connection->message_requests);
connection->message_requests = NULL; connection->message_requests = NULL;
@ -1956,7 +1971,7 @@ static void _tf_ssb_connection_on_close(uv_handle_t* handle)
{ {
tf_ssb_connection_t* connection = handle->data; tf_ssb_connection_t* connection = handle->data;
handle->data = NULL; handle->data = NULL;
if (connection) if (connection && connection->closing)
{ {
_tf_ssb_connection_destroy(connection, "handle closed"); _tf_ssb_connection_destroy(connection, "handle closed");
} }
@ -2643,6 +2658,15 @@ static void _tf_ssb_connection_process_message_async(uv_async_t* async)
} }
} }
static void _tf_ssb_connection_handshake_timer_callback(uv_timer_t* timer)
{
tf_ssb_connection_t* connection = timer->data;
if (connection && connection->state != k_tf_ssb_state_verified && connection->state != k_tf_ssb_state_server_verified)
{
_tf_ssb_connection_destroy(connection, "handshake timeout");
}
}
tf_ssb_connection_t* tf_ssb_connection_create(tf_ssb_t* ssb, const char* host, const struct sockaddr_in* addr, const uint8_t* public_key) tf_ssb_connection_t* tf_ssb_connection_create(tf_ssb_t* ssb, const char* host, const struct sockaddr_in* addr, const uint8_t* public_key)
{ {
for (tf_ssb_connection_t* connection = ssb->connections; connection; connection = connection->next) for (tf_ssb_connection_t* connection = ssb->connections; connection; connection = connection->next)
@ -2677,6 +2701,10 @@ tf_ssb_connection_t* tf_ssb_connection_create(tf_ssb_t* ssb, const char* host, c
connection->async.data = connection; connection->async.data = connection;
uv_async_init(ssb->loop, &connection->async, _tf_ssb_connection_process_message_async); uv_async_init(ssb->loop, &connection->async, _tf_ssb_connection_process_message_async);
connection->handshake_timer.data = connection;
uv_timer_init(ssb->loop, &connection->handshake_timer);
uv_timer_start(&connection->handshake_timer, _tf_ssb_connection_handshake_timer_callback, k_handshake_timeout_ms, 0);
connection->object = JS_NewObjectClass(ssb->context, _connection_class_id); connection->object = JS_NewObjectClass(ssb->context, _connection_class_id);
JS_SetOpaque(connection->object, connection); JS_SetOpaque(connection->object, connection);
char public_key_str[k_id_base64_len] = { 0 }; char public_key_str[k_id_base64_len] = { 0 };
@ -2737,6 +2765,10 @@ tf_ssb_connection_t* tf_ssb_connection_tunnel_create(tf_ssb_t* ssb, const char*
tunnel->async.data = tunnel; tunnel->async.data = tunnel;
uv_async_init(ssb->loop, &tunnel->async, _tf_ssb_connection_process_message_async); uv_async_init(ssb->loop, &tunnel->async, _tf_ssb_connection_process_message_async);
tunnel->handshake_timer.data = tunnel;
uv_timer_init(ssb->loop, &tunnel->handshake_timer);
uv_timer_start(&tunnel->handshake_timer, _tf_ssb_connection_handshake_timer_callback, k_handshake_timeout_ms, 0);
tunnel->object = JS_NewObjectClass(ssb->context, _connection_class_id); tunnel->object = JS_NewObjectClass(ssb->context, _connection_class_id);
JS_SetOpaque(tunnel->object, tunnel); JS_SetOpaque(tunnel->object, tunnel);
JS_SetPropertyStr(context, tunnel->object, "id", JS_NewString(context, target_id)); JS_SetPropertyStr(context, tunnel->object, "id", JS_NewString(context, target_id));
@ -2859,6 +2891,10 @@ static void _tf_ssb_on_connection(uv_stream_t* stream, int status)
return; return;
} }
connection->handshake_timer.data = connection;
uv_timer_init(ssb->loop, &connection->handshake_timer);
uv_timer_start(&connection->handshake_timer, _tf_ssb_connection_handshake_timer_callback, k_handshake_timeout_ms, 0);
struct sockaddr_storage addr = { 0 }; struct sockaddr_storage addr = { 0 };
int size = sizeof(addr); int size = sizeof(addr);
if (uv_tcp_getpeername(&connection->tcp, (struct sockaddr*)&addr, &size) == 0) if (uv_tcp_getpeername(&connection->tcp, (struct sockaddr*)&addr, &size) == 0)
@ -2870,10 +2906,10 @@ static void _tf_ssb_on_connection(uv_stream_t* stream, int status)
connection->next = ssb->connections; connection->next = ssb->connections;
ssb->connections = connection; ssb->connections = connection;
ssb->connections_count++; ssb->connections_count++;
_tf_ssb_notify_connections_changed(ssb, k_tf_ssb_change_create, connection);
connection->state = k_tf_ssb_state_server_wait_hello; connection->state = k_tf_ssb_state_server_wait_hello;
_tf_ssb_connection_read_start(connection); _tf_ssb_connection_read_start(connection);
_tf_ssb_notify_connections_changed(ssb, k_tf_ssb_change_create, connection);
} }
static void _tf_ssb_send_broadcast(tf_ssb_t* ssb, struct sockaddr_in* address, struct sockaddr_in* netmask) static void _tf_ssb_send_broadcast(tf_ssb_t* ssb, struct sockaddr_in* address, struct sockaddr_in* netmask)