First go at implementing rooms. A test passes that appears to exercise them.

git-svn-id: https://www.unprompted.com/svn/projects/tildefriends/trunk@4017 ed5197a5-7fde-0310-b194-c3ffbd925b24
This commit is contained in:
Cory McWilliams 2022-11-02 23:34:44 +00:00
parent 1abc611e54
commit b12f8f9da8
8 changed files with 692 additions and 83 deletions

View File

@ -1,6 +1,7 @@
"use strict";
var g_wants_requests = {};
var g_database = new Database('core');
let g_attendants = {};
const k_use_create_history_stream = false;
const k_blobs_concurrent_target = 8;
@ -72,7 +73,28 @@ function storeMessage(message) {
}
}
ssb.addEventListener('connections', function(change, connection) {
function tunnel_attendants(request) {
if (request.message.type !== 'state') {
throw Error('Unexpected type: ' + request.message.type);
}
let state = new Set(request.message.ids);
for (let id of state) {
request.add_room_attendant(id);
}
request.more(function attendants(message) {
if (message.message.type === 'joined') {
request.add_room_attendant(message.message.id);
state.add(message.message.id);
} else if (message.message.type === 'left') {
request.remove_room_attendant(message.message.id);
state.delete(message.message.id);
} else {
throw Error('Unexpected type: ' + message.type);
}
});
}
ssb.addEventListener('connections', function on_connections_changed(change, connection) {
if (change == 'add') {
var sequence = get_latest_sequence_for_author(connection.id);
if (k_use_create_history_stream) {
@ -89,12 +111,17 @@ ssb.addEventListener('connections', function(change, connection) {
});
} else {
if (connection.is_client) {
connection.send_json({'name': ['tunnel', 'isRoom'], 'args': [], 'type': 'source'}, function tunnel_is_room(request) {
if (request.message) {
connection.send_json({'name': ['room', 'attendants'], 'args': [], 'type': 'source'}, tunnel_attendants);
}
});
connection.send_json({"name": ["ebt", "replicate"], "args": [{"version": 3, "format": "classic"}], "type": "duplex"}, ebtReplicateClient);
}
}
connection.active_blob_wants = {};
connection.send_json({'name': ['blobs', 'createWants'], 'type': 'source', 'args': []}, function(message) {
connection.send_json({'name': ['blobs', 'createWants'], 'type': 'source', 'args': []}, function on_blob_create_wants(message) {
Object.keys(message.message).forEach(function(id) {
if (message.message[id] < 0) {
if (g_wants_requests[connection.id]) {
@ -129,6 +156,8 @@ ssb.addEventListener('connections', function(change, connection) {
});
} else if (change == 'remove') {
print('REMOVE', connection.id);
notify_attendant_changed(connection.id, 'left');
delete g_attendants[connection.id];
delete g_wants_requests[connection.id];
} else {
print('CHANGE', change);
@ -191,13 +220,59 @@ ssb.addRpc(['blobs', 'get'], function(request) {
});
ssb.addRpc(['gossip', 'ping'], function(request) {
request.more(function(message) {
request.more(function ping(message) {
message.send_json(Date.now());
});
});
ssb.addRpc(['tunnel', 'isRoom'], function(request) {
request.send_json(false);
request.send_json(true);
});
function notify_attendant_changed(id, type) {
for (let r of Object.values(g_attendants)) {
try {
r.send_json({
type: type,
id: id,
});
} catch (e) {
print(`Removing ${r.connection.id} from g_attendants in ${type}.`, e);
delete g_attendants[r.connection.id];
}
}
}
ssb.addRpc(['room', 'attendants'], function(request) {
let ids = Object.keys(g_attendants).sort();
request.send_json({
type: 'state',
ids: ids,
});
notify_attendant_changed(request.connection.id, 'joined');
g_attendants[request.connection.id] = request;
});
ssb.addRpc(['tunnel', 'connect'], function(request) {
if (!request.args[0].origin &&
request.args[0].portal &&
request.args[0].target) {
let target_connection = ssb.getConnection(request.args[0].target);
let target_request_number = target_connection.send_json({
'name': ['tunnel', 'connect'],
'args': [{
'origin': request.connection.id,
'portal': request.args[0].portal,
'target': request.args[0].target,
}],
'type': 'duplex',
});
ssb.tunnel(request.connection, -request.request_number, target_connection, target_request_number);
} else if (request.args[0].origin &&
request.args[0].portal &&
request.args[0].target) {
ssb.createTunnel(request.connection, -request.request_number, request.args[0].origin);
}
});
function ebtReplicateSendClock(request, have) {

347
src/ssb.c
View File

@ -28,6 +28,11 @@
#define _countof(a) ((int)(sizeof((a)) / sizeof(*(a))))
#endif
#define GREEN "\e[1;32m"
#define MAGENTA "\e[1;35m"
#define CYAN "\e[1;36m"
#define RESET "\e[0m"
static_assert(k_id_base64_len == sodium_base64_ENCODED_LEN(9 + crypto_box_PUBLICKEYBYTES, sodium_base64_VARIANT_ORIGINAL), "k_id_base64_len");
static_assert(k_id_bin_len == crypto_box_PUBLICKEYBYTES, "k_id_bin_len");
static_assert(k_blob_id_len == (sodium_base64_ENCODED_LEN(crypto_hash_sha256_BYTES, sodium_base64_VARIANT_ORIGINAL) + 8), "k_blob_id_len");
@ -66,6 +71,7 @@ typedef struct _tf_ssb_request_t
{
int32_t request_number;
tf_ssb_rpc_callback_t* callback;
tf_ssb_callback_cleanup_t* cleanup;
void* user_data;
} tf_ssb_request_t;
@ -76,6 +82,7 @@ typedef struct _tf_ssb_broadcast_t
time_t mtime;
char host[256];
struct sockaddr_in addr;
tf_ssb_connection_t* tunnel_connection;
uint8_t pub[crypto_sign_PUBLICKEYBYTES];
} tf_ssb_broadcast_t;
@ -184,8 +191,13 @@ typedef struct _tf_ssb_connection_t
uv_connect_t connect;
uv_async_t async;
tf_ssb_connection_t* tunnel_connection;
int32_t tunnel_request_number;
JSValue object;
char name[32];
char host[256];
int port;
@ -224,14 +236,17 @@ typedef struct _tf_ssb_connection_t
} tf_ssb_connection_t;
static JSClassID _connection_class_id;
static int s_connection_index;
static int s_tunnel_index;
static void _tf_ssb_connection_destroy(tf_ssb_connection_t* connection, const char* reason);
static void _tf_ssb_connection_client_send_hello(uv_stream_t* stream);
static void _tf_ssb_connection_client_send_hello(tf_ssb_connection_t* connection);
static void _tf_ssb_connection_on_close(uv_handle_t* handle);
static void _tf_ssb_connection_close(tf_ssb_connection_t* connection, const char* reason);
static void _tf_ssb_nonce_inc(uint8_t* nonce);
static void _tf_ssb_write(tf_ssb_connection_t* connection, void* data, size_t size);
static void _tf_ssb_connection_finalizer(JSRuntime* runtime, JSValue value);
static void _tf_ssb_connection_remove_request(tf_ssb_connection_t* connection, int32_t request_number);
static void _tf_ssb_connection_send_close(tf_ssb_connection_t* connection)
{
@ -285,14 +300,29 @@ static void _tf_ssb_connection_on_write(uv_write_t* req, int status)
static void _tf_ssb_write(tf_ssb_connection_t* connection, void* data, size_t size)
{
uv_write_t* write = tf_malloc(sizeof(uv_write_t) + size);
*write = (uv_write_t) { .data = connection };
memcpy(write + 1, data, size);
int result = uv_write(write, (uv_stream_t*)&connection->tcp, &(uv_buf_t) { .base = (char*)(write + 1), .len = size }, 1, _tf_ssb_connection_on_write);
if (result)
if (connection->tcp.data)
{
_tf_ssb_connection_close(connection, "write failed");
tf_free(write);
uv_write_t* write = tf_malloc(sizeof(uv_write_t) + size);
*write = (uv_write_t) { .data = connection };
memcpy(write + 1, data, size);
int result = uv_write(write, (uv_stream_t*)&connection->tcp, &(uv_buf_t) { .base = (char*)(write + 1), .len = size }, 1, _tf_ssb_connection_on_write);
if (result)
{
_tf_ssb_connection_close(connection, "write failed");
tf_free(write);
}
}
else if (connection->tunnel_connection)
{
tf_ssb_connection_rpc_send(
connection->tunnel_connection,
k_ssb_rpc_flag_binary | k_ssb_rpc_flag_stream,
-connection->tunnel_request_number,
data,
size,
NULL,
NULL,
NULL);
}
}
@ -436,18 +466,14 @@ static bool _tf_ssb_connection_get_request_callback(tf_ssb_connection_t* connect
return false;
}
void tf_ssb_connection_add_request(tf_ssb_connection_t* connection, int32_t request_number, tf_ssb_rpc_callback_t* callback, void* user_data)
void tf_ssb_connection_add_request(tf_ssb_connection_t* connection, int32_t request_number, tf_ssb_rpc_callback_t* callback, tf_ssb_callback_cleanup_t* cleanup, void* user_data)
{
if (_tf_ssb_connection_get_request_callback(connection, request_number, NULL, NULL))
{
/* TODO: This leaks the callback. */
printf("Adding a request %d that is already registered.\n", request_number);
return;
}
_tf_ssb_connection_remove_request(connection, request_number);
tf_ssb_request_t request =
{
.request_number = request_number,
.callback = callback,
.cleanup = cleanup,
.user_data = user_data,
};
int index = tf_util_insert_index(&request_number, connection->requests, connection->requests_count, sizeof(tf_ssb_request_t), _request_compare);
@ -467,9 +493,9 @@ static void _tf_ssb_connection_remove_request(tf_ssb_connection_t* connection, i
tf_ssb_request_t* request = bsearch(&request_number, connection->requests, connection->requests_count, sizeof(tf_ssb_request_t), _request_compare);
if (request)
{
if (request->user_data)
if (request->cleanup)
{
JS_FreeValue(tf_ssb_connection_get_context(connection), JS_MKPTR(JS_TAG_OBJECT, request->user_data));
request->cleanup(connection->ssb, request->user_data);
}
int index = request - connection->requests;
memmove(request, request + 1, sizeof(tf_ssb_request_t) * (connection->requests_count - index - 1));
@ -479,7 +505,7 @@ static void _tf_ssb_connection_remove_request(tf_ssb_connection_t* connection, i
}
}
void tf_ssb_connection_rpc_send(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, const uint8_t* message, size_t size, tf_ssb_rpc_callback_t* callback, void* user_data)
void tf_ssb_connection_rpc_send(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, const uint8_t* message, size_t size, tf_ssb_rpc_callback_t* callback, tf_ssb_callback_cleanup_t* cleanup, void* user_data)
{
if (!connection)
{
@ -487,7 +513,7 @@ void tf_ssb_connection_rpc_send(tf_ssb_connection_t* connection, uint8_t flags,
}
if (request_number > 0 && callback)
{
tf_ssb_connection_add_request(connection, request_number, callback, user_data);
tf_ssb_connection_add_request(connection, request_number, callback, cleanup, user_data);
}
uint8_t* combined = tf_malloc(9 + size);
*combined = flags;
@ -496,9 +522,9 @@ void tf_ssb_connection_rpc_send(tf_ssb_connection_t* connection, uint8_t flags,
uint32_t rn = htonl((uint32_t)request_number);
memcpy(combined + 1 + sizeof(uint32_t), &rn, sizeof(rn));
memcpy(combined + 1 + 2 * sizeof(uint32_t), message, size);
printf(MAGENTA "%s RPC SEND" RESET " flags=%x RN=%d: %.*s\n", connection->name, flags, request_number, (flags & k_ssb_rpc_mask_type) == k_ssb_rpc_flag_binary? 0 : (int)size, message);
_tf_ssb_connection_box_stream_send(connection, combined, 1 + 2 * sizeof(uint32_t) + size);
tf_free(combined);
printf("RPC SEND flags=%x RN=%d: %.*s\n", flags, request_number, (int)size, message);
connection->ssb->rpc_out++;
}
@ -852,17 +878,20 @@ bool tf_ssb_connection_get_id(tf_ssb_connection_t* connection, char* out_id, siz
return tf_ssb_id_bin_to_str(out_id, out_id_size, connection->serverpub);
}
static bool _tf_ssb_is_already_connected(tf_ssb_t* ssb, uint8_t* id)
static bool _tf_ssb_is_already_connected(tf_ssb_t* ssb, uint8_t* id, tf_ssb_connection_t* ignore_connection)
{
for (tf_ssb_connection_t* connection = ssb->connections; connection; connection = connection->next)
{
if (memcmp(connection->serverpub, id, k_id_bin_len) == 0)
if (!ignore_connection || connection != ignore_connection)
{
return true;
}
else if (memcmp(ssb->pub, id, k_id_bin_len) == 0)
{
return true;
if (memcmp(connection->serverpub, id, k_id_bin_len) == 0)
{
return true;
}
else if (memcmp(ssb->pub, id, k_id_bin_len) == 0)
{
return true;
}
}
}
return false;
@ -937,9 +966,13 @@ static void _tf_ssb_connection_verify_client_identity(tf_ssb_connection_t* conne
}
uint8_t* detached_signature_A = m;
if (_tf_ssb_is_already_connected(connection->ssb, m + 64))
if (_tf_ssb_is_already_connected(connection->ssb, m + 64, connection))
{
_tf_ssb_connection_close(connection, "already connected");
char id_base64[k_id_base64_len] = { 0 };
tf_ssb_id_bin_to_str(id_base64, sizeof(id_base64), m + 64);
char reason[256];
snprintf(reason, sizeof(reason), "already connected: %s\n", id_base64);
_tf_ssb_connection_close(connection, reason);
return;
}
@ -1101,11 +1134,16 @@ static bool _tf_ssb_name_equals(JSContext* context, JSValue object, const char**
static void _tf_ssb_connection_rpc_recv(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, const uint8_t* message, size_t size)
{
connection->ssb->rpc_in++;
if (flags & k_ssb_rpc_flag_json)
if (size == 0)
{
_tf_ssb_connection_close(connection, "read zero");
return;
}
else if (flags & k_ssb_rpc_flag_json)
{
char id[k_id_base64_len] = "";
tf_ssb_id_bin_to_str(id, sizeof(id), connection->serverpub);
printf("RPC RECV from %s flags=%x RN=%d: %.*s\n", id, flags, request_number, (int)size, message);
printf(CYAN "%s RPC RECV" RESET " from %s flags=%x RN=%d: %.*s\n", connection->name, id, flags, request_number, (int)size, message);
JSContext* context = connection->ssb->context;
JSValue val = JS_ParseJSON(context, (const char*)message, size, NULL);
@ -1139,7 +1177,7 @@ static void _tf_ssb_connection_rpc_recv(tf_ssb_connection_t* connection, uint8_t
{
const char* k_unsupported = "{\"message\": \"method: is not in list of allowed methods\", \"name\": \"Error\", \"stack\": \"none\"}";
tf_ssb_connection_rpc_send(connection, k_ssb_rpc_flag_json | k_ssb_rpc_flag_end_error | (flags & k_ssb_rpc_flag_stream), -request_number,
(const uint8_t*)k_unsupported, strlen(k_unsupported), NULL, NULL);
(const uint8_t*)k_unsupported, strlen(k_unsupported), NULL, NULL, NULL);
}
}
}
@ -1152,7 +1190,7 @@ static void _tf_ssb_connection_rpc_recv(tf_ssb_connection_t* connection, uint8_t
}
else if ((flags & k_ssb_rpc_mask_type) == k_ssb_rpc_flag_binary)
{
printf("RPC RECV flags=%x RN=%d: %zd bytes\n", flags, request_number, size);
printf(CYAN "%s RPC RECV" RESET " flags=%x RN=%d: %zd bytes\n", connection->name, flags, request_number, size);
tf_ssb_rpc_callback_t* callback = NULL;
void* user_data = NULL;
if (_tf_ssb_connection_get_request_callback(connection, -request_number, &callback, &user_data))
@ -1162,6 +1200,10 @@ static void _tf_ssb_connection_rpc_recv(tf_ssb_connection_t* connection, uint8_t
callback(connection, flags, request_number, JS_UNDEFINED, message, size, user_data);
}
}
else
{
printf("No request callback for %p %d\n", connection, -request_number);
}
}
if (request_number < 0 &&
@ -1348,7 +1390,11 @@ void _tf_ssb_connection_destroy(tf_ssb_connection_t* connection, const char* rea
if (!connection->destroy_reason)
{
connection->destroy_reason = reason;
printf("destroying connection: %s\n", reason);
printf("destroying connection %p obj=%p: %s\n", connection, JS_VALUE_GET_PTR(connection->object), reason);
}
while (connection->requests)
{
_tf_ssb_connection_remove_request(connection, connection->requests->request_number);
}
for (tf_ssb_connection_t** it = &connection->ssb->connections; *it; it = &(*it)->next)
{
@ -1361,9 +1407,27 @@ void _tf_ssb_connection_destroy(tf_ssb_connection_t* connection, const char* rea
break;
}
}
while (connection->requests)
bool again = true;
while (again)
{
_tf_ssb_connection_remove_request(connection, connection->requests->request_number);
again = false;
for (tf_ssb_connection_t* it = connection->ssb->connections; it; it = it->next)
{
if (it->tunnel_connection == connection)
{
it->tunnel_connection = NULL;
_tf_ssb_connection_close(it, "tunnel closed");
again = true;
break;
}
else if (connection->tunnel_connection == it)
{
connection->tunnel_connection = NULL;
_tf_ssb_connection_close(it, "tunnel closed");
again = true;
break;
}
}
}
if (!JS_IsUndefined(connection->object))
{
@ -1404,18 +1468,16 @@ static void _tf_ssb_connection_on_close(uv_handle_t* handle)
}
}
static void _tf_ssb_connection_on_tcp_recv(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf)
static void _tf_ssb_connection_on_tcp_recv_internal(tf_ssb_connection_t* connection, const void* data, ssize_t nread)
{
tf_ssb_connection_t* connection = stream->data;
if (nread >= 0)
{
if (connection->recv_size + nread > sizeof(connection->recv_buffer))
{
_tf_ssb_connection_close(connection, "recv buffer overflow");
tf_free(buf->base);
return;
}
memcpy(connection->recv_buffer + connection->recv_size, buf->base, nread);
memcpy(connection->recv_buffer + connection->recv_size, data, nread);
connection->recv_size += nread;
switch (connection->state)
@ -1461,7 +1523,7 @@ static void _tf_ssb_connection_on_tcp_recv(uv_stream_t* stream, ssize_t nread, c
}
else
{
_tf_ssb_connection_client_send_hello((uv_stream_t*)&connection->tcp);
_tf_ssb_connection_client_send_hello(connection);
connection->state = k_tf_ssb_state_server_wait_client_identity;
}
}
@ -1492,12 +1554,17 @@ static void _tf_ssb_connection_on_tcp_recv(uv_stream_t* stream, ssize_t nread, c
{
_tf_ssb_connection_close(connection, "read zero");
}
}
static void _tf_ssb_connection_on_tcp_recv(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf)
{
tf_ssb_connection_t* connection = stream->data;
_tf_ssb_connection_on_tcp_recv_internal(connection, buf->base, nread);
tf_free(buf->base);
}
static void _tf_ssb_connection_client_send_hello(uv_stream_t* stream)
static void _tf_ssb_connection_client_send_hello(tf_ssb_connection_t* connection)
{
tf_ssb_connection_t* connection = stream->data;
char write[crypto_auth_BYTES + crypto_box_PUBLICKEYBYTES];
if (crypto_box_keypair(connection->epub, connection->epriv) != 0)
@ -1536,7 +1603,7 @@ static void _tf_ssb_connection_on_connect(uv_connect_t* connect, int status)
}
else
{
_tf_ssb_connection_client_send_hello(connect->handle);
_tf_ssb_connection_client_send_hello(connection);
}
}
else
@ -1788,10 +1855,6 @@ void tf_ssb_destroy(tf_ssb_t* ssb)
uv_run(ssb->loop, UV_RUN_ONCE);
}
if (ssb->loop == &ssb->own_loop)
{
uv_loop_close(ssb->loop);
}
while (ssb->rpc)
{
tf_ssb_rpc_callback_node_t* node = ssb->rpc;
@ -1848,6 +1911,10 @@ void tf_ssb_destroy(tf_ssb_t* ssb)
}
tf_free(node);
}
if (ssb->loop == &ssb->own_loop)
{
uv_loop_close(ssb->loop);
}
if (ssb->own_context)
{
JS_FreeContext(ssb->context);
@ -1893,6 +1960,15 @@ static void _tf_ssb_connection_send_json_response(tf_ssb_connection_t* connectio
_tf_ssb_on_rpc(connection, flags, request_number, args, message, size, user_data);
}
static void _tf_ssb_connection_cleanup_value(tf_ssb_t* ssb, void* user_data)
{
if (user_data)
{
JSValue callback = JS_MKPTR(JS_TAG_OBJECT, user_data);
JS_FreeValue(tf_ssb_get_context(ssb), callback);
}
}
static JSValue _tf_ssb_connection_send_json(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv)
{
tf_ssb_connection_t* connection = JS_GetOpaque(this_val, _connection_class_id);
@ -1915,9 +1991,10 @@ static JSValue _tf_ssb_connection_send_json(JSContext* context, JSValueConst thi
(const uint8_t*)message,
size,
_tf_ssb_connection_send_json_response,
_tf_ssb_connection_cleanup_value,
JS_IsFunction(context, argv[1]) ? JS_VALUE_GET_PTR(JS_DupValue(context, argv[1])) : NULL);
JS_FreeCString(context, message);
return JS_UNDEFINED;
return JS_NewInt32(context, request_number);
}
static void _tf_ssb_connection_process_message_async(uv_async_t* async)
@ -1952,6 +2029,7 @@ tf_ssb_connection_t* tf_ssb_connection_create(tf_ssb_t* ssb, const char* host, c
JSContext* context = ssb->context;
tf_ssb_connection_t* connection = tf_malloc(sizeof(tf_ssb_connection_t));
memset(connection, 0, sizeof(*connection));
snprintf(connection->name, sizeof(connection->name), "cli%d", s_connection_index++);
connection->ssb = ssb;
connection->tcp.data = connection;
connection->connect.data = connection;
@ -1962,6 +2040,7 @@ tf_ssb_connection_t* tf_ssb_connection_create(tf_ssb_t* ssb, const char* host, c
uv_async_init(ssb->loop, &connection->async, _tf_ssb_connection_process_message_async);
connection->object = JS_NewObjectClass(ssb->context, _connection_class_id);
printf("%s = %p\n", connection->name, JS_VALUE_GET_PTR(connection->object));
JS_SetOpaque(connection->object, connection);
JS_SetPropertyStr(context, connection->object, "send_json", JS_NewCFunction(context, _tf_ssb_connection_send_json, "send_json", 2));
char public_key_str[k_id_base64_len] = { 0 };
@ -1992,6 +2071,67 @@ tf_ssb_connection_t* tf_ssb_connection_create(tf_ssb_t* ssb, const char* host, c
return connection;
}
static void _tf_ssb_connection_tunnel_callback(
tf_ssb_connection_t* connection,
uint8_t flags,
int32_t request_number,
JSValue args,
const uint8_t* message,
size_t size,
void* user_data)
{
tf_ssb_connection_t* tunnel = user_data;
_tf_ssb_connection_on_tcp_recv_internal(tunnel, message, size);
}
tf_ssb_connection_t* tf_ssb_connection_tunnel_create(tf_ssb_connection_t* connection, int32_t request_number, const char* target_id)
{
tf_ssb_t* ssb = connection->ssb;
JSContext* context = ssb->context;
tf_ssb_connection_t* tunnel = tf_malloc(sizeof(tf_ssb_connection_t));
memset(tunnel, 0, sizeof(*tunnel));
snprintf(tunnel->name, sizeof(tunnel->name), "tun%d", s_tunnel_index++);
tunnel->ssb = ssb;
tunnel->tunnel_connection = connection;
tunnel->tunnel_request_number = -request_number;
tunnel->send_request_number = 1;
tunnel->async.data = tunnel;
uv_async_init(ssb->loop, &tunnel->async, _tf_ssb_connection_process_message_async);
tunnel->object = JS_NewObjectClass(ssb->context, _connection_class_id);
printf("%s = %p\n", tunnel->name, JS_VALUE_GET_PTR(connection->object));
JS_SetOpaque(tunnel->object, tunnel);
JS_SetPropertyStr(context, tunnel->object, "send_json", JS_NewCFunction(context, _tf_ssb_connection_send_json, "send_json", 2));
JS_SetPropertyStr(context, tunnel->object, "id", JS_NewString(context, target_id));
JS_SetPropertyStr(context, tunnel->object, "is_client", JS_TRUE);
tf_ssb_id_str_to_bin(tunnel->serverpub, target_id);
tunnel->next = ssb->connections;
ssb->connections = tunnel;
ssb->connections_count++;
_tf_ssb_notify_connections_changed(ssb, k_tf_ssb_change_create, tunnel);
tf_ssb_connection_add_request(
connection,
request_number,
_tf_ssb_connection_tunnel_callback,
NULL,
tunnel);
if (request_number < 0)
{
tunnel->state = k_tf_ssb_state_connected;
_tf_ssb_connection_client_send_hello(tunnel);
}
else
{
tunnel->state = k_tf_ssb_state_server_wait_hello;
}
return tunnel;
}
typedef struct _connect_t {
tf_ssb_t* ssb;
uv_getaddrinfo_t req;
@ -2047,6 +2187,7 @@ static void _tf_ssb_on_connection(uv_stream_t* stream, int status)
tf_ssb_connection_t* connection = tf_malloc(sizeof(tf_ssb_connection_t));
memset(connection, 0, sizeof(*connection));
snprintf(connection->name, sizeof(connection->name), "srv%d", s_connection_index++);
connection->ssb = ssb;
connection->tcp.data = connection;
connection->send_request_number = 1;
@ -2054,6 +2195,7 @@ static void _tf_ssb_on_connection(uv_stream_t* stream, int status)
uv_async_init(ssb->loop, &connection->async, _tf_ssb_connection_process_message_async);
connection->object = JS_NewObjectClass(ssb->context, _connection_class_id);
printf("%s = %p\n", connection->name, JS_VALUE_GET_PTR(connection->object));
JS_SetPropertyStr(ssb->context, connection->object, "send_json", JS_NewCFunction(ssb->context, _tf_ssb_connection_send_json, "send_json", 2));
JS_SetOpaque(connection->object, connection);
@ -2204,7 +2346,7 @@ static bool _tf_ssb_parse_broadcast(const char* in_broadcast, tf_ssb_broadcast_t
printf("pton failed\n");
}
}
else if (strncmp(in_broadcast, "ws:", 3))
else if (strncmp(in_broadcast, "ws:", 3) == 0)
{
printf("Unsupported broadcast: %s\n", in_broadcast);
}
@ -2250,25 +2392,40 @@ static void _tf_ssb_add_broadcast(tf_ssb_t* ssb, const tf_ssb_broadcast_t* broad
return;
}
for (tf_ssb_broadcast_t* node = ssb->broadcasts; node; node = node->next)
if (broadcast->tunnel_connection)
{
if (node->addr.sin_family == broadcast->addr.sin_family &&
node->addr.sin_port == broadcast->addr.sin_port &&
node->addr.sin_addr.s_addr == broadcast->addr.sin_addr.s_addr &&
memcmp(node->pub, broadcast->pub, sizeof(node->pub)) == 0)
for (tf_ssb_broadcast_t* node = ssb->broadcasts; node; node = node->next)
{
node->mtime = time(NULL);
return;
if (node->tunnel_connection == broadcast->tunnel_connection &&
memcmp(node->pub, broadcast->pub, sizeof(node->pub)) == 0)
{
node->mtime = time(NULL);
return;
}
}
}
char key[k_id_base64_len];
if (tf_ssb_id_bin_to_str(key, sizeof(key), broadcast->pub))
else
{
tf_ssb_connections_store(ssb->connections_tracker, broadcast->host, ntohs(broadcast->addr.sin_port), key);
for (tf_ssb_broadcast_t* node = ssb->broadcasts; node; node = node->next)
{
if (node->addr.sin_family == broadcast->addr.sin_family &&
node->addr.sin_port == broadcast->addr.sin_port &&
node->addr.sin_addr.s_addr == broadcast->addr.sin_addr.s_addr &&
memcmp(node->pub, broadcast->pub, sizeof(node->pub)) == 0)
{
node->mtime = time(NULL);
return;
}
}
char key[k_id_base64_len];
if (tf_ssb_id_bin_to_str(key, sizeof(key), broadcast->pub))
{
tf_ssb_connections_store(ssb->connections_tracker, broadcast->host, ntohs(broadcast->addr.sin_port), key);
}
printf("Received new broadcast: host=%s, pub=%s.\n", broadcast->host, key);
}
printf("Received new broadcast: host=%s, pub=%s.\n", broadcast->host, key);
tf_ssb_broadcast_t* node = tf_malloc(sizeof(tf_ssb_broadcast_t));
*node = *broadcast;
node->next = ssb->broadcasts;
@ -2306,7 +2463,7 @@ static void _tf_ssb_on_broadcast_listener_recv(uv_udp_t* handle, ssize_t nread,
tf_free(buf->base);
}
void tf_ssb_visit_broadcasts(tf_ssb_t* ssb, void (*callback)(const struct sockaddr_in* addr, const uint8_t* pub, void* user_data), void* user_data)
void tf_ssb_visit_broadcasts(tf_ssb_t* ssb, void (*callback)(const struct sockaddr_in* addr, tf_ssb_connection_t* tunnel, const uint8_t* pub, void* user_data), void* user_data)
{
time_t now = time(NULL);
tf_ssb_broadcast_t* next = NULL;
@ -2315,7 +2472,7 @@ void tf_ssb_visit_broadcasts(tf_ssb_t* ssb, void (*callback)(const struct sockad
next = node->next;
if (node->mtime - now < 60)
{
callback(&node->addr, node->pub, user_data);
callback(&node->addr, node->tunnel_connection, node->pub, user_data);
}
}
}
@ -2392,6 +2549,24 @@ void tf_ssb_append_post(tf_ssb_t* ssb, const char* text)
JS_FreeValue(ssb->context, obj);
}
tf_ssb_connection_t* tf_ssb_connection_get(tf_ssb_t* ssb, const char* id)
{
uint8_t pub[k_id_bin_len] = { 0 };
tf_ssb_id_str_to_bin(pub, id);
for (tf_ssb_connection_t* connection = ssb->connections; connection; connection = connection->next)
{
if (memcmp(connection->serverpub, pub, k_id_bin_len) == 0)
{
return connection;
}
else if (memcmp(ssb->pub, pub, k_id_bin_len) == 0)
{
return connection;
}
}
return NULL;
}
const char** tf_ssb_get_connection_ids(tf_ssb_t* ssb)
{
int count = 0;
@ -2561,6 +2736,11 @@ JSClassID tf_ssb_get_connection_class_id()
JSValue tf_ssb_connection_get_object(tf_ssb_connection_t* connection)
{
if (connection && !JS_IsUndefined(connection->object))
{
JSRefCountHeader *p = (JSRefCountHeader *)JS_VALUE_GET_PTR(connection->object);
printf("%p _get_object count=%d\nn", JS_VALUE_GET_PTR(connection->object), p->ref_count);
}
return connection ? connection->object : JS_UNDEFINED;
}
@ -2660,3 +2840,42 @@ void tf_ssb_notify_blob_want_added(tf_ssb_t* ssb, const char* id)
node->callback(ssb, id, node->user_data);
}
}
void tf_ssb_connection_add_room_attendant(tf_ssb_connection_t* connection, const char* id)
{
tf_ssb_broadcast_t broadcast =
{
.tunnel_connection = connection,
};
tf_ssb_id_str_to_bin(broadcast.pub, id);
_tf_ssb_add_broadcast(connection->ssb, &broadcast);
}
void tf_ssb_connection_remove_room_attendant(tf_ssb_connection_t* connection, const char* id)
{
uint8_t pub[k_id_bin_len] = { 0 };
tf_ssb_id_str_to_bin(pub, id);
int modified = 0;
for (tf_ssb_broadcast_t** it = &connection->ssb->broadcasts; *it;)
{
if ((*it)->tunnel_connection == connection &&
memcmp((*it)->pub, pub, k_id_bin_len) == 0)
{
tf_ssb_broadcast_t* node = *it;
*it = node->next;
tf_free(node);
connection->ssb->broadcasts_count--;
modified++;
}
else
{
it = &(*it)->next;
}
}
if (modified)
{
_tf_ssb_notify_broadcasts_changed(connection->ssb);
}
}

View File

@ -114,7 +114,7 @@ void tf_ssb_db_init(tf_ssb_t* ssb)
populate_fts = true;
}
if (!populate_fts)
if (!populate_fts && /* HACK */ false)
{
printf("Checking FTS5 integrity...\n");
if (sqlite3_exec(db, "INSERT INTO messages_fts(messages_fts, rank) VALUES ('integrity-check', 0)", NULL, NULL, NULL) == SQLITE_CORRUPT_VTAB)
@ -279,6 +279,7 @@ bool tf_ssb_db_store_message(tf_ssb_t* ssb, JSContext* context, const char* id,
{
printf("%s\n", sqlite3_errmsg(db));
}
printf("changes = %d\n", sqlite3_changes(db));
stored = r == SQLITE_DONE && sqlite3_changes(db) != 0;
if (stored)
{

View File

@ -80,8 +80,9 @@ void tf_ssb_run(tf_ssb_t* ssb);
void tf_ssb_append_message_with_keys(tf_ssb_t* ssb, const char* author, const uint8_t* private_key, JSValue message);
bool tf_ssb_whoami(tf_ssb_t* ssb, char* out_id, size_t out_id_size);
void tf_ssb_visit_broadcasts(tf_ssb_t* ssb, void (*callback)(const struct sockaddr_in* addr, const uint8_t* pub, void* user_data), void* user_data);
void tf_ssb_visit_broadcasts(tf_ssb_t* ssb, void (*callback)(const struct sockaddr_in* addr, tf_ssb_connection_t* tunnel, const uint8_t* pub, void* user_data), void* user_data);
tf_ssb_connection_t* tf_ssb_get_connection(tf_ssb_t* ssb, const char* id);
const char** tf_ssb_get_connection_ids(tf_ssb_t* ssb);
int tf_ssb_get_connections(tf_ssb_t* ssb, tf_ssb_connection_t** out_connections, int out_connections_count);
void tf_ssb_connect(tf_ssb_t* ssb, const char* host, int port, const uint8_t* key);
@ -105,6 +106,7 @@ sqlite3* tf_ssb_connection_get_db(tf_ssb_connection_t* connection);
int32_t tf_ssb_connection_next_request_number(tf_ssb_connection_t* connection);
tf_ssb_connection_t* tf_ssb_connection_get(tf_ssb_t* ssb, const char* id);
bool tf_ssb_connection_get_id(tf_ssb_connection_t* connection, char* out_id, size_t out_id_size);
JSValue tf_ssb_connection_get_object(tf_ssb_connection_t* connection);
@ -132,8 +134,13 @@ typedef void (tf_ssb_rpc_callback_t)(tf_ssb_connection_t* connection, uint8_t fl
void tf_ssb_add_rpc_callback(tf_ssb_t* ssb, const char** name, tf_ssb_rpc_callback_t* callback, tf_ssb_callback_cleanup_t* cleanup, void* user_data);
void tf_ssb_remove_rpc_callback(tf_ssb_t* ssb, const char** name, tf_ssb_rpc_callback_t* callback, void* user_data);
void tf_ssb_connection_rpc_send(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, const uint8_t* message, size_t size, tf_ssb_rpc_callback_t* callback, void* user_data);
void tf_ssb_connection_add_request(tf_ssb_connection_t* connection, int32_t request_number, tf_ssb_rpc_callback_t* callback, void* user_data);
void tf_ssb_connection_rpc_send(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, const uint8_t* message, size_t size, tf_ssb_rpc_callback_t* callback, tf_ssb_callback_cleanup_t* cleanup, void* user_data);
void tf_ssb_connection_add_request(tf_ssb_connection_t* connection, int32_t request_number, tf_ssb_rpc_callback_t* callback, tf_ssb_callback_cleanup_t* cleanup, void* user_data);
void tf_ssb_connection_add_room_attendant(tf_ssb_connection_t* connection, const char* id);
void tf_ssb_connection_remove_room_attendant(tf_ssb_connection_t* connection, const char* id);
tf_ssb_connection_t* tf_ssb_connection_tunnel_create(tf_ssb_connection_t* connection, int32_t request_number, const char* target_id);
JSClassID tf_ssb_get_connection_class_id();

View File

@ -253,6 +253,15 @@ static JSValue _tf_ssb_connections(JSContext* context, JSValueConst this_val, in
return result;
}
static JSValue _tf_ssb_getConnection(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv)
{
tf_ssb_t* ssb = JS_GetOpaque(this_val, _tf_ssb_classId);
const char* id = JS_ToCString(context, argv[0]);
tf_ssb_connection_t* connection = tf_ssb_connection_get(ssb, id);
JS_FreeCString(context, id);
return JS_DupValue(context, tf_ssb_connection_get_object(connection));
}
typedef struct _sqlStream_callback_t
{
JSContext* context;
@ -319,7 +328,7 @@ typedef struct _broadcasts_t
int length;
} broadcasts_t;
static void _tf_ssb_broadcasts_visit(const struct sockaddr_in* addr, const uint8_t* pub, void* user_data)
static void _tf_ssb_broadcasts_visit(const struct sockaddr_in* addr, tf_ssb_connection_t* tunnel, const uint8_t* pub, void* user_data)
{
broadcasts_t* broadcasts = user_data;
JSValue entry = JS_NewObject(broadcasts->context);
@ -327,8 +336,15 @@ static void _tf_ssb_broadcasts_visit(const struct sockaddr_in* addr, const uint8
char pubkey[k_id_base64_len];
uv_ip4_name(addr, address, sizeof(address));
tf_ssb_id_bin_to_str(pubkey, sizeof(pubkey), pub);
JS_SetPropertyStr(broadcasts->context, entry, "address", JS_NewString(broadcasts->context, address));
JS_SetPropertyStr(broadcasts->context, entry, "port", JS_NewInt32(broadcasts->context, ntohs(addr->sin_port)));
if (tunnel)
{
JS_SetPropertyStr(broadcasts->context, entry, "tunnel", JS_DupValue(broadcasts->context, tf_ssb_connection_get_object(tunnel)));
}
else
{
JS_SetPropertyStr(broadcasts->context, entry, "address", JS_NewString(broadcasts->context, address));
JS_SetPropertyStr(broadcasts->context, entry, "port", JS_NewInt32(broadcasts->context, ntohs(addr->sin_port)));
}
JS_SetPropertyStr(broadcasts->context, entry, "pubkey", JS_NewString(broadcasts->context, pubkey));
JS_SetPropertyUint32(broadcasts->context, broadcasts->array, broadcasts->length++, entry);
}
@ -411,6 +427,7 @@ static JSValue _tf_ssb_rpc_send_json(JSContext* context, JSValueConst this_val,
(const uint8_t*)message,
size,
NULL,
NULL,
NULL);
JS_FreeValue(context, connection_val);
JS_FreeCString(context, message);
@ -443,6 +460,7 @@ static JSValue _tf_ssb_rpc_send_json_end(JSContext* context, JSValueConst this_v
(const uint8_t*)message,
size,
NULL,
NULL,
NULL);
JS_FreeValue(context, connection_val);
JS_FreeCString(context, message);
@ -450,6 +468,12 @@ static JSValue _tf_ssb_rpc_send_json_end(JSContext* context, JSValueConst this_v
return JS_UNDEFINED;
}
static void _tf_ssb_cleanup_value(tf_ssb_t* ssb, void* user_data)
{
JSValue callback = JS_MKPTR(JS_TAG_OBJECT, user_data);
JS_FreeValue(tf_ssb_get_context(ssb), callback);
}
static JSValue _tf_ssb_rpc_more(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv)
{
JSValue connection_val = JS_GetPropertyStr(context, this_val, "connection");
@ -459,7 +483,7 @@ static JSValue _tf_ssb_rpc_more(JSContext* context, JSValueConst this_val, int a
JS_ToInt32(context, &request_number, request_val);
JS_FreeValue(context, request_val);
tf_ssb_connection_add_request(connection, -request_number, _tf_ssb_on_rpc, JS_VALUE_GET_PTR(JS_DupValue(context, argv[0])));
tf_ssb_connection_add_request(connection, -request_number, _tf_ssb_on_rpc, _tf_ssb_cleanup_value, JS_VALUE_GET_PTR(JS_DupValue(context, argv[0])));
JS_FreeValue(context, connection_val);
return JS_UNDEFINED;
@ -485,6 +509,7 @@ static JSValue _tf_ssb_rpc_send_binary(JSContext* context, JSValueConst this_val
(const uint8_t*)message,
size,
NULL,
NULL,
NULL);
}
else
@ -505,6 +530,7 @@ static JSValue _tf_ssb_rpc_send_binary(JSContext* context, JSValueConst this_val
(const uint8_t*)message + offset,
size,
NULL,
NULL,
NULL);
}
}
@ -514,6 +540,28 @@ static JSValue _tf_ssb_rpc_send_binary(JSContext* context, JSValueConst this_val
return JS_UNDEFINED;
}
static JSValue _tf_ssb_rpc_add_room_attendant(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv)
{
JSValue connection_val = JS_GetPropertyStr(context, this_val, "connection");
tf_ssb_connection_t* connection = JS_GetOpaque(connection_val, tf_ssb_get_connection_class_id());
const char* id = JS_ToCString(context, argv[0]);
tf_ssb_connection_add_room_attendant(connection, id);
JS_FreeCString(context, id);
JS_FreeValue(context, connection_val);
return JS_UNDEFINED;
}
static JSValue _tf_ssb_rpc_remove_room_attendant(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv)
{
JSValue connection_val = JS_GetPropertyStr(context, this_val, "connection");
tf_ssb_connection_t* connection = JS_GetOpaque(connection_val, tf_ssb_get_connection_class_id());
const char* id = JS_ToCString(context, argv[0]);
tf_ssb_connection_remove_room_attendant(connection, id);
JS_FreeCString(context, id);
JS_FreeValue(context, connection_val);
return JS_UNDEFINED;
}
void _tf_ssb_on_rpc(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data)
{
tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection);
@ -530,6 +578,8 @@ void _tf_ssb_on_rpc(tf_ssb_connection_t* connection, uint8_t flags, int32_t requ
JS_SetPropertyStr(context, object, "send_binary", JS_NewCFunction(context, _tf_ssb_rpc_send_binary, "send_binary", 1));
JS_SetPropertyStr(context, object, "send_json_end", JS_NewCFunction(context, _tf_ssb_rpc_send_json_end, "send_json_end", 1));
JS_SetPropertyStr(context, object, "more", JS_NewCFunction(context, _tf_ssb_rpc_more, "more", 1));
JS_SetPropertyStr(context, object, "add_room_attendant", JS_NewCFunction(context, _tf_ssb_rpc_add_room_attendant, "add_room_attendant", 1));
JS_SetPropertyStr(context, object, "remove_room_attendant", JS_NewCFunction(context, _tf_ssb_rpc_remove_room_attendant, "remove_room_attendant", 1));
JSValue result = JS_Call(context, callback, JS_UNDEFINED, 1, &object);
tf_util_report_error(context, result);
@ -537,12 +587,6 @@ void _tf_ssb_on_rpc(tf_ssb_connection_t* connection, uint8_t flags, int32_t requ
JS_FreeValue(context, object);
}
static void _tf_ssb_cleanup_value(tf_ssb_t* ssb, void* user_data)
{
JSValue callback = JS_MKPTR(JS_TAG_OBJECT, user_data);
JS_FreeValue(tf_ssb_get_context(ssb), callback);
}
static JSValue _tf_ssb_add_rpc(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv)
{
tf_ssb_t* ssb = JS_GetOpaque(this_val, _tf_ssb_classId);
@ -888,6 +932,67 @@ static JSValue _tf_ssb_hmacsha256_verify(JSContext* context, JSValueConst this_v
return result;
}
static JSValue _tf_ssb_createTunnel(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv)
{
tf_ssb_connection_t* connection = JS_GetOpaque(argv[0], tf_ssb_get_connection_class_id());
int32_t request_number = 0;
JS_ToInt32(context, &request_number, argv[1]);
const char* target_id = JS_ToCString(context, argv[2]);
tf_ssb_connection_tunnel_create(connection, request_number, target_id);
JS_FreeCString(context, target_id);
return JS_UNDEFINED;
}
typedef struct tunnel_t
{
tf_ssb_connection_t* connection;
int32_t request_number;
} tunnel_t;
void _tf_ssb_tunnel_rpc_callback(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data)
{
tunnel_t* tun = user_data;
tf_ssb_connection_rpc_send(tun->connection, flags, tun->request_number, message, size, NULL, NULL, NULL);
}
void _tf_ssb_tunnel_cleanup(tf_ssb_t* ssb, void* user_data)
{
tf_free(user_data);
}
static JSValue _tf_ssb_tunnel(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv)
{
tf_ssb_connection_t* connection0 = JS_GetOpaque(argv[0], tf_ssb_get_connection_class_id());
int32_t request_number0 = 0;
JS_ToInt32(context, &request_number0, argv[1]);
tf_ssb_connection_t* connection1 = JS_GetOpaque(argv[2], tf_ssb_get_connection_class_id());
int32_t request_number1 = 0;
JS_ToInt32(context, &request_number1, argv[3]);
printf("TUNNEL %p %d <=> %p %d\n", connection0, request_number0, connection1, request_number1);
tunnel_t* data0 = tf_malloc(sizeof(tunnel_t));
*data0 = (tunnel_t)
{
.connection = connection1,
.request_number = request_number1,
};
tunnel_t* data1 = tf_malloc(sizeof(tunnel_t));
*data1 = (tunnel_t)
{
.connection = connection0,
.request_number = request_number0,
};
printf("ADD REQUEST %p %d\n", connection0, request_number0);
printf("ADD REQUEST %p %d\n", connection1, request_number1);
tf_ssb_connection_add_request(connection0, request_number0, _tf_ssb_tunnel_rpc_callback, _tf_ssb_tunnel_cleanup, data0);
tf_ssb_connection_add_request(connection1, request_number1, _tf_ssb_tunnel_rpc_callback, _tf_ssb_tunnel_cleanup, data1);
return JS_UNDEFINED;
}
void tf_ssb_register(JSContext* context, tf_ssb_t* ssb)
{
JS_NewClassID(&_tf_ssb_classId);
@ -919,10 +1024,13 @@ void tf_ssb_register(JSContext* context, tf_ssb_t* ssb)
JS_SetPropertyStr(context, object, "blobStore", JS_NewCFunction(context, _tf_ssb_blobStore, "blobStore", 2));
JS_SetPropertyStr(context, object, "messageContentGet", JS_NewCFunction(context, _tf_ssb_messageContentGet, "messageContentGet", 1));
JS_SetPropertyStr(context, object, "connections", JS_NewCFunction(context, _tf_ssb_connections, "connections", 0));
JS_SetPropertyStr(context, object, "getConnection", JS_NewCFunction(context, _tf_ssb_getConnection, "getConnection", 1));
JS_SetPropertyStr(context, object, "sqlStream", JS_NewCFunction(context, _tf_ssb_sqlStream, "sqlStream", 3));
JS_SetPropertyStr(context, object, "storeMessage", JS_NewCFunction(context, _tf_ssb_storeMessage, "storeMessage", 1));
JS_SetPropertyStr(context, object, "getBroadcasts", JS_NewCFunction(context, _tf_ssb_getBroadcasts, "getBroadcasts", 0));
JS_SetPropertyStr(context, object, "connect", JS_NewCFunction(context, _tf_ssb_connect, "connect", 1));
JS_SetPropertyStr(context, object, "createTunnel", JS_NewCFunction(context, _tf_ssb_createTunnel, "createTunnel", 3));
JS_SetPropertyStr(context, object, "tunnel", JS_NewCFunction(context, _tf_ssb_tunnel, "tunnel", 4));
/* Should be trusted only. */
JS_SetPropertyStr(context, object, "addRpc", JS_NewCFunction(context, _tf_ssb_add_rpc, "addRpc", 2));

View File

@ -35,8 +35,13 @@ void tf_ssb_test_id_conversion(const tf_test_options_t* options)
typedef struct _test_t {
tf_ssb_t* ssb0;
tf_ssb_t* ssb1;
tf_ssb_t* ssb2;
int connection_count0;
int connection_count1;
int connection_count2;
int broadcast_count0;
int broadcast_count1;
int broadcast_count2;
} test_t;
static void _ssb_test_connections_changed(tf_ssb_t* ssb, tf_ssb_change_t change, tf_ssb_connection_t* connection, void* user_data)
@ -61,7 +66,12 @@ static void _ssb_test_connections_changed(tf_ssb_t* ssb, tf_ssb_change_t change,
printf("callback1 change=%d connection=%p\n", change, connection);
test->connection_count1 = count;
}
printf("conns = %d %d\n", test->connection_count0, test->connection_count1);
else if (ssb == test->ssb2)
{
printf("callback2 change=%d connection=%p\n", change, connection);
test->connection_count2 = count;
}
printf("conns = %d %d %d\n", test->connection_count0, test->connection_count1, test->connection_count2);
}
static void _count_messages_callback(JSValue row, void* user_data)
@ -240,6 +250,193 @@ void tf_ssb_test_ssb(const tf_test_options_t* options)
sqlite3_close(db1);
}
static void _broadcasts_visit(const struct sockaddr_in* addr, tf_ssb_connection_t* tunnel, const uint8_t* pub, void* user_data)
{
int* count = user_data;
(*count)++;
}
static void _broadcasts_changed(tf_ssb_t* ssb, void* user_data)
{
int* count = NULL;
test_t* test = user_data;
if (ssb == test->ssb0)
{
count = &test->broadcast_count0;
}
else if (ssb == test->ssb1)
{
count = &test->broadcast_count1;
}
else if (ssb == test->ssb2)
{
count = &test->broadcast_count2;
}
*count = 0;
tf_ssb_visit_broadcasts(ssb, _broadcasts_visit, count);
printf("BROADCASTS %d %d %d\n", test->broadcast_count0, test->broadcast_count1, test->broadcast_count2);
}
void tf_ssb_test_rooms(const tf_test_options_t* options)
{
printf("Testing Rooms.\n");
sqlite3* db0 = NULL;
sqlite3* db1 = NULL;
sqlite3* db2 = NULL;
int r = sqlite3_open(":memory:", &db0);
(void)r;
assert(r == SQLITE_OK);
r = sqlite3_open(":memory:", &db1);
assert(r == SQLITE_OK);
r = sqlite3_open(":memory:", &db2);
assert(r == SQLITE_OK);
uv_loop_t loop = { 0 };
uv_loop_init(&loop);
tf_ssb_t* ssb0 = tf_ssb_create(&loop, NULL, db0);
tf_ssb_register(tf_ssb_get_context(ssb0), ssb0);
tf_ssb_t* ssb1 = tf_ssb_create(&loop, NULL, db1);
tf_ssb_register(tf_ssb_get_context(ssb1), ssb1);
tf_ssb_t* ssb2 = tf_ssb_create(&loop, NULL, db2);
tf_ssb_register(tf_ssb_get_context(ssb2), ssb2);
uv_idle_t idle0 = { .data = ssb0 };
uv_idle_init(&loop, &idle0);
uv_idle_start(&idle0, _ssb_test_idle);
uv_idle_t idle1 = { .data = ssb1 };
uv_idle_init(&loop, &idle1);
uv_idle_start(&idle1, _ssb_test_idle);
uv_idle_t idle2 = { .data = ssb2 };
uv_idle_init(&loop, &idle2);
uv_idle_start(&idle2, _ssb_test_idle);
test_t test =
{
.ssb0 = ssb0,
.ssb1 = ssb1,
.ssb2 = ssb2,
};
tf_ssb_add_connections_changed_callback(ssb0, _ssb_test_connections_changed, NULL, &test);
tf_ssb_add_connections_changed_callback(ssb1, _ssb_test_connections_changed, NULL, &test);
tf_ssb_add_connections_changed_callback(ssb2, _ssb_test_connections_changed, NULL, &test);
tf_ssb_add_broadcasts_changed_callback(ssb0, _broadcasts_changed, NULL, &test);
tf_ssb_add_broadcasts_changed_callback(ssb1, _broadcasts_changed, NULL, &test);
tf_ssb_add_broadcasts_changed_callback(ssb2, _broadcasts_changed, NULL, &test);
tf_ssb_generate_keys(ssb0);
tf_ssb_generate_keys(ssb1);
tf_ssb_generate_keys(ssb2);
char id0[k_id_base64_len] = { 0 };
char id1[k_id_base64_len] = { 0 };
char id2[k_id_base64_len] = { 0 };
bool b = tf_ssb_whoami(ssb0, id0, sizeof(id0));
(void)b;
assert(b);
b = tf_ssb_whoami(ssb1, id1, sizeof(id1));
assert(b);
b = tf_ssb_whoami(ssb2, id2, sizeof(id2));
assert(b);
printf("ID %s, %s, %s\n", id0, id1, id2);
tf_ssb_server_open(ssb0, 12347);
uint8_t id0bin[k_id_bin_len];
tf_ssb_id_str_to_bin(id0bin, id0);
tf_ssb_connect(ssb1, "127.0.0.1", 12347, id0bin);
tf_ssb_connect(ssb2, "127.0.0.1", 12347, id0bin);
printf("Waiting for connection.\n");
while (test.connection_count0 != 2 ||
test.connection_count1 != 1 ||
test.connection_count2 != 1)
{
uv_run(&loop, UV_RUN_ONCE);
}
tf_ssb_server_close(ssb0);
while (test.broadcast_count1 != 1 ||
test.broadcast_count2 != 1)
{
uv_run(&loop, UV_RUN_ONCE);
}
tf_ssb_connection_t* connections[4];
int count = tf_ssb_get_connections(ssb1, connections, 4);
assert(count == 1);
int32_t tunnel_request_number = tf_ssb_connection_next_request_number(connections[0]);
JSContext* context = tf_ssb_get_context(ssb1);
JSValue message = JS_NewObject(context);
JSValue name = JS_NewArray(context);
JS_SetPropertyUint32(context, name, 0, JS_NewString(context, "tunnel"));
JS_SetPropertyUint32(context, name, 1, JS_NewString(context, "connect"));
JS_SetPropertyStr(context, message, "name", name);
JSValue args = JS_NewArray(context);
JSValue arg = JS_NewObject(context);
JS_SetPropertyStr(context, arg, "portal", JS_NewString(context, id0));
JS_SetPropertyStr(context, arg, "target", JS_NewString(context, id2));
JS_SetPropertyUint32(context, args, 0, arg);
JS_SetPropertyStr(context, message, "args", args);
JS_SetPropertyStr(context, message, "type", JS_NewString(context, "duplex"));
JSValue message_json = JS_JSONStringify(context, message, JS_NULL, JS_NULL);
size_t size;
const char* raw = JS_ToCStringLen(context, &size, message_json);
tf_ssb_connection_rpc_send(
connections[0],
k_ssb_rpc_flag_json | k_ssb_rpc_flag_stream,
tunnel_request_number,
(const uint8_t*)raw,
size,
NULL,
NULL,
NULL);
JS_FreeCString(context, raw);
JS_FreeValue(context, message_json);
JS_FreeValue(context, message);
tf_ssb_connection_t* tun0 = tf_ssb_connection_tunnel_create(connections[0], tunnel_request_number, id2);
printf("tun0 = %p\n", tun0);
printf("Done.\n");
while (test.connection_count0 != 2 ||
test.connection_count1 != 2 ||
test.connection_count2 != 2)
{
uv_run(&loop, UV_RUN_ONCE);
}
printf("Done.\n");
tf_ssb_send_close(ssb1);
tf_ssb_send_close(ssb2);
uv_close((uv_handle_t*)&idle0, NULL);
uv_close((uv_handle_t*)&idle1, NULL);
uv_close((uv_handle_t*)&idle2, NULL);
uv_run(&loop, UV_RUN_DEFAULT);
tf_ssb_destroy(ssb0);
tf_ssb_destroy(ssb1);
tf_ssb_destroy(ssb2);
uv_loop_close(&loop);
sqlite3_close(db0);
sqlite3_close(db1);
sqlite3_close(db2);
}
void tf_ssb_test_following(const tf_test_options_t* options)
{
printf("Testing following.\n");

View File

@ -5,3 +5,4 @@ typedef struct _tf_test_options_t tf_test_options_t;
void tf_ssb_test_id_conversion(const tf_test_options_t* options);
void tf_ssb_test_ssb(const tf_test_options_t* options);
void tf_ssb_test_following(const tf_test_options_t* options);
void tf_ssb_test_rooms(const tf_test_options_t* options);

View File

@ -696,5 +696,6 @@ void tf_tests(const tf_test_options_t* options)
_tf_test_run(options, "file", _test_file);
_tf_test_run(options, "sign", _test_sign);
_tf_test_run(options, "b64", _test_b64);
_tf_test_run(options, "rooms", tf_ssb_test_rooms);
printf("Tests completed.\n");
}