ssb: Consolidate some redundant connection code.
All checks were successful
Build Tilde Friends / Build-All (push) Successful in 22m32s

This commit is contained in:
Cory McWilliams 2024-12-27 12:27:54 -05:00
parent d202f4e00d
commit ba8941046e

119
src/ssb.c
View File

@ -2688,6 +2688,36 @@ static void _tf_ssb_connection_handshake_timer_callback(uv_timer_t* timer)
} }
} }
static tf_ssb_connection_t* _tf_ssb_connection_create_internal(tf_ssb_t* ssb, const char* name, int index)
{
ssb->connection_ref_count++;
tf_ssb_connection_t* connection = tf_malloc(sizeof(tf_ssb_connection_t));
memset(connection, 0, sizeof(*connection));
snprintf(connection->name, sizeof(connection->name), "%s%d", name, index);
connection->ssb = ssb;
connection->send_request_number = 1;
connection->async.data = connection;
uv_async_init(ssb->loop, &connection->async, _tf_ssb_connection_process_message_async);
connection->scheduled_async.data = connection;
uv_async_init(ssb->loop, &connection->scheduled_async, _tf_ssb_connection_scheduled_async);
connection->handshake_timer.data = connection;
uv_timer_init(ssb->loop, &connection->handshake_timer);
uv_timer_start(&connection->handshake_timer, _tf_ssb_connection_handshake_timer_callback, k_handshake_timeout_ms, 0);
connection->linger_timer.data = connection;
uv_timer_init(ssb->loop, &connection->linger_timer);
connection->object = JS_NewObjectClass(ssb->context, _connection_class_id);
JS_SetOpaque(connection->object, connection);
connection->next = ssb->connections;
ssb->connections = connection;
ssb->connections_count++;
_tf_ssb_notify_connections_changed(ssb, k_tf_ssb_change_create, connection);
return connection;
}
static tf_ssb_connection_t* _tf_ssb_connection_create( static tf_ssb_connection_t* _tf_ssb_connection_create(
tf_ssb_t* ssb, const char* host, const struct sockaddr_in* addr, const uint8_t* public_key, tf_ssb_connect_callback_t* callback, void* user_data) tf_ssb_t* ssb, const char* host, const struct sockaddr_in* addr, const uint8_t* public_key, tf_ssb_connect_callback_t* callback, void* user_data)
{ {
@ -2719,42 +2749,24 @@ static tf_ssb_connection_t* _tf_ssb_connection_create(
} }
} }
JSContext* context = ssb->context; tf_ssb_connection_t* connection = _tf_ssb_connection_create_internal(ssb, "cli", s_connection_index++);
tf_ssb_connection_t* connection = tf_malloc(sizeof(tf_ssb_connection_t));
ssb->connection_ref_count++;
memset(connection, 0, sizeof(*connection));
snprintf(connection->name, sizeof(connection->name), "cli%d", s_connection_index++);
connection->ssb = ssb;
connection->tcp.data = connection;
connection->connect.data = connection; connection->connect.data = connection;
connection->send_request_number = 1;
snprintf(connection->host, sizeof(connection->host), "%s", host); snprintf(connection->host, sizeof(connection->host), "%s", host);
connection->port = ntohs(addr->sin_port); connection->port = ntohs(addr->sin_port);
connection->async.data = connection;
uv_async_init(ssb->loop, &connection->async, _tf_ssb_connection_process_message_async);
connection->scheduled_async.data = connection;
uv_async_init(ssb->loop, &connection->scheduled_async, _tf_ssb_connection_scheduled_async);
connection->connect_callback = callback; connection->connect_callback = callback;
connection->connect_callback_user_data = user_data; connection->connect_callback_user_data = user_data;
connection->handshake_timer.data = connection;
uv_timer_init(ssb->loop, &connection->handshake_timer);
uv_timer_start(&connection->handshake_timer, _tf_ssb_connection_handshake_timer_callback, k_handshake_timeout_ms, 0);
connection->linger_timer.data = connection;
uv_timer_init(ssb->loop, &connection->linger_timer);
connection->object = JS_NewObjectClass(ssb->context, _connection_class_id);
JS_SetOpaque(connection->object, connection);
char public_key_str[k_id_base64_len] = { 0 }; char public_key_str[k_id_base64_len] = { 0 };
if (tf_ssb_id_bin_to_str(public_key_str, sizeof(public_key_str), public_key)) if (tf_ssb_id_bin_to_str(public_key_str, sizeof(public_key_str), public_key))
{ {
JS_SetPropertyStr(context, connection->object, "id", JS_NewString(context, public_key_str)); JS_SetPropertyStr(ssb->context, connection->object, "id", JS_NewString(ssb->context, public_key_str));
JS_SetPropertyStr(context, connection->object, "is_client", JS_TRUE); JS_SetPropertyStr(ssb->context, connection->object, "is_client", JS_TRUE);
} }
memcpy(connection->serverpub, public_key, sizeof(connection->serverpub)); memcpy(connection->serverpub, public_key, sizeof(connection->serverpub));
connection->tcp.data = connection;
uv_tcp_init(ssb->loop, &connection->tcp); uv_tcp_init(ssb->loop, &connection->tcp);
int result = uv_tcp_connect(&connection->connect, &connection->tcp, (const struct sockaddr*)addr, _tf_ssb_connection_on_connect); int result = uv_tcp_connect(&connection->connect, &connection->tcp, (const struct sockaddr*)addr, _tf_ssb_connection_on_connect);
if (result) if (result)
@ -2764,13 +2776,6 @@ static tf_ssb_connection_t* _tf_ssb_connection_create(
connection->connect.data = NULL; connection->connect.data = NULL;
_tf_ssb_connection_destroy(connection, reason); _tf_ssb_connection_destroy(connection, reason);
} }
else
{
connection->next = ssb->connections;
ssb->connections = connection;
ssb->connections_count++;
_tf_ssb_notify_connections_changed(ssb, k_tf_ssb_change_create, connection);
}
return connection; return connection;
} }
@ -2802,39 +2807,16 @@ tf_ssb_connection_t* tf_ssb_connection_tunnel_create(tf_ssb_t* ssb, const char*
} }
JSContext* context = ssb->context; JSContext* context = ssb->context;
tf_ssb_connection_t* tunnel = tf_malloc(sizeof(tf_ssb_connection_t)); tf_ssb_connection_t* tunnel = _tf_ssb_connection_create_internal(ssb, "tun", s_tunnel_index++);
ssb->connection_ref_count++;
memset(tunnel, 0, sizeof(*tunnel));
snprintf(tunnel->name, sizeof(tunnel->name), "tun%d", s_tunnel_index++);
tunnel->ssb = ssb;
tunnel->flags = connect_flags; tunnel->flags = connect_flags;
tunnel->tunnel_connection = connection; tunnel->tunnel_connection = connection;
tunnel->tunnel_request_number = -request_number; tunnel->tunnel_request_number = -request_number;
tunnel->send_request_number = 1;
tunnel->async.data = tunnel;
uv_async_init(ssb->loop, &tunnel->async, _tf_ssb_connection_process_message_async);
tunnel->scheduled_async.data = tunnel;
uv_async_init(ssb->loop, &tunnel->scheduled_async, _tf_ssb_connection_scheduled_async);
tunnel->handshake_timer.data = tunnel;
uv_timer_init(ssb->loop, &tunnel->handshake_timer);
uv_timer_start(&tunnel->handshake_timer, _tf_ssb_connection_handshake_timer_callback, k_handshake_timeout_ms, 0);
tunnel->linger_timer.data = tunnel;
uv_timer_init(ssb->loop, &tunnel->linger_timer);
tunnel->object = JS_NewObjectClass(ssb->context, _connection_class_id);
JS_SetOpaque(tunnel->object, tunnel);
JS_SetPropertyStr(context, tunnel->object, "id", JS_NewString(context, target_id)); JS_SetPropertyStr(context, tunnel->object, "id", JS_NewString(context, target_id));
JS_SetPropertyStr(context, tunnel->object, "is_client", JS_TRUE); JS_SetPropertyStr(context, tunnel->object, "is_client", JS_TRUE);
tf_ssb_id_str_to_bin(tunnel->serverpub, target_id); tf_ssb_id_str_to_bin(tunnel->serverpub, target_id);
tunnel->next = ssb->connections;
ssb->connections = tunnel;
ssb->connections_count++;
_tf_ssb_notify_connections_changed(ssb, k_tf_ssb_change_create, tunnel);
tf_ssb_connection_add_request(connection, request_number, "tunnel.connect", _tf_ssb_connection_tunnel_callback, NULL, tunnel, tunnel); tf_ssb_connection_add_request(connection, request_number, "tunnel.connect", _tf_ssb_connection_tunnel_callback, NULL, tunnel, tunnel);
if (request_number > 0) if (request_number > 0)
{ {
@ -2941,24 +2923,13 @@ static void _tf_ssb_on_connection(uv_stream_t* stream, int status)
return; return;
} }
tf_ssb_connection_t* connection = tf_malloc(sizeof(tf_ssb_connection_t)); tf_ssb_connection_t* connection = _tf_ssb_connection_create_internal(ssb, "srv", s_connection_index++);
ssb->connection_ref_count++;
memset(connection, 0, sizeof(*connection));
snprintf(connection->name, sizeof(connection->name), "srv%d", s_connection_index++);
connection->ssb = ssb;
connection->tcp.data = connection; connection->tcp.data = connection;
connection->send_request_number = 1;
connection->async.data = connection;
uv_async_init(ssb->loop, &connection->async, _tf_ssb_connection_process_message_async);
connection->scheduled_async.data = connection;
uv_async_init(ssb->loop, &connection->scheduled_async, _tf_ssb_connection_scheduled_async);
connection->object = JS_NewObjectClass(ssb->context, _connection_class_id);
JS_SetOpaque(connection->object, connection);
int result = uv_tcp_init(ssb->loop, &connection->tcp); int result = uv_tcp_init(ssb->loop, &connection->tcp);
if (result != 0) if (result != 0)
{ {
connection->tcp.data = NULL;
char reason[1024]; char reason[1024];
snprintf(reason, sizeof(reason), "uv_tcp_init() => %s", uv_strerror(result)); snprintf(reason, sizeof(reason), "uv_tcp_init() => %s", uv_strerror(result));
_tf_ssb_connection_destroy(connection, reason); _tf_ssb_connection_destroy(connection, reason);
@ -2974,13 +2945,6 @@ static void _tf_ssb_on_connection(uv_stream_t* stream, int status)
return; return;
} }
connection->handshake_timer.data = connection;
uv_timer_init(ssb->loop, &connection->handshake_timer);
uv_timer_start(&connection->handshake_timer, _tf_ssb_connection_handshake_timer_callback, k_handshake_timeout_ms, 0);
connection->linger_timer.data = connection;
uv_timer_init(ssb->loop, &connection->linger_timer);
struct sockaddr_storage addr = { 0 }; struct sockaddr_storage addr = { 0 };
int size = sizeof(addr); int size = sizeof(addr);
if (uv_tcp_getpeername(&connection->tcp, (struct sockaddr*)&addr, &size) == 0) if (uv_tcp_getpeername(&connection->tcp, (struct sockaddr*)&addr, &size) == 0)
@ -2989,13 +2953,8 @@ static void _tf_ssb_on_connection(uv_stream_t* stream, int status)
connection->port = ntohs(((struct sockaddr_in*)&addr)->sin_port); connection->port = ntohs(((struct sockaddr_in*)&addr)->sin_port);
} }
connection->next = ssb->connections;
ssb->connections = connection;
ssb->connections_count++;
connection->state = k_tf_ssb_state_server_wait_hello; connection->state = k_tf_ssb_state_server_wait_hello;
_tf_ssb_connection_read_start(connection); _tf_ssb_connection_read_start(connection);
_tf_ssb_notify_connections_changed(ssb, k_tf_ssb_change_create, connection);
} }
static void _tf_ssb_send_broadcast(tf_ssb_t* ssb, struct sockaddr_in* address, struct sockaddr_in* netmask) static void _tf_ssb_send_broadcast(tf_ssb_t* ssb, struct sockaddr_in* address, struct sockaddr_in* netmask)