#include "ssb.rpc.h" #include "log.h" #include "mem.h" #include "ssb.db.h" #include "ssb.h" #include "util.js.h" #include "sqlite3.h" #include "uv.h" #include #include #include #include static void _tf_ssb_connection_send_history_stream( tf_ssb_connection_t* connection, int32_t request_number, const char* author, int64_t sequence, bool keys, bool live, bool end_request); static void _tf_ssb_rpc_send_peers_exchange(tf_ssb_connection_t* connection); static void _tf_ssb_rpc_start_delete_blobs(tf_ssb_t* ssb, int delay_ms); static void _tf_ssb_rpc_start_delete_feeds(tf_ssb_t* ssb, int delay_ms); static int64_t _get_global_setting_int64(tf_ssb_t* ssb, const char* name, int64_t default_value) { int64_t result = default_value; sqlite3* db = tf_ssb_acquire_db_reader(ssb); sqlite3_stmt* statement; if (sqlite3_prepare(db, "SELECT json_extract(value, '$.' || ?) FROM properties WHERE id = 'core' AND key = 'settings'", -1, &statement, NULL) == SQLITE_OK) { if (sqlite3_bind_text(statement, 1, name, -1, NULL) == SQLITE_OK) { if (sqlite3_step(statement) == SQLITE_ROW && sqlite3_column_type(statement, 0) != SQLITE_NULL) { result = sqlite3_column_int64(statement, 0); } } sqlite3_finalize(statement); } else { tf_printf("prepare failed: %s\n", sqlite3_errmsg(db)); } tf_ssb_release_db_reader(ssb, db); return result; } static bool _get_global_setting_bool(tf_ssb_t* ssb, const char* name, bool default_value) { bool result = default_value; sqlite3* db = tf_ssb_acquire_db_reader(ssb); sqlite3_stmt* statement; if (sqlite3_prepare(db, "SELECT json_extract(value, '$.' || ?) FROM properties WHERE id = 'core' AND key = 'settings'", -1, &statement, NULL) == SQLITE_OK) { if (sqlite3_bind_text(statement, 1, name, -1, NULL) == SQLITE_OK) { if (sqlite3_step(statement) == SQLITE_ROW) { result = sqlite3_column_int(statement, 0) != 0; } } sqlite3_finalize(statement); } else { tf_printf("prepare failed: %s\n", sqlite3_errmsg(db)); } tf_ssb_release_db_reader(ssb, db); return result; } static void _tf_ssb_rpc_gossip_ping_callback( tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) { char buffer[256]; snprintf(buffer, sizeof(buffer), "%" PRId64, (int64_t)time(NULL) * 1000); tf_ssb_connection_rpc_send(connection, flags, -request_number, NULL, (const uint8_t*)buffer, strlen(buffer), NULL, NULL, NULL); if (flags & k_ssb_rpc_flag_end_error) { tf_ssb_connection_remove_request(connection, request_number); } } static void _tf_ssb_rpc_gossip_ping(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) { tf_ssb_connection_add_request(connection, -request_number, "gossip.ping", _tf_ssb_rpc_gossip_ping_callback, NULL, NULL, NULL); } static void _tf_ssb_rpc_blobs_get_callback( tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) { } typedef struct _blobs_get_work_t { int64_t request_number; char id[k_id_base64_len]; bool found; uint8_t* blob; size_t size; } blobs_get_work_t; static void _tf_ssb_rpc_blobs_get_work(tf_ssb_connection_t* connection, void* user_data) { blobs_get_work_t* work = user_data; tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection); work->found = tf_ssb_db_blob_get(ssb, work->id, &work->blob, &work->size); } static void _tf_ssb_rpc_blobs_get_after_work(tf_ssb_connection_t* connection, int status, void* user_data) { blobs_get_work_t* work = user_data; if (work->found) { const size_t k_send_max = 8192; for (size_t offset = 0; offset < work->size; offset += k_send_max) { if (!tf_ssb_connection_rpc_send(connection, k_ssb_rpc_flag_binary | k_ssb_rpc_flag_stream, -work->request_number, NULL, work->blob + offset, offset + k_send_max <= work->size ? k_send_max : (work->size - offset), NULL, NULL, NULL)) { break; } } tf_free(work->blob); } tf_ssb_connection_rpc_send(connection, k_ssb_rpc_flag_json | k_ssb_rpc_flag_end_error | k_ssb_rpc_flag_stream, -work->request_number, NULL, (const uint8_t*)(work->found ? "true" : "false"), strlen(work->found ? "true" : "false"), NULL, NULL, NULL); tf_free(work); } static void _tf_ssb_rpc_blobs_get(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) { if (flags & k_ssb_rpc_flag_end_error) { tf_ssb_connection_remove_request(connection, -request_number); return; } tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection); if (!tf_ssb_is_replicator(ssb)) { tf_ssb_connection_rpc_send_error_method_not_allowed(connection, flags, -request_number, "blobs.get"); return; } tf_ssb_connection_add_request(connection, -request_number, "blobs.get", _tf_ssb_rpc_blobs_get_callback, NULL, NULL, NULL); JSContext* context = tf_ssb_connection_get_context(connection); JSValue ids = JS_GetPropertyStr(context, args, "args"); int length = tf_util_get_length(context, ids); for (int i = 0; i < length; i++) { JSValue arg = JS_GetPropertyUint32(context, ids, i); const char* id = NULL; if (JS_IsString(arg)) { id = JS_ToCString(context, arg); } else { JSValue key = JS_GetPropertyStr(context, arg, "key"); id = JS_ToCString(context, key); JS_FreeValue(context, key); } blobs_get_work_t* work = tf_malloc(sizeof(blobs_get_work_t)); *work = (blobs_get_work_t) { .request_number = request_number, }; snprintf(work->id, sizeof(work->id), "%s", id); tf_ssb_connection_run_work(connection, _tf_ssb_rpc_blobs_get_work, _tf_ssb_rpc_blobs_get_after_work, work); JS_FreeCString(context, id); JS_FreeValue(context, arg); } JS_FreeValue(context, ids); } typedef struct _blobs_has_work_t { int64_t request_number; char id[k_id_base64_len]; bool found; } blobs_has_work_t; static void _tf_ssb_rpc_blobs_has_work(tf_ssb_connection_t* connection, void* user_data) { blobs_has_work_t* work = user_data; tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection); work->found = tf_ssb_db_blob_has(ssb, work->id); } static void _tf_ssb_rpc_blobs_has_after_work(tf_ssb_connection_t* connection, int status, void* user_data) { blobs_has_work_t* work = user_data; tf_ssb_connection_rpc_send(connection, k_ssb_rpc_flag_json | k_ssb_rpc_flag_end_error | k_ssb_rpc_flag_stream, -work->request_number, NULL, (const uint8_t*)(work->found ? "true" : "false"), strlen(work->found ? "true" : "false"), NULL, NULL, NULL); tf_free(work); } static void _tf_ssb_rpc_blobs_has(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) { tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection); if (!tf_ssb_is_replicator(ssb)) { tf_ssb_connection_rpc_send_error_method_not_allowed(connection, flags, -request_number, "blobs.has"); return; } JSContext* context = tf_ssb_connection_get_context(connection); JSValue ids = JS_GetPropertyStr(context, args, "args"); JSValue id = JS_GetPropertyUint32(context, ids, 0); const char* id_str = JS_ToCString(context, id); blobs_has_work_t* work = tf_malloc(sizeof(blobs_has_work_t)); *work = (blobs_has_work_t) { .request_number = request_number, }; snprintf(work->id, sizeof(work->id), "%s", id_str); tf_ssb_connection_run_work(connection, _tf_ssb_rpc_blobs_has_work, _tf_ssb_rpc_blobs_has_after_work, work); JS_FreeCString(context, id_str); JS_FreeValue(context, id); JS_FreeValue(context, ids); } static void _tf_ssb_rpc_blob_wants_added_callback(tf_ssb_t* ssb, const char* id, void* user_data) { tf_ssb_connection_t* connection = user_data; tf_ssb_blob_wants_t* blob_wants = tf_ssb_connection_get_blob_wants_state(connection); JSContext* context = tf_ssb_get_context(ssb); JSValue message = JS_NewObject(context); JS_SetPropertyStr(context, message, id, JS_NewInt64(context, -1)); tf_ssb_connection_rpc_send_json(connection, k_ssb_rpc_flag_stream, -blob_wants->request_number, NULL, message, NULL, NULL, NULL); JS_FreeValue(context, message); } typedef struct _blob_wants_work_t { char out_id[32][k_blob_id_len]; int out_id_count; } blob_wants_work_t; static void _tf_ssb_request_blob_wants_work(tf_ssb_connection_t* connection, void* user_data) { blob_wants_work_t* work = user_data; tf_ssb_blob_wants_t* blob_wants = tf_ssb_connection_get_blob_wants_state(connection); tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection); int64_t age = _get_global_setting_int64(ssb, "blob_fetch_age_seconds", -1); int64_t timestamp = -1; if (age == 0) { /* Don't fetch any blobs. */ return; } else if (age > 0) { int64_t now = (int64_t)time(NULL) * 1000ULL; timestamp = now - age * 1000ULL; } sqlite3* db = tf_ssb_acquire_db_reader(ssb); sqlite3_stmt* statement; if (sqlite3_prepare(db, "SELECT id FROM blob_wants_view WHERE id > ? AND timestamp > ? ORDER BY id LIMIT ?", -1, &statement, NULL) == SQLITE_OK) { if (sqlite3_bind_text(statement, 1, blob_wants->last_id, -1, NULL) == SQLITE_OK && sqlite3_bind_int64(statement, 2, timestamp) == SQLITE_OK && sqlite3_bind_int(statement, 3, tf_countof(work->out_id)) == SQLITE_OK) { while (sqlite3_step(statement) == SQLITE_ROW) { snprintf(work->out_id[work->out_id_count], sizeof(work->out_id[work->out_id_count]), "%s", (const char*)sqlite3_column_text(statement, 0)); work->out_id_count++; } } sqlite3_finalize(statement); } else { tf_printf("prepare failed: %s\n", sqlite3_errmsg(db)); } tf_ssb_release_db_reader(ssb, db); } static void _tf_ssb_request_blob_wants_after_work(tf_ssb_connection_t* connection, int result, void* user_data) { blob_wants_work_t* work = user_data; if (!tf_ssb_is_shutting_down(tf_ssb_connection_get_ssb(connection))) { JSContext* context = tf_ssb_connection_get_context(connection); tf_ssb_blob_wants_t* blob_wants = tf_ssb_connection_get_blob_wants_state(connection); bool send_failed = false; for (int i = 0; i < work->out_id_count && !send_failed; i++) { JSValue message = JS_NewObject(context); JS_SetPropertyStr(context, message, work->out_id[i], JS_NewInt32(context, -1)); send_failed = !tf_ssb_connection_rpc_send_json(connection, k_ssb_rpc_flag_stream, -blob_wants->request_number, NULL, message, NULL, NULL, NULL); JS_FreeValue(context, message); blob_wants->wants_sent++; } if (work->out_id_count) { snprintf(blob_wants->last_id, sizeof(blob_wants->last_id), "%s", work->out_id[work->out_id_count - 1]); } } tf_free(work); } static void _tf_ssb_rpc_request_more_blobs(tf_ssb_connection_t* connection) { blob_wants_work_t* work = tf_malloc(sizeof(blob_wants_work_t)); memset(work, 0, sizeof(*work)); tf_ssb_connection_run_work(connection, _tf_ssb_request_blob_wants_work, _tf_ssb_request_blob_wants_after_work, work); } static void _tf_ssb_rpc_blobs_createWants( tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) { tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection); if (!tf_ssb_is_replicator(ssb)) { tf_ssb_connection_rpc_send_error_method_not_allowed(connection, flags, -request_number, "blobs.createWants"); return; } tf_ssb_blob_wants_t* blob_wants = tf_ssb_connection_get_blob_wants_state(connection); tf_ssb_add_blob_want_added_callback(ssb, _tf_ssb_rpc_blob_wants_added_callback, NULL, connection); blob_wants->request_number = request_number; _tf_ssb_rpc_request_more_blobs(connection); } typedef struct tunnel_t { tf_ssb_connection_t* connection; int32_t request_number; } tunnel_t; static void _tf_ssb_rpc_tunnel_callback(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) { tunnel_t* tun = user_data; if (flags & k_ssb_rpc_flag_end_error) { tf_ssb_connection_remove_request(connection, request_number); tf_ssb_connection_close(tun->connection); } else { tf_ssb_connection_rpc_send(tun->connection, flags, tun->request_number, NULL, message, size, NULL, NULL, NULL); } } static void _tf_ssb_rpc_tunnel_cleanup(tf_ssb_t* ssb, void* user_data) { tf_free(user_data); } static void _tf_ssb_rpc_tunnel_connect(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) { tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection); if (!tf_ssb_is_room(ssb)) { tf_ssb_connection_rpc_send_error_method_not_allowed(connection, flags, -request_number, "tunnel.connect"); return; } JSContext* context = tf_ssb_connection_get_context(connection); JSValue arg_array = JS_GetPropertyStr(context, args, "args"); JSValue arg = JS_GetPropertyUint32(context, arg_array, 0); JSValue origin = JS_GetPropertyStr(context, arg, "origin"); JSValue portal = JS_GetPropertyStr(context, arg, "portal"); JSValue target = JS_GetPropertyStr(context, arg, "target"); if (JS_IsUndefined(origin) && !JS_IsUndefined(portal) && !JS_IsUndefined(target)) { const char* target_str = JS_ToCString(context, target); tf_ssb_connection_t* target_connection = tf_ssb_connection_get(ssb, target_str); if (target_connection) { int32_t tunnel_request_number = tf_ssb_connection_next_request_number(target_connection); const char* portal_str = JS_ToCString(context, portal); JSValue message = JS_NewObject(context); JSValue name = JS_NewArray(context); JS_SetPropertyUint32(context, name, 0, JS_NewString(context, "tunnel")); JS_SetPropertyUint32(context, name, 1, JS_NewString(context, "connect")); JS_SetPropertyStr(context, message, "name", name); JSValue arg_obj = JS_NewObject(context); char origin_str[k_id_base64_len] = ""; tf_ssb_connection_get_id(connection, origin_str, sizeof(origin_str)); JS_SetPropertyStr(context, arg_obj, "origin", JS_NewString(context, origin_str)); JS_SetPropertyStr(context, arg_obj, "portal", JS_NewString(context, portal_str)); JS_SetPropertyStr(context, arg_obj, "target", JS_NewString(context, target_str)); JSValue arg_array = JS_NewArray(context); JS_SetPropertyUint32(context, arg_array, 0, arg_obj); JS_SetPropertyStr(context, message, "args", arg_array); JS_SetPropertyStr(context, message, "type", JS_NewString(context, "duplex")); tf_ssb_connection_rpc_send_json( target_connection, k_ssb_rpc_flag_stream | k_ssb_rpc_flag_new_request, tunnel_request_number, "tunnel.connect", message, NULL, NULL, NULL); tunnel_t* data0 = tf_malloc(sizeof(tunnel_t)); *data0 = (tunnel_t) { .connection = target_connection, .request_number = tunnel_request_number, }; tunnel_t* data1 = tf_malloc(sizeof(tunnel_t)); *data1 = (tunnel_t) { .connection = connection, .request_number = -request_number, }; tf_ssb_connection_add_request(connection, -request_number, "tunnel.connect", _tf_ssb_rpc_tunnel_callback, _tf_ssb_rpc_tunnel_cleanup, data0, target_connection); tf_ssb_connection_add_request(target_connection, tunnel_request_number, "tunnel.connect", _tf_ssb_rpc_tunnel_callback, _tf_ssb_rpc_tunnel_cleanup, data1, connection); JS_FreeValue(context, message); JS_FreeCString(context, portal_str); } else { tf_ssb_connection_rpc_send_error(connection, flags, -request_number, "Connection not found."); } JS_FreeCString(context, target_str); } else if (!JS_IsUndefined(origin) && !JS_IsUndefined(portal) && !JS_IsUndefined(target)) { const char* origin_str = JS_ToCString(context, origin); const char* portal_str = JS_ToCString(context, portal); const char* target_str = JS_ToCString(context, target); tf_ssb_connection_tunnel_create(ssb, portal_str, -request_number, origin_str, 0); JS_FreeCString(context, origin_str); JS_FreeCString(context, portal_str); JS_FreeCString(context, target_str); } JS_FreeValue(context, origin); JS_FreeValue(context, portal); JS_FreeValue(context, target); JS_FreeValue(context, arg); JS_FreeValue(context, arg_array); } static void _tf_ssb_rpc_room_meta(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) { tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection); JSContext* context = tf_ssb_get_context(ssb); JSValue response = JS_FALSE; if (tf_ssb_is_room(ssb)) { response = JS_NewObject(context); JS_SetPropertyStr(context, response, "name", JS_NewString(context, tf_ssb_get_room_name(ssb))); JS_SetPropertyStr(context, response, "membership", JS_FALSE); JSValue features = JS_NewArray(context); JS_SetPropertyUint32(context, features, 0, JS_NewString(context, "tunnel")); JS_SetPropertyUint32(context, features, 1, JS_NewString(context, "room1")); JS_SetPropertyUint32(context, features, 2, JS_NewString(context, "room2")); JS_SetPropertyStr(context, response, "features", features); } tf_ssb_connection_rpc_send_json(connection, flags, -request_number, NULL, response, NULL, NULL, NULL); JS_FreeValue(context, response); } static void _tf_ssb_rpc_room_attendants(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) { tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection); if (!tf_ssb_is_room(ssb)) { tf_ssb_connection_rpc_send_error_method_not_allowed(connection, flags, -request_number, "room.attendants"); return; } JSContext* context = tf_ssb_get_context(ssb); JSValue joined = JS_NewObject(context); JS_SetPropertyStr(context, joined, "type", JS_NewString(context, "joined")); char my_id[k_id_base64_len] = ""; if (tf_ssb_connection_get_id(connection, my_id, sizeof(my_id))) { JS_SetPropertyStr(context, joined, "id", JS_NewString(context, my_id)); } JSValue state = JS_NewObject(context); JS_SetPropertyStr(context, state, "type", JS_NewString(context, "state")); JSValue ids = JS_NewArray(context); int id_count = 0; tf_ssb_connection_t* connections[1024]; int count = tf_ssb_get_connections(ssb, connections, tf_countof(connections)); for (int i = 0; i < count; i++) { char id[k_id_base64_len] = { 0 }; if (tf_ssb_connection_is_attendant(connections[i]) && tf_ssb_connection_get_id(connections[i], id, sizeof(id))) { JS_SetPropertyUint32(context, ids, id_count++, JS_NewString(context, id)); tf_ssb_connection_rpc_send_json(connections[i], flags, -tf_ssb_connection_get_attendant_request_number(connections[i]), NULL, joined, NULL, NULL, NULL); } } JS_SetPropertyStr(context, state, "ids", ids); tf_ssb_connection_rpc_send_json(connection, flags, -request_number, NULL, state, NULL, NULL, NULL); JS_FreeValue(context, joined); JS_FreeValue(context, state); tf_ssb_connection_set_attendant(connection, true, request_number); } typedef struct _blobs_get_t { char id[k_blob_id_len]; size_t received; size_t expected_size; bool done; bool storing; tf_ssb_t* ssb; tf_ssb_connection_t* connection; uint8_t buffer[]; } blobs_get_t; static void _tf_ssb_rpc_blob_store_callback(const char* id, bool is_new, void* user_data) { blobs_get_t* get = user_data; get->storing = false; tf_ssb_connection_adjust_read_backpressure(get->connection, -1); if (get->done) { tf_free(get); } } static void _tf_ssb_rpc_connection_blobs_get_callback( tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) { tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection); JSContext* context = tf_ssb_get_context(ssb); blobs_get_t* get = user_data; if ((flags & k_ssb_rpc_mask_type) == k_ssb_rpc_flag_binary && size > 0 && get->received + size <= get->expected_size) { memcpy(get->buffer + get->received, message, size); get->received += size; } else if ((flags & k_ssb_rpc_mask_type) == k_ssb_rpc_flag_json) { if (JS_ToBool(context, args)) { get->storing = true; tf_ssb_connection_adjust_read_backpressure(connection, 1); tf_ssb_db_blob_store_async(ssb, get->buffer, get->received, _tf_ssb_rpc_blob_store_callback, get); } /* TODO: Should we send the response in the callback? */ bool stored = true; tf_ssb_connection_rpc_send(connection, k_ssb_rpc_flag_json | k_ssb_rpc_flag_stream | k_ssb_rpc_flag_end_error, -request_number, NULL, (const uint8_t*)(stored ? "true" : "false"), strlen(stored ? "true" : "false"), NULL, NULL, NULL); } } static void _tf_ssb_rpc_connection_blobs_get_cleanup(tf_ssb_t* ssb, void* user_data) { blobs_get_t* get = user_data; get->done = true; if (!get->storing) { tf_free(get); } } static void _tf_ssb_rpc_connection_blobs_get(tf_ssb_connection_t* connection, const char* blob_id, size_t size) { blobs_get_t* get = tf_malloc(sizeof(blobs_get_t) + size); *get = (blobs_get_t) { .ssb = tf_ssb_connection_get_ssb(connection), .connection = connection, .expected_size = size }; snprintf(get->id, sizeof(get->id), "%s", blob_id); memset(get->buffer, 0, size); tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection); JSContext* context = tf_ssb_get_context(ssb); JSValue message = JS_NewObject(context); JSValue name = JS_NewArray(context); JS_SetPropertyUint32(context, name, 0, JS_NewString(context, "blobs")); JS_SetPropertyUint32(context, name, 1, JS_NewString(context, "get")); JS_SetPropertyStr(context, message, "name", name); JS_SetPropertyStr(context, message, "type", JS_NewString(context, "source")); JSValue args = JS_NewArray(context); JS_SetPropertyUint32(context, args, 0, JS_NewString(context, blob_id)); JS_SetPropertyStr(context, message, "args", args); tf_ssb_connection_rpc_send_json(connection, k_ssb_rpc_flag_stream | k_ssb_rpc_flag_new_request, tf_ssb_connection_next_request_number(connection), "blobs.get", message, _tf_ssb_rpc_connection_blobs_get_callback, _tf_ssb_rpc_connection_blobs_get_cleanup, get); JS_FreeValue(context, message); } typedef struct _blob_create_wants_work_t { tf_ssb_connection_t* connection; char blob_id[k_blob_id_len]; bool out_result; int64_t size; size_t out_size; } blob_create_wants_work_t; static void _tf_ssb_rpc_connection_blobs_create_wants_work(tf_ssb_connection_t* connection, void* user_data) { blob_create_wants_work_t* work = user_data; tf_ssb_t* ssb = tf_ssb_connection_get_ssb(work->connection); work->out_result = tf_ssb_db_blob_get(ssb, work->blob_id, NULL, &work->out_size); } static void _tf_ssb_rpc_connection_blobs_create_wants_after_work(tf_ssb_connection_t* connection, int result, void* user_data) { blob_create_wants_work_t* work = user_data; tf_ssb_t* ssb = tf_ssb_connection_get_ssb(work->connection); tf_ssb_blob_wants_t* blob_wants = tf_ssb_connection_get_blob_wants_state(connection); JSContext* context = tf_ssb_get_context(ssb); if (work->out_result) { JSValue message = JS_NewObject(context); JS_SetPropertyStr(context, message, work->blob_id, JS_NewInt64(context, work->out_size)); tf_ssb_connection_rpc_send_json(work->connection, k_ssb_rpc_flag_stream, -blob_wants->request_number, NULL, message, NULL, NULL, NULL); JS_FreeValue(context, message); } else if (work->size == -1LL) { JSValue message = JS_NewObject(context); JS_SetPropertyStr(context, message, work->blob_id, JS_NewInt64(context, -2)); tf_ssb_connection_rpc_send_json(work->connection, k_ssb_rpc_flag_stream, -blob_wants->request_number, NULL, message, NULL, NULL, NULL); JS_FreeValue(context, message); } tf_free(work); } static void _tf_ssb_rpc_connection_blobs_createWants_callback( tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) { tf_ssb_blob_wants_t* blob_wants = tf_ssb_connection_get_blob_wants_state(connection); if (!blob_wants) { return; } JSContext* context = tf_ssb_connection_get_context(connection); JSValue name = JS_GetPropertyStr(context, args, "name"); if (!JS_IsUndefined(name)) { /* { name: "Error" } */ tf_ssb_connection_remove_request(connection, -request_number); JS_FreeValue(context, name); return; } JSPropertyEnum* ptab = NULL; uint32_t plen = 0; if (JS_GetOwnPropertyNames(context, &ptab, &plen, args, JS_GPN_STRING_MASK) == 0) { for (uint32_t i = 0; i < plen; ++i) { JSValue key = JS_AtomToString(context, ptab[i].atom); JSPropertyDescriptor desc; JSValue key_value = JS_NULL; if (JS_GetOwnProperty(context, &desc, args, ptab[i].atom) == 1) { key_value = desc.value; JS_FreeValue(context, desc.setter); JS_FreeValue(context, desc.getter); } const char* blob_id = JS_ToCString(context, key); int64_t size = 0; JS_ToInt64(context, &size, key_value); if (--blob_wants->wants_sent == 0) { _tf_ssb_rpc_request_more_blobs(connection); } if (size < 0) { blob_create_wants_work_t* work = tf_malloc(sizeof(blob_create_wants_work_t)); *work = (blob_create_wants_work_t) { .connection = connection, .size = size, }; snprintf(work->blob_id, sizeof(work->blob_id), "%s", blob_id); tf_ssb_connection_run_work(connection, _tf_ssb_rpc_connection_blobs_create_wants_work, _tf_ssb_rpc_connection_blobs_create_wants_after_work, work); } else { _tf_ssb_rpc_connection_blobs_get(connection, blob_id, size); } JS_FreeCString(context, blob_id); JS_FreeValue(context, key); JS_FreeValue(context, key_value); } for (uint32_t i = 0; i < plen; ++i) { JS_FreeAtom(context, ptab[i].atom); } js_free(context, ptab); } } static void _tf_ssb_rpc_connection_room_attendants_callback( tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) { tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection); JSContext* context = tf_ssb_get_context(ssb); JSValue type = JS_GetPropertyStr(context, args, "type"); const char* type_string = JS_ToCString(context, type); if (!type_string) { tf_ssb_connection_rpc_send_error(connection, flags, -request_number, "Missing type."); } else if (strcmp(type_string, "state") == 0) { tf_ssb_connection_clear_room_attendants(connection); JSValue ids = JS_GetPropertyStr(context, args, "ids"); int length = tf_util_get_length(context, ids); for (int i = 0; i < length; i++) { JSValue id = JS_GetPropertyUint32(context, ids, i); const char* id_string = JS_ToCString(context, id); if (id_string) { tf_ssb_connection_add_room_attendant(connection, id_string); } JS_FreeCString(context, id_string); JS_FreeValue(context, id); } JS_FreeValue(context, ids); } else if (strcmp(type_string, "joined") == 0) { JSValue id = JS_GetPropertyStr(context, args, "id"); const char* id_string = JS_ToCString(context, id); if (id_string) { tf_ssb_connection_add_room_attendant(connection, id_string); } JS_FreeCString(context, id_string); JS_FreeValue(context, id); } else if (strcmp(type_string, "left") == 0) { JSValue id = JS_GetPropertyStr(context, args, "id"); const char* id_string = JS_ToCString(context, id); if (id_string) { tf_ssb_connection_remove_room_attendant(connection, id_string); } JS_FreeCString(context, id_string); JS_FreeValue(context, id); } else { char buffer[256]; snprintf(buffer, sizeof(buffer), "Unexpected room.attendants response type: '%s'.", type_string); tf_ssb_connection_rpc_send_error(connection, flags, -request_number, buffer); } JS_FreeCString(context, type_string); JS_FreeValue(context, type); } static bool _is_error(JSContext* context, JSValue message) { JSValue name = JS_GetPropertyStr(context, message, "name"); const char* name_string = JS_ToCString(context, name); bool is_error = false; if (name_string && strcmp(name_string, "Error") == 0) { is_error = true; } JS_FreeCString(context, name_string); JS_FreeValue(context, name); return is_error; } static void _tf_ssb_rpc_connection_tunnel_isRoom_callback( tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) { tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection); JSContext* context = tf_ssb_get_context(ssb); if (_is_error(context, args)) { return; } if (JS_IsObject(args)) { JSValue message = JS_NewObject(context); JSValue name = JS_NewArray(context); JS_SetPropertyUint32(context, name, 0, JS_NewString(context, "room")); JS_SetPropertyUint32(context, name, 1, JS_NewString(context, "attendants")); JS_SetPropertyStr(context, message, "name", name); JS_SetPropertyStr(context, message, "type", JS_NewString(context, "source")); JS_SetPropertyStr(context, message, "args", JS_NewArray(context)); tf_ssb_connection_rpc_send_json(connection, k_ssb_rpc_flag_stream | k_ssb_rpc_flag_new_request, tf_ssb_connection_next_request_number(connection), "room.attendants", message, _tf_ssb_rpc_connection_room_attendants_callback, NULL, NULL); JS_FreeValue(context, message); } } typedef struct _tf_ssb_connection_send_history_stream_t { int32_t request_number; char author[k_id_base64_len]; int64_t sequence; bool keys; bool live; bool end_request; bool out_finished; int64_t out_max_sequence_seen; char** out_messages; int out_messages_count; } tf_ssb_connection_send_history_stream_t; static void _tf_ssb_connection_send_history_stream_work(tf_ssb_connection_t* connection, void* user_data) { tf_ssb_connection_send_history_stream_t* request = user_data; if (!tf_ssb_connection_is_connected(connection)) { return; } tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection); sqlite3* db = tf_ssb_acquire_db_reader(ssb); sqlite3_stmt* statement; const int k_max = 32; if (sqlite3_prepare(db, "SELECT previous, author, id, sequence, timestamp, hash, json(content), signature, flags FROM messages WHERE author = ?1 AND sequence > ?2 AND " "sequence " "< ?3 ORDER BY sequence", -1, &statement, NULL) == SQLITE_OK) { if (sqlite3_bind_text(statement, 1, request->author, -1, NULL) == SQLITE_OK && sqlite3_bind_int64(statement, 2, request->sequence) == SQLITE_OK && sqlite3_bind_int64(statement, 3, request->sequence + k_max) == SQLITE_OK) { JSMallocFunctions funcs = { 0 }; tf_get_js_malloc_functions(&funcs); JSRuntime* runtime = JS_NewRuntime2(&funcs, NULL); JSContext* context = JS_NewContext(runtime); while (sqlite3_step(statement) == SQLITE_ROW) { JSValue message = JS_UNDEFINED; request->out_max_sequence_seen = sqlite3_column_int64(statement, 3); JSValue formatted = tf_ssb_format_message(context, (const char*)sqlite3_column_text(statement, 0), (const char*)sqlite3_column_text(statement, 1), sqlite3_column_int64(statement, 3), sqlite3_column_double(statement, 4), (const char*)sqlite3_column_text(statement, 5), (const char*)sqlite3_column_text(statement, 6), (const char*)sqlite3_column_text(statement, 7), sqlite3_column_int(statement, 8)); if (request->keys) { message = JS_NewObject(context); JS_SetPropertyStr(context, message, "key", JS_NewString(context, (const char*)sqlite3_column_text(statement, 2))); JS_SetPropertyStr(context, message, "value", formatted); JS_SetPropertyStr(context, message, "timestamp", JS_NewString(context, (const char*)sqlite3_column_text(statement, 4))); } else { message = formatted; } JSValue json = JS_JSONStringify(context, message, JS_NULL, JS_NULL); size_t size = 0; const char* string = JS_ToCStringLen(context, &size, json); request->out_messages = tf_resize_vec(request->out_messages, sizeof(char*) * (request->out_messages_count + 1)); char* copy = tf_malloc(size + 1); memcpy(copy, string, size + 1); JS_FreeCString(context, string); request->out_messages[request->out_messages_count++] = copy; JS_FreeValue(context, json); JS_FreeValue(context, message); } JS_FreeContext(context); JS_FreeRuntime(runtime); } sqlite3_finalize(statement); } else { tf_printf("prepare failed: %s\n", sqlite3_errmsg(db)); } tf_ssb_release_db_reader(ssb, db); request->out_finished = request->out_max_sequence_seen != request->sequence + k_max - 1; } static void _tf_ssb_connection_send_history_stream_after_work(tf_ssb_connection_t* connection, int result, void* user_data) { tf_ssb_connection_send_history_stream_t* request = user_data; tf_ssb_connection_adjust_write_count(connection, -1); if (tf_ssb_connection_is_connected(connection)) { for (int i = 0; i < request->out_messages_count; i++) { if (!tf_ssb_connection_rpc_send(connection, k_ssb_rpc_flag_stream | k_ssb_rpc_flag_json, request->request_number, NULL, (const uint8_t*)request->out_messages[i], strlen(request->out_messages[i]), NULL, NULL, NULL)) { break; } } if (!request->out_finished) { _tf_ssb_connection_send_history_stream( connection, request->request_number, request->author, request->out_max_sequence_seen, request->keys, request->live, request->end_request); } else if (!request->live && request->end_request) { tf_ssb_connection_rpc_send(connection, k_ssb_rpc_flag_json, request->request_number, NULL, (const uint8_t*)"false", strlen("false"), NULL, NULL, NULL); } } for (int i = 0; i < request->out_messages_count; i++) { tf_free(request->out_messages[i]); } tf_free(request->out_messages); tf_free(request); } static void _tf_ssb_connection_send_history_stream_callback(tf_ssb_connection_t* connection, void* user_data) { tf_ssb_connection_adjust_write_count(connection, 1); if (tf_ssb_connection_is_connected(connection)) { tf_ssb_connection_run_work(connection, _tf_ssb_connection_send_history_stream_work, _tf_ssb_connection_send_history_stream_after_work, user_data); } else { _tf_ssb_connection_send_history_stream_after_work(connection, -1, user_data); } } static void _tf_ssb_connection_send_history_stream( tf_ssb_connection_t* connection, int32_t request_number, const char* author, int64_t sequence, bool keys, bool live, bool end_request) { tf_ssb_connection_send_history_stream_t* async = tf_malloc(sizeof(tf_ssb_connection_send_history_stream_t)); *async = (tf_ssb_connection_send_history_stream_t) { .request_number = request_number, .sequence = sequence, .keys = keys, .live = live, .end_request = end_request, }; snprintf(async->author, sizeof(async->author), "%s", author); tf_ssb_connection_schedule_idle(connection, _tf_ssb_connection_send_history_stream_callback, async); } static void _tf_ssb_rpc_createHistoryStream( tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) { tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection); if (!tf_ssb_is_replicator(ssb)) { tf_ssb_connection_rpc_send_error_method_not_allowed(connection, flags, -request_number, "createHistoryStream"); return; } JSContext* context = tf_ssb_get_context(ssb); JSValue arg_array = JS_GetPropertyStr(context, args, "args"); JSValue arg = JS_GetPropertyUint32(context, arg_array, 0); if (JS_IsUndefined(arg)) { tf_ssb_connection_rpc_send_error(connection, flags, -request_number, "Missing request.args in createHistoryStream."); } JSValue id = JS_GetPropertyStr(context, arg, "id"); JSValue seq = JS_GetPropertyStr(context, arg, "seq"); JSValue keys = JS_GetPropertyStr(context, arg, "keys"); JSValue live = JS_GetPropertyStr(context, arg, "live"); bool is_keys = JS_IsUndefined(keys) || JS_ToBool(context, keys) > 0; bool is_live = JS_ToBool(context, live) > 0 && (tf_ssb_connection_get_flags(connection) & k_tf_ssb_connect_flag_one_shot) == 0; int64_t sequence = 0; JS_ToInt64(context, &sequence, seq); const char* author = JS_ToCString(context, id); _tf_ssb_connection_send_history_stream(connection, -request_number, author, sequence, is_keys, is_live, true); if (is_live) { tf_ssb_connection_add_new_message_request(connection, author, -request_number, is_keys); } JS_FreeCString(context, author); JS_FreeValue(context, id); JS_FreeValue(context, seq); JS_FreeValue(context, keys); JS_FreeValue(context, live); JS_FreeValue(context, arg); JS_FreeValue(context, arg_array); } typedef struct _ebt_clock_row_t { char id[k_id_base64_len]; int64_t value; } ebt_clock_row_t; typedef struct _ebt_replicate_send_clock_t { int64_t request_number; ebt_clock_row_t* clock; int clock_count; char* out_clock; } ebt_replicate_send_clock_t; static void _tf_ssb_rpc_ebt_replicate_send_clock_work(tf_ssb_connection_t* connection, void* user_data) { ebt_replicate_send_clock_t* work = user_data; JSMallocFunctions funcs = { 0 }; tf_get_js_malloc_functions(&funcs); JSRuntime* runtime = JS_NewRuntime2(&funcs, NULL); JSContext* context = JS_NewContext(runtime); tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection); JSValue full_clock = JS_NewObject(context); int64_t depth = _get_global_setting_int64(ssb, "replication_hops", 2); /* Ask for every identity we know is being followed from local accounts. */ const char** visible = tf_ssb_db_get_all_visible_identities(ssb, depth); for (int i = 0; visible[i]; i++) { int64_t sequence = 0; tf_ssb_db_get_latest_message_by_author(ssb, visible[i], &sequence, NULL, 0); JS_SetPropertyStr(context, full_clock, visible[i], JS_NewInt64(context, sequence == -1 ? -1 : (sequence << 1))); } tf_free(visible); /* Ask about the incoming connection, too. */ char id[k_id_base64_len] = ""; if (tf_ssb_connection_get_id(connection, id, sizeof(id))) { JSValue in_clock = JS_GetPropertyStr(context, full_clock, id); if (JS_IsUndefined(in_clock)) { int64_t sequence = 0; tf_ssb_db_get_latest_message_by_author(ssb, id, &sequence, NULL, 0); JS_SetPropertyStr(context, full_clock, id, JS_NewInt64(context, sequence == -1 ? -1 : (sequence << 1))); } JS_FreeValue(context, in_clock); } /* Also respond with what we know about all requested identities. */ for (int i = 0; i < work->clock_count; i++) { JSValue in_clock = JS_GetPropertyStr(context, full_clock, work->clock[i].id); if (JS_IsUndefined(in_clock)) { int64_t sequence = -1; tf_ssb_db_get_latest_message_by_author(ssb, work->clock[i].id, &sequence, NULL, 0); JS_SetPropertyStr(context, full_clock, work->clock[i].id, JS_NewInt64(context, sequence == -1 ? -1 : (sequence << 1))); } JS_FreeValue(context, in_clock); } JSValue json = JS_JSONStringify(context, full_clock, JS_NULL, JS_NULL); size_t size = 0; const char* string = JS_ToCStringLen(context, &size, json); char* copy = tf_malloc(size + 1); memcpy(copy, string, size + 1); work->out_clock = copy; JS_FreeCString(context, string); JS_FreeValue(context, json); JS_FreeValue(context, full_clock); JS_FreeContext(context); JS_FreeRuntime(runtime); } static void _tf_ssb_rpc_ebt_replicate_send_clock_after_work(tf_ssb_connection_t* connection, int result, void* user_data) { ebt_replicate_send_clock_t* work = user_data; tf_free(work->clock); if (work->out_clock) { tf_ssb_connection_rpc_send( connection, k_ssb_rpc_flag_stream | k_ssb_rpc_flag_json, -work->request_number, NULL, (const uint8_t*)work->out_clock, strlen(work->out_clock), NULL, NULL, NULL); tf_free(work->out_clock); } tf_free(work); } static void _tf_ssb_rpc_ebt_replicate_send_clock(tf_ssb_connection_t* connection, int32_t request_number, JSValue message) { ebt_replicate_send_clock_t* work = tf_malloc(sizeof(ebt_replicate_send_clock_t)); *work = (ebt_replicate_send_clock_t) { .request_number = request_number, }; JSContext* context = tf_ssb_connection_get_context(connection); if (!JS_IsUndefined(message)) { JSPropertyEnum* ptab = NULL; uint32_t plen = 0; if (JS_GetOwnPropertyNames(context, &ptab, &plen, message, JS_GPN_STRING_MASK) == 0) { work->clock_count = (int)plen; work->clock = tf_malloc(sizeof(ebt_clock_row_t) * plen); memset(work->clock, 0, sizeof(ebt_clock_row_t) * plen); for (uint32_t i = 0; i < plen; ++i) { const char* id = JS_AtomToCString(context, ptab[i].atom); snprintf(work->clock[i].id, sizeof(work->clock[i].id), "%s", id); JS_FreeCString(context, id); JSPropertyDescriptor desc = { 0 }; JSValue key_value = JS_UNDEFINED; if (JS_GetOwnProperty(context, &desc, message, ptab[i].atom) == 1) { key_value = desc.value; JS_FreeValue(context, desc.setter); JS_FreeValue(context, desc.getter); } JS_ToInt64(context, &work->clock[i].value, key_value); JS_FreeValue(context, key_value); JS_FreeAtom(context, ptab[i].atom); } js_free(context, ptab); } } tf_ssb_connection_run_work(connection, _tf_ssb_rpc_ebt_replicate_send_clock_work, _tf_ssb_rpc_ebt_replicate_send_clock_after_work, work); } static void _tf_ssb_rpc_ebt_replicate_send_messages(tf_ssb_connection_t* connection, JSValue message) { if (JS_IsUndefined(message)) { return; } tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection); JSContext* context = tf_ssb_get_context(ssb); JSPropertyEnum* ptab = NULL; uint32_t plen = 0; if (JS_GetOwnPropertyNames(context, &ptab, &plen, message, JS_GPN_STRING_MASK) == 0) { for (uint32_t i = 0; i < plen; ++i) { JSValue in_clock = JS_UNDEFINED; JSPropertyDescriptor desc = { 0 }; if (JS_GetOwnProperty(context, &desc, message, ptab[i].atom) == 1) { in_clock = desc.value; JS_FreeValue(context, desc.setter); JS_FreeValue(context, desc.getter); } if (!JS_IsUndefined(in_clock)) { JSValue key = JS_AtomToString(context, ptab[i].atom); int64_t sequence = -1; JS_ToInt64(context, &sequence, in_clock); const char* author = JS_ToCString(context, key); if (sequence >= 0 && (sequence & 1) == 0) { int32_t request_number = tf_ssb_connection_get_ebt_request_number(connection); bool live = (tf_ssb_connection_get_flags(connection) & k_tf_ssb_connect_flag_one_shot) == 0; _tf_ssb_connection_send_history_stream(connection, request_number, author, sequence >> 1, false, live, false); if (live) { tf_ssb_connection_add_new_message_request(connection, author, request_number, false); } } else { tf_ssb_connection_remove_new_message_request(connection, author); } JS_FreeCString(context, author); JS_FreeValue(context, key); } JS_FreeValue(context, in_clock); } for (uint32_t i = 0; i < plen; ++i) { JS_FreeAtom(context, ptab[i].atom); } js_free(context, ptab); } } static void _tf_ssb_rpc_ebt_replicate_store_callback(const char* id, bool verified, bool is_new, void* user_data) { tf_ssb_connection_t* connection = user_data; tf_ssb_connection_adjust_read_backpressure(connection, -1); } static void _tf_ssb_rpc_ebt_replicate(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) { tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection); JSContext* context = tf_ssb_get_context(ssb); if (_is_error(context, args)) { /* TODO: Send createHistoryStream. */ tf_ssb_connection_remove_request(connection, -request_number); return; } if (!tf_ssb_connection_get_ebt_request_number(connection)) { tf_ssb_connection_set_ebt_request_number(connection, -request_number); } JSValue author = JS_GetPropertyStr(context, args, "author"); JSValue name = JS_GetPropertyStr(context, args, "name"); JSValue in_clock = JS_IsUndefined(name) ? args : JS_UNDEFINED; if (!JS_IsUndefined(author)) { /* Looks like a message. */ tf_ssb_connection_adjust_read_backpressure(connection, 1); tf_ssb_verify_strip_and_store_message(ssb, args, _tf_ssb_rpc_ebt_replicate_store_callback, connection); } else { /* EBT clock. */ if (!tf_ssb_connection_get_sent_clock(connection)) { _tf_ssb_rpc_ebt_replicate_send_clock(connection, request_number, in_clock); tf_ssb_connection_set_sent_clock(connection, true); } _tf_ssb_rpc_ebt_replicate_send_messages(connection, in_clock); } JS_FreeValue(context, name); JS_FreeValue(context, author); } static void _tf_ssb_rpc_ebt_replicate_client( tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) { _tf_ssb_rpc_ebt_replicate(connection, flags, request_number, args, message, size, user_data); } static void _tf_ssb_rpc_send_ebt_replicate(tf_ssb_connection_t* connection) { tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection); JSContext* context = tf_ssb_get_context(ssb); JSValue message = JS_NewObject(context); JSValue name = JS_NewArray(context); JS_SetPropertyUint32(context, name, 0, JS_NewString(context, "ebt")); JS_SetPropertyUint32(context, name, 1, JS_NewString(context, "replicate")); JS_SetPropertyStr(context, message, "name", name); JSValue arg = JS_NewObject(context); JS_SetPropertyStr(context, arg, "version", JS_NewInt32(context, 3)); JS_SetPropertyStr(context, arg, "format", JS_NewString(context, "classic")); JSValue args = JS_NewArray(context); JS_SetPropertyUint32(context, args, 0, arg); JS_SetPropertyStr(context, message, "args", args); JS_SetPropertyStr(context, message, "type", JS_NewString(context, "duplex")); int32_t request_number = tf_ssb_connection_next_request_number(connection); tf_ssb_connection_rpc_send_json( connection, k_ssb_rpc_flag_stream | k_ssb_rpc_flag_new_request, request_number, "ebt.replicate", message, _tf_ssb_rpc_ebt_replicate_client, NULL, NULL); if (!tf_ssb_connection_get_ebt_request_number(connection)) { tf_ssb_connection_set_ebt_request_number(connection, request_number); } JS_FreeValue(context, message); } static void _tf_ssb_rpc_ebt_replicate_server( tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) { if (flags & k_ssb_rpc_flag_end_error) { return; } tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection); if (!tf_ssb_is_replicator(ssb)) { tf_ssb_connection_rpc_send_error_method_not_allowed(connection, flags, -request_number, "ebt.replicate"); return; } _tf_ssb_rpc_ebt_replicate(connection, flags, request_number, args, message, size, user_data); tf_ssb_connection_add_request(connection, -request_number, "ebt.replicate", _tf_ssb_rpc_ebt_replicate, NULL, NULL, NULL); } static void _tf_ssb_rpc_connections_changed_callback(tf_ssb_t* ssb, tf_ssb_change_t change, tf_ssb_connection_t* connection, void* user_data) { JSContext* context = tf_ssb_get_context(ssb); if (change == k_tf_ssb_change_connect) { if (tf_ssb_is_replicator(ssb)) { JSValue message = JS_NewObject(context); JSValue name = JS_NewArray(context); JS_SetPropertyUint32(context, name, 0, JS_NewString(context, "blobs")); JS_SetPropertyUint32(context, name, 1, JS_NewString(context, "createWants")); JS_SetPropertyStr(context, message, "name", name); JS_SetPropertyStr(context, message, "type", JS_NewString(context, "source")); JS_SetPropertyStr(context, message, "args", JS_NewArray(context)); tf_ssb_connection_rpc_send_json(connection, k_ssb_rpc_flag_stream | k_ssb_rpc_flag_new_request, tf_ssb_connection_next_request_number(connection), "blobs.createWants", message, _tf_ssb_rpc_connection_blobs_createWants_callback, NULL, NULL); JS_FreeValue(context, message); } if (tf_ssb_connection_is_client(connection)) { JSValue message = JS_NewObject(context); JSValue name = JS_NewArray(context); JS_SetPropertyUint32(context, name, 0, JS_NewString(context, "tunnel")); JS_SetPropertyUint32(context, name, 1, JS_NewString(context, "isRoom")); JS_SetPropertyStr(context, message, "name", name); JS_SetPropertyStr(context, message, "args", JS_NewArray(context)); tf_ssb_connection_rpc_send_json(connection, k_ssb_rpc_flag_new_request, tf_ssb_connection_next_request_number(connection), "tunnel.isRoom", message, _tf_ssb_rpc_connection_tunnel_isRoom_callback, NULL, NULL); JS_FreeValue(context, message); if (tf_ssb_is_peer_exchange(ssb)) { _tf_ssb_rpc_send_peers_exchange(connection); } if (tf_ssb_is_replicator(ssb)) { _tf_ssb_rpc_send_ebt_replicate(connection); } } } else if (change == k_tf_ssb_change_remove) { tf_ssb_remove_blob_want_added_callback(ssb, _tf_ssb_rpc_blob_wants_added_callback, connection); char id[k_id_base64_len] = ""; if (tf_ssb_connection_get_id(connection, id, sizeof(id))) { JSValue left = JS_NewObject(context); JS_SetPropertyStr(context, left, "type", JS_NewString(context, "left")); JS_SetPropertyStr(context, left, "id", JS_NewString(context, id)); tf_ssb_connection_t* connections[1024]; int count = tf_ssb_get_connections(ssb, connections, tf_countof(connections)); for (int i = 0; i < count; i++) { if (tf_ssb_connection_is_attendant(connections[i])) { tf_ssb_connection_rpc_send_json( connections[i], k_ssb_rpc_flag_stream, -tf_ssb_connection_get_attendant_request_number(connections[i]), NULL, left, NULL, NULL, NULL); } } JS_FreeValue(context, left); } } } static void _tf_ssb_rpc_broadcasts_changed_visit( const char* host, const struct sockaddr_in* addr, tf_ssb_broadcast_origin_t origin, tf_ssb_connection_t* tunnel, const uint8_t* pub, void* user_data) { tf_ssb_t* ssb = user_data; if (tunnel && (tf_ssb_connection_get_flags(tunnel) & k_tf_ssb_connect_flag_one_shot) != 0 && !tf_ssb_connection_get_tunnel(tunnel)) { char target_id[k_id_base64_len] = { 0 }; char portal_id[k_id_base64_len] = { 0 }; if (tf_ssb_id_bin_to_str(target_id, sizeof(target_id), pub) && tf_ssb_connection_get_id(tunnel, portal_id, sizeof(portal_id))) { tf_ssb_tunnel_create(ssb, portal_id, target_id, k_tf_ssb_connect_flag_one_shot); } } } static void _tf_ssb_rpc_broadcasts_changed_callback(tf_ssb_t* ssb, void* user_data) { tf_ssb_visit_broadcasts(ssb, _tf_ssb_rpc_broadcasts_changed_visit, ssb); } static void _tf_ssb_rpc_checkpoint(tf_ssb_t* ssb) { int64_t checkpoint_start_ms = uv_hrtime(); sqlite3* db = tf_ssb_acquire_db_writer(ssb); int log = 0; int checkpointed = 0; if (sqlite3_wal_checkpoint_v2(db, NULL, SQLITE_CHECKPOINT_TRUNCATE, &log, &checkpointed) == SQLITE_OK) { tf_printf("Checkpointed %d frames in %d ms. Log is now %d frames.\n", (int)((uv_hrtime() - checkpoint_start_ms) / 1000000LL), checkpointed, log); } else { tf_printf("Checkpoint: %s.\n", sqlite3_errmsg(db)); } tf_ssb_release_db_writer(ssb, db); } typedef struct _delete_t { int deleted; int64_t duration_ms; } delete_t; static void _tf_ssb_rpc_delete_blobs_work(tf_ssb_t* ssb, void* user_data) { delete_t* delete = user_data; int64_t age = _get_global_setting_int64(ssb, "blob_expire_age_seconds", -1); if (age <= 0) { _tf_ssb_rpc_checkpoint(ssb); return; } int64_t start_ns = uv_hrtime(); sqlite3* db = tf_ssb_acquire_db_writer(ssb); sqlite3_stmt* statement; int64_t now = (int64_t)time(NULL) * 1000ULL; int64_t timestamp = now - age * 1000ULL; const int k_limit = 128; int deleted = 0; if (sqlite3_prepare(db, "DELETE FROM blobs WHERE blobs.id IN (" " SELECT blobs.id FROM blobs " " JOIN messages_refs ON blobs.id = messages_refs.ref " " JOIN messages ON messages.id = messages_refs.message " " WHERE blobs.created < ?1 / 1000 " " GROUP BY blobs.id HAVING MAX(messages.timestamp) < ?1 LIMIT ?2)", -1, &statement, NULL) == SQLITE_OK) { if (sqlite3_bind_int64(statement, 1, timestamp) == SQLITE_OK && sqlite3_bind_int(statement, 2, k_limit) == SQLITE_OK) { int r = sqlite3_step(statement); if (r != SQLITE_DONE) { tf_printf("_tf_ssb_rpc_delete_blobs_work: %s\n", sqlite3_errmsg(db)); } else { tf_printf("_tf_ssb_rpc_delete_blobs_work: %d rows\n", sqlite3_changes(db)); } deleted = sqlite3_changes(db); } } else { tf_printf("prepare failed: %s\n", sqlite3_errmsg(db)); } tf_ssb_release_db_writer(ssb, db); delete->duration_ms = (uv_hrtime() - start_ns) / 1000000LL; tf_printf("Deleted %d blobs in %d ms.\n", deleted, (int)delete->duration_ms); _tf_ssb_rpc_checkpoint(ssb); } static void _tf_ssb_rpc_delete_blobs_after_work(tf_ssb_t* ssb, int status, void* user_data) { delete_t* delete = user_data; _tf_ssb_rpc_start_delete_blobs(ssb, delete->deleted ? (int)delete->duration_ms : (15 * 60 * 1000)); tf_free(delete); } static void _tf_ssb_rpc_start_delete_blobs_callback(tf_ssb_t* ssb, void* user_data) { delete_t* delete = tf_malloc(sizeof(delete_t)); *delete = (delete_t) { 0 }; tf_ssb_run_work(ssb, _tf_ssb_rpc_delete_blobs_work, _tf_ssb_rpc_delete_blobs_after_work, delete); } static void _tf_ssb_rpc_start_delete_blobs(tf_ssb_t* ssb, int delay_ms) { tf_printf("will delete more blobs in %d ms\n", delay_ms); tf_ssb_schedule_work(ssb, delay_ms, _tf_ssb_rpc_start_delete_blobs_callback, NULL); } static void _tf_ssb_rpc_delete_feeds_work(tf_ssb_t* ssb, void* user_data) { delete_t* delete = user_data; if (!_get_global_setting_bool(ssb, "delete_stale_feeds", false)) { return; } int64_t start_ns = uv_hrtime(); int replication_hops = (int)_get_global_setting_int64(ssb, "replication_hops", 2); const char** identities = tf_ssb_db_get_all_visible_identities(ssb, replication_hops); JSMallocFunctions funcs = { 0 }; tf_get_js_malloc_functions(&funcs); JSRuntime* runtime = JS_NewRuntime2(&funcs, NULL); JSContext* context = JS_NewContext(runtime); JSValue array = JS_NewArray(context); for (int i = 0; identities[i]; i++) { JS_SetPropertyUint32(context, array, i, JS_NewString(context, identities[i])); } tf_free(identities); JSValue json = JS_JSONStringify(context, array, JS_NULL, JS_NULL); const char* arg = JS_ToCString(context, json); JS_FreeValue(context, json); JS_FreeValue(context, array); sqlite3* db = tf_ssb_acquire_db_writer(ssb); sqlite3_stmt* statement; if (sqlite3_prepare(db, "DELETE FROM messages WHERE author IN (" " SELECT author FROM messages WHERE author NOT IN (SELECT value FROM json_each(?)) GROUP BY author LIMIT 1" ") RETURNING author", -1, &statement, NULL) == SQLITE_OK) { int status = SQLITE_OK; bool printed = false; if (sqlite3_bind_text(statement, 1, arg, -1, NULL) == SQLITE_OK) { while ((status = sqlite3_step(statement)) == SQLITE_ROW) { if (!printed) { tf_printf("deleting %s\n", sqlite3_column_text(statement, 0)); printed = true; delete->deleted++; } } if (status != SQLITE_DONE) { tf_printf("deleting feeds: %s\n", sqlite3_errmsg(db)); } } sqlite3_finalize(statement); } tf_ssb_release_db_writer(ssb, db); JS_FreeCString(context, arg); JS_FreeContext(context); JS_FreeRuntime(runtime); delete->duration_ms = (uv_hrtime() - start_ns) / 1000000LL; tf_printf("Deleted %d feeds in %d ms.\n", delete->deleted, (int)delete->duration_ms); _tf_ssb_rpc_checkpoint(ssb); } static void _tf_ssb_rpc_delete_feeds_after_work(tf_ssb_t* ssb, int status, void* user_data) { delete_t* delete = user_data; _tf_ssb_rpc_start_delete_feeds(ssb, delete->deleted ? (int)delete->duration_ms : (15 * 60 * 1000)); tf_free(delete); } static void _tf_ssb_rpc_start_delete_feeds_callback(tf_ssb_t* ssb, void* user_data) { delete_t* delete = tf_malloc(sizeof(delete_t)); *delete = (delete_t) { 0 }; tf_ssb_run_work(ssb, _tf_ssb_rpc_delete_feeds_work, _tf_ssb_rpc_delete_feeds_after_work, delete); } static void _tf_ssb_rpc_start_delete_feeds(tf_ssb_t* ssb, int delay_ms) { tf_printf("will delete more feeds in %d ms\n", delay_ms); tf_ssb_schedule_work(ssb, delay_ms, _tf_ssb_rpc_start_delete_feeds_callback, NULL); } void tf_ssb_rpc_start_periodic(tf_ssb_t* ssb) { _tf_ssb_rpc_start_delete_blobs(ssb, 30 * 1000); _tf_ssb_rpc_start_delete_feeds(ssb, 25 * 1000); } typedef struct _peers_exchange_t { tf_ssb_t* ssb; JSValue peers; } peers_exchange_t; static void _tf_ssb_get_peers_exhange_callback( const char* host, const struct sockaddr_in* addr, tf_ssb_broadcast_origin_t origin, tf_ssb_connection_t* tunnel, const uint8_t* pub, void* user_data) { peers_exchange_t* data = user_data; if (origin == k_tf_ssb_broadcast_origin_peer_exchange) { char fullid[256] = { 0 }; tf_base64_encode(pub, k_id_bin_len, fullid, sizeof(fullid)); char* dot = strchr(fullid, '.'); if (dot) { *dot = '\0'; } char connection[1024] = { 0 }; snprintf(connection, sizeof(connection), "net:%s:%d~shs:%s", host, ntohs(addr->sin_port), fullid); JSContext* context = tf_ssb_get_context(data->ssb); JS_SetPropertyStr(context, data->peers, connection, JS_NewInt32(context, 0)); } } static JSValue _tf_ssb_get_peers_exchange(tf_ssb_t* ssb) { JSContext* context = tf_ssb_get_context(ssb); JSValue peers = JS_NewObject(context); tf_ssb_visit_broadcasts(ssb, _tf_ssb_get_peers_exhange_callback, &(peers_exchange_t) { .ssb = ssb, .peers = peers }); return peers; } static void _tf_ssb_rpc_peers_exchange_internal( tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) { JSContext* context = tf_ssb_connection_get_context(connection); if (_is_error(context, args)) { return; } /* The peer that participated in the exchange is now a peer exchange entry, too. */ tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection); if (*tf_ssb_connection_get_host(connection)) { char fullid[k_id_base64_len] = { 0 }; tf_ssb_connection_get_id(connection, fullid, sizeof(fullid)); char* dot = strchr(fullid, '.'); if (dot) { *dot = '\0'; } int port = tf_ssb_connection_get_port(connection); JSValue port_value = JS_GetPropertyStr(context, args, "port"); JS_ToInt32(context, &port, port_value); JS_FreeValue(context, port_value); char connection_string[1024] = { 0 }; snprintf(connection_string, sizeof(connection_string), "net:%s:%d~shs:%s", tf_ssb_connection_get_host(connection), port, fullid + 1); tf_ssb_add_broadcast(ssb, connection_string, k_tf_ssb_broadcast_origin_peer_exchange, k_ssb_peer_exchange_expires_seconds); } JSValue in_peers = JS_GetPropertyStr(context, args, "peers"); JSPropertyEnum* ptab = NULL; uint32_t plen = 0; if (JS_GetOwnPropertyNames(context, &ptab, &plen, in_peers, JS_GPN_STRING_MASK) == 0) { for (uint32_t i = 0; i < plen; ++i) { JSValue key = JS_AtomToString(context, ptab[i].atom); JSPropertyDescriptor desc; JSValue key_value = JS_NULL; if (JS_GetOwnProperty(context, &desc, args, ptab[i].atom) == 1) { key_value = desc.value; JS_FreeValue(context, desc.setter); JS_FreeValue(context, desc.getter); } const char* connection = JS_ToCString(context, key); int64_t timestamp = 0; JS_ToInt64(context, ×tamp, key_value); /* ADD BROADCAST connection: timestamp */ JS_FreeCString(context, connection); JS_FreeValue(context, key); JS_FreeValue(context, key_value); } for (uint32_t i = 0; i < plen; ++i) { JS_FreeAtom(context, ptab[i].atom); } js_free(context, ptab); } JS_FreeValue(context, in_peers); } static void _tf_ssb_rpc_send_peers_exchange(tf_ssb_connection_t* connection) { int32_t request_number = tf_ssb_connection_next_request_number(connection); JSContext* context = tf_ssb_connection_get_context(connection); JSValue message = JS_NewObject(context); JSValue name = JS_NewArray(context); JS_SetPropertyUint32(context, name, 0, JS_NewString(context, "peers")); JS_SetPropertyUint32(context, name, 1, JS_NewString(context, "exchange")); JS_SetPropertyStr(context, message, "name", name); tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection); JS_SetPropertyStr(context, message, "port", JS_NewInt32(context, tf_ssb_server_get_port(ssb))); JS_SetPropertyStr(context, message, "peers", _tf_ssb_get_peers_exchange(ssb)); tf_ssb_connection_rpc_send_json(connection, k_ssb_rpc_flag_new_request, request_number, "peers.exchange", message, _tf_ssb_rpc_peers_exchange_internal, NULL, NULL); JS_FreeValue(context, message); } static void _tf_ssb_rpc_peers_exchange(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) { tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection); if (!tf_ssb_is_peer_exchange(ssb)) { tf_ssb_connection_rpc_send_error_method_not_allowed(connection, flags, -request_number, "peers.exchange"); return; } _tf_ssb_rpc_peers_exchange_internal(connection, flags, request_number, args, message, size, user_data); JSContext* context = tf_ssb_connection_get_context(connection); JSValue out_message = JS_NewObject(context); JS_SetPropertyStr(context, out_message, "port", JS_NewInt32(context, tf_ssb_server_get_port(ssb))); JS_SetPropertyStr(context, out_message, "peers", _tf_ssb_get_peers_exchange(ssb)); tf_ssb_connection_rpc_send_json(connection, flags, -request_number, NULL, out_message, NULL, NULL, NULL); JS_FreeValue(context, out_message); } void tf_ssb_rpc_register(tf_ssb_t* ssb) { tf_ssb_add_connections_changed_callback(ssb, _tf_ssb_rpc_connections_changed_callback, NULL, NULL); tf_ssb_add_broadcasts_changed_callback(ssb, _tf_ssb_rpc_broadcasts_changed_callback, NULL, NULL); tf_ssb_add_rpc_callback(ssb, "gossip.ping", _tf_ssb_rpc_gossip_ping, NULL, NULL); /* DUPLEX */ tf_ssb_add_rpc_callback(ssb, "blobs.get", _tf_ssb_rpc_blobs_get, NULL, NULL); /* SOURCE */ tf_ssb_add_rpc_callback(ssb, "blobs.has", _tf_ssb_rpc_blobs_has, NULL, NULL); /* ASYNC */ tf_ssb_add_rpc_callback(ssb, "blobs.createWants", _tf_ssb_rpc_blobs_createWants, NULL, NULL); /* SOURCE */ tf_ssb_add_rpc_callback(ssb, "tunnel.connect", _tf_ssb_rpc_tunnel_connect, NULL, NULL); /* DUPLEX */ tf_ssb_add_rpc_callback(ssb, "tunnel.isRoom", _tf_ssb_rpc_room_meta, NULL, NULL); /* FAKE-ASYNC */ tf_ssb_add_rpc_callback(ssb, "room.metadata", _tf_ssb_rpc_room_meta, NULL, NULL); /* ASYNC */ tf_ssb_add_rpc_callback(ssb, "room.attendants", _tf_ssb_rpc_room_attendants, NULL, NULL); /* SOURCE */ tf_ssb_add_rpc_callback(ssb, "createHistoryStream", _tf_ssb_rpc_createHistoryStream, NULL, NULL); /* SOURCE */ tf_ssb_add_rpc_callback(ssb, "ebt.replicate", _tf_ssb_rpc_ebt_replicate_server, NULL, NULL); /* DUPLEX */ tf_ssb_add_rpc_callback(ssb, "peers.exchange", _tf_ssb_rpc_peers_exchange, NULL, NULL); /* ASYNC */ }