Compare commits

..

2 Commits

7 changed files with 131 additions and 34 deletions

View File

@ -1,5 +1,5 @@
{
"type": "tildefriends-app",
"emoji": "🐌",
"previous": "&0ZjcRFTyAmJMm3qhqKy5Ffrue23G1CAUfc1i2WYZrzQ=.sha256"
"previous": "&+UZSxjoucLAl5r/nfRNu5KXx3K/PdutGnizL/Cn2eCU=.sha256"
}

View File

@ -12,6 +12,7 @@ class TfTabConnectionsElement extends LitElement {
stored_connections: {type: Array},
users: {type: Object},
server_identity: {type: String},
connect_error: {type: String},
};
}
@ -163,6 +164,15 @@ class TfTabConnectionsElement extends LitElement {
tfrpc.rpc.sync();
}
connect(address) {
let self = this;
tfrpc.rpc.connect(address).then(function() {
self.connect_error = 'connected!';
}).catch(function(error) {
self.connect_error = error;
});
}
render() {
let self = this;
return html`
@ -178,10 +188,11 @@ class TfTabConnectionsElement extends LitElement {
<button
class="w3-button w3-theme-d1"
@click=${() =>
tfrpc.rpc.connect(self.renderRoot.getElementById('code').value)}
self.connect(self.renderRoot.getElementById('code').value)}
>
Connect
</button>
<div ?hidden=${this.connect_error === undefined}>${this.connect_error}</div>
<h2>Broadcasts</h2>
<ul class="w3-ul w3-border">
${this.broadcasts

View File

@ -367,6 +367,9 @@ typedef struct _tf_ssb_connection_t
uint64_t last_notified_active;
int flags;
tf_ssb_connect_callback_t* connect_callback;
void* connect_callback_user_data;
} tf_ssb_connection_t;
static JSClassID _connection_class_id;
@ -1321,6 +1324,12 @@ static void _tf_ssb_connection_verify_identity(tf_ssb_connection_t* connection,
JS_SetPropertyStr(context, connection->object, "is_client", JS_TRUE);
connection->state = k_tf_ssb_state_verified;
if (connection->connect_callback)
{
connection->connect_callback(connection, NULL, connection->connect_callback_user_data);
connection->connect_callback = NULL;
connection->connect_callback_user_data = NULL;
}
if (connection->handshake_timer.data)
{
uv_timer_stop(&connection->handshake_timer);
@ -1878,6 +1887,12 @@ static void _tf_ssb_connection_destroy(tf_ssb_connection_t* connection, const ch
{
tf_ssb_t* ssb = connection->ssb;
connection->closing = true;
if (connection->connect_callback)
{
connection->connect_callback(NULL, reason, connection->connect_callback_user_data);
connection->connect_callback = NULL;
connection->connect_callback_user_data = NULL;
}
if (!connection->destroy_reason)
{
connection->destroy_reason = reason;
@ -2698,22 +2713,33 @@ static void _tf_ssb_connection_handshake_timer_callback(uv_timer_t* timer)
}
}
tf_ssb_connection_t* tf_ssb_connection_create(tf_ssb_t* ssb, const char* host, const struct sockaddr_in* addr, const uint8_t* public_key)
static tf_ssb_connection_t* _tf_ssb_connection_create(
tf_ssb_t* ssb, const char* host, const struct sockaddr_in* addr, const uint8_t* public_key, tf_ssb_connect_callback_t* callback, void* user_data)
{
for (tf_ssb_connection_t* connection = ssb->connections; connection; connection = connection->next)
{
if (memcmp(connection->serverpub, public_key, k_id_bin_len) == 0 && connection->state != k_tf_ssb_state_invalid)
{
char id[k_id_base64_len];
tf_ssb_id_bin_to_str(id, sizeof(id), public_key);
tf_printf("Not connecting to %s:%d, because we are already connected to %s (state = %d).\n", host, ntohs(addr->sin_port), id, connection->state);
if (callback)
{
char id[k_id_base64_len];
tf_ssb_id_bin_to_str(id, sizeof(id), public_key);
char reason[1024];
snprintf(reason, sizeof(reason), "Already connected to %s (state = %d).", id, connection->state);
callback(NULL, reason, user_data);
}
return NULL;
}
else if (memcmp(ssb->pub, public_key, k_id_bin_len) == 0)
{
char id[k_id_base64_len];
tf_ssb_id_bin_to_str(id, sizeof(id), public_key);
tf_printf("Not connecting to %s:%d, because they appear to be ourselves %s.\n", host, ntohs(addr->sin_port), id);
if (callback)
{
char id[k_id_base64_len];
tf_ssb_id_bin_to_str(id, sizeof(id), public_key);
char reason[1024];
snprintf(reason, sizeof(reason), "Not connecting to ourself: %s.", id);
callback(NULL, reason, user_data);
}
return NULL;
}
}
@ -2731,6 +2757,8 @@ tf_ssb_connection_t* tf_ssb_connection_create(tf_ssb_t* ssb, const char* host, c
connection->port = ntohs(addr->sin_port);
connection->async.data = connection;
uv_async_init(ssb->loop, &connection->async, _tf_ssb_connection_process_message_async);
connection->connect_callback = callback;
connection->connect_callback_user_data = user_data;
connection->handshake_timer.data = connection;
uv_timer_init(ssb->loop, &connection->handshake_timer);
@ -2836,6 +2864,8 @@ typedef struct _connect_t
int port;
int flags;
uint8_t key[k_id_bin_len];
tf_ssb_connect_callback_t* callback;
void* user_data;
} connect_t;
static void _tf_on_connect_getaddrinfo(uv_getaddrinfo_t* addrinfo, int result, struct addrinfo* info)
@ -2847,26 +2877,36 @@ static void _tf_on_connect_getaddrinfo(uv_getaddrinfo_t* addrinfo, int result, s
{
struct sockaddr_in addr = *(struct sockaddr_in*)info->ai_addr;
addr.sin_port = htons(connect->port);
tf_ssb_connection_t* connection = tf_ssb_connection_create(connect->ssb, connect->host, &addr, connect->key);
tf_ssb_connection_t* connection = _tf_ssb_connection_create(connect->ssb, connect->host, &addr, connect->key, connect->callback, connect->user_data);
if (connection)
{
connection->flags = connect->flags;
}
}
else
else if (connect->callback)
{
tf_printf("getaddrinfo(%s) => %s\n", connect->host, uv_strerror(result));
char reason[1024];
snprintf(reason, sizeof(reason), "uv_getaddrinfo(%s) => %s", connect->host, uv_strerror(result));
connect->callback(NULL, reason, connect->user_data);
}
}
else if (connect->callback)
{
connect->callback(NULL, "Shutting down.", connect->user_data);
}
uv_freeaddrinfo(info);
tf_ssb_unref(connect->ssb);
tf_free(connect);
}
void tf_ssb_connect(tf_ssb_t* ssb, const char* host, int port, const uint8_t* key, int connect_flags)
void tf_ssb_connect(tf_ssb_t* ssb, const char* host, int port, const uint8_t* key, int connect_flags, tf_ssb_connect_callback_t* callback, void* user_data)
{
if (ssb->shutting_down)
{
if (callback)
{
callback(NULL, "Shutting down.", user_data);
}
return;
}
connect_t* connect = tf_malloc(sizeof(connect_t));
@ -2875,6 +2915,8 @@ void tf_ssb_connect(tf_ssb_t* ssb, const char* host, int port, const uint8_t* ke
.port = port,
.flags = connect_flags,
.req.data = connect,
.callback = callback,
.user_data = user_data,
};
char id[k_id_base64_len] = { 0 };
tf_ssb_id_bin_to_str(id, sizeof(id), key);
@ -2885,6 +2927,12 @@ void tf_ssb_connect(tf_ssb_t* ssb, const char* host, int port, const uint8_t* ke
int r = uv_getaddrinfo(ssb->loop, &connect->req, _tf_on_connect_getaddrinfo, host, NULL, &(struct addrinfo) { .ai_family = AF_INET });
if (r < 0)
{
if (callback)
{
char reason[1024];
snprintf(reason, sizeof(reason), "uv_getaddr_info(%s): %s", host, uv_strerror(r));
callback(NULL, reason, user_data);
}
tf_printf("uv_getaddrinfo(%s): %s\n", host, uv_strerror(r));
tf_free(connect);
tf_ssb_unref(ssb);
@ -3164,16 +3212,18 @@ static bool _tf_ssb_parse_broadcast(const char* in_broadcast, tf_ssb_broadcast_t
return false;
}
void tf_ssb_connect_str(tf_ssb_t* ssb, const char* address, int connect_flags)
void tf_ssb_connect_str(tf_ssb_t* ssb, const char* address, int connect_flags, tf_ssb_connect_callback_t* callback, void* user_data)
{
tf_ssb_broadcast_t broadcast = { 0 };
if (_tf_ssb_parse_broadcast(address, &broadcast))
{
tf_ssb_connect(ssb, broadcast.host, ntohs(broadcast.addr.sin_port), broadcast.pub, connect_flags);
tf_ssb_connect(ssb, broadcast.host, ntohs(broadcast.addr.sin_port), broadcast.pub, connect_flags, callback, user_data);
}
else
else if (callback)
{
tf_printf("Unable to parse: %s\n", address);
char reason[1024] = "";
snprintf(reason, sizeof(reason), "Unable to parse: '%s'.", address);
callback(NULL, reason, user_data);
}
}

View File

@ -106,7 +106,7 @@ static void _tf_ssb_connections_get_next_after_work(tf_ssb_t* ssb, int status, v
uint8_t key_bin[k_id_bin_len];
if (tf_ssb_id_str_to_bin(key_bin, next->key))
{
tf_ssb_connect(ssb, next->host, next->port, key_bin, 0);
tf_ssb_connect(ssb, next->host, next->port, key_bin, 0, NULL, NULL);
}
}
tf_free(next);
@ -286,7 +286,7 @@ static void _tf_ssb_connections_sync_broadcast_visit(
}
else
{
tf_ssb_connect(ssb, host, ntohs(addr->sin_port), pub, k_tf_ssb_connect_flag_one_shot);
tf_ssb_connect(ssb, host, ntohs(addr->sin_port), pub, k_tf_ssb_connect_flag_one_shot, NULL, NULL);
}
}
@ -332,7 +332,7 @@ static void _tf_ssb_connections_get_all_after_work(tf_ssb_t* ssb, int status, vo
tf_ssb_connections_get_all_work_t* work = user_data;
for (int i = 0; i < work->connections_count; i++)
{
tf_ssb_connect_str(ssb, work->connections[i], k_tf_ssb_connect_flag_one_shot);
tf_ssb_connect_str(ssb, work->connections[i], k_tf_ssb_connect_flag_one_shot, NULL, NULL);
tf_free(work->connections[i]);
}
tf_free(work->connections);

View File

@ -348,6 +348,14 @@ const char** tf_ssb_get_connection_ids(tf_ssb_t* ssb);
*/
int tf_ssb_get_connections(tf_ssb_t* ssb, tf_ssb_connection_t** out_connections, int out_connections_count);
/**
** Callback for completing establishing a connection.
** @param connection The established connection if successful or null.
** @param reason The reason for failure if the connection failed.
** @param user_data User data.
*/
typedef void(tf_ssb_connect_callback_t)(tf_ssb_connection_t* connection, const char* reason, void* user_data);
/**
** Establish an SHS connection with a host.
** @param ssb The SSB instance.
@ -355,16 +363,20 @@ int tf_ssb_get_connections(tf_ssb_t* ssb, tf_ssb_connection_t** out_connections,
** @param port The host's SHS port.
** @param key The host's SSB identity.
** @param connect_flags Flags affecting the connection.
** @param callback Completion callback.
** @param user_data User data to be passed to the callback.
*/
void tf_ssb_connect(tf_ssb_t* ssb, const char* host, int port, const uint8_t* key, int connect_flags);
void tf_ssb_connect(tf_ssb_t* ssb, const char* host, int port, const uint8_t* key, int connect_flags, tf_ssb_connect_callback_t* callback, void* user_data);
/**
** Establish an SHS connection with a host by string address.
** @param ssb The SSB instance.
** @param address The address.
** @param connect_flags Flags affecting the connection.
** @param callback Completion callback.
** @param user_data User data to be passed to the callback.
*/
void tf_ssb_connect_str(tf_ssb_t* ssb, const char* address, int connect_flags);
void tf_ssb_connect_str(tf_ssb_t* ssb, const char* address, int connect_flags, tf_ssb_connect_callback_t* callback, void* user_data);
/**
** Begin listening for SHS connections on the given port.

View File

@ -1633,17 +1633,41 @@ static JSValue _tf_ssb_getBroadcasts(JSContext* context, JSValueConst this_val,
return result;
}
typedef struct _connect_t
{
JSContext* context;
JSValue promise[2];
} connect_t;
static void _tf_ssb_connect_callback(tf_ssb_connection_t* connection, const char* reason, void* user_data)
{
connect_t* connect = user_data;
JSContext* context = connect->context;
JSValue arg = connection ? JS_UNDEFINED : JS_NewString(context, reason);
JSValue result = JS_Call(context, connection ? connect->promise[0] : connect->promise[1], JS_UNDEFINED, connection ? 0 : 1, &arg);
tf_util_report_error(context, result);
JS_FreeValue(context, result);
JS_FreeValue(context, connect->promise[0]);
JS_FreeValue(context, connect->promise[1]);
JS_FreeValue(context, arg);
tf_free(connect);
}
static JSValue _tf_ssb_connect(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv)
{
JSValue result = JS_UNDEFINED;
JSValue args = argv[0];
tf_ssb_t* ssb = JS_GetOpaque(this_val, _tf_ssb_classId);
if (ssb)
{
connect_t* connect = tf_malloc(sizeof(connect_t));
*connect = (connect_t) { .context = context };
result = JS_NewPromiseCapability(context, connect->promise);
if (JS_IsString(args))
{
const char* address_str = JS_ToCString(context, args);
tf_printf("Connecting to %s\n", address_str);
tf_ssb_connect_str(ssb, address_str, 0);
tf_ssb_connect_str(ssb, address_str, 0, _tf_ssb_connect_callback, connect);
JS_FreeCString(context, address_str);
}
else
@ -1660,11 +1684,11 @@ static JSValue _tf_ssb_connect(JSContext* context, JSValueConst this_val, int ar
tf_printf("Connecting to %s:%d\n", address_str, port_int);
uint8_t pubkey_bin[k_id_bin_len];
tf_ssb_id_str_to_bin(pubkey_bin, pubkey_str);
tf_ssb_connect(ssb, address_str, port_int, pubkey_bin, 0);
tf_ssb_connect(ssb, address_str, port_int, pubkey_bin, 0, _tf_ssb_connect_callback, connect);
}
else
{
tf_printf("Not connecting to null.\n");
_tf_ssb_connect_callback(NULL, "Not connecting to null.", connect);
}
JS_FreeCString(context, pubkey_str);
JS_FreeCString(context, address_str);
@ -1673,7 +1697,7 @@ static JSValue _tf_ssb_connect(JSContext* context, JSValueConst this_val, int ar
JS_FreeValue(context, pubkey);
}
}
return JS_UNDEFINED;
return result;
}
typedef struct _forget_stored_connection_t

View File

@ -268,7 +268,7 @@ void tf_ssb_test_ssb(const tf_test_options_t* options)
uint8_t id0bin[k_id_bin_len];
tf_ssb_id_str_to_bin(id0bin, id0);
tf_ssb_connect(ssb1, "127.0.0.1", 12347, id0bin, 0);
tf_ssb_connect(ssb1, "127.0.0.1", 12347, id0bin, 0, NULL, NULL);
tf_printf("Waiting for connection.\n");
while (test.connection_count0 != 1 || test.connection_count1 != 1)
@ -480,8 +480,8 @@ void tf_ssb_test_rooms(const tf_test_options_t* options)
uint8_t id0bin[k_id_bin_len];
tf_ssb_id_str_to_bin(id0bin, id0);
tf_ssb_connect(ssb1, "127.0.0.1", 12347, id0bin, 0);
tf_ssb_connect(ssb2, "127.0.0.1", 12347, id0bin, 0);
tf_ssb_connect(ssb1, "127.0.0.1", 12347, id0bin, 0, NULL, NULL);
tf_ssb_connect(ssb2, "127.0.0.1", 12347, id0bin, 0, NULL, NULL);
tf_printf("Waiting for connection.\n");
while (test.connection_count0 != 2 || test.connection_count1 != 1 || test.connection_count2 != 1)
@ -712,7 +712,7 @@ void tf_ssb_test_bench(const tf_test_options_t* options)
tf_ssb_register(tf_ssb_get_context(ssb1), ssb1);
tf_ssb_server_open(ssb0, 12347);
tf_ssb_connect(ssb1, "127.0.0.1", 12347, id0bin, 0);
tf_ssb_connect(ssb1, "127.0.0.1", 12347, id0bin, 0, NULL, NULL);
tf_printf("Waiting for messages.\n");
clock_gettime(CLOCK_REALTIME, &start_time);
@ -881,8 +881,8 @@ void tf_ssb_test_go_ssb_room(const tf_test_options_t* options)
tf_ssb_add_broadcasts_changed_callback(ssb0, _ssb_test_room_broadcasts_changed, NULL, NULL);
tf_ssb_connect_str(ssb0, "net:linode.unprompted.com:8008~shs:Q0pc/7kXQJGIlqJxuwayL2huayzddgkVDoGkYVWQS1Y=:SSB+Room+PSK3TLYC2T86EHQCUHBUHASCASE18JBV24=", 0);
tf_ssb_connect_str(ssb1, "net:linode.unprompted.com:8008~shs:Q0pc/7kXQJGIlqJxuwayL2huayzddgkVDoGkYVWQS1Y=:SSB+Room+PSK3TLYC2T86EHQCUHBUHASCASE18JBV24=", 0);
tf_ssb_connect_str(ssb0, "net:linode.unprompted.com:8008~shs:Q0pc/7kXQJGIlqJxuwayL2huayzddgkVDoGkYVWQS1Y=:SSB+Room+PSK3TLYC2T86EHQCUHBUHASCASE18JBV24=", 0, NULL, NULL);
tf_ssb_connect_str(ssb1, "net:linode.unprompted.com:8008~shs:Q0pc/7kXQJGIlqJxuwayL2huayzddgkVDoGkYVWQS1Y=:SSB+Room+PSK3TLYC2T86EHQCUHBUHASCASE18JBV24=", 0, NULL, NULL);
uv_run(&loop, UV_RUN_DEFAULT);
@ -982,8 +982,8 @@ void tf_ssb_test_peer_exchange(const tf_test_options_t* options)
tf_ssb_whoami(ssb0, id0, sizeof(id0));
uint8_t id0bin[k_id_bin_len];
tf_ssb_id_str_to_bin(id0bin, id0);
tf_ssb_connect(ssb1, "127.0.0.1", 12347, id0bin, 0);
tf_ssb_connect(ssb2, "127.0.0.1", 12347, id0bin, 0);
tf_ssb_connect(ssb1, "127.0.0.1", 12347, id0bin, 0, NULL, NULL);
tf_ssb_connect(ssb2, "127.0.0.1", 12347, id0bin, 0, NULL, NULL);
while (_count_broadcasts(ssb0) != 2 || _count_broadcasts(ssb1) != 1 || _count_broadcasts(ssb2) != 1)
{