#include "ssb.rpc.h" #include "log.h" #include "mem.h" #include "ssb.h" #include "ssb.db.h" #include "util.js.h" #include "sqlite3.h" #include "uv.h" #include #include #include #include #if !defined(_countof) #define _countof(a) ((int)(sizeof((a)) / sizeof(*(a)))) #endif static void _tf_ssb_connection_send_history_stream(tf_ssb_connection_t* connection, int32_t request_number, const char* author, int64_t sequence, bool keys, bool live); static void _tf_ssb_connection_send_history_stream_internal(tf_ssb_connection_t* connection, int32_t request_number, const char* author, int64_t sequence, bool keys, bool live); static void _tf_ssb_rpc_start_delete_blobs(tf_ssb_t* ssb, int delay_ms); static int64_t _get_global_setting_int64(tf_ssb_t* ssb, const char* name, int64_t default_value) { int64_t result = default_value; sqlite3* db = tf_ssb_acquire_db_reader(ssb); sqlite3_stmt* statement; if (sqlite3_prepare(db, "SELECT json_extract(value, '$.' || ?) FROM properties WHERE id = 'core' AND key = 'settings'", -1, &statement, NULL) == SQLITE_OK) { if (sqlite3_bind_text(statement, 1, name, -1, NULL) == SQLITE_OK) { if (sqlite3_step(statement) == SQLITE_ROW) { result = sqlite3_column_int64(statement, 0); } } sqlite3_finalize(statement); } else { tf_printf("prepare failed: %s\n", sqlite3_errmsg(db)); } tf_ssb_release_db_reader(ssb, db); return result; } static bool _get_global_setting_bool(tf_ssb_t* ssb, const char* name, bool default_value) { bool result = default_value; sqlite3* db = tf_ssb_acquire_db_reader(ssb); sqlite3_stmt* statement; if (sqlite3_prepare(db, "SELECT json_extract(value, '$.' || ?) FROM properties WHERE id = 'core' AND key = 'settings'", -1, &statement, NULL) == SQLITE_OK) { if (sqlite3_bind_text(statement, 1, name, -1, NULL) == SQLITE_OK) { if (sqlite3_step(statement) == SQLITE_ROW) { result = sqlite3_column_int(statement, 0) != 0; } } sqlite3_finalize(statement); } else { tf_printf("prepare failed: %s\n", sqlite3_errmsg(db)); } tf_ssb_release_db_reader(ssb, db); return result; } static bool _get_global_setting_string(tf_ssb_t* ssb, const char* name, char* out_value, size_t size) { bool result = false; sqlite3* db = tf_ssb_acquire_db_reader(ssb); sqlite3_stmt* statement; if (sqlite3_prepare(db, "SELECT json_extract(value, '$.' || ?) FROM properties WHERE id = 'core' AND key = 'settings'", -1, &statement, NULL) == SQLITE_OK) { if (sqlite3_bind_text(statement, 1, name, -1, NULL) == SQLITE_OK) { if (sqlite3_step(statement) == SQLITE_ROW) { snprintf(out_value, size, "%s", sqlite3_column_text(statement, 0)); result = true; } } sqlite3_finalize(statement); } else { tf_printf("prepare failed: %s\n", sqlite3_errmsg(db)); } tf_ssb_release_db_reader(ssb, db); return result; } static void _tf_ssb_rpc_gossip_ping_callback(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) { char buffer[256]; snprintf(buffer, sizeof(buffer), "%" PRId64, (int64_t)time(NULL) * 1000); tf_ssb_connection_rpc_send( connection, flags, -request_number, (const uint8_t*)buffer, strlen(buffer), NULL, NULL, NULL); if (flags & k_ssb_rpc_flag_end_error) { tf_ssb_connection_remove_request(connection, request_number); } } static void _tf_ssb_rpc_gossip_ping(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) { tf_ssb_connection_add_request(connection, -request_number, _tf_ssb_rpc_gossip_ping_callback, NULL, NULL, NULL); } static void _tf_ssb_rpc_blobs_get_callback(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) { } static void _tf_ssb_rpc_blobs_get(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) { if (flags & k_ssb_rpc_flag_end_error) { return; } tf_ssb_connection_add_request(connection, -request_number, _tf_ssb_rpc_blobs_get_callback, NULL, NULL, NULL); tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection); JSContext* context = tf_ssb_connection_get_context(connection); JSValue ids = JS_GetPropertyStr(context, args, "args"); int length = tf_util_get_length(context, ids); bool success = false; for (int i = 0; i < length; i++) { JSValue arg = JS_GetPropertyUint32(context, ids, i); const char* id = NULL; if (JS_IsString(arg)) { id = JS_ToCString(context, arg); } else { JSValue key = JS_GetPropertyStr(context, arg, "key"); id = JS_ToCString(context, key); JS_FreeValue(context, key); } uint8_t* blob = NULL; size_t size = 0; const size_t k_send_max = 8192; if (tf_ssb_db_blob_get(ssb, id, &blob, &size)) { for (size_t offset = 0; offset < size; offset += k_send_max) { tf_ssb_connection_rpc_send( connection, k_ssb_rpc_flag_binary | k_ssb_rpc_flag_stream, -request_number, blob + offset, offset + k_send_max <= size ? k_send_max : (size - offset), NULL, NULL, NULL); } success = true; tf_free(blob); } JS_FreeCString(context, id); JS_FreeValue(context, arg); } JS_FreeValue(context, ids); tf_ssb_connection_rpc_send( connection, k_ssb_rpc_flag_json | k_ssb_rpc_flag_end_error | k_ssb_rpc_flag_stream, -request_number, (const uint8_t*)(success ? "true" : "false"), strlen(success ? "true" : "false"), NULL, NULL, NULL); } static void _tf_ssb_rpc_blobs_has(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) { tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection); JSContext* context = tf_ssb_connection_get_context(connection); JSValue ids = JS_GetPropertyStr(context, args, "args"); JSValue id = JS_GetPropertyUint32(context, ids, 0); const char* id_str = JS_ToCString(context, id); bool has = tf_ssb_db_blob_has(ssb, id_str); JS_FreeCString(context, id_str); JS_FreeValue(context, id); JS_FreeValue(context, ids); tf_ssb_connection_rpc_send( connection, k_ssb_rpc_flag_json, -request_number, (const uint8_t*)(has ? "true" : "false"), strlen(has ? "true" : "false"), NULL, NULL, NULL); } static void _tf_ssb_rpc_blob_wants_added_callback(tf_ssb_t* ssb, const char* id, void* user_data) { tf_ssb_connection_t* connection = user_data; tf_ssb_blob_wants_t* blob_wants = tf_ssb_connection_get_blob_wants_state(connection); JSContext* context = tf_ssb_get_context(ssb); JSValue message = JS_NewObject(context); JS_SetPropertyStr(context, message, id, JS_NewInt64(context, -1)); tf_ssb_connection_rpc_send_json(connection, k_ssb_rpc_flag_stream, -blob_wants->request_number, message, NULL, NULL, NULL); JS_FreeValue(context, message); } static void _tf_ssb_rpc_request_more_blobs(tf_ssb_connection_t* connection) { JSContext* context = tf_ssb_connection_get_context(connection); tf_ssb_blob_wants_t* blob_wants = tf_ssb_connection_get_blob_wants_state(connection); tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection); int64_t age = _get_global_setting_int64(ssb, "blob_fetch_age_seconds", -1); int64_t timestamp = -1; if (age == 0) { /* Don't fetch any blobs. */ return; } else if (age > 0) { int64_t now = (int64_t)time(NULL) * 1000ULL; timestamp = now - age * 1000ULL; } sqlite3* db = tf_ssb_acquire_db_reader(ssb); sqlite3_stmt* statement; if (sqlite3_prepare(db, "SELECT id FROM blob_wants_view WHERE id > ? AND timestamp > ? ORDER BY id LIMIT 32", -1, &statement, NULL) == SQLITE_OK) { if (sqlite3_bind_text(statement, 1, blob_wants->last_id, -1, NULL) == SQLITE_OK && sqlite3_bind_int64(statement, 2, timestamp) == SQLITE_OK) { while (sqlite3_step(statement) == SQLITE_ROW) { const char* blob = (const char*)sqlite3_column_text(statement, 0); JSValue message = JS_NewObject(context); JS_SetPropertyStr(context, message, blob, JS_NewInt32(context, -1)); tf_ssb_connection_rpc_send_json(connection, k_ssb_rpc_flag_stream, -blob_wants->request_number, message, NULL, NULL, NULL); JS_FreeValue(context, message); snprintf(blob_wants->last_id, sizeof(blob_wants->last_id), "%s", blob); blob_wants->wants_sent++; } } sqlite3_finalize(statement); } else { tf_printf("prepare failed: %s\n", sqlite3_errmsg(db)); } tf_ssb_release_db_reader(ssb, db); } static void _tf_ssb_rpc_blobs_createWants(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) { tf_ssb_blob_wants_t* blob_wants = tf_ssb_connection_get_blob_wants_state(connection); tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection); tf_ssb_add_blob_want_added_callback(ssb, _tf_ssb_rpc_blob_wants_added_callback, NULL, connection); blob_wants->request_number = request_number; _tf_ssb_rpc_request_more_blobs(connection); } typedef struct tunnel_t { tf_ssb_connection_t* connection; int32_t request_number; } tunnel_t; void _tf_ssb_rpc_tunnel_callback(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) { tunnel_t* tun = user_data; if (flags & k_ssb_rpc_flag_end_error) { tf_ssb_connection_rpc_send( connection, flags, -request_number, (const uint8_t*)"false", strlen("false"), NULL, NULL, NULL); tf_ssb_connection_close(tun->connection); } else { tf_ssb_connection_rpc_send(tun->connection, flags, tun->request_number, message, size, NULL, NULL, NULL); } } void _tf_ssb_rpc_tunnel_cleanup(tf_ssb_t* ssb, void* user_data) { tf_free(user_data); } static void _tf_ssb_rpc_tunnel_connect(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) { tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection); if (!_get_global_setting_bool(ssb, "room", true)) { tf_ssb_connection_rpc_send_error_method_not_allowed(connection, flags, -request_number, "tunnel.connect"); return; } JSContext* context = tf_ssb_connection_get_context(connection); JSValue arg_array = JS_GetPropertyStr(context, args, "args"); JSValue arg = JS_GetPropertyUint32(context, arg_array, 0); JSValue origin = JS_GetPropertyStr(context, arg, "origin"); JSValue portal = JS_GetPropertyStr(context, arg, "portal"); JSValue target = JS_GetPropertyStr(context, arg, "target"); if (JS_IsUndefined(origin) && !JS_IsUndefined(portal) && !JS_IsUndefined(target)) { const char* target_str = JS_ToCString(context, target); tf_ssb_connection_t* target_connection = tf_ssb_connection_get(ssb, target_str); if (target_connection) { int32_t tunnel_request_number = tf_ssb_connection_next_request_number(target_connection); const char* portal_str = JS_ToCString(context, portal); JSValue message = JS_NewObject(context); JSValue name = JS_NewArray(context); JS_SetPropertyUint32(context, name, 0, JS_NewString(context, "tunnel")); JS_SetPropertyUint32(context, name, 1, JS_NewString(context, "connect")); JS_SetPropertyStr(context, message, "name", name); JSValue arg_obj = JS_NewObject(context); char origin_str[k_id_base64_len] = ""; tf_ssb_connection_get_id(connection, origin_str, sizeof(origin_str)); JS_SetPropertyStr(context, arg_obj, "origin", JS_NewString(context, origin_str)); JS_SetPropertyStr(context, arg_obj, "portal", JS_NewString(context, portal_str)); JS_SetPropertyStr(context, arg_obj, "target", JS_NewString(context, target_str)); JSValue arg_array = JS_NewArray(context); JS_SetPropertyUint32(context, arg_array, 0, arg_obj); JS_SetPropertyStr(context, message, "args", arg_array); JS_SetPropertyStr(context, message, "type", JS_NewString(context, "duplex")); tf_ssb_connection_rpc_send_json( target_connection, k_ssb_rpc_flag_stream, tunnel_request_number, message, NULL, NULL, NULL); tunnel_t* data0 = tf_malloc(sizeof(tunnel_t)); *data0 = (tunnel_t) { .connection = target_connection, .request_number = tunnel_request_number, }; tunnel_t* data1 = tf_malloc(sizeof(tunnel_t)); *data1 = (tunnel_t) { .connection = connection, .request_number = -request_number, }; tf_ssb_connection_add_request(connection, -request_number, _tf_ssb_rpc_tunnel_callback, _tf_ssb_rpc_tunnel_cleanup, data0, target_connection); tf_ssb_connection_add_request(target_connection, tunnel_request_number, _tf_ssb_rpc_tunnel_callback, _tf_ssb_rpc_tunnel_cleanup, data1, connection); JS_FreeValue(context, message); JS_FreeCString(context, portal_str); } else { tf_ssb_connection_rpc_send_error(connection, flags, -request_number, "Connection not found."); } JS_FreeCString(context, target_str); } else if (!JS_IsUndefined(origin) && !JS_IsUndefined(portal) && !JS_IsUndefined(target)) { const char* origin_str = JS_ToCString(context, origin); const char* portal_str = JS_ToCString(context, portal); const char* target_str = JS_ToCString(context, target); tf_ssb_connection_tunnel_create(ssb, portal_str, -request_number, origin_str); JS_FreeCString(context, origin_str); JS_FreeCString(context, portal_str); JS_FreeCString(context, target_str); } JS_FreeValue(context, origin); JS_FreeValue(context, portal); JS_FreeValue(context, target); JS_FreeValue(context, arg); JS_FreeValue(context, arg_array); } static void _tf_ssb_rpc_room_meta(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) { tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection); JSContext* context = tf_ssb_get_context(ssb); JSValue response = JS_FALSE; if (_get_global_setting_bool(ssb, "room", true)) { char room_name[1024] = "tilde friends tunnel"; _get_global_setting_string(ssb, "room_name", room_name, sizeof(room_name)); response = JS_NewObject(context); JS_SetPropertyStr(context, response, "name", JS_NewString(context, room_name)); JS_SetPropertyStr(context, response, "membership", JS_FALSE); JSValue features = JS_NewArray(context); JS_SetPropertyUint32(context, features, 0, JS_NewString(context, "tunnel")); JS_SetPropertyUint32(context, features, 1, JS_NewString(context, "room1")); JS_SetPropertyUint32(context, features, 2, JS_NewString(context, "room2")); JS_SetPropertyStr(context, response, "features", features); } tf_ssb_connection_rpc_send_json( connection, flags, -request_number, response, NULL, NULL, NULL); JS_FreeValue(context, response); } static void _tf_ssb_rpc_room_attendants(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) { tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection); if (!_get_global_setting_bool(ssb, "room", true)) { tf_ssb_connection_rpc_send_error_method_not_allowed(connection, flags, -request_number, "room.attendants"); return; } JSContext* context = tf_ssb_get_context(ssb); JSValue joined = JS_NewObject(context); JS_SetPropertyStr(context, joined, "type", JS_NewString(context, "joined")); char my_id[k_id_base64_len] = ""; if (tf_ssb_connection_get_id(connection, my_id, sizeof(my_id))) { JS_SetPropertyStr(context, joined, "id", JS_NewString(context, my_id)); } JSValue state = JS_NewObject(context); JS_SetPropertyStr(context, state, "type", JS_NewString(context, "state")); JSValue ids = JS_NewArray(context); int id_count = 0; tf_ssb_connection_t* connections[1024]; int count = tf_ssb_get_connections(ssb, connections, _countof(connections)); for (int i = 0; i < count; i++) { char id[k_id_base64_len] = { 0 }; if (tf_ssb_connection_is_attendant(connections[i]) && tf_ssb_connection_get_id(connections[i], id, sizeof(id))) { JS_SetPropertyUint32(context, ids, id_count++, JS_NewString(context, id)); tf_ssb_connection_rpc_send_json( connections[i], flags, -tf_ssb_connection_get_attendant_request_number(connections[i]), joined, NULL, NULL, NULL); } } JS_SetPropertyStr(context, state, "ids", ids); tf_ssb_connection_rpc_send_json(connection, flags, -request_number, state, NULL, NULL, NULL); JS_FreeValue(context, joined); JS_FreeValue(context, state); tf_ssb_connection_set_attendant(connection, true, request_number); } typedef struct _blobs_get_t { char id[k_blob_id_len]; size_t received; size_t expected_size; bool done; bool storing; tf_ssb_t* ssb; uint8_t buffer[]; } blobs_get_t; static void _tf_ssb_rpc_blob_store_callback(const char* id, bool is_new, void* user_data) { blobs_get_t* get = user_data; get->storing = false; if (get->done) { tf_free(get); } } static void _tf_ssb_rpc_connection_blobs_get_callback(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) { tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection); JSContext* context = tf_ssb_get_context(ssb); blobs_get_t* get = user_data; if ((flags & k_ssb_rpc_mask_type) == k_ssb_rpc_flag_binary && size > 0 && get->received + size <= get->expected_size) { memcpy(get->buffer + get->received, message, size); get->received += size; } else if ((flags & k_ssb_rpc_mask_type) == k_ssb_rpc_flag_json) { if (JS_ToBool(context, args)) { get->storing = true; tf_ssb_db_blob_store_async(ssb, get->buffer, get->received, _tf_ssb_rpc_blob_store_callback, get); } /* TODO: Should we send the response in the callback? */ bool stored = true; tf_ssb_connection_rpc_send( connection, k_ssb_rpc_flag_json | k_ssb_rpc_flag_stream | k_ssb_rpc_flag_end_error, -request_number, (const uint8_t*)(stored ? "true" : "false"), strlen(stored ? "true" : "false"), NULL, NULL, NULL); } } static void _tf_ssb_rpc_connection_blobs_get_cleanup(tf_ssb_t* ssb, void* user_data) { blobs_get_t* get = user_data; get->done = true; if (!get->storing) { tf_free(get); } } static void _tf_ssb_rpc_connection_blobs_get(tf_ssb_connection_t* connection, const char* blob_id, size_t size) { blobs_get_t* get = tf_malloc(sizeof(blobs_get_t) + size); *get = (blobs_get_t) { .ssb = tf_ssb_connection_get_ssb(connection), .expected_size = size }; snprintf(get->id, sizeof(get->id), "%s", blob_id); memset(get->buffer, 0, size); tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection); JSContext* context = tf_ssb_get_context(ssb); JSValue message = JS_NewObject(context); JSValue name = JS_NewArray(context); JS_SetPropertyUint32(context, name, 0, JS_NewString(context, "blobs")); JS_SetPropertyUint32(context, name, 1, JS_NewString(context, "get")); JS_SetPropertyStr(context, message, "name", name); JS_SetPropertyStr(context, message, "type", JS_NewString(context, "source")); JSValue args = JS_NewArray(context); JS_SetPropertyUint32(context, args, 0, JS_NewString(context, blob_id)); JS_SetPropertyStr(context, message, "args", args); tf_ssb_connection_rpc_send_json( connection, k_ssb_rpc_flag_stream, tf_ssb_connection_next_request_number(connection), message, _tf_ssb_rpc_connection_blobs_get_callback, _tf_ssb_rpc_connection_blobs_get_cleanup, get); JS_FreeValue(context, message); } static void _tf_ssb_rpc_connection_blobs_createWants_callback(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) { tf_ssb_blob_wants_t* blob_wants = tf_ssb_connection_get_blob_wants_state(connection); if (!blob_wants) { return; } tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection); JSContext* context = tf_ssb_connection_get_context(connection); JSValue name = JS_GetPropertyStr(context, args, "name"); if (!JS_IsUndefined(name)) { /* { name: "Error" } */ JS_FreeValue(context, name); return; } JSPropertyEnum* ptab; uint32_t plen; JS_GetOwnPropertyNames(context, &ptab, &plen, args, JS_GPN_STRING_MASK); for (uint32_t i = 0; i < plen; ++i) { JSValue key = JS_AtomToString(context, ptab[i].atom); JSPropertyDescriptor desc; JSValue key_value = JS_NULL; if (JS_GetOwnProperty(context, &desc, args, ptab[i].atom) == 1) { key_value = desc.value; JS_FreeValue(context, desc.setter); JS_FreeValue(context, desc.getter); } const char* blob_id = JS_ToCString(context, key); int64_t size = 0; JS_ToInt64(context, &size, key_value); if (--blob_wants->wants_sent == 0) { _tf_ssb_rpc_request_more_blobs(connection); } if (size < 0) { size_t blob_size = 0; if (tf_ssb_db_blob_get(ssb, blob_id, NULL, &blob_size)) { JSValue message = JS_NewObject(context); JS_SetPropertyStr(context, message, blob_id, JS_NewInt64(context, blob_size)); tf_ssb_connection_rpc_send_json( connection, k_ssb_rpc_flag_stream, -blob_wants->request_number, message, NULL, NULL, NULL); JS_FreeValue(context, message); } else if (size == -1LL) { JSValue message = JS_NewObject(context); JS_SetPropertyStr(context, message, blob_id, JS_NewInt64(context, -2)); tf_ssb_connection_rpc_send_json( connection, k_ssb_rpc_flag_stream, -blob_wants->request_number, message, NULL, NULL, NULL); JS_FreeValue(context, message); } } else { _tf_ssb_rpc_connection_blobs_get(connection, blob_id, size); } JS_FreeCString(context, blob_id); JS_FreeValue(context, key); JS_FreeValue(context, key_value); } for (uint32_t i = 0; i < plen; ++i) { JS_FreeAtom(context, ptab[i].atom); } js_free(context, ptab); } static void _tf_ssb_rpc_connection_room_attendants_callback(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) { tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection); JSContext* context = tf_ssb_get_context(ssb); JSValue type = JS_GetPropertyStr(context, args, "type"); const char* type_string = JS_ToCString(context, type); if (!type_string) { tf_ssb_connection_rpc_send_error(connection, flags, -request_number, "Missing type."); } else if (strcmp(type_string, "state") == 0) { tf_ssb_connection_clear_room_attendants(connection); JSValue ids = JS_GetPropertyStr(context, args, "ids"); int length = tf_util_get_length(context, ids); for (int i = 0; i < length; i++) { JSValue id = JS_GetPropertyUint32(context, ids, i); const char* id_string = JS_ToCString(context, id); if (id_string) { tf_ssb_connection_add_room_attendant(connection, id_string); } JS_FreeCString(context, id_string); JS_FreeValue(context, id); } JS_FreeValue(context, ids); } else if (strcmp(type_string, "joined") == 0) { JSValue id = JS_GetPropertyStr(context, args, "id"); const char* id_string = JS_ToCString(context, id); if (id_string) { tf_ssb_connection_add_room_attendant(connection, id_string); } JS_FreeCString(context, id_string); JS_FreeValue(context, id); } else if (strcmp(type_string, "left") == 0) { JSValue id = JS_GetPropertyStr(context, args, "id"); const char* id_string = JS_ToCString(context, id); if (id_string) { tf_ssb_connection_remove_room_attendant(connection, id_string); } JS_FreeCString(context, id_string); JS_FreeValue(context, id); } else { char buffer[256]; snprintf(buffer, sizeof(buffer), "Unexpected room.attendants response type: '%s'.", type_string); tf_ssb_connection_rpc_send_error(connection, flags, -request_number, buffer); } JS_FreeCString(context, type_string); JS_FreeValue(context, type); } static bool _is_error(JSContext* context, JSValue message) { JSValue name = JS_GetPropertyStr(context, message, "name"); const char* name_string = JS_ToCString(context, name); bool is_error = false; if (name_string && strcmp(name_string, "Error") == 0) { is_error = true; } JS_FreeCString(context, name_string); JS_FreeValue(context, name); return is_error; } static void _tf_ssb_rpc_connection_tunnel_isRoom_callback(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) { tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection); JSContext* context = tf_ssb_get_context(ssb); if (_is_error(context, args)) { return; } if (JS_IsObject(args)) { JSValue message = JS_NewObject(context); JSValue name = JS_NewArray(context); JS_SetPropertyUint32(context, name, 0, JS_NewString(context, "room")); JS_SetPropertyUint32(context, name, 1, JS_NewString(context, "attendants")); JS_SetPropertyStr(context, message, "name", name); JS_SetPropertyStr(context, message, "type", JS_NewString(context, "source")); JS_SetPropertyStr(context, message, "args", JS_NewArray(context)); tf_ssb_connection_rpc_send_json( connection, k_ssb_rpc_flag_stream, tf_ssb_connection_next_request_number(connection), message, _tf_ssb_rpc_connection_room_attendants_callback, NULL, NULL); JS_FreeValue(context, message); } } typedef struct _tf_ssb_connection_send_history_stream_t { int32_t request_number; char author[k_id_base64_len]; int64_t sequence; bool keys; bool live; } tf_ssb_connection_send_history_stream_t; static void _tf_ssb_connection_send_history_stream_callback(tf_ssb_connection_t* connection, void* user_data) { tf_ssb_connection_send_history_stream_t* request = user_data; if (tf_ssb_connection_is_connected(connection)) { _tf_ssb_connection_send_history_stream_internal(connection, request->request_number, request->author, request->sequence, request->keys, request->live); } tf_free(request); } static void _tf_ssb_connection_send_history_stream_internal(tf_ssb_connection_t* connection, int32_t request_number, const char* author, int64_t sequence, bool keys, bool live) { tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection); JSContext* context = tf_ssb_get_context(ssb); sqlite3* db = tf_ssb_acquire_db_reader(ssb); sqlite3_stmt* statement; const int k_max = 32; int64_t max_sequence_seen = 0; if (sqlite3_prepare(db, "SELECT previous, author, id, sequence, timestamp, hash, content, signature, sequence_before_author FROM messages WHERE author = ?1 AND sequence >= ?2 AND sequence < ?3 ORDER BY sequence", -1, &statement, NULL) == SQLITE_OK) { if (sqlite3_bind_text(statement, 1, author, -1, NULL) == SQLITE_OK && sqlite3_bind_int64(statement, 2, sequence) == SQLITE_OK && sqlite3_bind_int64(statement, 3, sequence + k_max) == SQLITE_OK) { while (sqlite3_step(statement) == SQLITE_ROW) { JSValue message = JS_UNDEFINED; max_sequence_seen = sqlite3_column_int64(statement, 3); JSValue formatted = tf_ssb_format_message( context, (const char*)sqlite3_column_text(statement, 0), (const char*)sqlite3_column_text(statement, 1), sqlite3_column_int64(statement, 3), sqlite3_column_double(statement, 4), (const char*)sqlite3_column_text(statement, 5), (const char*)sqlite3_column_text(statement, 6), (const char*)sqlite3_column_text(statement, 7), sqlite3_column_int(statement, 8)); if (keys) { message = JS_NewObject(context); JS_SetPropertyStr(context, message, "key", JS_NewString(context, (const char*)sqlite3_column_text(statement, 2))); JS_SetPropertyStr(context, message, "value", formatted); JS_SetPropertyStr(context, message, "timestamp", JS_NewString(context, (const char*)sqlite3_column_text(statement, 4))); } else { message = formatted; } tf_ssb_connection_rpc_send_json(connection, k_ssb_rpc_flag_stream, request_number, message, NULL, NULL, NULL); JS_FreeValue(context, message); } } sqlite3_finalize(statement); } else { tf_printf("prepare failed: %s\n", sqlite3_errmsg(db)); } tf_ssb_release_db_reader(ssb, db); if (max_sequence_seen == sequence + k_max - 1) { _tf_ssb_connection_send_history_stream(connection, request_number, author, max_sequence_seen + 1, keys, live); } else if (!live) { tf_ssb_connection_rpc_send( connection, k_ssb_rpc_flag_json, request_number, (const uint8_t*)"false", strlen("false"), NULL, NULL, NULL); } } static void _tf_ssb_connection_send_history_stream(tf_ssb_connection_t* connection, int32_t request_number, const char* author, int64_t sequence, bool keys, bool live) { tf_ssb_connection_send_history_stream_t* async = tf_malloc(sizeof(tf_ssb_connection_send_history_stream_t)); *async = (tf_ssb_connection_send_history_stream_t) { .request_number = request_number, .sequence = sequence, .keys = keys, .live = live, }; snprintf(async->author, sizeof(async->author), "%s", author); tf_ssb_connection_schedule_idle(connection, _tf_ssb_connection_send_history_stream_callback, async); } static void _tf_ssb_rpc_createHistoryStream(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) { tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection); JSContext* context = tf_ssb_get_context(ssb); JSValue arg_array = JS_GetPropertyStr(context, args, "args"); JSValue arg = JS_GetPropertyUint32(context, arg_array, 0); if (JS_IsUndefined(arg)) { tf_ssb_connection_rpc_send_error(connection, flags, -request_number, "Missing request.args in createHistoryStream."); } JSValue id = JS_GetPropertyStr(context, arg, "id"); JSValue seq = JS_GetPropertyStr(context, arg, "seq"); JSValue keys = JS_GetPropertyStr(context, arg, "keys"); JSValue live = JS_GetPropertyStr(context, arg, "live"); bool is_keys = JS_IsUndefined(keys) || JS_ToBool(context, keys) > 0; bool is_live = JS_ToBool(context, live) > 0; int64_t sequence = 0; JS_ToInt64(context, &sequence, seq); const char* author = JS_ToCString(context, id); _tf_ssb_connection_send_history_stream(connection, -request_number, author, sequence, is_keys, is_live); if (is_live) { tf_ssb_connection_add_new_message_request(connection, author, -request_number, is_keys); } JS_FreeCString(context, author); JS_FreeValue(context, id); JS_FreeValue(context, seq); JS_FreeValue(context, keys); JS_FreeValue(context, arg); JS_FreeValue(context, arg_array); } static void _tf_ssb_rpc_ebt_replicate_send_clock(tf_ssb_connection_t* connection, int32_t request_number, JSValue message) { tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection); JSContext* context = tf_ssb_get_context(ssb); JSValue full_clock = JS_NewObject(context); /* Ask for every identity we know is being followed from local accounts. */ const char** visible = tf_ssb_db_get_all_visible_identities(ssb, 2); for (int i = 0; visible[i]; i++) { int64_t sequence = 0; tf_ssb_db_get_latest_message_by_author(ssb, visible[i], &sequence, NULL, 0); JS_SetPropertyStr(context, full_clock, visible[i], JS_NewInt64(context, sequence == -1 ? -1 : (sequence << 1))); } tf_free(visible); /* Ask about the incoming connection, too. */ char id[k_id_base64_len] = ""; if (tf_ssb_connection_get_id(connection, id, sizeof(id))) { JSValue in_clock = JS_GetPropertyStr(context, full_clock, id); if (JS_IsUndefined(in_clock)) { int64_t sequence = 0; tf_ssb_db_get_latest_message_by_author(ssb, id, &sequence, NULL, 0); JS_SetPropertyStr(context, full_clock, id, JS_NewInt64(context, sequence == -1 ? -1 : (sequence << 1))); } JS_FreeValue(context, in_clock); } /* Also respond with what we know about all requested identities. */ if (!JS_IsUndefined(message)) { JSPropertyEnum* ptab; uint32_t plen; JS_GetOwnPropertyNames(context, &ptab, &plen, message, JS_GPN_STRING_MASK); for (uint32_t i = 0; i < plen; ++i) { JSValue in_clock = JS_GetProperty(context, full_clock, ptab[i].atom); if (JS_IsUndefined(in_clock)) { JSValue key = JS_AtomToString(context, ptab[i].atom); const char* key_string = JS_ToCString(context, key); if (key_string) { int64_t sequence = -1; tf_ssb_db_get_latest_message_by_author(ssb, key_string, &sequence, NULL, 0); JS_SetPropertyStr(context, full_clock, key_string, JS_NewInt64(context, sequence == -1 ? -1 : (sequence << 1))); } JS_FreeCString(context, key_string); JS_FreeValue(context, key); } } for (uint32_t i = 0; i < plen; ++i) { JS_FreeAtom(context, ptab[i].atom); } js_free(context, ptab); } tf_ssb_connection_rpc_send_json(connection, k_ssb_rpc_flag_stream, -request_number, full_clock, NULL, NULL, NULL); JS_FreeValue(context, full_clock); } static void _tf_ssb_rpc_ebt_replicate_send_messages(tf_ssb_connection_t* connection, JSValue message) { if (JS_IsUndefined(message)) { return; } tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection); JSContext* context = tf_ssb_get_context(ssb); JSPropertyEnum* ptab = NULL; uint32_t plen = 0; JS_GetOwnPropertyNames(context, &ptab, &plen, message, JS_GPN_STRING_MASK); for (uint32_t i = 0; i < plen; ++i) { JSValue in_clock = JS_GetProperty(context, message, ptab[i].atom); if (!JS_IsUndefined(in_clock)) { JSValue key = JS_AtomToString(context, ptab[i].atom); int64_t sequence = -1; JS_ToInt64(context, &sequence, in_clock); const char* author = JS_ToCString(context, key); if (sequence >= 0 && (sequence & 1) == 0) { int32_t request_number = tf_ssb_connection_get_ebt_request_number(connection); _tf_ssb_connection_send_history_stream(connection, request_number, author, sequence >> 1, false, true); tf_ssb_connection_add_new_message_request(connection, author, request_number, false); } else { tf_ssb_connection_remove_new_message_request(connection, author); } JS_FreeCString(context, author); JS_FreeValue(context, key); } JS_FreeValue(context, in_clock); } for (uint32_t i = 0; i < plen; ++i) { JS_FreeAtom(context, ptab[i].atom); } js_free(context, ptab); } static void _tf_ssb_rpc_ebt_replicate(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) { tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection); JSContext* context = tf_ssb_get_context(ssb); if (_is_error(context, args)) { /* TODO: Send createHistoryStream. */ return; } if (!tf_ssb_connection_get_ebt_request_number(connection)) { tf_ssb_connection_set_ebt_request_number(connection, -request_number); } JSValue author = JS_GetPropertyStr(context, args, "author"); JSValue name = JS_GetPropertyStr(context, args, "name"); JSValue in_clock = JS_IsUndefined(name) ? args : JS_UNDEFINED; if (!JS_IsUndefined(author)) { /* Looks like a message. */ tf_ssb_verify_strip_and_store_message(ssb, args, NULL, NULL); } else { /* EBT clock. */ tf_ssb_connection_set_ebt_send_clock(connection, args); if (!tf_ssb_connection_get_sent_clock(connection)) { _tf_ssb_rpc_ebt_replicate_send_clock(connection, request_number, in_clock); tf_ssb_connection_set_sent_clock(connection, true); } _tf_ssb_rpc_ebt_replicate_send_messages(connection, in_clock); } JS_FreeValue(context, name); JS_FreeValue(context, author); } static void _tf_ssb_rpc_ebt_replicate_client(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) { _tf_ssb_rpc_ebt_replicate(connection, flags, request_number, args, message, size, user_data); } static void _tf_ssb_rpc_send_ebt_replicate(tf_ssb_connection_t* connection) { tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection); JSContext* context = tf_ssb_get_context(ssb); JSValue message = JS_NewObject(context); JSValue name = JS_NewArray(context); JS_SetPropertyUint32(context, name, 0, JS_NewString(context, "ebt")); JS_SetPropertyUint32(context, name, 1, JS_NewString(context, "replicate")); JS_SetPropertyStr(context, message, "name", name); JSValue arg = JS_NewObject(context); JS_SetPropertyStr(context, arg, "version", JS_NewInt32(context, 3)); JS_SetPropertyStr(context, arg, "format", JS_NewString(context, "classic")); JSValue args = JS_NewArray(context); JS_SetPropertyUint32(context, args, 0, arg); JS_SetPropertyStr(context, message, "args", args); JS_SetPropertyStr(context, message, "type", JS_NewString(context, "duplex")); int32_t request_number = tf_ssb_connection_next_request_number(connection); tf_ssb_connection_rpc_send_json( connection, k_ssb_rpc_flag_stream, request_number, message, _tf_ssb_rpc_ebt_replicate_client, NULL, NULL); if (!tf_ssb_connection_get_ebt_request_number(connection)) { tf_ssb_connection_set_ebt_request_number(connection, request_number); } JS_FreeValue(context, message); } static void _tf_ssb_rpc_ebt_replicate_server(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) { if (flags & k_ssb_rpc_flag_end_error) { return; } _tf_ssb_rpc_ebt_replicate(connection, flags, request_number, args, message, size, user_data); tf_ssb_connection_add_request(connection, -request_number, _tf_ssb_rpc_ebt_replicate, NULL, NULL, NULL); } static void _tf_ssb_rpc_connections_changed_callback(tf_ssb_t* ssb, tf_ssb_change_t change, tf_ssb_connection_t* connection, void* user_data) { JSContext* context = tf_ssb_get_context(ssb); if (change == k_tf_ssb_change_connect) { JSValue message = JS_NewObject(context); JSValue name = JS_NewArray(context); JS_SetPropertyUint32(context, name, 0, JS_NewString(context, "blobs")); JS_SetPropertyUint32(context, name, 1, JS_NewString(context, "createWants")); JS_SetPropertyStr(context, message, "name", name); JS_SetPropertyStr(context, message, "type", JS_NewString(context, "source")); JS_SetPropertyStr(context, message, "args", JS_NewArray(context)); tf_ssb_connection_rpc_send_json( connection, k_ssb_rpc_flag_stream, tf_ssb_connection_next_request_number(connection), message, _tf_ssb_rpc_connection_blobs_createWants_callback, NULL, NULL); JS_FreeValue(context, message); if (tf_ssb_connection_is_client(connection)) { message = JS_NewObject(context); name = JS_NewArray(context); JS_SetPropertyUint32(context, name, 0, JS_NewString(context, "tunnel")); JS_SetPropertyUint32(context, name, 1, JS_NewString(context, "isRoom")); JS_SetPropertyStr(context, message, "name", name); JS_SetPropertyStr(context, message, "args", JS_NewArray(context)); tf_ssb_connection_rpc_send_json( connection, 0, tf_ssb_connection_next_request_number(connection), message, _tf_ssb_rpc_connection_tunnel_isRoom_callback, NULL, NULL); JS_FreeValue(context, message); _tf_ssb_rpc_send_ebt_replicate(connection); } } else if (change == k_tf_ssb_change_remove) { tf_ssb_remove_blob_want_added_callback(ssb, _tf_ssb_rpc_blob_wants_added_callback, connection); char id[k_id_base64_len] = ""; if (tf_ssb_connection_get_id(connection, id, sizeof(id))) { JSValue left = JS_NewObject(context); JS_SetPropertyStr(context, left, "type", JS_NewString(context, "left")); JS_SetPropertyStr(context, left, "id", JS_NewString(context, id)); tf_ssb_connection_t* connections[1024]; int count = tf_ssb_get_connections(ssb, connections, _countof(connections)); for (int i = 0; i < count; i++) { if (tf_ssb_connection_is_attendant(connections[i])) { tf_ssb_connection_rpc_send_json( connections[i], k_ssb_rpc_flag_stream, -tf_ssb_connection_get_attendant_request_number(connections[i]), left, NULL, NULL, NULL); } } JS_FreeValue(context, left); } } } typedef struct _delete_blobs_work_t { uv_work_t work; tf_ssb_t* ssb; } delete_blobs_work_t; static void _tf_ssb_rpc_delete_blobs_work(uv_work_t* work) { delete_blobs_work_t* delete = work->data; tf_ssb_t* ssb = delete->ssb; tf_ssb_record_thread_busy(ssb, true); int64_t age = _get_global_setting_int64(ssb, "blob_expire_age_seconds", -1); if (age <= 0) { tf_ssb_record_thread_busy(ssb, false); return; } int64_t start_ns = uv_hrtime(); sqlite3* db = tf_ssb_acquire_db_writer(ssb); sqlite3_stmt* statement; int64_t now = (int64_t)time(NULL) * 1000ULL; int64_t timestamp = now - age * 1000ULL; const int k_limit = 256; int deleted = 0; if (sqlite3_prepare(db, "DELETE FROM blobs WHERE blobs.id IN (" " SELECT blobs.id FROM blobs " " JOIN messages_refs ON blobs.id = messages_refs.ref " " JOIN messages ON messages.id = messages_refs.message " " GROUP BY blobs.id HAVING MAX(messages.timestamp) < ? LIMIT ?)", -1, &statement, NULL) == SQLITE_OK) { if (sqlite3_bind_int64(statement, 1, timestamp) == SQLITE_OK && sqlite3_bind_int(statement, 2, k_limit) == SQLITE_OK) { int r = sqlite3_step(statement); if (r != SQLITE_DONE) { tf_printf("_tf_ssb_rpc_delete_blobs_work: %s\n", sqlite3_errmsg(db)); } else { tf_printf("_tf_ssb_rpc_delete_blobs_work: %d rows\n", sqlite3_changes(db)); } deleted = sqlite3_changes(db); } } else { tf_printf("prepare failed: %s\n", sqlite3_errmsg(db)); } tf_ssb_release_db_writer(ssb, db); int64_t duration_ms = (uv_hrtime() - start_ns) / 1000000LL; tf_printf("Deleted %d blobs in %d ms.\n", deleted, (int)duration_ms); _tf_ssb_rpc_start_delete_blobs(ssb, deleted ? (int)duration_ms : (15 * 60 * 1000)); tf_ssb_record_thread_busy(ssb, false); } static void _tf_ssb_rpc_delete_blobs_after_work(uv_work_t* work, int status) { delete_blobs_work_t* delete = work->data; tf_ssb_unref(delete->ssb); tf_free(delete); } static void _tf_ssb_rpc_timer_on_close(uv_handle_t* handle) { tf_free(handle); } static void _tf_ssb_rpc_start_delete_timer(uv_timer_t* timer) { tf_ssb_t* ssb = timer->data; delete_blobs_work_t* work = tf_malloc(sizeof(delete_blobs_work_t)); *work = (delete_blobs_work_t) { .work = { .data = work}, .ssb = ssb }; int r = uv_queue_work(tf_ssb_get_loop(ssb), &work->work, _tf_ssb_rpc_delete_blobs_work, _tf_ssb_rpc_delete_blobs_after_work); if (r) { tf_printf("uv_queue_work: %s\n", uv_strerror(r)); tf_free(work); } uv_close((uv_handle_t*)timer, _tf_ssb_rpc_timer_on_close); } static void _tf_ssb_rpc_start_delete_blobs(tf_ssb_t* ssb, int delay_ms) { tf_printf("will delete more blobs in %d ms\n", delay_ms); uv_timer_t* timer = tf_malloc(sizeof(uv_timer_t)); *timer = (uv_timer_t) { .data = ssb }; uv_timer_init(tf_ssb_get_loop(ssb), timer); uv_timer_start(timer, _tf_ssb_rpc_start_delete_timer, delay_ms, 0); tf_ssb_ref(ssb); } void tf_ssb_rpc_register(tf_ssb_t* ssb) { tf_ssb_add_connections_changed_callback(ssb, _tf_ssb_rpc_connections_changed_callback, NULL, NULL); tf_ssb_add_rpc_callback(ssb, (const char*[]) { "gossip", "ping", NULL }, _tf_ssb_rpc_gossip_ping, NULL, NULL); /* DUPLEX */ tf_ssb_add_rpc_callback(ssb, (const char*[]) { "blobs", "get", NULL }, _tf_ssb_rpc_blobs_get, NULL, NULL); /* SOURCE */ tf_ssb_add_rpc_callback(ssb, (const char*[]) { "blobs", "has", NULL }, _tf_ssb_rpc_blobs_has, NULL, NULL); /* ASYNC */ tf_ssb_add_rpc_callback(ssb, (const char*[]) { "blobs", "createWants", NULL }, _tf_ssb_rpc_blobs_createWants, NULL, NULL); /* SOURCE */ tf_ssb_add_rpc_callback(ssb, (const char*[]) { "tunnel", "connect", NULL }, _tf_ssb_rpc_tunnel_connect, NULL, NULL); /* DUPLEX */ tf_ssb_add_rpc_callback(ssb, (const char*[]) { "tunnel", "isRoom", NULL }, _tf_ssb_rpc_room_meta, NULL, NULL); /* FAKE-ASYNC */ tf_ssb_add_rpc_callback(ssb, (const char*[]) { "room", "metadata", NULL }, _tf_ssb_rpc_room_meta, NULL, NULL); /* ASYNC */ tf_ssb_add_rpc_callback(ssb, (const char*[]) { "room", "attendants", NULL }, _tf_ssb_rpc_room_attendants, NULL, NULL); /* SOURCE */ tf_ssb_add_rpc_callback(ssb, (const char*[]) { "createHistoryStream", NULL }, _tf_ssb_rpc_createHistoryStream, NULL, NULL); /* SOURCE */ tf_ssb_add_rpc_callback(ssb, (const char*[]) { "ebt", "replicate", NULL }, _tf_ssb_rpc_ebt_replicate_server, NULL, NULL); /* DUPLEX */ _tf_ssb_rpc_start_delete_blobs(ssb, 0); }