#include "ssb.h" #include "log.h" #include "mem.h" #include "ssb.connections.h" #include "ssb.db.h" #include "ssb.rpc.h" #include "trace.h" #include "util.js.h" #include "ares.h" #include "quickjs.h" #include "sodium/crypto_auth.h" #include "sodium/crypto_box.h" #include "sodium/crypto_hash_sha256.h" #include "sodium/crypto_scalarmult.h" #include "sodium/crypto_scalarmult_curve25519.h" #include "sodium/crypto_secretbox.h" #include "sodium/crypto_sign.h" #include "sodium/utils.h" #include "sqlite3.h" #include "uv.h" #if !defined(_WIN32) #include #endif #include #include #include #include #include #if !defined(_countof) #define _countof(a) ((int)(sizeof((a)) / sizeof(*(a)))) #endif #define GREEN "\e[1;32m" #define MAGENTA "\e[1;35m" #define CYAN "\e[1;36m" #define RESET "\e[0m" #define PRE_CALLBACK(ssb, cb) uint64_t pre_callback_hrtime_ns = _tf_ssb_callback_pre(ssb) #define POST_CALLBACK(ssb, cb) _tf_ssb_callback_post(ssb, cb, pre_callback_hrtime_ns) static_assert(k_id_base64_len == sodium_base64_ENCODED_LEN(9 + crypto_box_PUBLICKEYBYTES, sodium_base64_VARIANT_ORIGINAL), "k_id_base64_len"); static_assert(k_id_bin_len == crypto_box_PUBLICKEYBYTES, "k_id_bin_len"); static_assert(k_blob_id_len == (sodium_base64_ENCODED_LEN(crypto_hash_sha256_BYTES, sodium_base64_VARIANT_ORIGINAL) + 8), "k_blob_id_len"); const char* k_ssb_network_string = "d4a1cb88a66f02f8db635ce26441cc5dac1b08420ceaac230839b755845a9ffb"; const char* k_ssb_type_names[] = { "binary", "utf8", "json", "unknown", }; typedef enum { k_tf_ssb_state_invalid, k_tf_ssb_state_connected, k_tf_ssb_state_sent_hello, k_tf_ssb_state_sent_identity, k_tf_ssb_state_verified, k_tf_ssb_state_server_wait_hello, k_tf_ssb_state_server_wait_client_identity, k_tf_ssb_state_server_verified, k_tf_ssb_state_closing, } tf_ssb_state_t; enum { k_connections_changed_callbacks_max = 8, k_tf_ssb_rpc_message_body_length_max = 1 * 1024 * 1024, k_debug_close_message_count = 256, k_debug_close_connection_count = 32, k_seed_expire_seconds = 10 * 60, k_seed_check_interval_seconds = 5 * 60, k_udp_discovery_expires_seconds = 10, k_handshake_timeout_ms = 15000, k_rpc_active_ms = 3000, }; typedef struct _tf_ssb_broadcast_t tf_ssb_broadcast_t; typedef struct _tf_ssb_connection_t tf_ssb_connection_t; typedef struct _tf_ssb_request_t tf_ssb_request_t; typedef struct _tf_ssb_debug_message_t { bool outgoing; int flags; int32_t request_number; time_t timestamp; size_t size; uint8_t data[]; } tf_ssb_debug_message_t; typedef struct _tf_ssb_debug_close_t { char id[k_id_base64_len]; char tunnel[k_id_base64_len]; char reason[128]; tf_ssb_debug_message_t* messages[k_debug_close_message_count]; } tf_ssb_debug_close_t; typedef struct _tf_ssb_request_t { char name[256]; tf_ssb_rpc_callback_t* callback; tf_ssb_callback_cleanup_t* cleanup; void* user_data; tf_ssb_connection_t* dependent_connection; uint64_t last_active; int32_t request_number; } tf_ssb_request_t; typedef struct _tf_ssb_broadcast_t { tf_ssb_broadcast_t* next; time_t ctime; time_t mtime; time_t expires_at; char host[256]; tf_ssb_broadcast_origin_t origin; struct sockaddr_in addr; tf_ssb_connection_t* tunnel_connection; uint8_t pub[crypto_sign_PUBLICKEYBYTES]; } tf_ssb_broadcast_t; typedef struct _tf_ssb_rpc_callback_node_t tf_ssb_rpc_callback_node_t; typedef struct _tf_ssb_rpc_callback_node_t { const char* name; tf_ssb_rpc_callback_t* callback; tf_ssb_callback_cleanup_t* cleanup; void* user_data; tf_ssb_rpc_callback_node_t* next; } tf_ssb_rpc_callback_node_t; typedef struct _tf_ssb_connections_changed_callback_node_t tf_ssb_connections_changed_callback_node_t; typedef struct _tf_ssb_connections_changed_callback_node_t { tf_ssb_connections_changed_callback_t* callback; tf_ssb_callback_cleanup_t* cleanup; void* user_data; tf_ssb_connections_changed_callback_node_t* next; } tf_ssb_connections_changed_callback_node_t; typedef struct _tf_ssb_message_added_callback_node_t tf_ssb_message_added_callback_node_t; typedef struct _tf_ssb_message_added_callback_node_t { tf_ssb_message_added_callback_t* callback; tf_ssb_callback_cleanup_t* cleanup; void* user_data; tf_ssb_message_added_callback_node_t* next; } tf_ssb_message_added_callback_node_t; typedef struct _tf_ssb_blob_want_added_callback_node_t tf_ssb_blob_want_added_callback_node_t; typedef struct _tf_ssb_blob_want_added_callback_node_t { tf_ssb_blob_want_added_callback_t* callback; tf_ssb_callback_cleanup_t* cleanup; void* user_data; tf_ssb_blob_want_added_callback_node_t* next; } tf_ssb_blob_want_added_callback_node_t; typedef struct _tf_ssb_broadcasts_changed_callback_node_t tf_ssb_broadcasts_changed_callback_node_t; typedef struct _tf_ssb_broadcasts_changed_callback_node_t { tf_ssb_broadcasts_changed_callback_t* callback; tf_ssb_callback_cleanup_t* cleanup; void* user_data; tf_ssb_broadcasts_changed_callback_node_t* next; } tf_ssb_broadcasts_changed_callback_node_t; typedef struct _tf_thread_work_time_t { int64_t thread_id; uint64_t hrtime; } tf_thread_work_time_t; typedef struct _tf_ssb_timer_t { tf_ssb_t* ssb; uv_timer_t timer; void (*callback)(tf_ssb_t* ssb, void* user_data); void* user_data; } tf_ssb_timer_t; typedef struct _tf_ssb_t { bool own_context; JSRuntime* runtime; JSContext* context; tf_trace_t* trace; const char* db_path; uv_mutex_t db_readers_lock; uv_mutex_t db_writer_lock; sqlite3* db_writer; sqlite3** db_readers; int db_readers_count; uv_loop_t own_loop; uv_loop_t* loop; uv_udp_t broadcast_listener; uv_udp_t broadcast_sender; uv_timer_t broadcast_cleanup_timer; uv_timer_t broadcast_timer; uv_timer_t trace_timer; uv_timer_t request_activity_timer; uv_tcp_t server; uint8_t network_key[32]; uint8_t pub[crypto_sign_PUBLICKEYBYTES]; uint8_t priv[crypto_sign_SECRETKEYBYTES]; bool verbose; bool store_debug_messages; bool shutting_down; int messages_stored; int blobs_stored; int rpc_in; int rpc_out; tf_ssb_connection_t* connections; int connections_count; int connection_ref_count; int request_count; tf_ssb_connections_t* connections_tracker; tf_ssb_broadcast_t* broadcasts; int broadcasts_count; tf_ssb_rpc_callback_node_t* rpc; int rpc_count; tf_ssb_connections_changed_callback_node_t* connections_changed; int connections_changed_count; tf_ssb_message_added_callback_node_t* message_added; int message_added_count; tf_ssb_blob_want_added_callback_node_t* blob_want_added; int blob_want_added_count; tf_ssb_broadcasts_changed_callback_node_t* broadcasts_changed; int broadcasts_changed_count; tf_ssb_debug_close_t debug_close[k_debug_close_connection_count]; int32_t thread_busy_count; int32_t thread_busy_max; void (*hitch_callback)(const char* name, uint64_t duration, void* user_data); void* hitch_user_data; tf_ssb_store_queue_t store_queue; int ref_count; uv_thread_t thread_self; bool is_room; bool is_replicator; bool is_peer_exchange; char* room_name; char seeds_host[256]; time_t last_seed_check; tf_ssb_timer_t** timers; int timers_count; } tf_ssb_t; typedef struct _tf_ssb_connection_message_request_t { int32_t request_number; char author[k_id_base64_len]; bool keys; } tf_ssb_connection_message_request_t; typedef struct _tf_ssb_connection_scheduled_t { tf_ssb_scheduled_callback_t* callback; void* user_data; } tf_ssb_connection_scheduled_t; typedef struct _tf_ssb_connection_t { tf_ssb_t* ssb; uv_tcp_t tcp; uv_connect_t connect; uv_async_t async; uv_timer_t handshake_timer; bool closing; tf_ssb_connection_t* tunnel_connection; int32_t tunnel_request_number; tf_ssb_blob_wants_t blob_wants; bool sent_clock; int32_t ebt_request_number; JSValue object; char name[32]; char host[256]; int port; tf_ssb_state_t state; bool is_attendant; int32_t attendant_request_number; uint8_t epub[crypto_box_PUBLICKEYBYTES]; uint8_t epriv[crypto_box_SECRETKEYBYTES]; uint8_t serverpub[crypto_box_PUBLICKEYBYTES]; uint8_t serverepub[crypto_box_PUBLICKEYBYTES]; uint8_t detached_signature_A[crypto_sign_BYTES]; uint8_t s_to_c_box_key[crypto_hash_sha256_BYTES]; uint8_t c_to_s_box_key[crypto_hash_sha256_BYTES]; uint8_t recv_buffer[128 * 1024]; size_t recv_size; uint8_t nonce[crypto_secretbox_NONCEBYTES]; uint8_t send_nonce[crypto_secretbox_NONCEBYTES]; uint16_t body_len; uint8_t body_auth_tag[16]; uint8_t rpc_recv_buffer[8 * 1024 * 1024]; uint8_t pad; size_t rpc_recv_size; uint8_t box_stream_buf[16 + k_tf_ssb_rpc_message_body_length_max]; uint8_t secretbox_buf[k_tf_ssb_rpc_message_body_length_max]; uint32_t send_request_number; tf_ssb_connection_t* next; tf_ssb_request_t* requests; int requests_count; const char* destroy_reason; tf_ssb_connection_message_request_t* message_requests; int message_requests_count; tf_ssb_connection_scheduled_t* scheduled; int scheduled_count; tf_ssb_debug_message_t* debug_messages[k_debug_close_message_count]; int ref_count; int read_back_pressure; int active_write_count; uint64_t last_notified_active; int flags; } tf_ssb_connection_t; static JSClassID _connection_class_id; static int s_connection_index; static int s_tunnel_index; static void _tf_ssb_add_broadcast(tf_ssb_t* ssb, const tf_ssb_broadcast_t* broadcast, int expires_seconds); static void _tf_ssb_connection_client_send_hello(tf_ssb_connection_t* connection); static void _tf_ssb_connection_close(tf_ssb_connection_t* connection, const char* reason); static void _tf_ssb_connection_destroy(tf_ssb_connection_t* connection, const char* reason); static void _tf_ssb_connection_finalizer(JSRuntime* runtime, JSValue value); static void _tf_ssb_connection_on_close(uv_handle_t* handle); static void _tf_ssb_nonce_inc(uint8_t* nonce); static void _tf_ssb_notify_connections_changed(tf_ssb_t* ssb, tf_ssb_change_t change, tf_ssb_connection_t* connection); static bool _tf_ssb_parse_broadcast(const char* in_broadcast, tf_ssb_broadcast_t* out_broadcast); static void _tf_ssb_start_update_settings(tf_ssb_t* ssb); static void _tf_ssb_update_settings(tf_ssb_t* ssb); static void _tf_ssb_write(tf_ssb_connection_t* connection, void* data, size_t size); static void _tf_ssb_add_debug_close(tf_ssb_t* ssb, tf_ssb_connection_t* connection, const char* reason) { if (!ssb->store_debug_messages) { return; } for (int i = 0; i < k_debug_close_message_count; i++) { tf_free(ssb->debug_close[k_debug_close_connection_count - 1].messages[i]); } for (int i = k_debug_close_connection_count - 1; i > 0; i--) { ssb->debug_close[i] = ssb->debug_close[i - 1]; } tf_ssb_id_bin_to_str(ssb->debug_close[0].id, sizeof(ssb->debug_close[0].id), connection->serverpub); if (connection->tunnel_connection) { tf_ssb_id_bin_to_str(ssb->debug_close[0].tunnel, sizeof(ssb->debug_close[0].tunnel), connection->tunnel_connection->serverpub); } else { memset(ssb->debug_close[0].tunnel, 0, sizeof(ssb->debug_close[0].tunnel)); } memcpy(ssb->debug_close[0].messages, connection->debug_messages, sizeof(connection->debug_messages)); memset(connection->debug_messages, 0, sizeof(connection->debug_messages)); snprintf(ssb->debug_close[0].reason, sizeof(ssb->debug_close[0].reason), "%s", reason); } static void _tf_ssb_connection_add_debug_message(tf_ssb_connection_t* connection, bool outgoing, int flags, int32_t request_number, const uint8_t* data, size_t size) { if (!connection->ssb->store_debug_messages) { return; } if (connection->debug_messages[k_debug_close_message_count - 1]) { tf_free(connection->debug_messages[k_debug_close_message_count - 1]); } for (int i = k_debug_close_message_count - 1; i > 0; i--) { connection->debug_messages[i] = connection->debug_messages[i - 1]; } tf_ssb_debug_message_t* message = tf_malloc(sizeof(tf_ssb_debug_message_t) + size); *message = (tf_ssb_debug_message_t) { .outgoing = outgoing, .flags = flags, .request_number = request_number, .timestamp = time(NULL), .size = size, }; memcpy(message + 1, data, size); connection->debug_messages[0] = message; } static void _tf_ssb_connection_send_close(tf_ssb_connection_t* connection) { uint8_t message_enc[34]; uint8_t nonce1[crypto_secretbox_NONCEBYTES]; memcpy(nonce1, connection->send_nonce, sizeof(nonce1)); _tf_ssb_nonce_inc(connection->send_nonce); uint8_t header[18] = { 0 }; if (crypto_secretbox_easy(message_enc, header, sizeof(header), nonce1, connection->c_to_s_box_key) == 0) { _tf_ssb_write(connection, message_enc, sizeof(message_enc)); } _tf_ssb_connection_close(connection, "crypto_secretbox_easy close message"); } static void _tf_ssb_connection_close(tf_ssb_connection_t* connection, const char* reason) { if (connection->state == k_tf_ssb_state_closing) { return; } else if (connection->state == k_tf_ssb_state_verified || connection->state == k_tf_ssb_state_server_verified) { tf_printf("Connection %s %p is closing: %s.\n", connection->name, connection, reason); _tf_ssb_add_debug_close(connection->ssb, connection, reason); connection->state = k_tf_ssb_state_closing; _tf_ssb_connection_send_close(connection); } _tf_ssb_connection_destroy(connection, reason); } static void _tf_ssb_connection_on_tcp_alloc(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { tf_ssb_connection_t* connection = handle->data; size_t malloc_size = sizeof(connection->recv_buffer) - connection->recv_size; buf->base = malloc_size ? tf_malloc(malloc_size) : NULL; buf->len = malloc_size; } static void _tf_ssb_connection_on_write(uv_write_t* req, int status) { tf_ssb_connection_t* connection = req->data; tf_ssb_connection_adjust_write_count(connection, -1); if (status) { char buffer[256]; snprintf(buffer, sizeof(buffer), "write failed asynchronously: %s", uv_strerror(status)); _tf_ssb_connection_close(connection, buffer); } tf_free(req); } static void _tf_ssb_write(tf_ssb_connection_t* connection, void* data, size_t size) { if (connection->tcp.data) { uv_write_t* write = tf_malloc(sizeof(uv_write_t) + size); *write = (uv_write_t) { .data = connection }; memcpy(write + 1, data, size); tf_ssb_connection_adjust_write_count(connection, 1); int result = uv_write(write, (uv_stream_t*)&connection->tcp, &(uv_buf_t) { .base = (char*)(write + 1), .len = size }, 1, _tf_ssb_connection_on_write); if (result) { tf_ssb_connection_adjust_write_count(connection, -1); _tf_ssb_connection_close(connection, "write failed"); tf_free(write); } } else if (connection->tunnel_connection) { tf_ssb_connection_rpc_send( connection->tunnel_connection, k_ssb_rpc_flag_binary | k_ssb_rpc_flag_stream, -connection->tunnel_request_number, NULL, data, size, NULL, NULL, NULL); } } static void _tf_ssb_connection_send_identity(tf_ssb_connection_t* connection, uint8_t* hmac, uint8_t* pubkey) { memcpy(connection->serverepub, pubkey, sizeof(connection->serverepub)); if (crypto_auth_hmacsha512256_verify(hmac, connection->serverepub, 32, connection->ssb->network_key) != 0) { _tf_ssb_connection_close(connection, "invalid server hello"); return; } uint8_t shared_secret_ab[crypto_scalarmult_curve25519_SCALARBYTES]; if (crypto_scalarmult_curve25519(shared_secret_ab, connection->epriv, connection->serverepub) != 0) { _tf_ssb_connection_close(connection, "unable to compute shared_secret_ab as client"); return; } uint8_t servercurvepub[crypto_scalarmult_curve25519_SCALARBYTES]; if (crypto_sign_ed25519_pk_to_curve25519(servercurvepub, connection->serverpub) != 0) { _tf_ssb_connection_close(connection, "unable to compute key to curve25519 as client"); return; } uint8_t shared_secret_aB[crypto_scalarmult_curve25519_SCALARBYTES]; if (crypto_scalarmult_curve25519(shared_secret_aB, connection->epriv, servercurvepub) != 0) { _tf_ssb_connection_close(connection, "unable to compute shared_secret_aB as client"); return; } uint8_t hash[crypto_hash_sha256_BYTES]; crypto_hash_sha256(hash, shared_secret_ab, sizeof(shared_secret_ab)); uint8_t msg[sizeof(connection->ssb->network_key) + sizeof(connection->serverpub) + crypto_hash_sha256_BYTES]; memcpy(msg, connection->ssb->network_key, sizeof(connection->ssb->network_key)); memcpy(msg + sizeof(connection->ssb->network_key), connection->serverpub, sizeof(connection->serverpub)); memcpy(msg + sizeof(connection->ssb->network_key) + sizeof(connection->serverpub), hash, sizeof(hash)); unsigned long long siglen; if (crypto_sign_detached(connection->detached_signature_A, &siglen, msg, sizeof(msg), connection->ssb->priv) != 0) { _tf_ssb_connection_close(connection, "unable to compute detached_signature_A as client"); return; } uint8_t tosend[crypto_sign_BYTES + sizeof(connection->ssb->pub)]; memcpy(tosend, connection->detached_signature_A, sizeof(connection->detached_signature_A)); memcpy(tosend + sizeof(connection->detached_signature_A), connection->ssb->pub, sizeof(connection->ssb->pub)); uint8_t nonce[crypto_secretbox_NONCEBYTES] = { 0 }; uint8_t tohash[sizeof(connection->ssb->network_key) + sizeof(shared_secret_ab) + sizeof(shared_secret_aB)]; memcpy(tohash, connection->ssb->network_key, sizeof(connection->ssb->network_key)); memcpy(tohash + sizeof(connection->ssb->network_key), shared_secret_ab, sizeof(shared_secret_ab)); memcpy(tohash + sizeof(connection->ssb->network_key) + sizeof(shared_secret_ab), shared_secret_aB, sizeof(shared_secret_aB)); uint8_t hash2[crypto_hash_sha256_BYTES]; crypto_hash_sha256(hash2, tohash, sizeof(tohash)); uint8_t c[crypto_secretbox_MACBYTES + sizeof(tosend)]; if (crypto_secretbox_easy(c, tosend, sizeof(tosend), nonce, hash2) != 0) { _tf_ssb_connection_close(connection, "unable to create initial secretbox as client"); return; } static_assert(sizeof(c) == 112, "client send size"); _tf_ssb_write(connection, c, sizeof(c)); connection->state = k_tf_ssb_state_sent_identity; } static void _tf_ssb_nonce_inc(uint8_t* nonce) { int i = 23; while (++nonce[i] == 0 && i > 0) { i--; } } static void _tf_ssb_connection_box_stream_send(tf_ssb_connection_t* connection, const uint8_t* message, size_t size) { const size_t k_send_max = 4096; for (size_t offset = 0; offset < size; offset += k_send_max) { size_t send_size = size - offset > k_send_max ? k_send_max : size - offset; uint8_t* message_enc = tf_malloc(send_size + 34); uint8_t nonce1[crypto_secretbox_NONCEBYTES]; memcpy(nonce1, connection->send_nonce, sizeof(nonce1)); _tf_ssb_nonce_inc(connection->send_nonce); uint8_t nonce2[crypto_secretbox_NONCEBYTES]; memcpy(nonce2, connection->send_nonce, sizeof(nonce2)); _tf_ssb_nonce_inc(connection->send_nonce); if (crypto_secretbox_easy(message_enc + 34 - 16, message + offset, send_size, nonce2, connection->c_to_s_box_key) != 0) { _tf_ssb_connection_close(connection, "unable to secretbox message"); tf_free(message_enc); return; } uint8_t header[18]; *(uint16_t*)header = htons((uint16_t)send_size); memcpy(header + sizeof(uint16_t), message_enc + 34 - 16, 16); if (crypto_secretbox_easy(message_enc, header, sizeof(header), nonce1, connection->c_to_s_box_key) != 0) { _tf_ssb_connection_close(connection, "unable to secretbox header"); tf_free(message_enc); return; } _tf_ssb_write(connection, message_enc, send_size + 34); tf_free(message_enc); } } static void _tf_ssb_connection_dispatch_scheduled(tf_ssb_connection_t* connection) { while ((connection->active_write_count == 0 || connection->closing) && connection->scheduled_count && connection->scheduled) { tf_ssb_connection_scheduled_t scheduled = connection->scheduled[0]; memmove(connection->scheduled, connection->scheduled + 1, sizeof(tf_ssb_connection_scheduled_t) * (connection->scheduled_count - 1)); connection->scheduled_count--; tf_trace_begin(connection->ssb->trace, "scheduled callback"); scheduled.callback(connection, scheduled.user_data); tf_trace_end(connection->ssb->trace); } } void tf_ssb_connection_schedule_idle(tf_ssb_connection_t* connection, tf_ssb_scheduled_callback_t* callback, void* user_data) { connection->scheduled = tf_resize_vec(connection->scheduled, sizeof(tf_ssb_connection_scheduled_t) * (connection->scheduled_count + 1)); connection->scheduled[connection->scheduled_count++] = (tf_ssb_connection_scheduled_t) { .callback = callback, .user_data = user_data, }; _tf_ssb_connection_dispatch_scheduled(connection); } static int _request_compare(const void* a, const void* b) { int32_t ai = *(const int32_t*)a; const tf_ssb_request_t* br = (const tf_ssb_request_t*)b; return ai < br->request_number ? -1 : br->request_number < ai ? 1 : 0; } static void _tf_ssb_request_activity_timer(uv_timer_t* timer) { tf_ssb_t* ssb = timer->data; uint64_t now_ms = uv_now(ssb->loop); bool any_still_active = false; for (tf_ssb_connection_t* connection = ssb->connections; connection; connection = connection->next) { bool any_changed = false; bool last_notified_active = (now_ms - connection->last_notified_active) < k_rpc_active_ms; for (int i = 0; i < connection->requests_count; i++) { bool last_active = (now_ms - connection->requests[i].last_active) < k_rpc_active_ms; if (last_active != last_notified_active) { any_changed = true; } if (last_active) { any_still_active = true; } } if (any_changed) { _tf_ssb_notify_connections_changed(ssb, k_tf_ssb_change_update, connection); connection->last_notified_active = now_ms; } } if (any_still_active && uv_timer_get_due_in(&ssb->request_activity_timer) == 0) { uv_timer_start(&ssb->request_activity_timer, _tf_ssb_request_activity_timer, k_rpc_active_ms, 0); } } static bool _tf_ssb_connection_get_request_callback(tf_ssb_connection_t* connection, int32_t request_number, tf_ssb_rpc_callback_t** out_callback, void** out_user_data) { if (!connection->requests) { return false; } tf_ssb_request_t* request = bsearch(&request_number, connection->requests, connection->requests_count, sizeof(tf_ssb_request_t), _request_compare); if (request) { if (out_callback) { *out_callback = request->callback; } if (out_user_data) { *out_user_data = request->user_data; } request->last_active = uv_now(connection->ssb->loop); if (uv_timer_get_due_in(&connection->ssb->request_activity_timer) == 0) { uv_timer_start(&connection->ssb->request_activity_timer, _tf_ssb_request_activity_timer, k_rpc_active_ms, 0); } return true; } return false; } void tf_ssb_connection_add_request(tf_ssb_connection_t* connection, int32_t request_number, const char* name, tf_ssb_rpc_callback_t* callback, tf_ssb_callback_cleanup_t* cleanup, void* user_data, tf_ssb_connection_t* dependent_connection) { tf_ssb_request_t* existing = connection->requests_count ? bsearch(&request_number, connection->requests, connection->requests_count, sizeof(tf_ssb_request_t), _request_compare) : NULL; uint64_t now_ms = uv_now(connection->ssb->loop); if (existing) { assert(!existing->callback); assert(!existing->cleanup); assert(!existing->user_data); assert(!existing->dependent_connection); existing->last_active = now_ms; existing->callback = callback; existing->cleanup = cleanup; existing->user_data = user_data; existing->dependent_connection = dependent_connection; } else { tf_ssb_connection_remove_request(connection, request_number); tf_ssb_request_t request = { .request_number = request_number, .callback = callback, .cleanup = cleanup, .user_data = user_data, .dependent_connection = dependent_connection, .last_active = now_ms, }; snprintf(request.name, sizeof(request.name), "%s", name); int index = tf_util_insert_index(&request_number, connection->requests, connection->requests_count, sizeof(tf_ssb_request_t), _request_compare); connection->requests = tf_resize_vec(connection->requests, sizeof(tf_ssb_request_t) * (connection->requests_count + 1)); if (connection->requests_count - index) { memmove(connection->requests + index + 1, connection->requests + index, sizeof(tf_ssb_request_t) * (connection->requests_count - index)); } connection->requests[index] = request; connection->requests_count++; connection->ssb->request_count++; } if (uv_timer_get_due_in(&connection->ssb->request_activity_timer) == 0) { uv_timer_start(&connection->ssb->request_activity_timer, _tf_ssb_request_activity_timer, k_rpc_active_ms, 0); } _tf_ssb_notify_connections_changed(connection->ssb, k_tf_ssb_change_update, connection); connection->last_notified_active = now_ms; } static int _message_request_compare(const void* a, const void* b) { const char* author = a; const tf_ssb_connection_message_request_t* rb = b; return strcmp(author, rb->author); } void tf_ssb_connection_add_new_message_request(tf_ssb_connection_t* connection, const char* author, int32_t request_number, bool keys) { int index = tf_util_insert_index(author, connection->message_requests, connection->message_requests_count, sizeof(tf_ssb_connection_message_request_t), _message_request_compare); if (index < connection->message_requests_count && strcmp(author, connection->message_requests[index].author) == 0) { connection->message_requests[index].request_number = request_number; connection->message_requests[index].keys = keys; return; } connection->message_requests = tf_resize_vec(connection->message_requests, sizeof(tf_ssb_connection_message_request_t) * (connection->message_requests_count + 1)); if (connection->message_requests_count - index) { memmove(connection->message_requests + index + 1, connection->message_requests + index, sizeof(tf_ssb_connection_message_request_t) * (connection->message_requests_count - index)); } connection->message_requests[index] = (tf_ssb_connection_message_request_t) { .request_number = request_number, .keys = keys, }; snprintf(connection->message_requests[index].author, sizeof(connection->message_requests[index].author), "%s", author); connection->message_requests_count++; } void tf_ssb_connection_remove_new_message_request(tf_ssb_connection_t* connection, const char* author) { int index = tf_util_insert_index(author, connection->message_requests, connection->message_requests_count, sizeof(tf_ssb_connection_message_request_t), _message_request_compare); if (index < connection->message_requests_count && strcmp(author, connection->message_requests[index].author) == 0) { memmove(connection->message_requests + index, connection->message_requests + index + 1, sizeof(tf_ssb_connection_message_request_t) * (connection->message_requests_count - index - 1)); connection->message_requests_count--; } } void tf_ssb_connection_remove_request(tf_ssb_connection_t* connection, int32_t request_number) { tf_ssb_request_t* request = connection->requests_count ? bsearch(&request_number, connection->requests, connection->requests_count, sizeof(tf_ssb_request_t), _request_compare) : NULL; if (request) { if (request->cleanup) { request->cleanup(connection->ssb, request->user_data); } int index = request - connection->requests; memmove(request, request + 1, sizeof(tf_ssb_request_t) * (connection->requests_count - index - 1)); connection->requests_count--; connection->requests = tf_resize_vec(connection->requests, sizeof(tf_ssb_request_t) * connection->requests_count); connection->ssb->request_count--; _tf_ssb_notify_connections_changed(connection->ssb, k_tf_ssb_change_update, connection); } } void tf_ssb_connection_rpc_send(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, const char* new_request_name, const uint8_t* message, size_t size, tf_ssb_rpc_callback_t* callback, tf_ssb_callback_cleanup_t* cleanup, void* user_data) { if (!connection) { if (cleanup) { cleanup(NULL, user_data); } return; } if (flags & k_ssb_rpc_flag_new_request) { assert(request_number > 0); assert(!_tf_ssb_connection_get_request_callback(connection, request_number, NULL, NULL)); assert(new_request_name); } else if (!_tf_ssb_connection_get_request_callback(connection, request_number, NULL, NULL)) { if (flags & k_ssb_rpc_flag_binary) { tf_printf("Dropping message with no active request (%d): (%zd bytes).\n", request_number, size); } else { tf_printf("Dropping message with no active request (%d): %.*s\n", request_number, (int)size, message); } return; } uint8_t* combined = tf_malloc(9 + size); *combined = flags & k_ssb_rpc_mask_send; uint32_t u32size = htonl((uint32_t)size); memcpy(combined + 1, &u32size, sizeof(u32size)); uint32_t rn = htonl((uint32_t)request_number); memcpy(combined + 1 + sizeof(uint32_t), &rn, sizeof(rn)); memcpy(combined + 1 + 2 * sizeof(uint32_t), message, size); if (connection->ssb->verbose) { tf_printf(MAGENTA "%s RPC SEND" RESET " end/error=%s stream=%s type=%s RN=%d: [%zd B] %.*s\n", connection->name, (flags & k_ssb_rpc_flag_end_error) ? "true" : "false", (flags & k_ssb_rpc_flag_stream) ? "true" : "false", k_ssb_type_names[flags & k_ssb_rpc_mask_type], request_number, size, (flags & k_ssb_rpc_mask_type) == k_ssb_rpc_flag_binary ? 0 : (int)size, message); } _tf_ssb_connection_add_debug_message(connection, true, flags & k_ssb_rpc_mask_send, request_number, message, size); _tf_ssb_connection_box_stream_send(connection, combined, 1 + 2 * sizeof(uint32_t) + size); tf_free(combined); connection->ssb->rpc_out++; if ((flags & k_ssb_rpc_flag_end_error) || (request_number < 0 && !(flags & k_ssb_rpc_flag_stream))) { tf_ssb_connection_remove_request(connection, request_number); } else if (flags & k_ssb_rpc_flag_new_request) { tf_ssb_connection_add_request(connection, request_number, new_request_name, callback, cleanup, user_data, NULL); } } void tf_ssb_connection_rpc_send_json(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, const char* new_request_name, JSValue message, tf_ssb_rpc_callback_t* callback, tf_ssb_callback_cleanup_t* cleanup, void* user_data) { JSContext* context = connection->ssb->context; JSValue json = JS_JSONStringify(context, message, JS_NULL, JS_NULL); size_t size = 0; const char* json_string = JS_ToCStringLen(context, &size, json); tf_ssb_connection_rpc_send( connection, k_ssb_rpc_flag_json | (flags & ~k_ssb_rpc_mask_type), request_number, new_request_name, (const uint8_t*)json_string, size, callback, cleanup, user_data); JS_FreeCString(context, json_string); JS_FreeValue(context, json); } void tf_ssb_connection_rpc_send_error(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, const char* error) { JSContext* context = connection->ssb->context; JSValue message = JS_NewObject(context); const char* stack = tf_util_backtrace_string(); JS_SetPropertyStr(context, message, "name", JS_NewString(context, "Error")); JS_SetPropertyStr(context, message, "stack", JS_NewString(context, stack ? stack : "stack unavailable")); JS_SetPropertyStr(context, message, "message", JS_NewString(context, error)); tf_ssb_connection_rpc_send_json( connection, ((flags & k_ssb_rpc_flag_stream) ? (k_ssb_rpc_flag_stream) : 0) | k_ssb_rpc_flag_end_error, request_number, NULL, message, NULL, NULL, NULL); JS_FreeValue(context, message); tf_free((void*)stack); } void tf_ssb_connection_rpc_send_error_method_not_allowed(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, const char* name) { char buffer[1024]; snprintf(buffer, sizeof(buffer), "method '%s' is not in list of allowed methods", name); tf_ssb_connection_rpc_send_error(connection, flags, request_number, buffer); } static int _utf8_len(uint8_t ch) { static const uint8_t k_length[] = { 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 2, 2, 3, 4 }; return k_length[(ch & 0xf0) >> 4]; } static uint32_t _utf8_decode(uint32_t c) { if (c > 0x7f) { uint32_t mask = (c <= 0x00efbfbf) ? 0x000f0000 : 0x003f0000; c = ((c & 0x07000000) >> 6) | ((c & mask) >> 4) | ((c & 0x00003f00) >> 2) | (c & 0x0000003f); } return c; } static const uint8_t* _utf8_to_cp(const uint8_t* ch, uint32_t* out_cp) { int len = _utf8_len(*ch); int actual_len = 0; uint32_t encoding = 0; for (int i = 0; i < len && ch[i]; i++, actual_len++) { encoding = (encoding << 8) | ch[i]; } *out_cp = _utf8_decode(encoding); return ch + actual_len; } static uint32_t _cp_to_utf16(uint32_t cp, uint16_t* out_h, uint16_t* out_l) { if (cp < 0x10000) { *out_h = 0; *out_l = cp & 0xffff; return cp; } else { uint32_t t = cp - 0x10000; uint32_t h = ((t << 12) >> 22) + 0xd800; uint32_t l = ((t << 22) >> 22) + 0xdc00; *out_h = h & 0xffff; *out_l = l & 0xffff; return (h << 16) | (l & 0xffff); } } void tf_ssb_calculate_message_id(JSContext* context, JSValue message, char* out_id, size_t out_id_size) { JSValue idval = JS_JSONStringify(context, message, JS_NULL, JS_NewInt32(context, 2)); size_t len = 0; const char* messagestr = JS_ToCStringLen(context, &len, idval); if (!messagestr) { memset(out_id, 0, out_id_size); JS_FreeValue(context, idval); return; } char* latin1 = tf_strdup(messagestr); uint8_t* write_pos = (uint8_t*)latin1; const uint8_t* p = (const uint8_t*)messagestr; while (p && *p) { uint32_t cp = 0; p = _utf8_to_cp(p, &cp); uint16_t h = 0; uint16_t l = 0; _cp_to_utf16(cp, &h, &l); if (h) { *write_pos++ = h & 0xff; } if (l) { *write_pos++ = l & 0xff; } } size_t latin1_len = write_pos - (uint8_t*)latin1; *write_pos++ = '\0'; uint8_t id[crypto_hash_sha256_BYTES]; crypto_hash_sha256(id, (uint8_t*)latin1, latin1_len); char id_base64[k_id_base64_len]; tf_base64_encode(id, sizeof(id), id_base64, sizeof(id_base64)); snprintf(out_id, out_id_size, "%%%s.sha256", id_base64); tf_free(latin1); JS_FreeCString(context, messagestr); JS_FreeValue(context, idval); } static bool _tf_ssb_verify_and_strip_signature_internal(JSContext* context, JSValue val, char* out_id, size_t out_id_size, char* out_signature, size_t out_signature_size) { JSValue signature = JS_GetPropertyStr(context, val, "signature"); if (JS_IsUndefined(signature)) { memset(out_signature, 0, out_signature_size); return false; } const char* str = JS_ToCString(context, signature); if (!str) { JS_FreeValue(context, signature); memset(out_signature, 0, out_signature_size); return false; } bool verified = false; tf_ssb_calculate_message_id(context, val, out_id, out_id_size); JSAtom sigatom = JS_NewAtom(context, "signature"); JS_DeleteProperty(context, val, sigatom, 0); JS_FreeAtom(context, sigatom); if (out_signature) { memset(out_signature, 0, out_signature_size); strncpy(out_signature, str, out_signature_size - 1); } JSValue sigval = JS_JSONStringify(context, val, JS_NULL, JS_NewInt32(context, 2)); const char* sigstr = JS_ToCString(context, sigval); const char* sigkind = strstr(str, ".sig.ed25519"); JSValue authorval = JS_GetPropertyStr(context, val, "author"); const char* author = JS_ToCString(context, authorval); const char* author_id = author && *author == '@' ? author + 1 : author; const char* type = strstr(author_id, ".ed25519"); uint8_t publickey[crypto_box_PUBLICKEYBYTES]; int r = tf_base64_decode(author_id, type - author_id, publickey, sizeof(publickey)); if (r != -1) { uint8_t binsig[crypto_sign_BYTES]; r = tf_base64_decode(str, sigkind - str, binsig, sizeof(binsig)); if (r != -1) { r = crypto_sign_verify_detached(binsig, (const uint8_t*)sigstr, strlen(sigstr), publickey); verified = r == 0; if (!verified) { // tf_printf("crypto_sign_verify_detached fail (r=%d)\n", r); if (false) { tf_printf("val=[%.*s]\n", (int)strlen(sigstr), sigstr); tf_printf("sig=%.*s\n", (int)(sigkind - str), str); tf_printf("public key=%.*s\n", (int)(type - author_id), author_id); } } } else { tf_printf("base64 decode sig fail [%.*s]\n", (int)(sigkind - str), str); } } else { tf_printf("base64 decode author[%.*s] fail (%d)\n", (int)(type - author_id), author_id, r); } JS_FreeCString(context, author); JS_FreeCString(context, sigstr); JS_FreeCString(context, str); JS_FreeValue(context, sigval); JS_FreeValue(context, authorval); if (verified) { JS_FreeValue(context, signature); } else { JS_SetPropertyStr(context, val, "signature", signature); } return verified; } bool tf_ssb_verify_and_strip_signature(JSContext* context, JSValue val, char* out_id, size_t out_id_size, char* out_signature, size_t out_signature_size, int* out_flags) { JSValue reordered = JS_NewObject(context); JS_SetPropertyStr(context, reordered, "previous", JS_GetPropertyStr(context, val, "previous")); JS_SetPropertyStr(context, reordered, "author", JS_GetPropertyStr(context, val, "author")); JS_SetPropertyStr(context, reordered, "sequence", JS_GetPropertyStr(context, val, "sequence")); JS_SetPropertyStr(context, reordered, "timestamp", JS_GetPropertyStr(context, val, "timestamp")); JS_SetPropertyStr(context, reordered, "hash", JS_GetPropertyStr(context, val, "hash")); JS_SetPropertyStr(context, reordered, "content", JS_GetPropertyStr(context, val, "content")); JS_SetPropertyStr(context, reordered, "signature", JS_GetPropertyStr(context, val, "signature")); bool result = _tf_ssb_verify_and_strip_signature_internal(context, reordered, out_id, out_id_size, out_signature, out_signature_size); JS_FreeValue(context, reordered); if (result) { if (out_flags) { *out_flags = 0; } return true; } reordered = JS_NewObject(context); JS_SetPropertyStr(context, reordered, "previous", JS_GetPropertyStr(context, val, "previous")); JS_SetPropertyStr(context, reordered, "sequence", JS_GetPropertyStr(context, val, "sequence")); JS_SetPropertyStr(context, reordered, "author", JS_GetPropertyStr(context, val, "author")); JS_SetPropertyStr(context, reordered, "timestamp", JS_GetPropertyStr(context, val, "timestamp")); JS_SetPropertyStr(context, reordered, "hash", JS_GetPropertyStr(context, val, "hash")); JS_SetPropertyStr(context, reordered, "content", JS_GetPropertyStr(context, val, "content")); JS_SetPropertyStr(context, reordered, "signature", JS_GetPropertyStr(context, val, "signature")); result = _tf_ssb_verify_and_strip_signature_internal(context, reordered, out_id, out_id_size, out_signature, out_signature_size); JS_FreeValue(context, reordered); if (result) { if (out_flags) { *out_flags = k_tf_ssb_message_flag_sequence_before_author; } return true; } return false; } void tf_ssb_close_all(tf_ssb_t* ssb, const char* reason) { for (tf_ssb_connection_t* connection = ssb->connections; connection; connection = connection->next) { _tf_ssb_connection_close(connection, reason); } } void tf_ssb_send_close(tf_ssb_t* ssb) { for (tf_ssb_connection_t* connection = ssb->connections; connection; connection = connection->next) { _tf_ssb_connection_send_close(connection); } } bool tf_ssb_id_bin_to_str(char* str, size_t str_size, const uint8_t* bin) { char buffer[k_id_base64_len - 9]; tf_base64_encode(bin, crypto_sign_PUBLICKEYBYTES, buffer, sizeof(buffer)); return snprintf(str, str_size, "@%s.ed25519", buffer) < (int)str_size; } bool tf_ssb_id_str_to_bin(uint8_t* bin, const char* str) { const char* author_id = str && *str == '@' ? str + 1 : str; const char* type = str ? strstr(str, ".ed25519") : NULL; return author_id && type ? tf_base64_decode(author_id, type - author_id, bin, crypto_box_PUBLICKEYBYTES) != 0 : false; } static uint64_t _tf_ssb_callback_pre(tf_ssb_t* ssb) { return uv_hrtime(); } static void _tf_ssb_callback_post(tf_ssb_t* ssb, void* callback, uint64_t pre) { if (ssb->hitch_callback) { uint64_t post = uv_hrtime(); const char* name = tf_util_function_to_string(callback); ssb->hitch_callback(name, post - pre, ssb->hitch_user_data); tf_free((void*)name); } } static void _tf_ssb_notify_connections_changed(tf_ssb_t* ssb, tf_ssb_change_t change, tf_ssb_connection_t* connection) { tf_ssb_connections_changed_callback_node_t* next = NULL; for (tf_ssb_connections_changed_callback_node_t* node = ssb->connections_changed; node; node = next) { next = node->next; tf_trace_begin(ssb->trace, "connections_changed"); PRE_CALLBACK(ssb, node->callback); node->callback(ssb, change, connection, node->user_data); POST_CALLBACK(ssb, node->callback); tf_trace_end(ssb->trace); } } static void _tf_ssb_connection_verify_identity(tf_ssb_connection_t* connection, const uint8_t* message, size_t len) { uint8_t nonce[crypto_secretbox_NONCEBYTES] = { 0 }; uint8_t shared_secret_ab[crypto_scalarmult_curve25519_SCALARBYTES]; if (crypto_scalarmult_curve25519(shared_secret_ab, connection->epriv, connection->serverepub) != 0) { _tf_ssb_connection_close(connection, "unable to compute shared_secret_ab"); return; } uint8_t servercurvepub[crypto_scalarmult_curve25519_SCALARBYTES]; if (crypto_sign_ed25519_pk_to_curve25519(servercurvepub, connection->serverpub) != 0) { _tf_ssb_connection_close(connection, "unable to convert key to curve25519"); return; } uint8_t shared_secret_aB[crypto_scalarmult_curve25519_SCALARBYTES]; if (crypto_scalarmult_curve25519(shared_secret_aB, connection->epriv, servercurvepub) != 0) { _tf_ssb_connection_close(connection, "unable to compute shared_secret_aB"); return; } uint8_t clientcurvepriv[crypto_scalarmult_curve25519_SCALARBYTES]; if (crypto_sign_ed25519_sk_to_curve25519(clientcurvepriv, connection->ssb->priv) != 0) { _tf_ssb_connection_close(connection, "unable to convert key to curve25519"); return; } uint8_t shared_secret_Ab[crypto_scalarmult_curve25519_SCALARBYTES]; if (crypto_scalarmult_curve25519(shared_secret_Ab, clientcurvepriv, connection->serverepub) != 0) { _tf_ssb_connection_close(connection, "unable to compute shared_secret_Ab"); return; } uint8_t tohash[sizeof(connection->ssb->network_key) + sizeof(shared_secret_ab) + sizeof(shared_secret_aB) + sizeof(shared_secret_Ab)]; memcpy(tohash, connection->ssb->network_key, sizeof(connection->ssb->network_key)); memcpy(tohash + sizeof(connection->ssb->network_key), shared_secret_ab, sizeof(shared_secret_ab)); memcpy(tohash + sizeof(connection->ssb->network_key) + sizeof(shared_secret_ab), shared_secret_aB, sizeof(shared_secret_aB)); memcpy(tohash + sizeof(connection->ssb->network_key) + sizeof(shared_secret_ab) + sizeof(shared_secret_aB), shared_secret_Ab, sizeof(shared_secret_Ab)); uint8_t hash2[crypto_hash_sha256_BYTES]; crypto_hash_sha256(hash2, tohash, sizeof(tohash)); uint8_t hash3a[crypto_hash_sha256_BYTES + crypto_sign_PUBLICKEYBYTES]; crypto_hash_sha256(hash3a, hash2, sizeof(hash2)); memcpy(hash3a + crypto_hash_sha256_BYTES, connection->ssb->pub, sizeof(connection->ssb->pub)); crypto_hash_sha256(connection->s_to_c_box_key, hash3a, sizeof(hash3a)); uint8_t hash3b[crypto_hash_sha256_BYTES + crypto_sign_PUBLICKEYBYTES]; crypto_hash_sha256(hash3b, hash2, sizeof(hash2)); memcpy(hash3b + crypto_hash_sha256_BYTES, connection->serverpub, sizeof(connection->serverpub)); crypto_hash_sha256(connection->c_to_s_box_key, hash3b, sizeof(hash3b)); uint8_t m[80]; if (crypto_secretbox_open_easy(m, message, len, nonce, hash2) != 0) { _tf_ssb_connection_close(connection, "unable to open initial secret box as client"); return; } uint8_t hash3[crypto_hash_sha256_BYTES]; crypto_hash_sha256(hash3, shared_secret_ab, sizeof(shared_secret_ab)); uint8_t msg[sizeof(connection->ssb->network_key) + sizeof(connection->detached_signature_A) + sizeof(connection->ssb->pub) + sizeof(hash3)]; memcpy(msg, connection->ssb->network_key, sizeof(connection->ssb->network_key)); memcpy(msg + sizeof(connection->ssb->network_key), connection->detached_signature_A, sizeof(connection->detached_signature_A)); memcpy(msg + sizeof(connection->ssb->network_key) + sizeof(connection->detached_signature_A), connection->ssb->pub, sizeof(connection->ssb->pub)); memcpy(msg + sizeof(connection->ssb->network_key) + sizeof(connection->detached_signature_A) + sizeof(connection->ssb->pub), hash3, sizeof(hash3)); if (crypto_sign_verify_detached(m, msg, sizeof(msg), connection->serverpub) != 0) { _tf_ssb_connection_close(connection, "unable to verify server identity"); return; } uint8_t nonce2[crypto_auth_hmacsha512256_BYTES]; if (crypto_auth_hmacsha512256(nonce2, connection->epub, sizeof(connection->epub), connection->ssb->network_key) != 0) { _tf_ssb_connection_close(connection, "unable to compute client recv nonce"); return; } memcpy(connection->nonce, nonce2, sizeof(connection->nonce)); uint8_t nonce3[crypto_auth_hmacsha512256_BYTES]; if (crypto_auth_hmacsha512256(nonce3, connection->serverepub, sizeof(connection->serverepub), connection->ssb->network_key) != 0) { _tf_ssb_connection_close(connection, "unable to compute client send nonce"); return; } memcpy(connection->send_nonce, nonce3, sizeof(connection->send_nonce)); char fullid[k_id_base64_len]; tf_ssb_id_bin_to_str(fullid, sizeof(fullid), connection->serverpub); JSContext* context = connection->ssb->context; JS_SetPropertyStr(context, connection->object, "id", JS_NewString(context, fullid)); JS_SetPropertyStr(context, connection->object, "is_client", JS_TRUE); connection->state = k_tf_ssb_state_verified; if (connection->handshake_timer.data) { uv_timer_stop(&connection->handshake_timer); } _tf_ssb_notify_connections_changed(connection->ssb, k_tf_ssb_change_connect, connection); } bool tf_ssb_connection_is_client(tf_ssb_connection_t* connection) { return connection->state == k_tf_ssb_state_verified; } bool tf_ssb_connection_is_connected(tf_ssb_connection_t* connection) { return connection->state == k_tf_ssb_state_verified || connection->state == k_tf_ssb_state_server_verified; } const char* tf_ssb_connection_get_host(tf_ssb_connection_t* connection) { return connection->host; } int tf_ssb_connection_get_port(tf_ssb_connection_t* connection) { return connection->port; } bool tf_ssb_connection_get_id(tf_ssb_connection_t* connection, char* out_id, size_t out_id_size) { return connection && memcmp(connection->serverpub, (uint8_t[k_id_bin_len]) { 0 }, k_id_bin_len) != 0 && tf_ssb_id_bin_to_str(out_id, out_id_size, connection->serverpub); } tf_ssb_connection_t* tf_ssb_connection_get_tunnel(tf_ssb_connection_t* connection) { return connection ? connection->tunnel_connection : NULL; } static bool _tf_ssb_is_already_connected(tf_ssb_t* ssb, uint8_t* id, tf_ssb_connection_t* ignore_connection) { for (tf_ssb_connection_t* connection = ssb->connections; connection; connection = connection->next) { if (!ignore_connection || connection != ignore_connection) { if (memcmp(connection->serverpub, id, k_id_bin_len) == 0) { return true; } else if (memcmp(ssb->pub, id, k_id_bin_len) == 0) { return true; } } } return false; } static void _tf_ssb_connection_verify_client_identity(tf_ssb_connection_t* connection, const uint8_t* message, size_t len) { uint8_t nonce[crypto_secretbox_NONCEBYTES] = { 0 }; /* ** shared_secret_ab = nacl_scalarmult( ** server_ephemeral_sk, ** client_ephemeral_pk ** ) */ uint8_t shared_secret_ab[crypto_scalarmult_curve25519_SCALARBYTES]; if (crypto_scalarmult_curve25519(shared_secret_ab, connection->epriv, connection->serverepub) != 0) { _tf_ssb_connection_close(connection, "unable to compute shared_secret_ab"); return; } /* ** shared_secret_aB = nacl_scalarmult( ** sk_to_curve25519(server_longterm_sk), ** client_ephemeral_pk ** ) */ uint8_t curvepriv[crypto_scalarmult_curve25519_SCALARBYTES]; if (crypto_sign_ed25519_sk_to_curve25519(curvepriv, connection->ssb->priv) != 0) { _tf_ssb_connection_close(connection, "unable to convert key to curve25519"); return; } static_assert(sizeof(connection->ssb->priv) == crypto_sign_ed25519_SECRETKEYBYTES, "size"); uint8_t shared_secret_aB[crypto_scalarmult_curve25519_SCALARBYTES] = { 0 }; if (crypto_scalarmult(shared_secret_aB, curvepriv, connection->serverepub) != 0) { _tf_ssb_connection_close(connection, "unable to compute shared_secret_aB"); return; } static_assert(sizeof(connection->ssb->network_key) == crypto_auth_KEYBYTES, "network key size"); uint8_t tohash[sizeof(connection->ssb->network_key) + sizeof(shared_secret_ab) + sizeof(shared_secret_aB)]; memcpy(tohash, connection->ssb->network_key, sizeof(connection->ssb->network_key)); memcpy(tohash + sizeof(connection->ssb->network_key), shared_secret_ab, sizeof(shared_secret_ab)); memcpy(tohash + sizeof(connection->ssb->network_key) + sizeof(shared_secret_ab), shared_secret_aB, sizeof(shared_secret_aB)); uint8_t hash2[crypto_hash_sha256_BYTES]; crypto_hash_sha256(hash2, tohash, sizeof(tohash)); /* ** msg3_plaintext = assert_nacl_secretbox_open( ** ciphertext: msg3, ** nonce: 24_bytes_of_zeros, ** key: sha256( ** concat( ** network_identifier, ** shared_secret_ab, ** shared_secret_aB ** ) ** ) ** ) */ uint8_t m[96]; if (crypto_secretbox_open_easy(m, message, len, nonce, hash2) != 0) { _tf_ssb_connection_close(connection, "unable to open initial secret box as server"); return; } uint8_t* detached_signature_A = m; if (_tf_ssb_is_already_connected(connection->ssb, m + 64, connection)) { char id_base64[k_id_base64_len] = { 0 }; tf_ssb_id_bin_to_str(id_base64, sizeof(id_base64), m + 64); char reason[256]; snprintf(reason, sizeof(reason), "already connected: %s\n", id_base64); _tf_ssb_connection_close(connection, reason); return; } memcpy(connection->serverpub, m + 64, sizeof(connection->serverpub)); uint8_t hash3[crypto_hash_sha256_BYTES]; crypto_hash_sha256(hash3, shared_secret_ab, sizeof(shared_secret_ab)); uint8_t msg[sizeof(connection->ssb->network_key) + sizeof(connection->ssb->pub) + sizeof(hash3)]; memcpy(msg, connection->ssb->network_key, sizeof(connection->ssb->network_key)); memcpy(msg + sizeof(connection->ssb->network_key), connection->ssb->pub, sizeof(connection->ssb->pub)); memcpy(msg + sizeof(connection->ssb->network_key) + sizeof(connection->ssb->pub), hash3, sizeof(hash3)); if (crypto_sign_verify_detached(detached_signature_A, msg, sizeof(msg), connection->serverpub) != 0) { _tf_ssb_connection_close(connection, "unable to verify client identity"); return; } uint8_t nonce2[crypto_auth_hmacsha512256_BYTES]; if (crypto_auth_hmacsha512256(nonce2, connection->epub, sizeof(connection->epub), connection->ssb->network_key) != 0) { _tf_ssb_connection_close(connection, "unable to compute initial recv nonce as server"); return; } memcpy(connection->nonce, nonce2, sizeof(connection->nonce)); uint8_t nonce3[crypto_auth_hmacsha512256_BYTES]; if (crypto_auth_hmacsha512256(nonce3, connection->serverepub, sizeof(connection->serverepub), connection->ssb->network_key) != 0) { _tf_ssb_connection_close(connection, "unable to compute initial send nonce as server"); return; } memcpy(connection->send_nonce, nonce3, sizeof(connection->send_nonce)); int detached_signature_A_size = 64; uint8_t sign_b[sizeof(connection->ssb->network_key) + detached_signature_A_size + sizeof(connection->serverpub) + sizeof(hash3)]; memcpy(sign_b, connection->ssb->network_key, sizeof(connection->ssb->network_key)); memcpy(sign_b + sizeof(connection->ssb->network_key), detached_signature_A, detached_signature_A_size); memcpy(sign_b + sizeof(connection->ssb->network_key) + detached_signature_A_size, connection->serverpub, sizeof(connection->serverpub)); memcpy(sign_b + sizeof(connection->ssb->network_key) + detached_signature_A_size + sizeof(connection->serverpub), hash3, sizeof(hash3)); uint8_t detached_signature_B[crypto_sign_BYTES]; unsigned long long siglen; if (crypto_sign_detached(detached_signature_B, &siglen, sign_b, sizeof(sign_b), connection->ssb->priv) != 0) { _tf_ssb_connection_close(connection, "unable to compute detached_signature_B as server"); return; } uint8_t clientcurvepub[crypto_scalarmult_curve25519_SCALARBYTES]; if (crypto_sign_ed25519_pk_to_curve25519(clientcurvepub, connection->serverpub) != 0) { _tf_ssb_connection_close(connection, "unable to convert key to curve25519"); return; } uint8_t shared_secret_Ab[crypto_scalarmult_curve25519_SCALARBYTES]; if (crypto_scalarmult_curve25519(shared_secret_Ab, connection->epriv, clientcurvepub) != 0) { _tf_ssb_connection_close(connection, "unable to compute shared_secret_Ab as server"); return; } uint8_t key_buf[sizeof(connection->ssb->network_key) + sizeof(shared_secret_ab) + sizeof(shared_secret_aB) + sizeof(shared_secret_Ab)]; memcpy(key_buf, connection->ssb->network_key, sizeof(connection->ssb->network_key)); memcpy(key_buf + sizeof(connection->ssb->network_key), shared_secret_ab, sizeof(shared_secret_ab)); memcpy(key_buf + sizeof(connection->ssb->network_key) + sizeof(shared_secret_ab), shared_secret_aB, sizeof(shared_secret_aB)); memcpy(key_buf + sizeof(connection->ssb->network_key) + sizeof(shared_secret_ab) + sizeof(shared_secret_aB), shared_secret_Ab, sizeof(shared_secret_Ab)); uint8_t key_hash[crypto_hash_sha256_BYTES]; crypto_hash_sha256(key_hash, key_buf, sizeof(key_buf)); uint8_t hash3a[crypto_hash_sha256_BYTES + crypto_sign_PUBLICKEYBYTES]; crypto_hash_sha256(hash3a, key_hash, sizeof(key_hash)); memcpy(hash3a + crypto_hash_sha256_BYTES, connection->ssb->pub, sizeof(connection->ssb->pub)); crypto_hash_sha256(connection->s_to_c_box_key, hash3a, sizeof(hash3a)); uint8_t hash3b[crypto_hash_sha256_BYTES + crypto_sign_PUBLICKEYBYTES]; crypto_hash_sha256(hash3b, key_hash, sizeof(key_hash)); memcpy(hash3b + crypto_hash_sha256_BYTES, connection->serverpub, sizeof(connection->serverpub)); crypto_hash_sha256(connection->c_to_s_box_key, hash3b, sizeof(hash3b)); uint8_t c[crypto_secretbox_MACBYTES + sizeof(detached_signature_B)]; if (crypto_secretbox_easy(c, detached_signature_B, sizeof(detached_signature_B), nonce, key_hash) != 0) { _tf_ssb_connection_close(connection, "unable to create initial secret box as server"); return; } static_assert(sizeof(c) == 80, "server send size"); _tf_ssb_write(connection, c, sizeof(c)); char fullid[k_id_base64_len]; tf_ssb_id_bin_to_str(fullid, sizeof(fullid), connection->serverpub); JSContext* context = connection->ssb->context; JS_SetPropertyStr(context, connection->object, "id", JS_NewString(context, fullid)); JS_SetPropertyStr(context, connection->object, "is_client", JS_FALSE); connection->state = k_tf_ssb_state_server_verified; if (connection->handshake_timer.data) { uv_timer_stop(&connection->handshake_timer); } _tf_ssb_notify_connections_changed(connection->ssb, k_tf_ssb_change_connect, connection); } static bool _tf_ssb_connection_recv_pop(tf_ssb_connection_t* connection, uint8_t* buffer, size_t size) { if (size >= sizeof(connection->recv_buffer)) { char message[256]; snprintf(message, sizeof(message), "Trying to pop a message (%zd) larger than the connection's receive buffer (%zd).", size, sizeof(connection->recv_buffer)); _tf_ssb_connection_close(connection, message); } if (connection->recv_size < size) { return false; } memcpy(buffer, connection->recv_buffer, size); if (connection->recv_size - size) { memmove(connection->recv_buffer, connection->recv_buffer + size, connection->recv_size - size); } connection->recv_size -= size; return true; } static void _tf_ssb_name_to_string(JSContext* context, JSValue object, char* buffer, size_t size) { JSValue name = JS_GetPropertyStr(context, object, "name"); if (JS_IsArray(context, name)) { int length = tf_util_get_length(context, name); int offset = 0; for (int i = 0; i < length; i++) { JSValue part = JS_GetPropertyUint32(context, name, i); const char* part_str = JS_ToCString(context, part); offset += snprintf(buffer + offset, size - offset, "%s%s", i == 0 ? "" : ".", part_str); JS_FreeCString(context, part_str); JS_FreeValue(context, part); } } else if (JS_IsString(name)) { const char* part_str = JS_ToCString(context, name); snprintf(buffer, size, "%s", part_str); JS_FreeCString(context, part_str); } JS_FreeValue(context, name); } static void _tf_ssb_connection_rpc_recv(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, const uint8_t* message, size_t size) { connection->ssb->rpc_in++; _tf_ssb_connection_add_debug_message(connection, false, flags, request_number, message, size); bool close_connection = false; if (size == 0) { _tf_ssb_connection_close(connection, "rpc recv zero"); return; } else if (flags & k_ssb_rpc_flag_json) { char id[k_id_base64_len] = ""; tf_ssb_id_bin_to_str(id, sizeof(id), connection->serverpub); if (connection->ssb->verbose) { tf_printf(CYAN "%s RPC RECV" RESET " end/error=%s stream=%s type=%s RN=%d: [%zd B] %.*s\n", connection->name, (flags & k_ssb_rpc_flag_end_error) ? "true" : "false", (flags & k_ssb_rpc_flag_stream) ? "true" : "false", k_ssb_type_names[flags & k_ssb_rpc_mask_type], request_number, size, (int)size, message); } JSContext* context = connection->ssb->context; JSValue val = JS_ParseJSON(context, (const char*)message, size, NULL); if (!JS_IsUndefined(val)) { tf_ssb_rpc_callback_t* callback = NULL; void* user_data = NULL; if (_tf_ssb_connection_get_request_callback(connection, -request_number, &callback, &user_data)) { if (callback) { char buffer[64]; snprintf(buffer, sizeof(buffer), "request %d", request_number); tf_trace_begin(connection->ssb->trace, buffer); PRE_CALLBACK(connection->ssb, callback); callback(connection, flags, request_number, val, message, size, user_data); POST_CALLBACK(connection->ssb, callback); tf_trace_end(connection->ssb->trace); if (!(flags & k_ssb_rpc_flag_stream)) { tf_ssb_connection_remove_request(connection, -request_number); } } } else if (JS_IsObject(val)) { bool found = false; char name[256] = ""; _tf_ssb_name_to_string(context, val, name, sizeof(name)); for (tf_ssb_rpc_callback_node_t* it = connection->ssb->rpc; it; it = it->next) { if (strcmp(name, it->name) == 0) { tf_ssb_connection_add_request(connection, -request_number, name, NULL, NULL, NULL, NULL); tf_trace_begin(connection->ssb->trace, it->name); PRE_CALLBACK(connection->ssb, it->callback); it->callback(connection, flags, request_number, val, message, size, it->user_data); POST_CALLBACK(connection->ssb, it->callback); tf_trace_end(connection->ssb->trace); found = true; break; } } if (!found && strcmp(name, "Error") != 0) { tf_ssb_connection_add_request(connection, -request_number, name, NULL, NULL, NULL, NULL); tf_ssb_connection_rpc_send_error_method_not_allowed(connection, flags, -request_number, name); } } } else { tf_printf("Failed to parse %.*s\n", (int)size, message); close_connection = true; } JS_FreeValue(context, val); } else if ((flags & k_ssb_rpc_mask_type) == k_ssb_rpc_flag_binary) { if (connection->ssb->verbose) { tf_printf(CYAN "%s RPC RECV" RESET " end/error=%s stream=%s type=%s RN=%d: [%zd B]\n", connection->name, (flags & k_ssb_rpc_flag_end_error) ? "true" : "false", (flags & k_ssb_rpc_flag_stream) ? "true" : "false", k_ssb_type_names[flags & k_ssb_rpc_mask_type], request_number, size); } tf_ssb_rpc_callback_t* callback = NULL; void* user_data = NULL; if (_tf_ssb_connection_get_request_callback(connection, -request_number, &callback, &user_data)) { if (callback) { char buffer[64]; snprintf(buffer, sizeof(buffer), "request %d", request_number); tf_trace_begin(connection->ssb->trace, buffer); PRE_CALLBACK(connection->ssb, callback); callback(connection, flags, request_number, JS_UNDEFINED, message, size, user_data); POST_CALLBACK(connection->ssb, callback); tf_trace_end(connection->ssb->trace); } } } if (close_connection) { tf_ssb_connection_close(connection); } } static void _tf_ssb_connection_rpc_recv_push(tf_ssb_connection_t* connection, const uint8_t* data, size_t size) { size_t size_left = size; size_t size_processed = 0; while (size_left > 0) { size_t copy_size = (connection->rpc_recv_size + size_left > sizeof(connection->rpc_recv_buffer)) ? sizeof(connection->rpc_recv_buffer) - connection->rpc_recv_size : size_left; if (copy_size == 0) { _tf_ssb_connection_close(connection, "recv buffer overflow"); return; } memcpy(connection->rpc_recv_buffer + connection->rpc_recv_size, data + size_processed, copy_size); connection->rpc_recv_size += copy_size; size_processed += copy_size; size_left -= copy_size; while (connection->rpc_recv_size >= 9) { uint8_t flags = *connection->rpc_recv_buffer; uint32_t body_len; int32_t request_number; memcpy(&body_len, connection->rpc_recv_buffer + 1, sizeof(body_len)); body_len = htonl(body_len); memcpy(&request_number, connection->rpc_recv_buffer + 1 + sizeof(body_len), sizeof(request_number)); request_number = htonl(request_number); size_t rpc_size = 9 + body_len; if (connection->rpc_recv_size >= rpc_size) { uint8_t* end = &connection->rpc_recv_buffer[9 + body_len]; uint8_t tmp = *end; *end = '\0'; _tf_ssb_connection_rpc_recv(connection, flags, request_number, connection->rpc_recv_buffer + 9, body_len); *end = tmp; memmove(connection->rpc_recv_buffer, connection->rpc_recv_buffer + rpc_size, connection->rpc_recv_size - rpc_size); connection->rpc_recv_size -= rpc_size; } else { /* Wait for more body. */ break; } } } } static bool _tf_ssb_connection_box_stream_recv(tf_ssb_connection_t* connection) { if (!connection->body_len) { uint8_t header_enc[34]; if (_tf_ssb_connection_recv_pop(connection, header_enc, sizeof(header_enc))) { uint8_t header[18]; if (crypto_secretbox_open_easy(header, header_enc, sizeof(header_enc), connection->nonce, connection->s_to_c_box_key) != 0) { _tf_ssb_connection_close(connection, "failed to open header secret box"); return false; } _tf_ssb_nonce_inc(connection->nonce); connection->body_len = htons(*(uint16_t*)header); memcpy(connection->body_auth_tag, header + sizeof(uint16_t), sizeof(connection->body_auth_tag)); if (!connection->body_len) { _tf_ssb_connection_close(connection, "empty body, graceful close"); } } else { return false; } } if (connection->body_len) { memcpy(connection->box_stream_buf, connection->body_auth_tag, sizeof(connection->body_auth_tag)); if (_tf_ssb_connection_recv_pop(connection, connection->box_stream_buf + 16, connection->body_len)) { if (crypto_secretbox_open_easy(connection->secretbox_buf, connection->box_stream_buf, 16 + connection->body_len, connection->nonce, connection->s_to_c_box_key) != 0) { _tf_ssb_connection_close(connection, "failed to open secret box"); return false; } _tf_ssb_nonce_inc(connection->nonce); _tf_ssb_connection_rpc_recv_push(connection, connection->secretbox_buf, connection->body_len); connection->body_len = 0; } else { return false; } } return true; } JSValue tf_ssb_sign_message(tf_ssb_t* ssb, const char* author, const uint8_t* private_key, JSValue message, const char* previous_id, int64_t previous_sequence) { char actual_previous_id[crypto_hash_sha256_BYTES * 2]; int64_t actual_previous_sequence = 0; bool have_previous = false; if (previous_id) { have_previous = *previous_id && previous_sequence > 0; snprintf(actual_previous_id, sizeof(actual_previous_id), "%s", previous_id); actual_previous_sequence = previous_sequence; } else { have_previous = tf_ssb_db_get_latest_message_by_author(ssb, author, &actual_previous_sequence, actual_previous_id, sizeof(actual_previous_id)); } JSContext* context = ssb->context; JSValue root = JS_NewObject(context); JS_SetPropertyStr(context, root, "previous", have_previous ? JS_NewString(context, actual_previous_id) : JS_NULL); JS_SetPropertyStr(context, root, "author", JS_NewString(context, author)); JS_SetPropertyStr(context, root, "sequence", JS_NewInt64(context, actual_previous_sequence + 1)); int64_t now = (int64_t)time(NULL); JS_SetPropertyStr(context, root, "timestamp", JS_NewInt64(context, now * 1000LL)); JSValue hashstr = JS_NewString(context, "sha256"); JS_SetPropertyStr(context, root, "hash", hashstr); JSValue content = JS_DupValue(context, message); JS_SetPropertyStr(context, root, "content", content); JSValue jsonval = JS_JSONStringify(context, root, JS_NULL, JS_NewInt32(context, 2)); size_t len = 0; const char* json = JS_ToCStringLen(context, &len, jsonval); JS_FreeValue(context, jsonval); uint8_t signature[crypto_sign_BYTES]; unsigned long long siglen; bool valid = crypto_sign_detached(signature, &siglen, (const uint8_t*)json, len, private_key) == 0; JS_FreeCString(context, json); if (!valid) { tf_printf("crypto_sign_detached failed\n"); JS_FreeValue(context, root); root = JS_UNDEFINED; } else { char signature_base64[crypto_sign_BYTES * 2]; tf_base64_encode(signature, sizeof(signature), signature_base64, sizeof(signature_base64)); strcat(signature_base64, ".sig.ed25519"); JSValue sigstr = JS_NewString(context, signature_base64); JS_SetPropertyStr(context, root, "signature", sigstr); } return root; } static void _tf_ssb_connection_destroy(tf_ssb_connection_t* connection, const char* reason) { tf_ssb_t* ssb = connection->ssb; connection->closing = true; if (!connection->destroy_reason) { connection->destroy_reason = reason; } _tf_ssb_connection_dispatch_scheduled(connection); tf_free(connection->scheduled); connection->scheduled = NULL; while (connection->requests) { tf_ssb_connection_remove_request(connection, connection->requests->request_number); } for (tf_ssb_broadcast_t* node = ssb->broadcasts; node; node = node->next) { if (node->tunnel_connection == connection) { node->tunnel_connection = NULL; node->mtime = 0; } } for (tf_ssb_connection_t** it = &connection->ssb->connections; *it; it = &(*it)->next) { for (int i = (*it)->requests_count - 1; i >= 0; i--) { if ((*it)->requests[i].dependent_connection == connection) { tf_ssb_connection_remove_request(*it, (*it)->requests[i].request_number); } } if (*it == connection) { *it = connection->next; connection->next = NULL; ssb->connections_count--; _tf_ssb_notify_connections_changed(ssb, k_tf_ssb_change_remove, connection); break; } } bool again = true; while (again) { again = false; for (tf_ssb_connection_t* it = connection->ssb->connections; it; it = it->next) { if (it->tunnel_connection == connection) { it->tunnel_connection = NULL; _tf_ssb_connection_close(it, "tunnel closed"); again = true; break; } else if (it == connection->tunnel_connection) { tf_ssb_connection_remove_request(it, connection->tunnel_request_number); connection->tunnel_connection = NULL; connection->tunnel_request_number = 0; } } } if (!JS_IsUndefined(connection->object)) { JSValue object = connection->object; connection->object = JS_UNDEFINED; JS_SetOpaque(object, NULL); JS_FreeValue(ssb->context, object); } if (connection->async.data && !uv_is_closing((uv_handle_t*)&connection->async)) { uv_close((uv_handle_t*)&connection->async, _tf_ssb_connection_on_close); } if (connection->tcp.data && !uv_is_closing((uv_handle_t*)&connection->tcp)) { uv_close((uv_handle_t*)&connection->tcp, _tf_ssb_connection_on_close); } if (connection->handshake_timer.data && !uv_is_closing((uv_handle_t*)&connection->handshake_timer)) { uv_close((uv_handle_t*)&connection->handshake_timer, _tf_ssb_connection_on_close); } if (JS_IsUndefined(connection->object) && !connection->async.data && !connection->tcp.data && !connection->connect.data && !connection->handshake_timer.data && connection->ref_count == 0) { tf_free(connection->message_requests); connection->message_requests = NULL; connection->message_requests_count = 0; for (int i = 0; i < k_debug_close_message_count; i++) { tf_free(connection->debug_messages[i]); connection->debug_messages[i] = NULL; } if (--connection->ssb->connection_ref_count == 0 && connection->ssb->shutting_down) { tf_ssb_destroy(connection->ssb); } tf_free(connection); } } static void _tf_ssb_connection_on_close(uv_handle_t* handle) { tf_ssb_connection_t* connection = handle->data; handle->data = NULL; if (connection && connection->closing) { _tf_ssb_connection_destroy(connection, "handle closed"); } } static void _tf_ssb_connection_on_tcp_recv_internal(tf_ssb_connection_t* connection, const void* data, ssize_t nread) { if (nread >= 0) { if (connection->recv_size + nread > sizeof(connection->recv_buffer)) { _tf_ssb_connection_close(connection, "recv buffer overflow"); return; } memcpy(connection->recv_buffer + connection->recv_size, data, nread); connection->recv_size += nread; switch (connection->state) { case k_tf_ssb_state_invalid: _tf_ssb_connection_close(connection, "received a message in invalid state"); break; case k_tf_ssb_state_connected: _tf_ssb_connection_close(connection, "received a message in connected state"); break; case k_tf_ssb_state_sent_hello: { uint8_t hello[64]; if (_tf_ssb_connection_recv_pop(connection, hello, sizeof(hello))) { _tf_ssb_connection_send_identity(connection, hello, hello + 32); } } break; case k_tf_ssb_state_sent_identity: { uint8_t identity[80]; if (_tf_ssb_connection_recv_pop(connection, identity, sizeof(identity))) { _tf_ssb_connection_verify_identity(connection, identity, sizeof(identity)); } } break; case k_tf_ssb_state_verified: uv_async_send(&connection->async); break; case k_tf_ssb_state_server_wait_hello: { uint8_t hello[64]; if (_tf_ssb_connection_recv_pop(connection, hello, sizeof(hello))) { uint8_t* hmac = hello; memcpy(connection->serverepub, hello + crypto_box_PUBLICKEYBYTES, crypto_box_PUBLICKEYBYTES); static_assert(sizeof(connection->serverepub) == crypto_box_PUBLICKEYBYTES, "serverepub size"); if (crypto_auth_hmacsha512256_verify(hmac, connection->serverepub, 32, connection->ssb->network_key) != 0) { _tf_ssb_connection_close(connection, "crypto_auth_hmacsha512256_verify failed"); } else { _tf_ssb_connection_client_send_hello(connection); connection->state = k_tf_ssb_state_server_wait_client_identity; } } } break; case k_tf_ssb_state_server_wait_client_identity: { uint8_t identity[112]; if (_tf_ssb_connection_recv_pop(connection, identity, sizeof(identity))) { _tf_ssb_connection_verify_client_identity(connection, identity, sizeof(identity)); } } break; case k_tf_ssb_state_server_verified: uv_async_send(&connection->async); break; case k_tf_ssb_state_closing: break; } } else if (nread == UV_ENOBUFS) { /* Our read buffer is full. Try harder to process messages. */ uv_async_send(&connection->async); } else { _tf_ssb_connection_close(connection, uv_strerror(nread)); } } static void _tf_ssb_connection_on_tcp_recv(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) { tf_ssb_connection_t* connection = stream->data; _tf_ssb_connection_on_tcp_recv_internal(connection, buf->base, nread); tf_free(buf->base); } static void _tf_ssb_connection_client_send_hello(tf_ssb_connection_t* connection) { char write[crypto_auth_BYTES + crypto_box_PUBLICKEYBYTES]; if (crypto_box_keypair(connection->epub, connection->epriv) != 0) { _tf_ssb_connection_close(connection, "failed to generate ephemeral keypair"); return; } uint8_t a[crypto_auth_hmacsha512256_BYTES]; if (crypto_auth_hmacsha512256(a, connection->epub, sizeof(connection->epub), connection->ssb->network_key) != 0) { _tf_ssb_connection_close(connection, "failed to create hello message"); return; } char* b = write; memcpy(b, a, crypto_auth_BYTES); memcpy(b + crypto_auth_BYTES, connection->epub, sizeof(connection->epub)); _tf_ssb_write(connection, b, crypto_auth_BYTES + sizeof(connection->epub)); connection->state = k_tf_ssb_state_sent_hello; } static bool _tf_ssb_connection_read_start(tf_ssb_connection_t* connection) { int result = uv_read_start((uv_stream_t*)&connection->tcp, _tf_ssb_connection_on_tcp_alloc, _tf_ssb_connection_on_tcp_recv); if (result && result != UV_EALREADY) { tf_printf("uv_read_start => %s\n", uv_strerror(result)); _tf_ssb_connection_close(connection, "uv_read_start failed"); return false; } return true; } static bool _tf_ssb_connection_read_stop(tf_ssb_connection_t* connection) { int result = uv_read_stop((uv_stream_t*)&connection->tcp); if (result && result != UV_EALREADY) { tf_printf("uv_read_stop => %s\n", uv_strerror(result)); _tf_ssb_connection_close(connection, "uv_read_stop failed"); return false; } return true; } static void _tf_ssb_connection_on_connect(uv_connect_t* connect, int status) { tf_ssb_connection_t* connection = connect->data; connect->data = NULL; if (status == 0) { connection->state = k_tf_ssb_state_connected; if (_tf_ssb_connection_read_start(connection)) { _tf_ssb_connection_client_send_hello(connection); } } else { _tf_ssb_connection_close(connection, "uv_tcp_connect failed"); } } static void _load_keys_callback(const char* identity, void* user_data) { tf_ssb_t* ssb = user_data; if (*ssb->pub) { return; } tf_ssb_id_str_to_bin(ssb->pub, identity); tf_ssb_db_identity_get_private_key(ssb, ":admin", identity, ssb->priv, sizeof(ssb->priv)); } static bool _tf_ssb_load_keys(tf_ssb_t* ssb) { tf_ssb_db_identity_visit(ssb, ":admin", _load_keys_callback, ssb); return *ssb->pub != '\0' && *ssb->priv != '\0'; } static void _tf_ssb_trace_timer(uv_timer_t* timer) { tf_ssb_t* ssb = timer->data; const char* names[] = { "connections", "broadcasts", "rpc", "connections_changed", "message_added", "blob_want_added", "broadcasts_changed", }; int64_t values[] = { ssb->connections_count, ssb->broadcasts_count, ssb->rpc_count, ssb->connections_changed_count, ssb->message_added_count, ssb->blob_want_added_count, ssb->broadcasts_changed_count, }; tf_trace_counter(ssb->trace, "ssb", _countof(values), names, values); } void tf_ssb_get_stats(tf_ssb_t* ssb, tf_ssb_stats_t* out_stats) { *out_stats = (tf_ssb_stats_t) { .connections = ssb->connections_count, .broadcasts = ssb->broadcasts_count, .messages_stored = ssb->messages_stored, .blobs_stored = ssb->blobs_stored, .rpc_in = ssb->rpc_in, .rpc_out = ssb->rpc_out, .request_count = ssb->request_count, .callbacks = { .rpc = ssb->rpc_count, .connections_changed = ssb->connections_changed_count, .message_added = ssb->message_added_count, .blob_want_added = ssb->blob_want_added_count, .broadcasts_changed = ssb->broadcasts_changed_count, }, }; ssb->messages_stored = 0; ssb->blobs_stored = 0; ssb->rpc_in = 0; ssb->rpc_out = 0; } tf_ssb_t* tf_ssb_create(uv_loop_t* loop, JSContext* context, const char* db_path, const char* network_key) { tf_ssb_t* ssb = tf_malloc(sizeof(tf_ssb_t)); memset(ssb, 0, sizeof(*ssb)); ssb->is_replicator = true; const char* actual_key = network_key ? network_key : k_ssb_network_string; if (sodium_hex2bin(ssb->network_key, sizeof(ssb->network_key), actual_key, strlen(actual_key), ": ", NULL, NULL)) { tf_printf("Error parsing network key: %s.", actual_key); } char buffer[8] = { 0 }; size_t buffer_size = sizeof(buffer); ssb->store_debug_messages = uv_os_getenv("TF_DEBUG_CLOSE", buffer, &buffer_size) == 0 && strcmp(buffer, "1") == 0; buffer_size = sizeof(buffer); ssb->verbose = uv_os_getenv("TF_SSB_VERBOSE", buffer, &buffer_size) == 0 && strcmp(buffer, "1") == 0; if (context) { ssb->context = context; } else { ssb->own_context = true; JSMallocFunctions funcs = { 0 }; tf_get_js_malloc_functions(&funcs); ssb->runtime = JS_NewRuntime2(&funcs, NULL); ssb->context = JS_NewContext(ssb->runtime); } uv_mutex_init(&ssb->db_readers_lock); uv_mutex_init(&ssb->db_writer_lock); JS_NewClassID(&_connection_class_id); JSClassDef def = { .class_name = "connection", .finalizer = _tf_ssb_connection_finalizer, }; JS_NewClass(JS_GetRuntime(ssb->context), _connection_class_id, &def); ssb->db_path = tf_strdup(db_path); sqlite3_open_v2(db_path, &ssb->db_writer, SQLITE_OPEN_READWRITE | SQLITE_OPEN_URI | SQLITE_OPEN_CREATE, NULL); tf_ssb_db_init(ssb); if (loop) { ssb->loop = loop; } else { uv_loop_init(&ssb->own_loop); ssb->loop = &ssb->own_loop; } ssb->broadcast_timer.data = ssb; uv_timer_init(ssb->loop, &ssb->broadcast_timer); ssb->trace_timer.data = ssb; uv_timer_init(ssb->loop, &ssb->trace_timer); uv_timer_start(&ssb->trace_timer, _tf_ssb_trace_timer, 100, 100); uv_unref((uv_handle_t*)&ssb->trace_timer); ssb->request_activity_timer.data = ssb; uv_timer_init(ssb->loop, &ssb->request_activity_timer); uv_timer_start(&ssb->request_activity_timer, _tf_ssb_request_activity_timer, k_rpc_active_ms, 0); uv_unref((uv_handle_t*)&ssb->request_activity_timer); if (!_tf_ssb_load_keys(ssb)) { tf_printf("Generating a new keypair.\n"); tf_ssb_db_identity_create(ssb, ":admin", ssb->pub, ssb->priv); } ssb->connections_tracker = tf_ssb_connections_create(ssb); _tf_ssb_update_settings(ssb); tf_ssb_rpc_register(ssb); _tf_ssb_start_update_settings(ssb); return ssb; } void tf_ssb_start_periodic(tf_ssb_t* ssb) { tf_ssb_rpc_start_periodic(ssb); } static void _tf_ssb_assert_not_main_thread(tf_ssb_t* ssb) { if (uv_thread_self() == ssb->thread_self) { const char* bt = tf_util_backtrace_string(); tf_printf("Acquiring DB from the main thread:\n%s\n", bt); tf_free((void*)bt); abort(); } } sqlite3* tf_ssb_acquire_db_reader(tf_ssb_t* ssb) { tf_trace_begin(ssb->trace, "db_reader"); _tf_ssb_assert_not_main_thread(ssb); sqlite3* db = NULL; uv_mutex_lock(&ssb->db_readers_lock); if (ssb->db_readers_count) { db = ssb->db_readers[--ssb->db_readers_count]; } else { sqlite3_open_v2(ssb->db_path, &db, SQLITE_OPEN_READONLY | SQLITE_OPEN_URI, NULL); tf_ssb_db_init_reader(db); } tf_trace_sqlite(ssb->trace, db); uv_mutex_unlock(&ssb->db_readers_lock); sqlite3_set_authorizer(db, NULL, NULL); return db; } sqlite3* tf_ssb_acquire_db_reader_restricted(tf_ssb_t* ssb) { sqlite3* db = tf_ssb_acquire_db_reader(ssb); sqlite3_set_authorizer(db, tf_ssb_sqlite_authorizer, ssb); return db; } void tf_ssb_release_db_reader(tf_ssb_t* ssb, sqlite3* db) { sqlite3_db_release_memory(db); uv_mutex_lock(&ssb->db_readers_lock); ssb->db_readers = tf_resize_vec(ssb->db_readers, sizeof(sqlite3*) * (ssb->db_readers_count + 1)); ssb->db_readers[ssb->db_readers_count++] = db; uv_mutex_unlock(&ssb->db_readers_lock); tf_trace_end(ssb->trace); } sqlite3* tf_ssb_acquire_db_writer(tf_ssb_t* ssb) { tf_trace_begin(ssb->trace, "db_writer"); _tf_ssb_assert_not_main_thread(ssb); uv_mutex_lock(&ssb->db_writer_lock); sqlite3* writer = ssb->db_writer; assert(writer); ssb->db_writer = NULL; return writer; } void tf_ssb_release_db_writer(tf_ssb_t* ssb, sqlite3* db) { assert(ssb->db_writer == NULL); sqlite3_db_release_memory(db); ssb->db_writer = db; uv_mutex_unlock(&ssb->db_writer_lock); tf_trace_end(ssb->trace); } uv_loop_t* tf_ssb_get_loop(tf_ssb_t* ssb) { return ssb->loop; } void tf_ssb_generate_keys(tf_ssb_t* ssb) { crypto_sign_ed25519_keypair(ssb->pub, ssb->priv); } void tf_ssb_generate_keys_buffer(char* out_public, size_t public_size, char* out_private, size_t private_size) { uint8_t public[crypto_sign_PUBLICKEYBYTES]; uint8_t private[crypto_sign_SECRETKEYBYTES]; crypto_sign_ed25519_keypair(public, private); char buffer[512]; tf_base64_encode(public, sizeof(public), buffer, sizeof(buffer)); snprintf(out_public, public_size, "%s.ed25519", buffer); tf_base64_encode(private, sizeof(private), buffer, sizeof(buffer)); snprintf(out_private, private_size, "%s.ed25519", buffer); } void tf_ssb_get_private_key(tf_ssb_t* ssb, uint8_t* out_private, size_t private_size) { memcpy(out_private, ssb->priv, tf_min(private_size, sizeof(ssb->priv))); } void tf_ssb_set_trace(tf_ssb_t* ssb, tf_trace_t* trace) { ssb->trace = trace; sqlite3* db = tf_ssb_acquire_db_writer(ssb); if (trace && db) { tf_trace_sqlite(trace, db); } tf_ssb_release_db_writer(ssb, db); if (trace) { uv_mutex_lock(&ssb->db_readers_lock); for (int i = 0; i < ssb->db_readers_count; i++) { tf_trace_sqlite(trace, ssb->db_readers[i]); } uv_mutex_unlock(&ssb->db_readers_lock); } } tf_trace_t* tf_ssb_get_trace(tf_ssb_t* ssb) { return ssb->trace; } JSContext* tf_ssb_get_context(tf_ssb_t* ssb) { if (ssb->thread_self && uv_thread_self() != ssb->thread_self) { const char* bt = tf_util_backtrace_string(); tf_printf("Acquiring JS context from non-main thread:\n%s\n", bt); tf_free((void*)bt); abort(); } return ssb->context; } static void _tf_ssb_on_handle_close(uv_handle_t* handle) { handle->data = NULL; } static void _tf_ssb_on_timer_close(uv_handle_t* handle) { tf_free(handle->data); } void tf_ssb_destroy(tf_ssb_t* ssb) { tf_printf("tf_ssb_destroy\n"); ssb->shutting_down = true; if (ssb->connections_tracker) { tf_ssb_connections_destroy(ssb->connections_tracker); ssb->connections_tracker = NULL; } if (ssb->broadcast_listener.data && !uv_is_closing((uv_handle_t*)&ssb->broadcast_listener)) { uv_close((uv_handle_t*)&ssb->broadcast_listener, _tf_ssb_on_handle_close); } if (ssb->broadcast_sender.data && !uv_is_closing((uv_handle_t*)&ssb->broadcast_sender)) { uv_close((uv_handle_t*)&ssb->broadcast_sender, _tf_ssb_on_handle_close); } if (ssb->broadcast_timer.data && !uv_is_closing((uv_handle_t*)&ssb->broadcast_timer)) { uv_close((uv_handle_t*)&ssb->broadcast_timer, _tf_ssb_on_handle_close); } if (ssb->broadcast_cleanup_timer.data && !uv_is_closing((uv_handle_t*)&ssb->broadcast_cleanup_timer)) { uv_close((uv_handle_t*)&ssb->broadcast_cleanup_timer, _tf_ssb_on_handle_close); } if (ssb->trace_timer.data && !uv_is_closing((uv_handle_t*)&ssb->trace_timer)) { uv_close((uv_handle_t*)&ssb->trace_timer, _tf_ssb_on_handle_close); } if (ssb->request_activity_timer.data && !uv_is_closing((uv_handle_t*)&ssb->request_activity_timer)) { uv_close((uv_handle_t*)&ssb->request_activity_timer, _tf_ssb_on_handle_close); } if (ssb->server.data && !uv_is_closing((uv_handle_t*)&ssb->server)) { uv_close((uv_handle_t*)&ssb->server, _tf_ssb_on_handle_close); } for (int i = 0; i < ssb->timers_count; i++) { uv_close((uv_handle_t*)&ssb->timers[i]->timer, _tf_ssb_on_timer_close); } ssb->timers_count = 0; tf_free(ssb->timers); ssb->timers = NULL; tf_printf("Waiting for closes.\n"); while (ssb->broadcast_listener.data || ssb->broadcast_sender.data || ssb->broadcast_timer.data || ssb->broadcast_cleanup_timer.data || ssb->trace_timer.data || ssb->server.data || ssb->ref_count || ssb->request_activity_timer.data) { uv_run(ssb->loop, UV_RUN_ONCE); } tf_printf("Waiting for rpc.\n"); while (ssb->rpc) { tf_ssb_rpc_callback_node_t* node = ssb->rpc; ssb->rpc = node->next; ssb->rpc_count--; if (node->cleanup) { node->cleanup(ssb, node->user_data); node->cleanup = NULL; } tf_free(node); } while (ssb->connections_changed) { tf_ssb_connections_changed_callback_node_t* node = ssb->connections_changed; ssb->connections_changed = node->next; ssb->connections_changed_count--; if (node->cleanup) { node->cleanup(ssb, node->user_data); } tf_free(node); } while (ssb->message_added) { tf_ssb_message_added_callback_node_t* node = ssb->message_added; ssb->message_added = node->next; ssb->message_added_count--; if (node->cleanup) { node->cleanup(ssb, node->user_data); } tf_free(node); } while (ssb->blob_want_added) { tf_ssb_blob_want_added_callback_node_t* node = ssb->blob_want_added; ssb->blob_want_added = node->next; ssb->blob_want_added_count--; if (node->cleanup) { node->cleanup(ssb, node->user_data); } tf_free(node); } while (ssb->broadcasts_changed) { tf_ssb_broadcasts_changed_callback_node_t* node = ssb->broadcasts_changed; ssb->broadcasts_changed = node->next; ssb->broadcasts_changed_count--; if (node->cleanup) { node->cleanup(ssb, node->user_data); } tf_free(node); } tf_printf("Closing connections.\n"); tf_ssb_connection_t* connection = ssb->connections; while (connection) { tf_ssb_connection_t* next = connection->next; tf_ssb_connection_close(connection); connection = next; } tf_printf("Closed.\n"); if (ssb->loop == &ssb->own_loop) { int r = uv_loop_close(ssb->loop); if (r != 0) { tf_printf("uv_loop_close: %s\n", uv_strerror(r)); } } tf_printf("uv loop closed.\n"); if (ssb->own_context) { tf_printf("closing ssb context\n"); JS_FreeContext(ssb->context); JS_FreeRuntime(ssb->runtime); ssb->own_context = false; } if (ssb->db_writer) { sqlite3_close(ssb->db_writer); ssb->db_writer = NULL; } while (ssb->broadcasts) { tf_ssb_broadcast_t* broadcast = ssb->broadcasts; ssb->broadcasts = broadcast->next; ssb->broadcasts_count--; tf_free(broadcast); } for (int i = 0; i < k_debug_close_connection_count; i++) { for (int j = 0; j < k_debug_close_message_count; j++) { tf_free(ssb->debug_close[i].messages[j]); } } for (int i = 0; i < ssb->db_readers_count; i++) { sqlite3_close(ssb->db_readers[i]); } ssb->db_readers_count = 0; if (ssb->db_readers) { tf_free(ssb->db_readers); ssb->db_readers = NULL; } if (ssb->db_path) { tf_free((void*)ssb->db_path); ssb->db_path = NULL; } if (ssb->room_name) { tf_free(ssb->room_name); ssb->room_name = NULL; } if (ssb->connection_ref_count == 0) { uv_mutex_destroy(&ssb->db_readers_lock); uv_mutex_destroy(&ssb->db_writer_lock); tf_free(ssb); } } bool tf_ssb_is_shutting_down(tf_ssb_t* ssb) { return ssb->shutting_down; } void tf_ssb_run(tf_ssb_t* ssb) { uv_run(ssb->loop, UV_RUN_DEFAULT); } static void _tf_ssb_connection_finalizer(JSRuntime* runtime, JSValue value) { tf_ssb_connection_t* connection = JS_GetOpaque(value, _connection_class_id); if (connection) { connection->object = JS_UNDEFINED; _tf_ssb_connection_destroy(connection, "object finalized"); } } static void _tf_ssb_connection_process_message_async(uv_async_t* async) { tf_ssb_connection_t* connection = async->data; if (_tf_ssb_connection_box_stream_recv(connection)) { uv_async_send(&connection->async); } } static void _tf_ssb_connection_handshake_timer_callback(uv_timer_t* timer) { tf_ssb_connection_t* connection = timer->data; if (connection && connection->state != k_tf_ssb_state_verified && connection->state != k_tf_ssb_state_server_verified) { _tf_ssb_connection_destroy(connection, "handshake timeout"); } } tf_ssb_connection_t* tf_ssb_connection_create(tf_ssb_t* ssb, const char* host, const struct sockaddr_in* addr, const uint8_t* public_key) { for (tf_ssb_connection_t* connection = ssb->connections; connection; connection = connection->next) { if (memcmp(connection->serverpub, public_key, k_id_bin_len) == 0 && connection->state != k_tf_ssb_state_invalid) { char id[k_id_base64_len]; tf_ssb_id_bin_to_str(id, sizeof(id), public_key); tf_printf("Not connecting to %s:%d, because we are already connected to %s (state = %d).\n", host, ntohs(addr->sin_port), id, connection->state); return NULL; } else if (memcmp(ssb->pub, public_key, k_id_bin_len) == 0) { char id[k_id_base64_len]; tf_ssb_id_bin_to_str(id, sizeof(id), public_key); tf_printf("Not connecting to %s:%d, because they appear to be ourselves %s.\n", host, ntohs(addr->sin_port), id); return NULL; } } JSContext* context = ssb->context; tf_ssb_connection_t* connection = tf_malloc(sizeof(tf_ssb_connection_t)); ssb->connection_ref_count++; memset(connection, 0, sizeof(*connection)); snprintf(connection->name, sizeof(connection->name), "cli%d", s_connection_index++); connection->ssb = ssb; connection->tcp.data = connection; connection->connect.data = connection; connection->send_request_number = 1; snprintf(connection->host, sizeof(connection->host), "%s", host); connection->port = ntohs(addr->sin_port); connection->async.data = connection; uv_async_init(ssb->loop, &connection->async, _tf_ssb_connection_process_message_async); connection->handshake_timer.data = connection; uv_timer_init(ssb->loop, &connection->handshake_timer); uv_timer_start(&connection->handshake_timer, _tf_ssb_connection_handshake_timer_callback, k_handshake_timeout_ms, 0); connection->object = JS_NewObjectClass(ssb->context, _connection_class_id); JS_SetOpaque(connection->object, connection); char public_key_str[k_id_base64_len] = { 0 }; if (tf_ssb_id_bin_to_str(public_key_str, sizeof(public_key_str), public_key)) { JS_SetPropertyStr(context, connection->object, "id", JS_NewString(context, public_key_str)); JS_SetPropertyStr(context, connection->object, "is_client", JS_TRUE); } memcpy(connection->serverpub, public_key, sizeof(connection->serverpub)); uv_tcp_init(ssb->loop, &connection->tcp); int result = uv_tcp_connect(&connection->connect, &connection->tcp, (const struct sockaddr*)addr, _tf_ssb_connection_on_connect); if (result) { tf_printf("uv_tcp_connect(%s): %s\n", host, uv_strerror(result)); connection->connect.data = NULL; _tf_ssb_connection_destroy(connection, "connect failed"); } else { connection->next = ssb->connections; ssb->connections = connection; ssb->connections_count++; _tf_ssb_notify_connections_changed(ssb, k_tf_ssb_change_create, connection); } return connection; } static void _tf_ssb_connection_tunnel_callback( tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) { tf_ssb_connection_t* tunnel = user_data; if (flags & k_ssb_rpc_flag_end_error) { tf_ssb_connection_remove_request(connection, -request_number); tf_ssb_connection_rpc_send(connection, flags, -request_number, NULL, (const uint8_t*)"false", strlen("false"), NULL, NULL, NULL); tf_ssb_connection_close(tunnel); } else { _tf_ssb_connection_on_tcp_recv_internal(tunnel, message, size); } } tf_ssb_connection_t* tf_ssb_connection_tunnel_create(tf_ssb_t* ssb, const char* portal_id, int32_t request_number, const char* target_id, int connect_flags) { tf_ssb_connection_t* connection = tf_ssb_connection_get(ssb, portal_id); JSContext* context = ssb->context; tf_ssb_connection_t* tunnel = tf_malloc(sizeof(tf_ssb_connection_t)); ssb->connection_ref_count++; memset(tunnel, 0, sizeof(*tunnel)); snprintf(tunnel->name, sizeof(tunnel->name), "tun%d", s_tunnel_index++); tunnel->ssb = ssb; tunnel->flags = connect_flags; tunnel->tunnel_connection = connection; tunnel->tunnel_request_number = -request_number; tunnel->send_request_number = 1; tunnel->async.data = tunnel; uv_async_init(ssb->loop, &tunnel->async, _tf_ssb_connection_process_message_async); tunnel->handshake_timer.data = tunnel; uv_timer_init(ssb->loop, &tunnel->handshake_timer); uv_timer_start(&tunnel->handshake_timer, _tf_ssb_connection_handshake_timer_callback, k_handshake_timeout_ms, 0); tunnel->object = JS_NewObjectClass(ssb->context, _connection_class_id); JS_SetOpaque(tunnel->object, tunnel); JS_SetPropertyStr(context, tunnel->object, "id", JS_NewString(context, target_id)); JS_SetPropertyStr(context, tunnel->object, "is_client", JS_TRUE); tf_ssb_id_str_to_bin(tunnel->serverpub, target_id); tunnel->next = ssb->connections; ssb->connections = tunnel; ssb->connections_count++; _tf_ssb_notify_connections_changed(ssb, k_tf_ssb_change_create, tunnel); tf_ssb_connection_add_request(connection, request_number, "tunnel.connect", _tf_ssb_connection_tunnel_callback, NULL, tunnel, tunnel); if (request_number > 0) { tunnel->state = k_tf_ssb_state_connected; _tf_ssb_connection_client_send_hello(tunnel); } else { tunnel->state = k_tf_ssb_state_server_wait_hello; } return tunnel; } typedef struct _connect_t { tf_ssb_t* ssb; uv_getaddrinfo_t req; char host[256]; int port; int flags; uint8_t key[k_id_bin_len]; } connect_t; static void _tf_on_connect_getaddrinfo(uv_getaddrinfo_t* addrinfo, int result, struct addrinfo* info) { connect_t* connect = addrinfo->data; if (!connect->ssb->shutting_down) { if (result == 0 && info) { struct sockaddr_in addr = *(struct sockaddr_in*)info->ai_addr; addr.sin_port = htons(connect->port); tf_ssb_connection_t* connection = tf_ssb_connection_create(connect->ssb, connect->host, &addr, connect->key); if (connection) { connection->flags = connect->flags; } } else { tf_printf("getaddrinfo(%s) => %s\n", connect->host, uv_strerror(result)); } } uv_freeaddrinfo(info); tf_ssb_unref(connect->ssb); tf_free(connect); } void tf_ssb_connect(tf_ssb_t* ssb, const char* host, int port, const uint8_t* key, int connect_flags) { if (ssb->shutting_down) { return; } connect_t* connect = tf_malloc(sizeof(connect_t)); *connect = (connect_t) { .ssb = ssb, .port = port, .flags = connect_flags, .req.data = connect, }; char id[k_id_base64_len] = { 0 }; tf_ssb_id_bin_to_str(id, sizeof(id), key); tf_ssb_connections_store(ssb->connections_tracker, host, port, id); snprintf(connect->host, sizeof(connect->host), "%s", host); memcpy(connect->key, key, k_id_bin_len); tf_ssb_ref(ssb); int r = uv_getaddrinfo(ssb->loop, &connect->req, _tf_on_connect_getaddrinfo, host, NULL, &(struct addrinfo) { .ai_family = AF_INET }); if (r < 0) { tf_printf("uv_getaddrinfo(%s): %s\n", host, uv_strerror(r)); tf_free(connect); tf_ssb_unref(ssb); } } void tf_ssb_connection_close(tf_ssb_connection_t* connection) { _tf_ssb_connection_close(connection, "tf_ssb_connection_close"); } static void _tf_ssb_on_connection(uv_stream_t* stream, int status) { tf_ssb_t* ssb = stream->data; if (status < 0) { tf_printf("uv_listen failed: %s\n", uv_strerror(status)); return; } tf_ssb_connection_t* connection = tf_malloc(sizeof(tf_ssb_connection_t)); ssb->connection_ref_count++; memset(connection, 0, sizeof(*connection)); snprintf(connection->name, sizeof(connection->name), "srv%d", s_connection_index++); connection->ssb = ssb; connection->tcp.data = connection; connection->send_request_number = 1; connection->async.data = connection; uv_async_init(ssb->loop, &connection->async, _tf_ssb_connection_process_message_async); connection->object = JS_NewObjectClass(ssb->context, _connection_class_id); JS_SetOpaque(connection->object, connection); if (uv_tcp_init(ssb->loop, &connection->tcp) != 0) { _tf_ssb_connection_destroy(connection, "init failed"); return; } if (uv_accept(stream, (uv_stream_t*)&connection->tcp) != 0) { _tf_ssb_connection_destroy(connection, "accept failed"); return; } connection->handshake_timer.data = connection; uv_timer_init(ssb->loop, &connection->handshake_timer); uv_timer_start(&connection->handshake_timer, _tf_ssb_connection_handshake_timer_callback, k_handshake_timeout_ms, 0); struct sockaddr_storage addr = { 0 }; int size = sizeof(addr); if (uv_tcp_getpeername(&connection->tcp, (struct sockaddr*)&addr, &size) == 0) { uv_ip_name((struct sockaddr*)&addr, connection->host, sizeof(connection->host)); connection->port = ntohs(((struct sockaddr_in*)&addr)->sin_port); } connection->next = ssb->connections; ssb->connections = connection; ssb->connections_count++; connection->state = k_tf_ssb_state_server_wait_hello; _tf_ssb_connection_read_start(connection); _tf_ssb_notify_connections_changed(ssb, k_tf_ssb_change_create, connection); } static void _tf_ssb_send_broadcast(tf_ssb_t* ssb, struct sockaddr_in* address, struct sockaddr_in* netmask) { struct sockaddr server_addr; int len = (int)sizeof(server_addr); int r = uv_tcp_getsockname(&ssb->server, &server_addr, &len); if (r != 0) { tf_printf("Unable to get server's address: %s.\n", uv_strerror(r)); } else if (server_addr.sa_family != AF_INET) { tf_printf("Unexpected address family: %d\n", server_addr.sa_family); } char address_str[256]; if (uv_ip4_name(address, address_str, sizeof(address_str)) != 0) { tf_printf("Unable to convert address to string.\n"); } char fullid[k_id_base64_len]; tf_base64_encode(ssb->pub, sizeof(ssb->pub), fullid, sizeof(fullid)); char message[512]; snprintf(message, sizeof(message), "net:%s:%d~shs:%s", address_str, ntohs(((struct sockaddr_in*)&server_addr)->sin_port), fullid); uv_buf_t buf = { .base = message, .len = strlen(message) }; struct sockaddr_in broadcast_addr = { 0 }; broadcast_addr.sin_family = AF_INET; broadcast_addr.sin_port = htons(8008); broadcast_addr.sin_addr.s_addr = (address->sin_addr.s_addr & netmask->sin_addr.s_addr) | (INADDR_BROADCAST & ~netmask->sin_addr.s_addr); r = uv_udp_try_send(&ssb->broadcast_sender, &buf, 1, (struct sockaddr*)&broadcast_addr); if (r < 0) { char broadcast_str[256] = { 0 }; uv_ip4_name(&broadcast_addr, broadcast_str, sizeof(broadcast_str)); tf_printf("failed to send broadcast for %s via %s (%d): %s\n", address_str, broadcast_str, r, uv_strerror(r)); } } typedef struct _seeds_t { char** seeds; int seeds_count; } seeds_t; static void _tf_ssb_update_seed_callback(void* arg, ares_status_t status, size_t timeouts, const ares_dns_record_t* record) { seeds_t* seeds = arg; for (int i = 0; i < (int)ares_dns_record_rr_cnt(record, ARES_SECTION_ANSWER); i++) { const ares_dns_rr_t* rr = ares_dns_record_rr_get_const(record, ARES_SECTION_ANSWER, i); size_t len = 0; const unsigned char* str = ares_dns_rr_get_bin(rr, ARES_RR_TXT_DATA, &len); seeds->seeds = tf_resize_vec(seeds->seeds, sizeof(char*) * (seeds->seeds_count + 1)); seeds->seeds[seeds->seeds_count++] = tf_strdup((const char*)str); } } static void _tf_ssb_update_seeds_work(tf_ssb_t* ssb, void* user_data) { if (ares_library_init(0) == ARES_SUCCESS) { ares_channel channel; struct ares_options options = { .evsys = ARES_EVSYS_DEFAULT }; int result = ares_init_options(&channel, &options, ARES_OPT_EVENT_THREAD); if (result == ARES_SUCCESS) { if (ares_query_dnsrec(channel, ssb->seeds_host, ARES_CLASS_IN, ARES_REC_TYPE_TXT, _tf_ssb_update_seed_callback, user_data, NULL) == ARES_SUCCESS) { ares_queue_wait_empty(channel, -1); } ares_destroy(channel); } ares_library_cleanup(); } } static void _tf_ssb_update_seeds_after_work(tf_ssb_t* ssb, int status, void* user_data) { seeds_t* seeds = user_data; for (int i = 0; i < seeds->seeds_count; i++) { tf_ssb_broadcast_t broadcast = { .origin = k_tf_ssb_broadcast_origin_peer_exchange }; if (_tf_ssb_parse_broadcast(seeds->seeds[i], &broadcast)) { _tf_ssb_add_broadcast(ssb, &broadcast, k_seed_expire_seconds); } tf_free(seeds->seeds[i]); } tf_free(seeds->seeds); tf_free(seeds); } static void _tf_ssb_broadcast_timer(uv_timer_t* timer) { tf_ssb_t* ssb = timer->data; uv_interface_address_t* info = NULL; int count = 0; if (uv_interface_addresses(&info, &count) == 0) { for (int i = 0; i < count; i++) { if (!info[i].is_internal && info[i].address.address4.sin_family == AF_INET) { _tf_ssb_send_broadcast(ssb, &info[i].address.address4, &info[i].netmask.netmask4); } } uv_free_interface_addresses(info, count); } time_t now = time(NULL); if (ssb->is_peer_exchange && *ssb->seeds_host && now - ssb->last_seed_check > k_seed_check_interval_seconds) { seeds_t* seeds = tf_malloc(sizeof(seeds_t)); *seeds = (seeds_t) { 0 }; ssb->last_seed_check = now; tf_ssb_run_work(ssb, _tf_ssb_update_seeds_work, _tf_ssb_update_seeds_after_work, seeds); } } int tf_ssb_server_open(tf_ssb_t* ssb, int port) { if (ssb->server.data) { tf_printf("Already listening.\n"); return 0; } ssb->server.data = ssb; if (uv_tcp_init(ssb->loop, &ssb->server) != 0) { tf_printf("uv_tcp_init failed\n"); return 0; } struct sockaddr_in addr = { 0 }; addr.sin_family = AF_INET; addr.sin_port = htons(port); addr.sin_addr.s_addr = INADDR_ANY; int status = uv_tcp_bind(&ssb->server, (struct sockaddr*)&addr, 0); if (status != 0) { tf_printf("%s:%d: uv_tcp_bind failed: %s\n", __FILE__, __LINE__, uv_strerror(status)); return 0; } status = uv_listen((uv_stream_t*)&ssb->server, SOMAXCONN, _tf_ssb_on_connection); if (status != 0) { tf_printf("uv_listen failed: %s\n", uv_strerror(status)); /* TODO: cleanup */ return 0; } struct sockaddr_storage name = { 0 }; int size = (int)sizeof(name); status = uv_tcp_getsockname(&ssb->server, (struct sockaddr*)&name, &size); int assigned_port = ntohs(((struct sockaddr_in*)&name)->sin_port); return status == 0 ? assigned_port : 0; } int tf_ssb_server_get_port(tf_ssb_t* ssb) { int port = 0; if (ssb && ssb->server.data) { struct sockaddr_storage name = { 0 }; int size = (int)sizeof(name); if (uv_tcp_getsockname(&ssb->server, (struct sockaddr*)&name, &size) == 0) { port = ntohs(((struct sockaddr_in*)&name)->sin_port); } } return port; } void tf_ssb_server_close(tf_ssb_t* ssb) { if (ssb->server.data && !uv_is_closing((uv_handle_t*)&ssb->server)) { uv_close((uv_handle_t*)&ssb->server, _tf_ssb_on_handle_close); } uv_timer_stop(&ssb->broadcast_timer); tf_printf("Stopped broadcasts.\n"); } bool tf_ssb_whoami(tf_ssb_t* ssb, char* out_id, size_t out_id_size) { return tf_ssb_id_bin_to_str(out_id, out_id_size, ssb->pub); } static bool _tf_ssb_parse_broadcast(const char* in_broadcast, tf_ssb_broadcast_t* out_broadcast) { char public_key_str[45] = { 0 }; int port = 0; static_assert(sizeof(out_broadcast->host) == 256, "host field size"); if (sscanf(in_broadcast, "net:%255[0-9A-Za-z.-]:%d~shs:%44s", out_broadcast->host, &port, public_key_str) == 3) { out_broadcast->addr.sin_family = AF_INET; out_broadcast->addr.sin_port = htons((uint16_t)port); int r = tf_base64_decode(public_key_str, strlen(public_key_str), out_broadcast->pub, crypto_sign_PUBLICKEYBYTES); return r != -1; } else if (strncmp(in_broadcast, "ws:", 3) == 0) { tf_printf("Unsupported broadcast: %s\n", in_broadcast); } return false; } void tf_ssb_connect_str(tf_ssb_t* ssb, const char* address, int connect_flags) { tf_ssb_broadcast_t broadcast = { 0 }; if (_tf_ssb_parse_broadcast(address, &broadcast)) { tf_ssb_connect(ssb, broadcast.host, ntohs(broadcast.addr.sin_port), broadcast.pub, connect_flags); } else { tf_printf("Unable to parse: %s\n", address); } } static void _tf_ssb_on_broadcast_listener_alloc(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { buf->base = tf_malloc(suggested_size); buf->len = suggested_size; } static void _tf_ssb_notify_broadcasts_changed(tf_ssb_t* ssb) { tf_ssb_broadcasts_changed_callback_node_t* next = NULL; for (tf_ssb_broadcasts_changed_callback_node_t* node = ssb->broadcasts_changed; node; node = next) { next = node->next; if (node->callback) { tf_trace_begin(ssb->trace, "broadcasts changed"); node->callback(ssb, node->user_data); tf_trace_end(ssb->trace); } } } static void _tf_ssb_add_broadcast(tf_ssb_t* ssb, const tf_ssb_broadcast_t* broadcast, int expires_seconds) { if (memcmp(broadcast->pub, ssb->pub, sizeof(ssb->pub)) == 0) { return; } if (broadcast->tunnel_connection) { for (tf_ssb_broadcast_t* node = ssb->broadcasts; node; node = node->next) { if (node->tunnel_connection == broadcast->tunnel_connection && memcmp(node->pub, broadcast->pub, sizeof(node->pub)) == 0) { node->mtime = time(NULL); node->expires_at = node->mtime + expires_seconds; return; } } } else { for (tf_ssb_broadcast_t* node = ssb->broadcasts; node; node = node->next) { if (node->addr.sin_family == broadcast->addr.sin_family && node->addr.sin_port == broadcast->addr.sin_port && node->addr.sin_addr.s_addr == broadcast->addr.sin_addr.s_addr && memcmp(node->pub, broadcast->pub, sizeof(node->pub)) == 0) { node->mtime = time(NULL); node->expires_at = node->mtime + expires_seconds; return; } } char key[k_id_base64_len]; if (tf_ssb_id_bin_to_str(key, sizeof(key), broadcast->pub)) { tf_printf("Received new broadcast: host=%s, pub=%s.\n", broadcast->host, key); } } tf_ssb_broadcast_t* node = tf_malloc(sizeof(tf_ssb_broadcast_t)); *node = *broadcast; node->next = ssb->broadcasts; node->ctime = time(NULL); node->mtime = node->ctime; node->expires_at = node->mtime + expires_seconds; ssb->broadcasts = node; ssb->broadcasts_count++; _tf_ssb_notify_broadcasts_changed(ssb); } void tf_ssb_add_broadcast(tf_ssb_t* ssb, const char* connection, tf_ssb_broadcast_origin_t origin, int64_t expires_seconds) { tf_ssb_broadcast_t broadcast = { .origin = origin }; if (_tf_ssb_parse_broadcast(connection, &broadcast)) { _tf_ssb_add_broadcast(ssb, &broadcast, expires_seconds); } } static void _tf_ssb_on_broadcast_listener_recv(uv_udp_t* handle, ssize_t nread, const uv_buf_t* buf, const struct sockaddr* addr, unsigned flags) { if (nread <= 0) { tf_free(buf->base); return; } tf_ssb_t* ssb = handle->data; ((char*)buf->base)[nread] = '\0'; const char* k_delim = ";"; char* state = NULL; char* entry = strtok_r(buf->base, k_delim, &state); while (entry) { tf_ssb_broadcast_t broadcast = { .origin = k_tf_ssb_broadcast_origin_discovery }; if (_tf_ssb_parse_broadcast(entry, &broadcast)) { _tf_ssb_add_broadcast(ssb, &broadcast, k_udp_discovery_expires_seconds); } entry = strtok_r(NULL, k_delim, &state); } tf_free(buf->base); } void tf_ssb_visit_broadcasts(tf_ssb_t* ssb, void (*callback)(const char* host, const struct sockaddr_in* addr, tf_ssb_broadcast_origin_t origin, tf_ssb_connection_t* tunnel, const uint8_t* pub, void* user_data), void* user_data) { time_t now = time(NULL); tf_ssb_broadcast_t* next = NULL; for (tf_ssb_broadcast_t* node = ssb->broadcasts; node; node = next) { next = node->next; if (node->mtime - now < 60) { tf_trace_begin(ssb->trace, "broadcast"); callback(node->host, &node->addr, node->origin, node->tunnel_connection, node->pub, user_data); tf_trace_end(ssb->trace); } } } static void _tf_ssb_broadcast_cleanup_timer(uv_timer_t* timer) { tf_ssb_t* ssb = timer->data; int modified = 0; time_t now = time(NULL); for (tf_ssb_broadcast_t** it = &ssb->broadcasts; *it;) { if (!(*it)->tunnel_connection && now > (*it)->expires_at) { tf_ssb_broadcast_t* node = *it; *it = node->next; tf_free(node); ssb->broadcasts_count--; modified++; } else { it = &(*it)->next; } } if (modified) { _tf_ssb_notify_broadcasts_changed(ssb); } } void tf_ssb_broadcast_listener_start(tf_ssb_t* ssb, bool linger) { if (ssb->broadcast_listener.data) { return; } ssb->broadcast_listener.data = ssb; uv_udp_init(ssb->loop, &ssb->broadcast_listener); struct sockaddr_in addr = { 0 }; addr.sin_family = AF_INET; addr.sin_port = htons(8008); addr.sin_addr.s_addr = INADDR_ANY; int result = uv_udp_bind(&ssb->broadcast_listener, (const struct sockaddr*)&addr, UV_UDP_REUSEADDR); if (result != 0) { tf_printf("bind: %s\n", uv_strerror(result)); } result = uv_udp_recv_start(&ssb->broadcast_listener, _tf_ssb_on_broadcast_listener_alloc, _tf_ssb_on_broadcast_listener_recv); if (result != 0) { tf_printf("uv_udp_recv_start: %s\n", uv_strerror(result)); } if (!linger) { uv_unref((uv_handle_t*)&ssb->broadcast_listener); } ssb->broadcast_cleanup_timer.data = ssb; uv_timer_init(ssb->loop, &ssb->broadcast_cleanup_timer); uv_timer_start(&ssb->broadcast_cleanup_timer, _tf_ssb_broadcast_cleanup_timer, 2000, 2000); } void tf_ssb_broadcast_sender_start(tf_ssb_t* ssb) { if (ssb->broadcast_sender.data) { return; } ssb->broadcast_sender.data = ssb; uv_udp_init(ssb->loop, &ssb->broadcast_sender); struct sockaddr_in broadcast_from = { .sin_family = AF_INET, .sin_addr.s_addr = INADDR_ANY, }; uv_udp_bind(&ssb->broadcast_sender, (struct sockaddr*)&broadcast_from, 0); uv_udp_set_broadcast(&ssb->broadcast_sender, 1); tf_printf("Starting broadcasts.\n"); uv_timer_start(&ssb->broadcast_timer, _tf_ssb_broadcast_timer, 2000, 2000); } tf_ssb_connection_t* tf_ssb_connection_get(tf_ssb_t* ssb, const char* id) { uint8_t pub[k_id_bin_len] = { 0 }; tf_ssb_id_str_to_bin(pub, id); for (tf_ssb_connection_t* connection = ssb->connections; connection; connection = connection->next) { if (connection->tunnel_connection) { continue; } if (memcmp(connection->serverpub, pub, k_id_bin_len) == 0) { return connection; } else if (memcmp(ssb->pub, pub, k_id_bin_len) == 0) { return connection; } } return NULL; } const char** tf_ssb_get_connection_ids(tf_ssb_t* ssb) { int count = 0; for (tf_ssb_connection_t* connection = ssb->connections; connection; connection = connection->next) { if (connection->state == k_tf_ssb_state_verified || connection->state == k_tf_ssb_state_server_verified) { count++; } } char* buffer = tf_malloc(sizeof(char*) * (count + 1) + k_id_base64_len * count); char** array = (char**)buffer; char* strings = buffer + sizeof(char*) * (count + 1); int i = 0; for (tf_ssb_connection_t* connection = ssb->connections; connection; connection = connection->next) { if (connection->state == k_tf_ssb_state_verified || connection->state == k_tf_ssb_state_server_verified) { char* write_pos = strings + k_id_base64_len * i; tf_ssb_id_bin_to_str(write_pos, k_id_base64_len, connection->serverpub); array[i++] = write_pos; } } array[i] = NULL; return (const char**)array; } int tf_ssb_get_connections(tf_ssb_t* ssb, tf_ssb_connection_t** out_connections, int out_connections_count) { int i = 0; for (tf_ssb_connection_t* connection = ssb->connections; connection && i < out_connections_count; connection = connection->next) { out_connections[i++] = connection; } return i; } void tf_ssb_add_broadcasts_changed_callback(tf_ssb_t* ssb, tf_ssb_broadcasts_changed_callback_t* callback, tf_ssb_callback_cleanup_t* cleanup, void* user_data) { tf_ssb_broadcasts_changed_callback_node_t* node = tf_malloc(sizeof(tf_ssb_broadcasts_changed_callback_node_t)); *node = (tf_ssb_broadcasts_changed_callback_node_t) { .callback = callback, .cleanup = cleanup, .user_data = user_data, .next = ssb->broadcasts_changed, }; ssb->broadcasts_changed = node; ssb->broadcasts_changed_count++; } void tf_ssb_remove_broadcasts_changed_callback(tf_ssb_t* ssb, tf_ssb_broadcasts_changed_callback_t* callback, void* user_data) { tf_ssb_broadcasts_changed_callback_node_t** it = &ssb->broadcasts_changed; while (*it) { if ((*it)->callback == callback && (*it)->user_data == user_data) { tf_ssb_broadcasts_changed_callback_node_t* node = *it; *it = node->next; ssb->broadcasts_changed_count--; if (node->cleanup) { node->cleanup(ssb, node->user_data); } tf_free(node); } else { it = &(*it)->next; } } } void tf_ssb_add_connections_changed_callback(tf_ssb_t* ssb, tf_ssb_connections_changed_callback_t* callback, tf_ssb_callback_cleanup_t* cleanup, void* user_data) { tf_ssb_connections_changed_callback_node_t* node = tf_malloc(sizeof(tf_ssb_connections_changed_callback_node_t)); *node = (tf_ssb_connections_changed_callback_node_t) { .callback = callback, .cleanup = cleanup, .user_data = user_data, .next = ssb->connections_changed, }; ssb->connections_changed = node; ssb->connections_changed_count++; } void tf_ssb_remove_connections_changed_callback(tf_ssb_t* ssb, tf_ssb_connections_changed_callback_t* callback, void* user_data) { tf_ssb_connections_changed_callback_node_t** it = &ssb->connections_changed; while (*it) { if ((*it)->callback == callback && (*it)->user_data == user_data) { tf_ssb_connections_changed_callback_node_t* node = *it; *it = node->next; ssb->connections_changed_count--; if (node->cleanup) { node->cleanup(ssb, node->user_data); } tf_free(node); } else { it = &(*it)->next; } } } void tf_ssb_add_rpc_callback(tf_ssb_t* ssb, const char* name, tf_ssb_rpc_callback_t* callback, tf_ssb_callback_cleanup_t* cleanup, void* user_data) { size_t name_len = strlen(name); tf_ssb_rpc_callback_node_t* node = tf_malloc(sizeof(tf_ssb_rpc_callback_node_t) + name_len + 1); *node = (tf_ssb_rpc_callback_node_t) { .name = (const char*)(node + 1), .callback = callback, .cleanup = cleanup, .user_data = user_data, .next = ssb->rpc, }; memcpy((char*)node->name, name, name_len + 1); ssb->rpc = node; ssb->rpc_count++; } JSContext* tf_ssb_connection_get_context(tf_ssb_connection_t* connection) { return connection->ssb->context; } tf_ssb_t* tf_ssb_connection_get_ssb(tf_ssb_connection_t* connection) { return connection->ssb; } int32_t tf_ssb_connection_next_request_number(tf_ssb_connection_t* connection) { return connection->send_request_number++; } JSClassID tf_ssb_get_connection_class_id() { return _connection_class_id; } JSValue tf_ssb_connection_get_object(tf_ssb_connection_t* connection) { return connection ? connection->object : JS_UNDEFINED; } void tf_ssb_add_message_added_callback( tf_ssb_t* ssb, void (*callback)(tf_ssb_t* ssb, const char* id, void* user_data), void (*cleanup)(tf_ssb_t* ssb, void* user_data), void* user_data) { tf_ssb_message_added_callback_node_t* node = tf_malloc(sizeof(tf_ssb_message_added_callback_node_t)); *node = (tf_ssb_message_added_callback_node_t) { .callback = callback, .cleanup = cleanup, .user_data = user_data, .next = ssb->message_added, }; ssb->message_added = node; ssb->message_added_count++; } void tf_ssb_remove_message_added_callback(tf_ssb_t* ssb, tf_ssb_message_added_callback_t* callback, void* user_data) { tf_ssb_message_added_callback_node_t** it = &ssb->message_added; while (*it) { if ((*it)->callback == callback && (*it)->user_data == user_data) { tf_ssb_message_added_callback_node_t* node = *it; *it = node->next; ssb->message_added_count--; if (node->cleanup) { node->cleanup(ssb, node->user_data); } tf_free(node); } else { it = &(*it)->next; } } } void tf_ssb_notify_blob_stored(tf_ssb_t* ssb, const char* id) { ssb->blobs_stored++; } void tf_ssb_notify_message_added(tf_ssb_t* ssb, const char* id, JSValue message_keys) { tf_ssb_message_added_callback_node_t* next = NULL; ssb->messages_stored++; for (tf_ssb_message_added_callback_node_t* node = ssb->message_added; node; node = next) { next = node->next; tf_trace_begin(ssb->trace, "message added callback"); node->callback(ssb, id, node->user_data); tf_trace_end(ssb->trace); } JSContext* context = ssb->context; if (!JS_IsUndefined(message_keys)) { JSValue message = JS_GetPropertyStr(context, message_keys, "value"); JSValue author = JS_GetPropertyStr(context, message, "author"); const char* author_string = JS_ToCString(context, author); for (tf_ssb_connection_t* connection = ssb->connections; connection; connection = connection->next) { if (!connection->message_requests_count) { continue; } tf_ssb_connection_message_request_t* message_request = bsearch(author_string, connection->message_requests, connection->message_requests_count, sizeof(tf_ssb_connection_message_request_t), _message_request_compare); if (message_request) { tf_ssb_connection_rpc_send_json( connection, k_ssb_rpc_flag_stream, message_request->request_number, NULL, message_request->keys ? message_keys : message, NULL, NULL, NULL); } } JS_FreeCString(context, author_string); JS_FreeValue(context, author); JS_FreeValue(context, message); } } void tf_ssb_add_blob_want_added_callback( tf_ssb_t* ssb, void (*callback)(tf_ssb_t* ssb, const char* id, void* user_data), void (*cleanup)(tf_ssb_t* ssb, void* user_data), void* user_data) { tf_ssb_blob_want_added_callback_node_t* node = tf_malloc(sizeof(tf_ssb_blob_want_added_callback_node_t)); *node = (tf_ssb_blob_want_added_callback_node_t) { .callback = callback, .cleanup = cleanup, .user_data = user_data, .next = ssb->blob_want_added, }; ssb->blob_want_added = node; ssb->blob_want_added_count++; } void tf_ssb_remove_blob_want_added_callback(tf_ssb_t* ssb, tf_ssb_blob_want_added_callback_t* callback, void* user_data) { tf_ssb_blob_want_added_callback_node_t** it = &ssb->blob_want_added; while (*it) { if ((*it)->callback == callback && (*it)->user_data == user_data) { tf_ssb_blob_want_added_callback_node_t* node = *it; *it = node->next; ssb->blob_want_added_count--; if (node->cleanup) { node->cleanup(ssb, node->user_data); } tf_free(node); } else { it = &(*it)->next; } } } void tf_ssb_notify_blob_want_added(tf_ssb_t* ssb, const char* id) { tf_ssb_blob_want_added_callback_node_t* next = NULL; for (tf_ssb_blob_want_added_callback_node_t* node = ssb->blob_want_added; node; node = next) { next = node->next; tf_trace_begin(ssb->trace, "blob want added callback"); node->callback(ssb, id, node->user_data); tf_trace_end(ssb->trace); } } void tf_ssb_connection_add_room_attendant(tf_ssb_connection_t* connection, const char* id) { tf_ssb_broadcast_t broadcast = { .origin = k_tf_ssb_broadcast_origin_room, .tunnel_connection = connection, }; tf_ssb_id_str_to_bin(broadcast.pub, id); _tf_ssb_add_broadcast(connection->ssb, &broadcast, 0); } void tf_ssb_connection_remove_room_attendant(tf_ssb_connection_t* connection, const char* id) { uint8_t pub[k_id_bin_len] = { 0 }; tf_ssb_id_str_to_bin(pub, id); int modified = 0; for (tf_ssb_broadcast_t** it = &connection->ssb->broadcasts; *it;) { if ((*it)->tunnel_connection == connection && memcmp((*it)->pub, pub, k_id_bin_len) == 0) { tf_ssb_broadcast_t* node = *it; *it = node->next; tf_free(node); connection->ssb->broadcasts_count--; modified++; } else { it = &(*it)->next; } } if (modified) { _tf_ssb_notify_broadcasts_changed(connection->ssb); } } bool tf_ssb_connection_is_attendant(tf_ssb_connection_t* connection) { return connection->is_attendant; } int32_t tf_ssb_connection_get_attendant_request_number(tf_ssb_connection_t* connection) { return connection->attendant_request_number; } void tf_ssb_connection_set_attendant(tf_ssb_connection_t* connection, bool attendant, int request_number) { connection->is_attendant = attendant; connection->attendant_request_number = request_number; _tf_ssb_notify_broadcasts_changed(connection->ssb); } void tf_ssb_connection_clear_room_attendants(tf_ssb_connection_t* connection) { int modified = 0; for (tf_ssb_broadcast_t** it = &connection->ssb->broadcasts; *it;) { if ((*it)->tunnel_connection == connection) { tf_ssb_broadcast_t* node = *it; *it = node->next; tf_free(node); connection->ssb->broadcasts_count--; modified++; } else { it = &(*it)->next; } } if (modified) { _tf_ssb_notify_broadcasts_changed(connection->ssb); } } tf_ssb_blob_wants_t* tf_ssb_connection_get_blob_wants_state(tf_ssb_connection_t* connection) { return connection ? &connection->blob_wants : NULL; } typedef struct _store_t { tf_ssb_t* ssb; tf_ssb_verify_strip_store_callback_t* callback; void* user_data; bool verified; bool stored; char id[crypto_hash_sha256_BYTES * 2 + 1]; } store_t; static void _tf_ssb_verify_strip_and_store_finish(store_t* store) { if (store->callback) { store->callback(store->id, store->verified, store->stored, store->user_data); } tf_free(store); } static void _tf_ssb_verify_strip_and_store_callback(const char* id, bool stored, void* user_data) { store_t* store = user_data; store->stored = stored; _tf_ssb_verify_strip_and_store_finish(store); } void tf_ssb_verify_strip_and_store_message(tf_ssb_t* ssb, JSValue value, tf_ssb_verify_strip_store_callback_t* callback, void* user_data) { JSContext* context = tf_ssb_get_context(ssb); store_t* async = tf_malloc(sizeof(store_t)); *async = (store_t) { .ssb = ssb, .callback = callback, .user_data = user_data, }; char signature[crypto_sign_BYTES + 128] = { 0 }; int flags = 0; if (tf_ssb_verify_and_strip_signature(context, value, async->id, sizeof(async->id), signature, sizeof(signature), &flags)) { async->verified = true; tf_ssb_db_store_message(ssb, context, async->id, value, signature, flags, _tf_ssb_verify_strip_and_store_callback, async); } else { _tf_ssb_verify_strip_and_store_finish(async); } } bool tf_ssb_connection_get_sent_clock(tf_ssb_connection_t* connection) { return connection->sent_clock; } void tf_ssb_connection_set_sent_clock(tf_ssb_connection_t* connection, bool sent_clock) { connection->sent_clock = sent_clock; } int32_t tf_ssb_connection_get_ebt_request_number(tf_ssb_connection_t* connection) { return connection->ebt_request_number; } void tf_ssb_connection_set_ebt_request_number(tf_ssb_connection_t* connection, int32_t request_number) { connection->ebt_request_number = request_number; } JSValue tf_ssb_get_disconnection_debug(tf_ssb_t* ssb, JSContext* context) { JSValue result = JS_NewObject(context); JSValue disconnections = JS_NewArray(context); for (int i = 0; i < k_debug_close_connection_count; i++) { JSValue disconnection = JS_NewObject(context); if (*ssb->debug_close[i].id) { JS_SetPropertyStr(context, disconnection, "id", JS_NewString(context, ssb->debug_close[i].id)); if (*ssb->debug_close[i].tunnel) { JS_SetPropertyStr(context, disconnection, "tunnel", JS_NewString(context, ssb->debug_close[i].tunnel)); } JS_SetPropertyStr(context, disconnection, "reason", JS_NewString(context, ssb->debug_close[i].reason)); JSValue messages = JS_NewArray(context); for (int j = 0; j < k_debug_close_message_count; j++) { if (ssb->debug_close[i].messages[j]) { JSValue message = JS_NewObject(context); JS_SetPropertyStr(context, message, "direction", JS_NewString(context, ssb->debug_close[i].messages[j]->outgoing ? "out" : "in")); JS_SetPropertyStr(context, message, "flags", JS_NewInt32(context, ssb->debug_close[i].messages[j]->flags)); JS_SetPropertyStr(context, message, "request_number", JS_NewInt32(context, ssb->debug_close[i].messages[j]->request_number)); JS_SetPropertyStr(context, message, "timestamp", JS_NewFloat64(context, ssb->debug_close[i].messages[j]->timestamp)); JS_SetPropertyStr( context, message, "payload", JS_NewStringLen(context, (const char*)ssb->debug_close[i].messages[j]->data, ssb->debug_close[i].messages[j]->size)); JS_SetPropertyUint32(context, messages, j, message); } } JS_SetPropertyStr(context, disconnection, "messages", messages); } JS_SetPropertyUint32(context, disconnections, i, disconnection); } JS_SetPropertyStr(context, result, "disconnections", disconnections); return result; } void tf_ssb_record_thread_busy(tf_ssb_t* ssb, bool busy) { int32_t busy_value = __atomic_add_fetch(&ssb->thread_busy_count, busy ? 1 : -1, __ATOMIC_RELAXED); int32_t current = ssb->thread_busy_max; while (busy_value > current && !__atomic_compare_exchange_n(&ssb->thread_busy_max, ¤t, busy_value, false, __ATOMIC_RELAXED, __ATOMIC_RELAXED)) { current = ssb->thread_busy_max; } } float tf_ssb_get_average_thread_percent(tf_ssb_t* ssb) { if (!ssb || !ssb->thread_busy_max) { return 0.0f; } return 100.0f * ssb->thread_busy_count / ssb->thread_busy_max; } void tf_ssb_set_hitch_callback(tf_ssb_t* ssb, void (*callback)(const char* name, uint64_t duration_ns, void* user_data), void* user_data) { ssb->hitch_callback = callback; ssb->hitch_user_data = user_data; } tf_ssb_store_queue_t* tf_ssb_get_store_queue(tf_ssb_t* ssb) { return &ssb->store_queue; } void tf_ssb_ref(tf_ssb_t* ssb) { ssb->ref_count++; } void tf_ssb_unref(tf_ssb_t* ssb) { int new_count = --ssb->ref_count; if (new_count < 0) { tf_printf("tf_ssb_unref past 0: %d\n", new_count); abort(); } } void tf_ssb_set_main_thread(tf_ssb_t* ssb, bool main_thread) { ssb->thread_self = main_thread ? uv_thread_self() : 0; } typedef struct _connection_work_t { uv_work_t work; tf_ssb_connection_t* connection; const char* name; const char* after_name; void (*work_callback)(tf_ssb_connection_t* connection, void* user_data); void (*after_work_callback)(tf_ssb_connection_t* connection, int result, void* user_data); void* user_data; } connection_work_t; static void _tf_ssb_connection_work_callback(uv_work_t* work) { connection_work_t* data = work->data; tf_ssb_record_thread_busy(data->connection->ssb, true); if (data->work_callback) { tf_trace_begin(data->connection->ssb->trace, data->name); data->work_callback(data->connection, data->user_data); tf_trace_end(data->connection->ssb->trace); } tf_ssb_record_thread_busy(data->connection->ssb, false); } static void _tf_ssb_connection_after_work_callback(uv_work_t* work, int status) { connection_work_t* data = work->data; if (data->after_work_callback) { tf_trace_begin(data->connection->ssb->trace, data->after_name); data->after_work_callback(data->connection, status, data->user_data); tf_trace_end(data->connection->ssb->trace); } data->connection->ref_count--; if (data->connection->ref_count == 0 && data->connection->closing) { _tf_ssb_connection_destroy(data->connection, "work completed"); } tf_free((void*)data->name); tf_free((void*)data->after_name); tf_free(data); } void tf_ssb_connection_run_work(tf_ssb_connection_t* connection, void (*work_callback)(tf_ssb_connection_t* connection, void* user_data), void (*after_work_callback)(tf_ssb_connection_t* connection, int result, void* user_data), void* user_data) { connection_work_t* work = tf_malloc(sizeof(connection_work_t)); *work = (connection_work_t) { .work = { .data = work, }, .connection = connection, .work_callback = work_callback, .after_work_callback = after_work_callback, .user_data = user_data, }; connection->ref_count++; int result = uv_queue_work(connection->ssb->loop, &work->work, _tf_ssb_connection_work_callback, _tf_ssb_connection_after_work_callback); if (result) { _tf_ssb_connection_after_work_callback(&work->work, result); } } typedef struct _ssb_work_t { uv_work_t work; tf_ssb_t* ssb; const char* name; const char* after_name; void (*work_callback)(tf_ssb_t* ssb, void* user_data); void (*after_work_callback)(tf_ssb_t* ssb, int result, void* user_data); void* user_data; } ssb_work_t; static void _tf_ssb_work_callback(uv_work_t* work) { ssb_work_t* data = work->data; tf_ssb_record_thread_busy(data->ssb, true); if (data->work_callback) { tf_trace_begin(data->ssb->trace, data->name); data->work_callback(data->ssb, data->user_data); tf_trace_end(data->ssb->trace); } tf_ssb_record_thread_busy(data->ssb, false); } static void _tf_ssb_after_work_callback(uv_work_t* work, int status) { ssb_work_t* data = work->data; if (data->after_work_callback) { tf_trace_begin(data->ssb->trace, data->after_name); data->after_work_callback(data->ssb, status, data->user_data); tf_trace_end(data->ssb->trace); } tf_ssb_unref(data->ssb); tf_free((void*)data->name); tf_free((void*)data->after_name); tf_free(data); } void tf_ssb_run_work(tf_ssb_t* ssb, void (*work_callback)(tf_ssb_t* ssb, void* user_data), void (*after_work_callback)(tf_ssb_t* ssb, int result, void* user_data), void* user_data) { ssb_work_t* work = tf_malloc(sizeof(ssb_work_t)); *work = (ssb_work_t) { .work = { .data = work, }, .name = tf_util_function_to_string(work_callback), .after_name = tf_util_function_to_string(after_work_callback), .ssb = ssb, .work_callback = work_callback, .after_work_callback = after_work_callback, .user_data = user_data, }; tf_ssb_ref(ssb); int result = uv_queue_work(ssb->loop, &work->work, _tf_ssb_work_callback, _tf_ssb_after_work_callback); if (result) { _tf_ssb_connection_after_work_callback(&work->work, result); } } bool tf_ssb_is_room(tf_ssb_t* ssb) { return ssb->is_room; } void tf_ssb_set_is_room(tf_ssb_t* ssb, bool is_room) { ssb->is_room = is_room; } const char* tf_ssb_get_room_name(tf_ssb_t* ssb) { return ssb->room_name; } bool tf_ssb_is_replicator(tf_ssb_t* ssb) { return ssb->is_replicator; } void tf_ssb_set_is_replicator(tf_ssb_t* ssb, bool is_replicator) { ssb->is_replicator = is_replicator; } bool tf_ssb_is_peer_exchange(tf_ssb_t* ssb) { return ssb->is_peer_exchange; } void tf_ssb_set_is_peer_exchange(tf_ssb_t* ssb, bool is_peer_exchange) { ssb->is_peer_exchange = is_peer_exchange; } void tf_ssb_set_room_name(tf_ssb_t* ssb, const char* room_name) { tf_free(ssb->room_name); ssb->room_name = tf_strdup(room_name); } typedef struct _update_settings_t { bool is_room; bool is_replicator; bool is_peer_exchange; char seeds_host[256]; char room_name[1024]; } update_settings_t; static bool _get_global_setting_string(tf_ssb_t* ssb, const char* name, char* out_value, size_t size) { bool result = false; sqlite3* db = tf_ssb_acquire_db_reader(ssb); sqlite3_stmt* statement; if (sqlite3_prepare(db, "SELECT json_extract(value, '$.' || ?) FROM properties WHERE id = 'core' AND key = 'settings'", -1, &statement, NULL) == SQLITE_OK) { if (sqlite3_bind_text(statement, 1, name, -1, NULL) == SQLITE_OK) { if (sqlite3_step(statement) == SQLITE_ROW) { snprintf(out_value, size, "%s", sqlite3_column_text(statement, 0)); result = true; } } sqlite3_finalize(statement); } else { tf_printf("prepare failed: %s\n", sqlite3_errmsg(db)); } tf_ssb_release_db_reader(ssb, db); return result; } static bool _get_global_setting_bool(tf_ssb_t* ssb, const char* name, bool default_value) { bool result = default_value; sqlite3* db = tf_ssb_acquire_db_reader(ssb); sqlite3_stmt* statement; if (sqlite3_prepare(db, "SELECT json_extract(value, '$.' || ?) FROM properties WHERE id = 'core' AND key = 'settings'", -1, &statement, NULL) == SQLITE_OK) { if (sqlite3_bind_text(statement, 1, name, -1, NULL) == SQLITE_OK) { if (sqlite3_step(statement) == SQLITE_ROW) { result = sqlite3_column_int(statement, 0) != 0; } } sqlite3_finalize(statement); } else { tf_printf("prepare failed: %s\n", sqlite3_errmsg(db)); } tf_ssb_release_db_reader(ssb, db); return result; } static void _tf_ssb_update_settings_work(tf_ssb_t* ssb, void* user_data) { update_settings_t* update = user_data; update->is_room = _get_global_setting_bool(ssb, "room", true); update->is_replicator = _get_global_setting_bool(ssb, "replicator", true); update->is_peer_exchange = _get_global_setting_bool(ssb, "peer_exchange", true); _get_global_setting_string(ssb, "room_name", update->room_name, sizeof(update->room_name)); _get_global_setting_string(ssb, "seeds_host", update->seeds_host, sizeof(update->seeds_host)); } static void _tf_ssb_update_settings_after_work(tf_ssb_t* ssb, int result, void* user_data) { update_settings_t* update = user_data; tf_ssb_set_is_room(ssb, update->is_room); tf_ssb_set_room_name(ssb, update->room_name); tf_ssb_set_is_peer_exchange(ssb, update->is_peer_exchange); tf_ssb_set_is_replicator(ssb, update->is_replicator); snprintf(ssb->seeds_host, sizeof(ssb->seeds_host), "%s", update->seeds_host); _tf_ssb_start_update_settings(ssb); tf_free(update); } static void _tf_ssb_start_update_settings_timer(tf_ssb_t* ssb, void* user_data) { update_settings_t* update = tf_malloc(sizeof(update_settings_t)); *update = (update_settings_t) { 0 }; tf_ssb_run_work(ssb, _tf_ssb_update_settings_work, _tf_ssb_update_settings_after_work, update); } static void _tf_ssb_update_settings(tf_ssb_t* ssb) { update_settings_t* update = tf_malloc(sizeof(update_settings_t)); *update = (update_settings_t) { 0 }; _tf_ssb_update_settings_work(ssb, update); _tf_ssb_update_settings_after_work(ssb, 0, update); } static void _tf_ssb_start_update_settings(tf_ssb_t* ssb) { tf_ssb_schedule_work(ssb, 5000, _tf_ssb_start_update_settings_timer, NULL); } void tf_ssb_set_verbose(tf_ssb_t* ssb, bool verbose) { ssb->verbose = verbose; } static void _tf_ssb_scheduled_timer(uv_timer_t* handle) { tf_ssb_timer_t* timer = handle->data; timer->callback(timer->ssb, timer->user_data); for (int i = 0; i < timer->ssb->timers_count; i++) { if (timer->ssb->timers[i] == timer) { timer->ssb->timers[i] = timer->ssb->timers[--timer->ssb->timers_count]; break; } } uv_close((uv_handle_t*)handle, _tf_ssb_on_timer_close); } void tf_ssb_schedule_work(tf_ssb_t* ssb, int delay_ms, void (*callback)(tf_ssb_t* ssb, void* user_data), void* user_data) { ssb->timers = tf_resize_vec(ssb->timers, sizeof(uv_timer_t*) * (ssb->timers_count + 1)); tf_ssb_timer_t* timer = tf_malloc(sizeof(tf_ssb_timer_t)); *timer = (tf_ssb_timer_t) { .ssb = ssb, .timer = { .data = timer, }, .callback = callback, .user_data = user_data, }; ssb->timers[ssb->timers_count++] = timer; uv_timer_init(ssb->loop, &timer->timer); uv_timer_start(&timer->timer, _tf_ssb_scheduled_timer, delay_ms, 0); uv_unref((uv_handle_t*)&timer->timer); } bool tf_ssb_hmacsha256_verify(const char* public_key, const void* payload, size_t payload_length, const char* signature, bool signature_is_urlb64) { bool result = false; const char* public_key_start = public_key && *public_key == '@' ? public_key + 1 : public_key; const char* public_key_end = public_key_start ? strstr(public_key_start, ".ed25519") : NULL; if (public_key_start && !public_key_end) { public_key_end = public_key_start + strlen(public_key_start); } uint8_t bin_public_key[crypto_sign_PUBLICKEYBYTES] = { 0 }; if (tf_base64_decode(public_key_start, public_key_end - public_key_start, bin_public_key, sizeof(bin_public_key)) > 0) { uint8_t bin_signature[crypto_sign_BYTES] = { 0 }; if (sodium_base642bin(bin_signature, sizeof(bin_signature), signature, strlen(signature), NULL, NULL, NULL, signature_is_urlb64 ? sodium_base64_VARIANT_URLSAFE_NO_PADDING : sodium_base64_VARIANT_ORIGINAL) == 0) { if (crypto_sign_verify_detached(bin_signature, (const uint8_t*)payload, payload_length, bin_public_key) == 0) { result = true; } } } return result; } JSValue tf_ssb_connection_requests_to_object(tf_ssb_connection_t* connection) { JSContext* context = connection->ssb->context; JSValue object = JS_NewArray(context); uint64_t now_ms = uv_now(connection->ssb->loop); for (int i = 0; i < connection->requests_count; i++) { JSValue request = JS_NewObject(context); JS_SetPropertyStr(context, request, "name", JS_NewString(context, connection->requests[i].name)); JS_SetPropertyStr(context, request, "request_number", JS_NewInt32(context, connection->requests[i].request_number)); JS_SetPropertyStr(context, request, "active", JS_NewBool(context, (now_ms - connection->requests[i].last_active) < k_rpc_active_ms)); JS_SetPropertyUint32(context, object, i, request); } return object; } void tf_ssb_connection_adjust_read_backpressure(tf_ssb_connection_t* connection, int delta) { const int k_threshold = 256; int old_pressure = connection->read_back_pressure; connection->read_back_pressure += delta; if (!connection->closing) { if (old_pressure < k_threshold && connection->read_back_pressure >= k_threshold) { _tf_ssb_connection_read_stop(connection); } else if (old_pressure >= k_threshold && connection->read_back_pressure < k_threshold) { _tf_ssb_connection_read_start(connection); } } connection->ref_count += delta; if (connection->ref_count == 0 && connection->closing) { _tf_ssb_connection_destroy(connection, "backpressure released"); } } void tf_ssb_connection_adjust_write_count(tf_ssb_connection_t* connection, int delta) { connection->active_write_count += delta; _tf_ssb_connection_dispatch_scheduled(connection); } void tf_ssb_sync_start(tf_ssb_t* ssb) { tf_ssb_connections_sync_start(ssb->connections_tracker); } bool tf_ssb_tunnel_create(tf_ssb_t* ssb, const char* portal_id, const char* target_id, int connect_flags) { tf_ssb_connection_t* connection = tf_ssb_connection_get(ssb, portal_id); if (connection) { JSContext* context = ssb->context; int32_t request_number = tf_ssb_connection_next_request_number(connection); JSValue message = JS_NewObject(context); JSValue name = JS_NewArray(context); JS_SetPropertyUint32(context, name, 0, JS_NewString(context, "tunnel")); JS_SetPropertyUint32(context, name, 1, JS_NewString(context, "connect")); JS_SetPropertyStr(context, message, "name", name); JSValue arg = JS_NewObject(context); JS_SetPropertyStr(context, arg, "portal", JS_NewString(context, portal_id)); JS_SetPropertyStr(context, arg, "target", JS_NewString(context, target_id)); JSValue args = JS_NewArray(context); JS_SetPropertyUint32(context, args, 0, arg); JS_SetPropertyStr(context, message, "args", args); JS_SetPropertyStr(context, message, "type", JS_NewString(context, "duplex")); tf_ssb_connection_rpc_send_json(connection, k_ssb_rpc_flag_stream | k_ssb_rpc_flag_new_request, request_number, "tunnel.connect", message, NULL, NULL, NULL); JS_FreeValue(context, message); tf_ssb_connection_tunnel_create(ssb, portal_id, request_number, target_id, connect_flags); } return connection != NULL; } int tf_ssb_connection_get_flags(tf_ssb_connection_t* connection) { return connection->flags; }