From 657bcadc7e3d80de46a3bad0b7cd9b1cb7b4d351 Mon Sep 17 00:00:00 2001 From: Cory McWilliams Date: Wed, 14 Aug 2024 21:07:16 -0400 Subject: [PATCH] Work-in-progress, untested, naive peer exchange. Intended to be disabled by default by a setting. --- core/core.js | 7 ++- src/ssb.c | 28 +++++++++++- src/ssb.h | 25 +++++++++++ src/ssb.rpc.c | 122 ++++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 180 insertions(+), 2 deletions(-) diff --git a/core/core.js b/core/core.js index db9031bd..8bcd9773 100644 --- a/core/core.js +++ b/core/core.js @@ -71,9 +71,14 @@ const k_global_settings = { }, seeds_host: { type: 'string', - default_value: '', + default_value: 'seeds.tildefriends.net', description: 'Hostname for seed connections.', }, + peer_exchange: { + type: 'boolean', + default_value: false, + description: 'Enable discovery of, sharing of, and connecting to internet peer strangers, including announcing this instance.', + }, }; let gGlobalSettings = { diff --git a/src/ssb.c b/src/ssb.c index 12087c25..1c7cd8fd 100644 --- a/src/ssb.c +++ b/src/ssb.c @@ -266,6 +266,7 @@ typedef struct _tf_ssb_t uv_thread_t thread_self; bool is_room; bool is_replicator; + bool is_peer_exchange; char* room_name; char seeds_host[256]; time_t last_seed_check; @@ -2980,7 +2981,7 @@ static void _tf_ssb_broadcast_timer(uv_timer_t* timer) } time_t now = time(NULL); - if (*ssb->seeds_host && now - ssb->last_seed_check > k_seed_check_interval_seconds) + if (ssb->is_peer_exchange && *ssb->seeds_host && now - ssb->last_seed_check > k_seed_check_interval_seconds) { seeds_t* seeds = tf_malloc(sizeof(seeds_t)); *seeds = (seeds_t) { 0 }; @@ -3150,6 +3151,15 @@ static void _tf_ssb_add_broadcast(tf_ssb_t* ssb, const tf_ssb_broadcast_t* broad _tf_ssb_notify_broadcasts_changed(ssb); } +void tf_ssb_add_broadcast(tf_ssb_t* ssb, const char* connection, tf_ssb_broadcast_origin_t origin, int64_t expires_seconds) +{ + tf_ssb_broadcast_t broadcast = { .origin = origin }; + if (_tf_ssb_parse_broadcast(connection, &broadcast)) + { + _tf_ssb_add_broadcast(ssb, &broadcast, expires_seconds); + } +} + static void _tf_ssb_on_broadcast_listener_recv(uv_udp_t* handle, ssize_t nread, const uv_buf_t* buf, const struct sockaddr* addr, unsigned flags) { if (nread <= 0) @@ -4009,6 +4019,16 @@ void tf_ssb_set_is_replicator(tf_ssb_t* ssb, bool is_replicator) ssb->is_replicator = is_replicator; } +bool tf_ssb_is_peer_exchange(tf_ssb_t* ssb) +{ + return ssb->is_peer_exchange; +} + +void tf_ssb_set_is_peer_exchange(tf_ssb_t* ssb, bool is_peer_exchange) +{ + ssb->is_peer_exchange = is_peer_exchange; +} + void tf_ssb_set_room_name(tf_ssb_t* ssb, const char* room_name) { tf_free(ssb->room_name); @@ -4018,6 +4038,8 @@ void tf_ssb_set_room_name(tf_ssb_t* ssb, const char* room_name) typedef struct _update_settings_t { bool is_room; + bool is_replicator; + bool is_peer_exchange; char seeds_host[256]; char room_name[1024]; } update_settings_t; @@ -4075,6 +4097,8 @@ static void _tf_ssb_update_settings_work(tf_ssb_t* ssb, void* user_data) { update_settings_t* update = user_data; update->is_room = _get_global_setting_bool(ssb, "room", true); + update->is_replicator = _get_global_setting_bool(ssb, "replicator", true); + update->is_peer_exchange = _get_global_setting_bool(ssb, "peer_exchange", true); _get_global_setting_string(ssb, "room_name", update->room_name, sizeof(update->room_name)); _get_global_setting_string(ssb, "seeds_host", update->seeds_host, sizeof(update->seeds_host)); } @@ -4084,6 +4108,8 @@ static void _tf_ssb_update_settings_after_work(tf_ssb_t* ssb, int result, void* update_settings_t* update = user_data; tf_ssb_set_is_room(ssb, update->is_room); tf_ssb_set_room_name(ssb, update->room_name); + tf_ssb_set_is_peer_exchange(ssb, update->is_peer_exchange); + tf_ssb_set_is_replicator(ssb, update->is_replicator); snprintf(ssb->seeds_host, sizeof(ssb->seeds_host), "%s", update->seeds_host); _tf_ssb_start_update_settings(ssb); tf_free(update); diff --git a/src/ssb.h b/src/ssb.h index ab2f4cf7..883ac616 100644 --- a/src/ssb.h +++ b/src/ssb.h @@ -28,6 +28,8 @@ enum k_ssb_rpc_flag_new_request = 0x10, k_ssb_blob_bytes_max = 5 * 1024 * 1024, + + k_ssb_peer_exchange_expires_seconds = 60 * 60, }; /** @@ -313,6 +315,15 @@ void tf_ssb_visit_broadcasts(tf_ssb_t* ssb, void (*callback)(const char* host, const struct sockaddr_in* addr, tf_ssb_broadcast_origin_t origin, tf_ssb_connection_t* tunnel, const uint8_t* pub, void* user_data), void* user_data); +/** +** Add a broadcast entry. +** @param ssb The SSB instance. +** @param connection The connection string to add. +** @param origin The origin of the broadcast entry. +** @param expires_seconds How long the broadcast entry should last. +*/ +void tf_ssb_add_broadcast(tf_ssb_t* ssb, const char* connection, tf_ssb_broadcast_origin_t origin, int64_t expires_seconds); + /** ** Get the identities of all active connections. ** @param ssb The SSB instance. @@ -989,6 +1000,20 @@ bool tf_ssb_is_replicator(tf_ssb_t* ssb); */ void tf_ssb_set_is_replicator(tf_ssb_t* ssb, bool is_replicator); +/** +** Get whether the running server participates in peer exchange. +** @param ssb The SSB instance. +** @return True if the server participates in peer exchange. +*/ +bool tf_ssb_is_peer_exchange(tf_ssb_t* ssb); + +/** +** Set whether the running server participates in peer exchange. +** @param ssb The SSB instance. +** @param is_peer_exchange Whether to participate in peer exchange. +*/ +void tf_ssb_set_is_peer_exchange(tf_ssb_t* ssb, bool is_peer_exchange); + /** ** Get the name of the room hosted by the running server. ** @param ssb The SSB instance. diff --git a/src/ssb.rpc.c b/src/ssb.rpc.c index 9f38b8cf..2143086b 100644 --- a/src/ssb.rpc.c +++ b/src/ssb.rpc.c @@ -19,6 +19,7 @@ #endif static void _tf_ssb_connection_send_history_stream(tf_ssb_connection_t* connection, int32_t request_number, const char* author, int64_t sequence, bool keys, bool live); +static void _tf_ssb_rpc_send_peers_exchange(tf_ssb_connection_t* connection); static void _tf_ssb_rpc_start_delete_blobs(tf_ssb_t* ssb, int delay_ms); static int64_t _get_global_setting_int64(tf_ssb_t* ssb, const char* name, int64_t default_value) @@ -1216,6 +1217,11 @@ static void _tf_ssb_rpc_connections_changed_callback(tf_ssb_t* ssb, tf_ssb_chang _tf_ssb_rpc_connection_tunnel_isRoom_callback, NULL, NULL); JS_FreeValue(context, message); + if (tf_ssb_is_peer_exchange(ssb)) + { + _tf_ssb_rpc_send_peers_exchange(connection); + } + if (tf_ssb_is_replicator(ssb)) { _tf_ssb_rpc_send_ebt_replicate(connection); @@ -1333,6 +1339,121 @@ void tf_ssb_rpc_start_periodic(tf_ssb_t* ssb) _tf_ssb_rpc_start_delete_blobs(ssb, 30 * 1000); } +typedef struct _peers_exchange_t +{ + tf_ssb_t* ssb; + JSValue peers; +} peers_exchange_t; + +static void _tf_ssb_get_peers_exhange_callback( + const char* host, const struct sockaddr_in* addr, tf_ssb_broadcast_origin_t origin, tf_ssb_connection_t* tunnel, const uint8_t* pub, void* user_data) +{ + peers_exchange_t* data = user_data; + if (origin == k_tf_ssb_broadcast_origin_peer_exchange) + { + char fullid[k_id_base64_len] = { 0 }; + tf_base64_encode(pub, sizeof(pub), fullid, sizeof(fullid)); + + char connection[1024] = { 0 }; + snprintf(connection, sizeof(connection), "net:%s:%d~shs:%s", host, ntohs(addr->sin_port), fullid); + + JSContext* context = tf_ssb_get_context(data->ssb); + JS_SetPropertyStr(context, data->peers, connection, JS_NewInt32(context, 0)); + } +} + +static JSValue _tf_ssb_get_peers_exchange(tf_ssb_t* ssb) +{ + JSContext* context = tf_ssb_get_context(ssb); + JSValue peers = JS_NewObject(context); + tf_ssb_visit_broadcasts(ssb, _tf_ssb_get_peers_exhange_callback, &(peers_exchange_t) { .ssb = ssb, .peers = peers }); + return peers; +} + +static void _tf_ssb_rpc_peers_exchange_internal( + tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) +{ + JSContext* context = tf_ssb_connection_get_context(connection); + if (_is_error(context, args)) + { + return; + } + + /* The peer that participated in the exchange is now a peer exchange entry, too. */ + tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection); + if (*tf_ssb_connection_get_host(connection)) + { + char fullid[k_id_base64_len] = { 0 }; + tf_ssb_connection_get_id(connection, fullid, sizeof(fullid)); + + char connection_string[1024] = { 0 }; + snprintf(connection_string, sizeof(connection_string), "net:%s:%d~shs:%s", tf_ssb_connection_get_host(connection), tf_ssb_connection_get_port(connection), fullid); + tf_ssb_add_broadcast(ssb, connection_string, k_tf_ssb_broadcast_origin_peer_exchange, k_ssb_peer_exchange_expires_seconds); + } + + JSValue in_peers = JS_GetPropertyStr(context, args, "peers"); + + JSPropertyEnum* ptab = NULL; + uint32_t plen = 0; + if (JS_GetOwnPropertyNames(context, &ptab, &plen, in_peers, JS_GPN_STRING_MASK) == 0) + { + for (uint32_t i = 0; i < plen; ++i) + { + JSValue key = JS_AtomToString(context, ptab[i].atom); + JSPropertyDescriptor desc; + JSValue key_value = JS_NULL; + if (JS_GetOwnProperty(context, &desc, args, ptab[i].atom) == 1) + { + key_value = desc.value; + JS_FreeValue(context, desc.setter); + JS_FreeValue(context, desc.getter); + } + const char* connection = JS_ToCString(context, key); + int64_t timestamp = 0; + JS_ToInt64(context, ×tamp, key_value); + /* ADD BROADCAST connection: timestamp */ + JS_FreeCString(context, connection); + JS_FreeValue(context, key); + JS_FreeValue(context, key_value); + } + for (uint32_t i = 0; i < plen; ++i) + { + JS_FreeAtom(context, ptab[i].atom); + } + js_free(context, ptab); + } + JS_FreeValue(context, in_peers); +} + +static void _tf_ssb_rpc_send_peers_exchange(tf_ssb_connection_t* connection) +{ + int32_t request_number = tf_ssb_connection_next_request_number(connection); + JSContext* context = tf_ssb_connection_get_context(connection); + JSValue message = JS_NewObject(context); + tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection); + JS_SetPropertyStr(context, message, "peers", _tf_ssb_get_peers_exchange(ssb)); + tf_ssb_connection_rpc_send_json(connection, k_ssb_rpc_flag_new_request, request_number, NULL, message, _tf_ssb_rpc_peers_exchange_internal, NULL, NULL); + JS_FreeValue(context, message); +} + +static void _tf_ssb_rpc_peers_exchange(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) +{ + tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection); + if (!tf_ssb_is_peer_exchange(ssb)) + { + tf_ssb_connection_rpc_send_error_method_not_allowed(connection, flags, -request_number, "peers.exchange"); + return; + } + + _tf_ssb_rpc_peers_exchange_internal(connection, flags, request_number, args, message, size, user_data); + + JSContext* context = tf_ssb_connection_get_context(connection); + JSValue out_message = JS_NewObject(context); + JS_SetPropertyStr(context, out_message, "peers", _tf_ssb_get_peers_exchange(ssb)); + tf_ssb_connection_rpc_send_json(connection, flags, -request_number, NULL, out_message, NULL, NULL, NULL); + JS_FreeValue(context, out_message); +} + void tf_ssb_rpc_register(tf_ssb_t* ssb) { tf_ssb_add_connections_changed_callback(ssb, _tf_ssb_rpc_connections_changed_callback, NULL, NULL); @@ -1346,4 +1467,5 @@ void tf_ssb_rpc_register(tf_ssb_t* ssb) tf_ssb_add_rpc_callback(ssb, (const char*[]) { "room", "attendants", NULL }, _tf_ssb_rpc_room_attendants, NULL, NULL); /* SOURCE */ tf_ssb_add_rpc_callback(ssb, (const char*[]) { "createHistoryStream", NULL }, _tf_ssb_rpc_createHistoryStream, NULL, NULL); /* SOURCE */ tf_ssb_add_rpc_callback(ssb, (const char*[]) { "ebt", "replicate", NULL }, _tf_ssb_rpc_ebt_replicate_server, NULL, NULL); /* DUPLEX */ + tf_ssb_add_rpc_callback(ssb, (const char*[]) { "peers", "exchange", NULL }, _tf_ssb_rpc_peers_exchange, NULL, NULL); /* ASYNC */ }