Work-in-progress, untested, naive peer exchange. Intended to be disabled by default by a setting.

This commit is contained in:
Cory McWilliams 2024-08-14 21:07:16 -04:00
parent 107666cc60
commit 657bcadc7e
4 changed files with 180 additions and 2 deletions

View File

@ -71,9 +71,14 @@ const k_global_settings = {
},
seeds_host: {
type: 'string',
default_value: '',
default_value: 'seeds.tildefriends.net',
description: 'Hostname for seed connections.',
},
peer_exchange: {
type: 'boolean',
default_value: false,
description: 'Enable discovery of, sharing of, and connecting to internet peer strangers, including announcing this instance.',
},
};
let gGlobalSettings = {

View File

@ -266,6 +266,7 @@ typedef struct _tf_ssb_t
uv_thread_t thread_self;
bool is_room;
bool is_replicator;
bool is_peer_exchange;
char* room_name;
char seeds_host[256];
time_t last_seed_check;
@ -2980,7 +2981,7 @@ static void _tf_ssb_broadcast_timer(uv_timer_t* timer)
}
time_t now = time(NULL);
if (*ssb->seeds_host && now - ssb->last_seed_check > k_seed_check_interval_seconds)
if (ssb->is_peer_exchange && *ssb->seeds_host && now - ssb->last_seed_check > k_seed_check_interval_seconds)
{
seeds_t* seeds = tf_malloc(sizeof(seeds_t));
*seeds = (seeds_t) { 0 };
@ -3150,6 +3151,15 @@ static void _tf_ssb_add_broadcast(tf_ssb_t* ssb, const tf_ssb_broadcast_t* broad
_tf_ssb_notify_broadcasts_changed(ssb);
}
void tf_ssb_add_broadcast(tf_ssb_t* ssb, const char* connection, tf_ssb_broadcast_origin_t origin, int64_t expires_seconds)
{
tf_ssb_broadcast_t broadcast = { .origin = origin };
if (_tf_ssb_parse_broadcast(connection, &broadcast))
{
_tf_ssb_add_broadcast(ssb, &broadcast, expires_seconds);
}
}
static void _tf_ssb_on_broadcast_listener_recv(uv_udp_t* handle, ssize_t nread, const uv_buf_t* buf, const struct sockaddr* addr, unsigned flags)
{
if (nread <= 0)
@ -4009,6 +4019,16 @@ void tf_ssb_set_is_replicator(tf_ssb_t* ssb, bool is_replicator)
ssb->is_replicator = is_replicator;
}
bool tf_ssb_is_peer_exchange(tf_ssb_t* ssb)
{
return ssb->is_peer_exchange;
}
void tf_ssb_set_is_peer_exchange(tf_ssb_t* ssb, bool is_peer_exchange)
{
ssb->is_peer_exchange = is_peer_exchange;
}
void tf_ssb_set_room_name(tf_ssb_t* ssb, const char* room_name)
{
tf_free(ssb->room_name);
@ -4018,6 +4038,8 @@ void tf_ssb_set_room_name(tf_ssb_t* ssb, const char* room_name)
typedef struct _update_settings_t
{
bool is_room;
bool is_replicator;
bool is_peer_exchange;
char seeds_host[256];
char room_name[1024];
} update_settings_t;
@ -4075,6 +4097,8 @@ static void _tf_ssb_update_settings_work(tf_ssb_t* ssb, void* user_data)
{
update_settings_t* update = user_data;
update->is_room = _get_global_setting_bool(ssb, "room", true);
update->is_replicator = _get_global_setting_bool(ssb, "replicator", true);
update->is_peer_exchange = _get_global_setting_bool(ssb, "peer_exchange", true);
_get_global_setting_string(ssb, "room_name", update->room_name, sizeof(update->room_name));
_get_global_setting_string(ssb, "seeds_host", update->seeds_host, sizeof(update->seeds_host));
}
@ -4084,6 +4108,8 @@ static void _tf_ssb_update_settings_after_work(tf_ssb_t* ssb, int result, void*
update_settings_t* update = user_data;
tf_ssb_set_is_room(ssb, update->is_room);
tf_ssb_set_room_name(ssb, update->room_name);
tf_ssb_set_is_peer_exchange(ssb, update->is_peer_exchange);
tf_ssb_set_is_replicator(ssb, update->is_replicator);
snprintf(ssb->seeds_host, sizeof(ssb->seeds_host), "%s", update->seeds_host);
_tf_ssb_start_update_settings(ssb);
tf_free(update);

View File

@ -28,6 +28,8 @@ enum
k_ssb_rpc_flag_new_request = 0x10,
k_ssb_blob_bytes_max = 5 * 1024 * 1024,
k_ssb_peer_exchange_expires_seconds = 60 * 60,
};
/**
@ -313,6 +315,15 @@ void tf_ssb_visit_broadcasts(tf_ssb_t* ssb,
void (*callback)(const char* host, const struct sockaddr_in* addr, tf_ssb_broadcast_origin_t origin, tf_ssb_connection_t* tunnel, const uint8_t* pub, void* user_data),
void* user_data);
/**
** Add a broadcast entry.
** @param ssb The SSB instance.
** @param connection The connection string to add.
** @param origin The origin of the broadcast entry.
** @param expires_seconds How long the broadcast entry should last.
*/
void tf_ssb_add_broadcast(tf_ssb_t* ssb, const char* connection, tf_ssb_broadcast_origin_t origin, int64_t expires_seconds);
/**
** Get the identities of all active connections.
** @param ssb The SSB instance.
@ -989,6 +1000,20 @@ bool tf_ssb_is_replicator(tf_ssb_t* ssb);
*/
void tf_ssb_set_is_replicator(tf_ssb_t* ssb, bool is_replicator);
/**
** Get whether the running server participates in peer exchange.
** @param ssb The SSB instance.
** @return True if the server participates in peer exchange.
*/
bool tf_ssb_is_peer_exchange(tf_ssb_t* ssb);
/**
** Set whether the running server participates in peer exchange.
** @param ssb The SSB instance.
** @param is_peer_exchange Whether to participate in peer exchange.
*/
void tf_ssb_set_is_peer_exchange(tf_ssb_t* ssb, bool is_peer_exchange);
/**
** Get the name of the room hosted by the running server.
** @param ssb The SSB instance.

View File

@ -19,6 +19,7 @@
#endif
static void _tf_ssb_connection_send_history_stream(tf_ssb_connection_t* connection, int32_t request_number, const char* author, int64_t sequence, bool keys, bool live);
static void _tf_ssb_rpc_send_peers_exchange(tf_ssb_connection_t* connection);
static void _tf_ssb_rpc_start_delete_blobs(tf_ssb_t* ssb, int delay_ms);
static int64_t _get_global_setting_int64(tf_ssb_t* ssb, const char* name, int64_t default_value)
@ -1216,6 +1217,11 @@ static void _tf_ssb_rpc_connections_changed_callback(tf_ssb_t* ssb, tf_ssb_chang
_tf_ssb_rpc_connection_tunnel_isRoom_callback, NULL, NULL);
JS_FreeValue(context, message);
if (tf_ssb_is_peer_exchange(ssb))
{
_tf_ssb_rpc_send_peers_exchange(connection);
}
if (tf_ssb_is_replicator(ssb))
{
_tf_ssb_rpc_send_ebt_replicate(connection);
@ -1333,6 +1339,121 @@ void tf_ssb_rpc_start_periodic(tf_ssb_t* ssb)
_tf_ssb_rpc_start_delete_blobs(ssb, 30 * 1000);
}
typedef struct _peers_exchange_t
{
tf_ssb_t* ssb;
JSValue peers;
} peers_exchange_t;
static void _tf_ssb_get_peers_exhange_callback(
const char* host, const struct sockaddr_in* addr, tf_ssb_broadcast_origin_t origin, tf_ssb_connection_t* tunnel, const uint8_t* pub, void* user_data)
{
peers_exchange_t* data = user_data;
if (origin == k_tf_ssb_broadcast_origin_peer_exchange)
{
char fullid[k_id_base64_len] = { 0 };
tf_base64_encode(pub, sizeof(pub), fullid, sizeof(fullid));
char connection[1024] = { 0 };
snprintf(connection, sizeof(connection), "net:%s:%d~shs:%s", host, ntohs(addr->sin_port), fullid);
JSContext* context = tf_ssb_get_context(data->ssb);
JS_SetPropertyStr(context, data->peers, connection, JS_NewInt32(context, 0));
}
}
static JSValue _tf_ssb_get_peers_exchange(tf_ssb_t* ssb)
{
JSContext* context = tf_ssb_get_context(ssb);
JSValue peers = JS_NewObject(context);
tf_ssb_visit_broadcasts(ssb, _tf_ssb_get_peers_exhange_callback, &(peers_exchange_t) { .ssb = ssb, .peers = peers });
return peers;
}
static void _tf_ssb_rpc_peers_exchange_internal(
tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data)
{
JSContext* context = tf_ssb_connection_get_context(connection);
if (_is_error(context, args))
{
return;
}
/* The peer that participated in the exchange is now a peer exchange entry, too. */
tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection);
if (*tf_ssb_connection_get_host(connection))
{
char fullid[k_id_base64_len] = { 0 };
tf_ssb_connection_get_id(connection, fullid, sizeof(fullid));
char connection_string[1024] = { 0 };
snprintf(connection_string, sizeof(connection_string), "net:%s:%d~shs:%s", tf_ssb_connection_get_host(connection), tf_ssb_connection_get_port(connection), fullid);
tf_ssb_add_broadcast(ssb, connection_string, k_tf_ssb_broadcast_origin_peer_exchange, k_ssb_peer_exchange_expires_seconds);
}
JSValue in_peers = JS_GetPropertyStr(context, args, "peers");
JSPropertyEnum* ptab = NULL;
uint32_t plen = 0;
if (JS_GetOwnPropertyNames(context, &ptab, &plen, in_peers, JS_GPN_STRING_MASK) == 0)
{
for (uint32_t i = 0; i < plen; ++i)
{
JSValue key = JS_AtomToString(context, ptab[i].atom);
JSPropertyDescriptor desc;
JSValue key_value = JS_NULL;
if (JS_GetOwnProperty(context, &desc, args, ptab[i].atom) == 1)
{
key_value = desc.value;
JS_FreeValue(context, desc.setter);
JS_FreeValue(context, desc.getter);
}
const char* connection = JS_ToCString(context, key);
int64_t timestamp = 0;
JS_ToInt64(context, &timestamp, key_value);
/* ADD BROADCAST connection: timestamp */
JS_FreeCString(context, connection);
JS_FreeValue(context, key);
JS_FreeValue(context, key_value);
}
for (uint32_t i = 0; i < plen; ++i)
{
JS_FreeAtom(context, ptab[i].atom);
}
js_free(context, ptab);
}
JS_FreeValue(context, in_peers);
}
static void _tf_ssb_rpc_send_peers_exchange(tf_ssb_connection_t* connection)
{
int32_t request_number = tf_ssb_connection_next_request_number(connection);
JSContext* context = tf_ssb_connection_get_context(connection);
JSValue message = JS_NewObject(context);
tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection);
JS_SetPropertyStr(context, message, "peers", _tf_ssb_get_peers_exchange(ssb));
tf_ssb_connection_rpc_send_json(connection, k_ssb_rpc_flag_new_request, request_number, NULL, message, _tf_ssb_rpc_peers_exchange_internal, NULL, NULL);
JS_FreeValue(context, message);
}
static void _tf_ssb_rpc_peers_exchange(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data)
{
tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection);
if (!tf_ssb_is_peer_exchange(ssb))
{
tf_ssb_connection_rpc_send_error_method_not_allowed(connection, flags, -request_number, "peers.exchange");
return;
}
_tf_ssb_rpc_peers_exchange_internal(connection, flags, request_number, args, message, size, user_data);
JSContext* context = tf_ssb_connection_get_context(connection);
JSValue out_message = JS_NewObject(context);
JS_SetPropertyStr(context, out_message, "peers", _tf_ssb_get_peers_exchange(ssb));
tf_ssb_connection_rpc_send_json(connection, flags, -request_number, NULL, out_message, NULL, NULL, NULL);
JS_FreeValue(context, out_message);
}
void tf_ssb_rpc_register(tf_ssb_t* ssb)
{
tf_ssb_add_connections_changed_callback(ssb, _tf_ssb_rpc_connections_changed_callback, NULL, NULL);
@ -1346,4 +1467,5 @@ void tf_ssb_rpc_register(tf_ssb_t* ssb)
tf_ssb_add_rpc_callback(ssb, (const char*[]) { "room", "attendants", NULL }, _tf_ssb_rpc_room_attendants, NULL, NULL); /* SOURCE */
tf_ssb_add_rpc_callback(ssb, (const char*[]) { "createHistoryStream", NULL }, _tf_ssb_rpc_createHistoryStream, NULL, NULL); /* SOURCE */
tf_ssb_add_rpc_callback(ssb, (const char*[]) { "ebt", "replicate", NULL }, _tf_ssb_rpc_ebt_replicate_server, NULL, NULL); /* DUPLEX */
tf_ssb_add_rpc_callback(ssb, (const char*[]) { "peers", "exchange", NULL }, _tf_ssb_rpc_peers_exchange, NULL, NULL); /* ASYNC */
}