#include "ssb.db.h" #include "log.h" #include "mem.h" #include "ssb.h" #include "trace.h" #include "util.js.h" #include "sodium/crypto_hash_sha256.h" #include "sodium/crypto_sign.h" #include "sqlite3.h" #include "uv.h" #include #include #include typedef struct _message_store_t message_store_t; static void _tf_ssb_db_store_message_work_finish(message_store_t* store); static void _tf_ssb_db_store_message_after_work(uv_work_t* work, int status); static void _tf_ssb_db_exec(sqlite3* db, const char* statement) { char* error = NULL; int result = sqlite3_exec(db, statement, NULL, NULL, &error); if (result != SQLITE_OK) { tf_printf("Error running '%s': %s.\n", statement, error ? error : sqlite3_errmsg(db)); abort(); } } static bool _tf_ssb_db_has_rows(sqlite3* db, const char* query) { bool found = false; sqlite3_stmt* statement = NULL; if (sqlite3_prepare(db, query, -1, &statement, NULL) == SQLITE_OK) { int result = SQLITE_OK; while ((result = sqlite3_step(statement)) == SQLITE_ROW) { found = true; } if (result != SQLITE_DONE) { tf_printf("%s\n", sqlite3_errmsg(db)); abort(); } sqlite3_finalize(statement); } else { tf_printf("%s\n", sqlite3_errmsg(db)); abort(); } return found; } static void _tf_ssb_db_init_internal(sqlite3* db) { sqlite3_extended_result_codes(db, 1); _tf_ssb_db_exec(db, "PRAGMA journal_mode = WAL"); _tf_ssb_db_exec(db, "PRAGMA synchronous = NORMAL"); } void tf_ssb_db_init_reader(sqlite3* db) { _tf_ssb_db_init_internal(db); } void tf_ssb_db_init(tf_ssb_t* ssb) { sqlite3* db = tf_ssb_acquire_db_writer(ssb); _tf_ssb_db_init_internal(db); sqlite3_stmt* statement = NULL; int auto_vacuum = 0; if (sqlite3_prepare(db, "PRAGMA auto_vacuum", -1, &statement, NULL) == SQLITE_OK) { if (sqlite3_step(statement) == SQLITE_ROW) { auto_vacuum = sqlite3_column_int(statement, 0); } sqlite3_finalize(statement); } if (auto_vacuum != 1 /* FULL */) { tf_printf("Enabling auto-vacuum and performing full vacuum.\n"); _tf_ssb_db_exec(db, "PRAGMA auto_vacuum = FULL"); _tf_ssb_db_exec(db, "VACUUM main"); tf_printf("All clean.\n"); } _tf_ssb_db_exec(db, "CREATE TABLE IF NOT EXISTS messages (" " author TEXT," " id TEXT PRIMARY KEY," " sequence INTEGER," " timestamp REAL," " previous TEXT," " hash TEXT," " content BLOB," " signature TEXT," " flags INTEGER," " UNIQUE(author, sequence)" ")"); if (_tf_ssb_db_has_rows(db, "SELECT name FROM pragma_table_info('messages') WHERE name = 'content' AND type == 'TEXT'")) { tf_printf("converting to JSONB\n"); _tf_ssb_db_exec(db, "DROP TRIGGER IF EXISTS messages_ai_refs"); _tf_ssb_db_exec(db, "DROP TRIGGER IF EXISTS messages_ad_refs"); _tf_ssb_db_exec(db, "DROP TRIGGER IF EXISTS messages_ai"); _tf_ssb_db_exec(db, "DROP TRIGGER IF EXISTS messages_ad"); _tf_ssb_db_exec(db, "DROP TABLE IF EXISTS messages_fts"); _tf_ssb_db_exec(db, "BEGIN TRANSACTION"); _tf_ssb_db_exec(db, "ALTER TABLE messages ADD COLUMN contentb BLOB"); _tf_ssb_db_exec(db, "UPDATE messages SET contentb = jsonb(content)"); _tf_ssb_db_exec(db, "ALTER TABLE messages DROP COLUMN content"); _tf_ssb_db_exec(db, "ALTER TABLE messages RENAME COLUMN contentb TO content"); _tf_ssb_db_exec(db, "COMMIT TRANSACTION"); } if (_tf_ssb_db_has_rows(db, "SELECT name FROM pragma_table_info('messages') WHERE name = 'sequence_before_author'")) { tf_printf("Renaming sequence_before_author -> flags.\n"); _tf_ssb_db_exec(db, "ALTER TABLE messages RENAME COLUMN sequence_before_author TO flags"); } _tf_ssb_db_exec(db, "CREATE INDEX IF NOT EXISTS messages_author_id_index ON messages (author, id)"); _tf_ssb_db_exec(db, "CREATE INDEX IF NOT EXISTS messages_author_sequence_index ON messages (author, sequence)"); _tf_ssb_db_exec(db, "CREATE INDEX IF NOT EXISTS messages_author_timestamp_index ON messages (author, timestamp)"); _tf_ssb_db_exec(db, "CREATE INDEX IF NOT EXISTS messages_timestamp_index ON messages (timestamp)"); _tf_ssb_db_exec(db, "CREATE TABLE IF NOT EXISTS blobs (" " id TEXT PRIMARY KEY," " content BLOB," " created INTEGER" ")"); _tf_ssb_db_exec(db, "DROP TABLE IF EXISTS blob_wants"); _tf_ssb_db_exec(db, "CREATE TABLE IF NOT EXISTS properties (" " id TEXT," " key TEXT," " value TEXT," " UNIQUE(id, key)" ")"); _tf_ssb_db_exec(db, "CREATE TABLE IF NOT EXISTS connections (" " host TEXT," " port INTEGER," " key TEXT," " last_attempt INTEGER," " last_success INTEGER," " UNIQUE(host, port, key)" ")"); _tf_ssb_db_exec(db, "CREATE TABLE IF NOT EXISTS identities (" " user TEXT," " public_key TEXT UNIQUE," " private_key TEXT UNIQUE" ")"); _tf_ssb_db_exec(db, "CREATE INDEX IF NOT EXISTS identities_user ON identities (user, public_key)"); bool populate_fts = false; if (!_tf_ssb_db_has_rows(db, "PRAGMA table_list('messages_fts')")) { _tf_ssb_db_exec(db, "CREATE VIRTUAL TABLE IF NOT EXISTS messages_fts USING fts5(content, content=messages, content_rowid=rowid)"); populate_fts = true; } if (!populate_fts && /* HACK */ false) { tf_printf("Checking FTS5 integrity...\n"); if (sqlite3_exec(db, "INSERT INTO messages_fts(messages_fts, rank) VALUES ('integrity-check', 0)", NULL, NULL, NULL) == SQLITE_CORRUPT_VTAB) { populate_fts = true; } tf_printf("Done.\n"); } if (populate_fts) { tf_printf("Populating full-text search...\n"); _tf_ssb_db_exec(db, "INSERT INTO messages_fts (rowid, content) SELECT rowid, json(content) FROM messages"); tf_printf("Done.\n"); } _tf_ssb_db_exec( db, "CREATE TRIGGER IF NOT EXISTS messages_ai AFTER INSERT ON messages BEGIN INSERT INTO messages_fts(rowid, content) VALUES (new.rowid, json(new.content)); END"); _tf_ssb_db_exec(db, "CREATE TRIGGER IF NOT EXISTS messages_ad AFTER DELETE ON messages BEGIN INSERT INTO messages_fts(messages_fts, rowid, content) VALUES ('delete', old.rowid, " "old.content); END"); if (!_tf_ssb_db_has_rows(db, "PRAGMA table_list('messages_refs')")) { _tf_ssb_db_exec(db, "CREATE TABLE IF NOT EXISTS messages_refs (" " message TEXT, " " ref TEXT, " " UNIQUE(message, ref)" ")"); tf_printf("Populating messages_refs...\n"); _tf_ssb_db_exec(db, "INSERT INTO messages_refs(message, ref) " "SELECT messages.id, j.value FROM messages, json_tree(messages.content) as j WHERE " "j.value LIKE '&%.sha256' OR " "j.value LIKE '%%%.sha256' OR " "j.value LIKE '@%.ed25519' " "ON CONFLICT DO NOTHING"); tf_printf("Done.\n"); } _tf_ssb_db_exec(db, "DROP TRIGGER IF EXISTS messages_ai_refs"); _tf_ssb_db_exec(db, "CREATE TRIGGER IF NOT EXISTS messages_ai_refs AFTER INSERT ON messages BEGIN " "INSERT INTO messages_refs(message, ref) " "SELECT DISTINCT new.id, j.value FROM json_tree(new.content) as j WHERE " "j.value LIKE '&%.sha256' OR " "j.value LIKE '%%%.sha256' OR " "j.value LIKE '@%.ed25519' " "ON CONFLICT DO NOTHING; END"); _tf_ssb_db_exec(db, "DROP TRIGGER IF EXISTS messages_ad_refs"); _tf_ssb_db_exec(db, "CREATE TRIGGER IF NOT EXISTS messages_ad_refs AFTER DELETE ON messages BEGIN DELETE FROM messages_refs WHERE messages_refs.message = old.id; END"); _tf_ssb_db_exec(db, "CREATE INDEX IF NOT EXISTS messages_refs_message_idx ON messages_refs (message)"); _tf_ssb_db_exec(db, "CREATE INDEX IF NOT EXISTS messages_refs_ref_idx ON messages_refs (ref)"); _tf_ssb_db_exec(db, "DROP VIEW IF EXISTS blob_wants_view"); _tf_ssb_db_exec(db, "CREATE VIEW IF NOT EXISTS blob_wants_view (id, timestamp) AS " " SELECT messages_refs.ref AS id, messages.timestamp AS timestamp " " FROM messages_refs " " JOIN messages ON messages.id = messages_refs.message " " LEFT OUTER JOIN blobs ON messages_refs.ref = blobs.id " " WHERE blobs.id IS NULL " " AND LENGTH(messages_refs.ref) = 52 " " AND messages_refs.ref LIKE '&%.sha256'"); bool need_add_flags = true; bool need_convert_timestamp_to_real = false; if (sqlite3_prepare(db, "PRAGMA table_info(messages)", -1, &statement, NULL) == SQLITE_OK) { int result = SQLITE_OK; while ((result = sqlite3_step(statement)) == SQLITE_ROW) { const char* name = (const char*)sqlite3_column_text(statement, 1); const char* type = (const char*)sqlite3_column_text(statement, 2); if (name && type && strcmp(name, "timestamp") == 0 && strcmp(type, "INTEGER") == 0) { need_convert_timestamp_to_real = true; } if (name && strcmp(name, "flags") == 0) { need_add_flags = false; } } sqlite3_finalize(statement); } if (need_convert_timestamp_to_real) { tf_printf("Converting timestamp column from INTEGER to REAL.\n"); _tf_ssb_db_exec(db, "BEGIN TRANSACTION"); _tf_ssb_db_exec(db, "DROP INDEX IF EXISTS messages_author_timestamp_index"); _tf_ssb_db_exec(db, "ALTER TABLE messages ADD COLUMN timestamp_real REAL"); _tf_ssb_db_exec(db, "UPDATE messages SET timestamp_real = timestamp"); _tf_ssb_db_exec(db, "ALTER TABLE messages DROP COLUMN timestamp"); _tf_ssb_db_exec(db, "ALTER TABLE messages RENAME COLUMN timestamp_real TO timestamp"); _tf_ssb_db_exec(db, "CREATE INDEX IF NOT EXISTS messages_author_timestamp_index ON messages (author, timestamp)"); _tf_ssb_db_exec(db, "COMMIT TRANSACTION"); } if (need_add_flags) { tf_printf("Adding flags column.\n"); _tf_ssb_db_exec(db, "ALTER TABLE messages ADD COLUMN flags INTEGER"); } tf_ssb_release_db_writer(ssb, db); } static bool _tf_ssb_db_previous_message_exists(sqlite3* db, const char* author, int64_t sequence, const char* previous) { bool exists = false; if (sequence == 1) { exists = true; } else { sqlite3_stmt* statement; if (sqlite3_prepare(db, "SELECT COUNT(*) FROM messages WHERE author = ?1 AND sequence = ?2 AND id = ?3", -1, &statement, NULL) == SQLITE_OK) { if (sqlite3_bind_text(statement, 1, author, -1, NULL) == SQLITE_OK && sqlite3_bind_int64(statement, 2, sequence - 1) == SQLITE_OK && sqlite3_bind_text(statement, 3, previous, -1, NULL) == SQLITE_OK && sqlite3_step(statement) == SQLITE_ROW) { exists = sqlite3_column_int(statement, 0) != 0; } sqlite3_finalize(statement); } } return exists; } static int64_t _tf_ssb_db_store_message_raw(tf_ssb_t* ssb, const char* id, const char* previous, const char* author, int64_t sequence, double timestamp, const char* content, size_t content_len, const char* signature, int flags) { sqlite3* db = tf_ssb_acquire_db_writer(ssb); int64_t last_row_id = -1; if (_tf_ssb_db_previous_message_exists(db, author, sequence, previous)) { const char* query = "INSERT INTO messages (id, previous, author, sequence, timestamp, content, hash, signature, flags) VALUES (?, ?, ?, ?, ?, jsonb(?), " "?, ?, ?) ON CONFLICT DO NOTHING"; sqlite3_stmt* statement; if (sqlite3_prepare(db, query, -1, &statement, NULL) == SQLITE_OK) { if (sqlite3_bind_text(statement, 1, id, -1, NULL) == SQLITE_OK && (previous ? sqlite3_bind_text(statement, 2, previous, -1, NULL) : sqlite3_bind_null(statement, 2)) == SQLITE_OK && sqlite3_bind_text(statement, 3, author, -1, NULL) == SQLITE_OK && sqlite3_bind_int64(statement, 4, sequence) == SQLITE_OK && sqlite3_bind_double(statement, 5, timestamp) == SQLITE_OK && sqlite3_bind_text(statement, 6, content, content_len, NULL) == SQLITE_OK && sqlite3_bind_text(statement, 7, "sha256", 6, NULL) == SQLITE_OK && sqlite3_bind_text(statement, 8, signature, -1, NULL) == SQLITE_OK && sqlite3_bind_int(statement, 9, flags) == SQLITE_OK) { int r = sqlite3_step(statement); if (r != SQLITE_DONE) { tf_printf("_tf_ssb_db_store_message_raw: %s\n", sqlite3_errmsg(db)); } if (r == SQLITE_DONE && sqlite3_changes(db) != 0) { last_row_id = sqlite3_last_insert_rowid(db); } } else { tf_printf("bind failed\n"); } sqlite3_finalize(statement); } else { tf_printf("%s: prepare failed: %s\n", __FUNCTION__, sqlite3_errmsg(db)); } } else { tf_printf("%p: Previous message doesn't exist for author=%s sequence=%" PRId64 ".\n", db, author, sequence); } tf_ssb_release_db_writer(ssb, db); return last_row_id; } static char* _tf_ssb_db_get_message_blob_wants(tf_ssb_t* ssb, int64_t rowid) { sqlite3* db = tf_ssb_acquire_db_reader(ssb); sqlite3_stmt* statement; char* result = NULL; size_t size = 0; if (sqlite3_prepare(db, "SELECT DISTINCT json.value FROM messages, json_tree(messages.content) AS json LEFT OUTER JOIN blobs ON json.value = blobs.id WHERE messages.rowid = ?1 AND " "json.value LIKE '&%%.sha256' AND length(json.value) = ?2 AND blobs.content IS NULL", -1, &statement, NULL) == SQLITE_OK) { if (sqlite3_bind_int64(statement, 1, rowid) == SQLITE_OK && sqlite3_bind_int(statement, 2, k_blob_id_len - 1) == SQLITE_OK) { int r = SQLITE_OK; while ((r = sqlite3_step(statement)) == SQLITE_ROW) { int id_size = sqlite3_column_bytes(statement, 0); const uint8_t* id = sqlite3_column_text(statement, 0); result = tf_resize_vec(result, size + id_size + 1); memcpy(result + size, id, id_size + 1); size += id_size + 1; } if (r != SQLITE_DONE) { tf_printf("%s\n", sqlite3_errmsg(db)); } } else { tf_printf("bind failed: %s\n", sqlite3_errmsg(db)); } sqlite3_finalize(statement); } else { tf_printf("%s: prepare failed: %s\n", __FUNCTION__, sqlite3_errmsg(db)); } result = tf_realloc(result, size + 1); result[size] = '\0'; tf_ssb_release_db_reader(ssb, db); return result; } typedef struct _message_store_t { uv_work_t work; tf_ssb_t* ssb; char id[k_id_base64_len]; char signature[512]; int flags; char previous[k_id_base64_len]; char author[k_id_base64_len]; int64_t sequence; double timestamp; const char* content; size_t length; bool out_stored; char* out_blob_wants; tf_ssb_db_store_message_callback_t* callback; void* user_data; message_store_t* next; } message_store_t; static void _tf_ssb_db_store_message_work(uv_work_t* work) { message_store_t* store = work->data; tf_ssb_record_thread_busy(store->ssb, true); tf_trace_t* trace = tf_ssb_get_trace(store->ssb); tf_trace_begin(trace, "message_store_work"); int64_t last_row_id = _tf_ssb_db_store_message_raw(store->ssb, store->id, *store->previous ? store->previous : NULL, store->author, store->sequence, store->timestamp, store->content, store->length, store->signature, store->flags); if (last_row_id != -1) { store->out_stored = true; store->out_blob_wants = _tf_ssb_db_get_message_blob_wants(store->ssb, last_row_id); } tf_trace_end(trace); tf_ssb_record_thread_busy(store->ssb, false); } static void _wake_up_queue(tf_ssb_t* ssb, tf_ssb_store_queue_t* queue) { if (!queue->running) { message_store_t* next = queue->head; if (next) { queue->head = next->next; if (queue->tail == next) { queue->tail = NULL; } next->next = NULL; queue->running = true; int r = uv_queue_work(tf_ssb_get_loop(ssb), &next->work, _tf_ssb_db_store_message_work, _tf_ssb_db_store_message_after_work); if (r) { _tf_ssb_db_store_message_work_finish(next); } } } } static void _tf_ssb_db_store_message_work_finish(message_store_t* store) { JSContext* context = tf_ssb_get_context(store->ssb); if (store->callback) { store->callback(store->id, store->out_stored, store->user_data); } JS_FreeCString(context, store->content); tf_ssb_store_queue_t* queue = tf_ssb_get_store_queue(store->ssb); queue->running = false; _wake_up_queue(store->ssb, queue); tf_free(store); } static void _tf_ssb_db_store_message_after_work(uv_work_t* work, int status) { message_store_t* store = work->data; tf_trace_t* trace = tf_ssb_get_trace(store->ssb); tf_trace_begin(trace, "message_store_after_work"); if (store->out_stored) { tf_trace_begin(trace, "notify_message_added"); JSContext* context = tf_ssb_get_context(store->ssb); JSValue formatted = tf_ssb_format_message(context, store->previous, store->author, store->sequence, store->timestamp, "sha256", store->content, store->signature, store->flags); JSValue message = JS_NewObject(context); JS_SetPropertyStr(context, message, "key", JS_NewString(context, store->id)); JS_SetPropertyStr(context, message, "value", formatted); char timestamp_string[256]; snprintf(timestamp_string, sizeof(timestamp_string), "%f", store->timestamp); JS_SetPropertyStr(context, message, "timestamp", JS_NewString(context, timestamp_string)); tf_ssb_notify_message_added(store->ssb, store->id, message); JS_FreeValue(context, message); tf_trace_end(trace); } if (store->out_blob_wants) { tf_trace_begin(trace, "notify_blob_wants_added"); for (char* p = store->out_blob_wants; *p; p = p + strlen(p)) { tf_ssb_notify_blob_want_added(store->ssb, p); } tf_free(store->out_blob_wants); tf_trace_end(trace); } _tf_ssb_db_store_message_work_finish(store); tf_trace_end(trace); } void tf_ssb_db_store_message( tf_ssb_t* ssb, JSContext* context, const char* id, JSValue val, const char* signature, int flags, tf_ssb_db_store_message_callback_t* callback, void* user_data) { JSValue previousval = JS_GetPropertyStr(context, val, "previous"); const char* previous = JS_IsNull(previousval) ? NULL : JS_ToCString(context, previousval); JS_FreeValue(context, previousval); JSValue authorval = JS_GetPropertyStr(context, val, "author"); const char* author = JS_ToCString(context, authorval); JS_FreeValue(context, authorval); int64_t sequence = -1; JSValue sequenceval = JS_GetPropertyStr(context, val, "sequence"); JS_ToInt64(context, &sequence, sequenceval); JS_FreeValue(context, sequenceval); double timestamp = -1.0; JSValue timestampval = JS_GetPropertyStr(context, val, "timestamp"); JS_ToFloat64(context, ×tamp, timestampval); JS_FreeValue(context, timestampval); JSValue contentval = JS_GetPropertyStr(context, val, "content"); JSValue content = JS_JSONStringify(context, contentval, JS_NULL, JS_NULL); size_t content_len; const char* contentstr = JS_ToCStringLen(context, &content_len, content); JS_FreeValue(context, content); JS_FreeValue(context, contentval); message_store_t* store = tf_malloc(sizeof(message_store_t)); *store = (message_store_t) { .work = { .data = store, }, .ssb = ssb, .sequence = sequence, .timestamp = timestamp, .content = contentstr, .length = content_len, .flags = flags, .callback = callback, .user_data = user_data, }; snprintf(store->id, sizeof(store->id), "%s", id); snprintf(store->previous, sizeof(store->previous), "%s", previous ? previous : ""); snprintf(store->author, sizeof(store->author), "%s", author); snprintf(store->signature, sizeof(store->signature), "%s", signature); JS_FreeCString(context, author); JS_FreeCString(context, previous); tf_ssb_store_queue_t* queue = tf_ssb_get_store_queue(ssb); if (queue->tail) { message_store_t* tail = queue->tail; tail->next = store; queue->tail = store; } else { queue->head = store; queue->tail = store; } _wake_up_queue(ssb, queue); } bool tf_ssb_db_message_content_get(tf_ssb_t* ssb, const char* id, uint8_t** out_blob, size_t* out_size) { bool result = false; sqlite3_stmt* statement; sqlite3* db = tf_ssb_acquire_db_reader(ssb); const char* query = "SELECT json(content) FROM messages WHERE id = ?"; if (sqlite3_prepare(db, query, -1, &statement, NULL) == SQLITE_OK) { if (sqlite3_bind_text(statement, 1, id, -1, NULL) == SQLITE_OK && sqlite3_step(statement) == SQLITE_ROW) { const uint8_t* blob = sqlite3_column_blob(statement, 0); int size = sqlite3_column_bytes(statement, 0); if (out_blob) { *out_blob = tf_malloc(size + 1); memcpy(*out_blob, blob, size); (*out_blob)[size] = '\0'; } if (out_size) { *out_size = size; } result = true; } sqlite3_finalize(statement); } tf_ssb_release_db_reader(ssb, db); return result; } bool tf_ssb_db_blob_has(tf_ssb_t* ssb, const char* id) { bool result = false; sqlite3_stmt* statement; sqlite3* db = tf_ssb_acquire_db_reader(ssb); const char* query = "SELECT COUNT(*) FROM blobs WHERE id = ?1"; if (sqlite3_prepare(db, query, -1, &statement, NULL) == SQLITE_OK) { if (sqlite3_bind_text(statement, 1, id, -1, NULL) == SQLITE_OK && sqlite3_step(statement) == SQLITE_ROW) { result = sqlite3_column_int64(statement, 0) != 0; } sqlite3_finalize(statement); } tf_ssb_release_db_reader(ssb, db); return result; } bool tf_ssb_db_blob_get(tf_ssb_t* ssb, const char* id, uint8_t** out_blob, size_t* out_size) { bool result = false; sqlite3_stmt* statement; sqlite3* db = tf_ssb_acquire_db_reader(ssb); const char* query = "SELECT content FROM blobs WHERE id = ?1"; if (sqlite3_prepare(db, query, -1, &statement, NULL) == SQLITE_OK) { if (sqlite3_bind_text(statement, 1, id, -1, NULL) == SQLITE_OK && sqlite3_step(statement) == SQLITE_ROW) { const uint8_t* blob = sqlite3_column_blob(statement, 0); int size = sqlite3_column_bytes(statement, 0); if (out_blob) { *out_blob = tf_malloc(size + 1); if (size) { memcpy(*out_blob, blob, size); } (*out_blob)[size] = '\0'; } if (out_size) { *out_size = size; } result = true; } sqlite3_finalize(statement); } tf_ssb_release_db_reader(ssb, db); return result; } typedef struct _blob_store_work_t { uv_work_t work; tf_ssb_t* ssb; const uint8_t* blob; size_t size; char id[k_blob_id_len]; bool is_new; tf_ssb_db_blob_store_callback_t* callback; void* user_data; } blob_store_work_t; static void _tf_ssb_db_blob_store_work(uv_work_t* work) { blob_store_work_t* blob_work = work->data; tf_ssb_record_thread_busy(blob_work->ssb, true); tf_trace_t* trace = tf_ssb_get_trace(blob_work->ssb); tf_trace_begin(trace, "blob_store_work"); tf_ssb_db_blob_store(blob_work->ssb, blob_work->blob, blob_work->size, blob_work->id, sizeof(blob_work->id), &blob_work->is_new); tf_trace_end(trace); tf_ssb_record_thread_busy(blob_work->ssb, false); } static void _tf_ssb_db_blob_store_after_work(uv_work_t* work, int status) { blob_store_work_t* blob_work = work->data; tf_trace_t* trace = tf_ssb_get_trace(blob_work->ssb); tf_trace_begin(trace, "blob_store_after_work"); if (status == 0 && *blob_work->id) { tf_ssb_notify_blob_stored(blob_work->ssb, blob_work->id); } if (status != 0) { tf_printf("tf_ssb_db_blob_store_async -> uv_queue_work failed asynchronously: %s\n", uv_strerror(status)); } if (blob_work->callback) { blob_work->callback(status == 0 ? blob_work->id : NULL, blob_work->is_new, blob_work->user_data); } tf_trace_end(trace); tf_free(blob_work); } void tf_ssb_db_blob_store_async(tf_ssb_t* ssb, const uint8_t* blob, size_t size, tf_ssb_db_blob_store_callback_t* callback, void* user_data) { blob_store_work_t* work = tf_malloc(sizeof(blob_store_work_t)); *work = (blob_store_work_t) { .work = { .data = work, }, .ssb = ssb, .blob = blob, .size = size, .callback = callback, .user_data = user_data, }; int r = uv_queue_work(tf_ssb_get_loop(ssb), &work->work, _tf_ssb_db_blob_store_work, _tf_ssb_db_blob_store_after_work); if (r) { tf_printf("tf_ssb_db_blob_store_async -> uv_queue_work failed immediately: %s\n", uv_strerror(r)); if (callback) { callback(NULL, false, user_data); } tf_free(work); } } bool tf_ssb_db_blob_store(tf_ssb_t* ssb, const uint8_t* blob, size_t size, char* out_id, size_t out_id_size, bool* out_new) { bool result = false; uint8_t hash[crypto_hash_sha256_BYTES]; crypto_hash_sha256(hash, blob, size); char hash64[256]; tf_base64_encode(hash, sizeof(hash), hash64, sizeof(hash64)); char id[512]; snprintf(id, sizeof(id), "&%s.sha256", hash64); int rows = 0; sqlite3* db = tf_ssb_acquire_db_writer(ssb); sqlite3_stmt* statement; if (sqlite3_prepare(db, "INSERT INTO blobs (id, content, created) VALUES (?1, ?2, CAST(strftime('%s') AS INTEGER)) ON CONFLICT DO NOTHING", -1, &statement, NULL) == SQLITE_OK) { if (sqlite3_bind_text(statement, 1, id, -1, NULL) == SQLITE_OK && sqlite3_bind_blob(statement, 2, blob, size, NULL) == SQLITE_OK) { result = sqlite3_step(statement) == SQLITE_DONE; rows = sqlite3_changes(db); } else { tf_printf("bind failed: %s\n", sqlite3_errmsg(db)); } sqlite3_finalize(statement); } else { tf_printf("%s: prepare failed: %s\n", __FUNCTION__, sqlite3_errmsg(db)); } tf_ssb_release_db_writer(ssb, db); if (rows) { if (!out_new) { tf_printf("blob stored %s %zd => %d\n", id, size, result); } } if (result && out_id) { snprintf(out_id, out_id_size, "%s", id); } if (out_new) { *out_new = rows != 0; } return result; } bool tf_ssb_db_get_message_by_author_and_sequence( tf_ssb_t* ssb, const char* author, int64_t sequence, char* out_message_id, size_t out_message_id_size, double* out_timestamp, char** out_content) { bool found = false; sqlite3_stmt* statement; const char* query = "SELECT id, timestamp, json(content) FROM messages WHERE author = ?1 AND sequence = ?2"; sqlite3* db = tf_ssb_acquire_db_reader(ssb); if (sqlite3_prepare(db, query, -1, &statement, NULL) == SQLITE_OK) { if (sqlite3_bind_text(statement, 1, author, -1, NULL) == SQLITE_OK && sqlite3_bind_int64(statement, 2, sequence) == SQLITE_OK && sqlite3_step(statement) == SQLITE_ROW) { if (out_message_id) { strncpy(out_message_id, (const char*)sqlite3_column_text(statement, 0), out_message_id_size - 1); } if (out_timestamp) { *out_timestamp = sqlite3_column_double(statement, 1); } if (out_content) { *out_content = tf_strdup((const char*)sqlite3_column_text(statement, 2)); } found = true; } sqlite3_finalize(statement); } else { tf_printf("%s: prepare failed: %s\n", __FUNCTION__, sqlite3_errmsg(db)); } tf_ssb_release_db_reader(ssb, db); return found; } bool tf_ssb_db_get_latest_message_by_author(tf_ssb_t* ssb, const char* author, int64_t* out_sequence, char* out_message_id, size_t out_message_id_size) { bool found = false; sqlite3_stmt* statement; sqlite3* db = tf_ssb_acquire_db_reader(ssb); const char* query = "SELECT id, sequence FROM messages WHERE author = ?1 AND sequence = (SELECT MAX(sequence) FROM messages WHERE author = ?1)"; if (sqlite3_prepare(db, query, -1, &statement, NULL) == SQLITE_OK) { if (sqlite3_bind_text(statement, 1, author, -1, NULL) == SQLITE_OK && sqlite3_step(statement) == SQLITE_ROW) { if (out_sequence) { *out_sequence = sqlite3_column_int64(statement, 1); } if (out_message_id) { strncpy(out_message_id, (const char*)sqlite3_column_text(statement, 0), out_message_id_size - 1); } found = true; } sqlite3_finalize(statement); } else { tf_printf("%s: prepare failed: %s\n", __FUNCTION__, sqlite3_errmsg(db)); } tf_ssb_release_db_reader(ssb, db); return found; } static JSValue _tf_ssb_sqlite_bind_json(JSContext* context, sqlite3* db, sqlite3_stmt* statement, JSValue binds) { if (JS_IsUndefined(binds)) { return JS_UNDEFINED; } if (!JS_IsArray(context, binds)) { return JS_ThrowTypeError(context, "Expected bind parameters to be an array."); } JSValue result = JS_UNDEFINED; int32_t length = tf_util_get_length(context, binds); for (int i = 0; i < length && JS_IsUndefined(result); i++) { JSValue value = JS_GetPropertyUint32(context, binds, i); if (JS_IsNumber(value)) { int64_t number = 0; JS_ToInt64(context, &number, value); if (sqlite3_bind_int64(statement, i + 1, number) != SQLITE_OK) { result = JS_ThrowInternalError(context, "Failed to bind: %s.", sqlite3_errmsg(db)); } } else if (JS_IsBool(value)) { if (sqlite3_bind_int(statement, i + 1, JS_ToBool(context, value) ? 1 : 0) != SQLITE_OK) { result = JS_ThrowInternalError(context, "Failed to bind: %s.", sqlite3_errmsg(db)); } } else if (JS_IsNull(value)) { if (sqlite3_bind_null(statement, i + 1) != SQLITE_OK) { result = JS_ThrowInternalError(context, "Failed to bind: %s.", sqlite3_errmsg(db)); } } else { size_t str_len = 0; const char* str = JS_ToCStringLen(context, &str_len, value); if (str) { if (sqlite3_bind_text(statement, i + 1, str, str_len, SQLITE_TRANSIENT) != SQLITE_OK) { result = JS_ThrowInternalError(context, "Failed to bind: %s.", sqlite3_errmsg(db)); } JS_FreeCString(context, str); } else { result = JS_ThrowInternalError(context, "Could not convert bind argument %d to string.", i); } } JS_FreeValue(context, value); } return result; } static JSValue _tf_ssb_sqlite_row_to_json(JSContext* context, sqlite3_stmt* row) { JSValue result = JS_NewObject(context); for (int i = 0; i < sqlite3_column_count(row); i++) { const char* name = sqlite3_column_name(row, i); switch (sqlite3_column_type(row, i)) { case SQLITE_INTEGER: JS_SetPropertyStr(context, result, name, JS_NewInt64(context, sqlite3_column_int64(row, i))); break; case SQLITE_FLOAT: JS_SetPropertyStr(context, result, name, JS_NewFloat64(context, sqlite3_column_double(row, i))); break; case SQLITE_TEXT: JS_SetPropertyStr(context, result, name, JS_NewStringLen(context, (const char*)sqlite3_column_text(row, i), sqlite3_column_bytes(row, i))); break; case SQLITE_BLOB: JS_SetPropertyStr(context, result, name, JS_NewArrayBufferCopy(context, sqlite3_column_blob(row, i), sqlite3_column_bytes(row, i))); break; case SQLITE_NULL: JS_SetPropertyStr(context, result, name, JS_NULL); break; } } return result; } int tf_ssb_sqlite_authorizer(void* user_data, int action_code, const char* arg0, const char* arg1, const char* arg2, const char* arg3) { int result = SQLITE_DENY; switch (action_code) { case SQLITE_SELECT: case SQLITE_FUNCTION: result = SQLITE_OK; break; case SQLITE_READ: result = (strcmp(arg0, "blob_wants_view") == 0 || strcmp(arg0, "json_each") == 0 || strcmp(arg0, "json_tree") == 0 || strcmp(arg0, "messages") == 0 || strcmp(arg0, "messages_fts") == 0 || strcmp(arg0, "messages_fts_idx") == 0 || strcmp(arg0, "messages_fts_config") == 0 || strcmp(arg0, "messages_refs") == 0 || strcmp(arg0, "messages_refs_message_idx") == 0 || strcmp(arg0, "messages_refs_ref_idx") == 0 || strcmp(arg0, "sqlite_master") == 0 || false) ? SQLITE_OK : SQLITE_DENY; break; case SQLITE_PRAGMA: result = strcmp(arg0, "data_version") == 0 ? SQLITE_OK : SQLITE_DENY; break; case SQLITE_UPDATE: result = strcmp(arg0, "sqlite_master") == 0 ? SQLITE_OK : SQLITE_DENY; break; } if (result != SQLITE_OK) { tf_printf("Denying sqlite access to %d %s %s %s %s\n", action_code, arg0, arg1, arg2, arg3); fflush(stdout); } return result; } JSValue tf_ssb_db_visit_query(tf_ssb_t* ssb, const char* query, const JSValue binds, void (*callback)(JSValue row, void* user_data), void* user_data) { JSValue result = JS_UNDEFINED; sqlite3* db = tf_ssb_acquire_db_reader_restricted(ssb); JSContext* context = tf_ssb_get_context(ssb); sqlite3_stmt* statement; if (sqlite3_prepare(db, query, -1, &statement, NULL) == SQLITE_OK) { JSValue bind_result = _tf_ssb_sqlite_bind_json(context, db, statement, binds); if (JS_IsUndefined(bind_result)) { int r = SQLITE_OK; while ((r = sqlite3_step(statement)) == SQLITE_ROW) { JSValue row = _tf_ssb_sqlite_row_to_json(context, statement); tf_trace_t* trace = tf_ssb_get_trace(ssb); tf_trace_begin(trace, "callback"); callback(row, user_data); tf_trace_end(trace); JS_FreeValue(context, row); } if (r != SQLITE_DONE) { result = JS_ThrowInternalError(context, "SQL Error %s: running \"%s\".", sqlite3_errmsg(db), query); } } else { result = bind_result; } sqlite3_finalize(statement); } else { result = JS_ThrowInternalError(context, "SQL Error %s: preparing \"%s\".", sqlite3_errmsg(db), query); } tf_ssb_release_db_reader(ssb, db); return result; } JSValue tf_ssb_format_message( JSContext* context, const char* previous, const char* author, int64_t sequence, double timestamp, const char* hash, const char* content, const char* signature, int flags) { JSValue value = JS_NewObject(context); JS_SetPropertyStr(context, value, "previous", (previous && *previous) ? JS_NewString(context, previous) : JS_NULL); if (flags & k_tf_ssb_message_flag_sequence_before_author) { JS_SetPropertyStr(context, value, "sequence", JS_NewInt64(context, sequence)); JS_SetPropertyStr(context, value, "author", JS_NewString(context, author)); } else { JS_SetPropertyStr(context, value, "author", JS_NewString(context, author)); JS_SetPropertyStr(context, value, "sequence", JS_NewInt64(context, sequence)); } JS_SetPropertyStr(context, value, "timestamp", JS_NewFloat64(context, timestamp)); JS_SetPropertyStr(context, value, "hash", JS_NewString(context, hash)); JS_SetPropertyStr(context, value, "content", JS_ParseJSON(context, content, strlen(content), NULL)); JS_SetPropertyStr(context, value, "signature", JS_NewString(context, signature)); return value; } int tf_ssb_db_identity_get_count_for_user(tf_ssb_t* ssb, const char* user) { int count = 0; sqlite3* db = tf_ssb_acquire_db_reader(ssb); sqlite3_stmt* statement = NULL; if (sqlite3_prepare(db, "SELECT COUNT(*) FROM identities WHERE user = ?", -1, &statement, NULL) == SQLITE_OK) { if (sqlite3_bind_text(statement, 1, user, -1, NULL) == SQLITE_OK) { if (sqlite3_step(statement) == SQLITE_ROW) { count = sqlite3_column_int(statement, 0); } } sqlite3_finalize(statement); } tf_ssb_release_db_reader(ssb, db); return count; } bool tf_ssb_db_identity_create(tf_ssb_t* ssb, const char* user, uint8_t* out_public_key, uint8_t* out_private_key) { int count = tf_ssb_db_identity_get_count_for_user(ssb, user); if (count < 16) { char public[512]; char private[512]; tf_ssb_generate_keys_buffer(public, sizeof(public), private, sizeof(private)); if (tf_ssb_db_identity_add(ssb, user, public, private)) { tf_ssb_id_str_to_bin(out_public_key, public); tf_ssb_id_str_to_bin(out_private_key, private); return true; } } return false; } bool tf_ssb_db_identity_add(tf_ssb_t* ssb, const char* user, const char* public_key, const char* private_key) { bool added = false; sqlite3* db = tf_ssb_acquire_db_writer(ssb); sqlite3_stmt* statement = NULL; if (sqlite3_prepare(db, "INSERT INTO identities (user, public_key, private_key) VALUES (?, ?, ?) ON CONFLICT DO NOTHING", -1, &statement, NULL) == SQLITE_OK) { if (sqlite3_bind_text(statement, 1, user, -1, NULL) == SQLITE_OK && sqlite3_bind_text(statement, 2, public_key, -1, NULL) == SQLITE_OK && sqlite3_bind_text(statement, 3, private_key, -1, NULL) == SQLITE_OK) { added = sqlite3_step(statement) == SQLITE_DONE && sqlite3_changes(db) != 0; if (!added) { tf_printf("Unable to add identity: %s.\n", sqlite3_errmsg(db)); } } sqlite3_finalize(statement); } tf_ssb_release_db_writer(ssb, db); return added; } bool tf_ssb_db_identity_delete(tf_ssb_t* ssb, const char* user, const char* public_key) { bool removed = false; sqlite3* db = tf_ssb_acquire_db_writer(ssb); sqlite3_stmt* statement = NULL; tf_printf("deleting [%s] [%s]\n", user, public_key); if (sqlite3_prepare(db, "DELETE FROM identities WHERE user = ? AND public_key = ?", -1, &statement, NULL) == SQLITE_OK) { if (sqlite3_bind_text(statement, 1, user, -1, NULL) == SQLITE_OK && sqlite3_bind_text(statement, 2, public_key, -1, NULL) == SQLITE_OK) { removed = sqlite3_step(statement) == SQLITE_DONE && sqlite3_changes(db) != 0; if (!removed) { tf_printf("Unable to delete identity: %s.\n", sqlite3_errmsg(db)); } } sqlite3_finalize(statement); } tf_ssb_release_db_writer(ssb, db); return removed; } void tf_ssb_db_identity_visit(tf_ssb_t* ssb, const char* user, void (*callback)(const char* identity, void* user_data), void* user_data) { sqlite3* db = tf_ssb_acquire_db_reader(ssb); sqlite3_stmt* statement = NULL; if (sqlite3_prepare(db, "SELECT public_key FROM identities WHERE user = ? ORDER BY public_key", -1, &statement, NULL) == SQLITE_OK) { if (sqlite3_bind_text(statement, 1, user, -1, NULL) == SQLITE_OK) { while (sqlite3_step(statement) == SQLITE_ROW) { callback((const char*)sqlite3_column_text(statement, 0), user_data); } } sqlite3_finalize(statement); } tf_ssb_release_db_reader(ssb, db); } void tf_ssb_db_identity_visit_all(tf_ssb_t* ssb, void (*callback)(const char* identity, void* user_data), void* user_data) { sqlite3* db = tf_ssb_acquire_db_reader(ssb); sqlite3_stmt* statement = NULL; if (sqlite3_prepare(db, "SELECT public_key FROM identities ORDER BY public_key", -1, &statement, NULL) == SQLITE_OK) { while (sqlite3_step(statement) == SQLITE_ROW) { callback((const char*)sqlite3_column_text(statement, 0), user_data); } sqlite3_finalize(statement); } tf_ssb_release_db_reader(ssb, db); } bool tf_ssb_db_identity_get_private_key(tf_ssb_t* ssb, const char* user, const char* public_key, uint8_t* out_private_key, size_t private_key_size) { bool success = false; if (out_private_key) { memset(out_private_key, 0, private_key_size); } sqlite3* db = tf_ssb_acquire_db_reader(ssb); sqlite3_stmt* statement = NULL; if (sqlite3_prepare(db, "SELECT private_key FROM identities WHERE user = ? AND public_key = ?", -1, &statement, NULL) == SQLITE_OK) { if (sqlite3_bind_text(statement, 1, user, -1, NULL) == SQLITE_OK && sqlite3_bind_text(statement, 2, (public_key && *public_key == '@') ? public_key + 1 : public_key, -1, NULL) == SQLITE_OK) { if (sqlite3_step(statement) == SQLITE_ROW) { const char* key = (const char*)sqlite3_column_text(statement, 0); if (out_private_key && private_key_size) { int r = tf_base64_decode(key, sqlite3_column_bytes(statement, 0) - strlen(".ed25519"), out_private_key, private_key_size); success = r > 0; } else { success = true; } } } sqlite3_finalize(statement); } tf_ssb_release_db_reader(ssb, db); return success; } typedef struct _following_t following_t; typedef struct _following_t { char id[k_id_base64_len]; following_t** following; following_t** blocking; int following_count; int blocking_count; int depth; int ref_count; int block_ref_count; bool populated; } following_t; static int _following_compare(const void* a, const void* b) { const char* ida = a; const following_t* const* followingb = b; return strcmp(ida, (*followingb)->id); } static bool _has_following_entry(const char* id, following_t** list, int count) { return count ? bsearch(id, list, count, sizeof(following_t*), _following_compare) != 0 : false; } static bool _add_following_entry(following_t*** list, int* count, following_t* add) { int index = tf_util_insert_index(add->id, *list, *count, sizeof(following_t*), _following_compare); if (index >= *count || strcmp(add->id, (*list)[index]->id) != 0) { *list = tf_resize_vec(*list, sizeof(**list) * (*count + 1)); if (*count - index) { memmove(*list + index + 1, *list + index, sizeof(following_t*) * (*count - index)); } (*list)[index] = add; (*count)++; return true; } return false; } static bool _remove_following_entry(following_t*** list, int* count, following_t* remove) { int index = tf_util_insert_index(remove->id, *list, *count, sizeof(following_t*), _following_compare); if (index < *count && strcmp(remove->id, (*list)[index]->id) == 0) { if (*count - index > 1) { memmove(*list + index, *list + index + 1, sizeof(following_t*) * (*count - index - 1)); } *list = tf_resize_vec(*list, sizeof(**list) * (*count - 1)); (*count)--; return true; } return false; } typedef struct _block_node_t block_node_t; typedef struct _block_node_t { following_t* entry; block_node_t* parent; } block_node_t; static bool _is_blocked_by_active_blocks(const char* id, const block_node_t* blocks) { for (const block_node_t* b = blocks; b; b = b->parent) { if (_has_following_entry(id, b->entry->blocking, b->entry->blocking_count)) { return true; } } return false; } static following_t* _make_following_node(const char* id, following_t*** following, int* following_count, block_node_t* blocks) { if (_is_blocked_by_active_blocks(id, blocks)) { return NULL; } int index = tf_util_insert_index(id, *following, *following_count, sizeof(following_t*), _following_compare); following_t* entry = NULL; if (index < *following_count && strcmp(id, (*following)[index]->id) == 0) { entry = (*following)[index]; } else { *following = tf_resize_vec(*following, sizeof(*following) * (*following_count + 1)); if (*following_count - index) { memmove(*following + index + 1, *following + index, sizeof(following_t*) * (*following_count - index)); } entry = tf_malloc(sizeof(following_t)); (*following)[index] = entry; (*following_count)++; memset(entry, 0, sizeof(*entry)); entry->depth = INT_MAX; snprintf(entry->id, sizeof(entry->id), "%s", id); } return entry; } static void _populate_follows_and_blocks(tf_ssb_t* ssb, following_t* entry, following_t*** following, int* following_count, block_node_t* active_blocks) { sqlite3* db = tf_ssb_acquire_db_reader(ssb); sqlite3_stmt* statement = NULL; if (sqlite3_prepare(db, "SELECT json_extract(content, '$.contact') AS contact, json_extract(content, '$.following'), json_extract(content, '$.blocking') " "FROM messages " "WHERE contact IS NOT NULL AND author = ? AND json_extract(content, '$.type') = 'contact' " "ORDER BY sequence", -1, &statement, NULL) == SQLITE_OK) { if (sqlite3_bind_text(statement, 1, entry->id, -1, NULL) == SQLITE_OK) { while (sqlite3_step(statement) == SQLITE_ROW) { const char* contact = (const char*)sqlite3_column_text(statement, 0); if (sqlite3_column_type(statement, 1) != SQLITE_NULL) { bool is_following = sqlite3_column_int(statement, 1) != 0; following_t* next = _make_following_node(contact, following, following_count, active_blocks); if (next) { if (is_following) { if (_add_following_entry(&entry->following, &entry->following_count, next)) { next->ref_count++; } } else { if (_remove_following_entry(&entry->following, &entry->following_count, next)) { next->ref_count--; } } } } if (sqlite3_column_type(statement, 2) != SQLITE_NULL) { bool is_blocking = sqlite3_column_int(statement, 2) != 0; following_t* next = _make_following_node(contact, following, following_count, active_blocks); if (next) { if (is_blocking) { if (_add_following_entry(&entry->blocking, &entry->blocking_count, next)) { next->block_ref_count++; } } else { if (_remove_following_entry(&entry->blocking, &entry->blocking_count, next)) { next->block_ref_count--; } } } } } } sqlite3_finalize(statement); } tf_ssb_release_db_reader(ssb, db); } static void _get_following(tf_ssb_t* ssb, following_t* entry, following_t*** following, int* following_count, int depth, int max_depth, block_node_t* active_blocks) { entry->depth = tf_min(depth, entry->depth); if (depth < max_depth && !entry->populated && !_is_blocked_by_active_blocks(entry->id, active_blocks)) { entry->populated = true; _populate_follows_and_blocks(ssb, entry, following, following_count, active_blocks); if (depth < max_depth) { block_node_t blocks = { .entry = entry, .parent = active_blocks }; for (int i = 0; i < entry->following_count; i++) { if (!_has_following_entry(entry->following[i]->id, entry->blocking, entry->blocking_count)) { _get_following(ssb, entry->following[i], following, following_count, depth + 1, max_depth, &blocks); } } } } } tf_ssb_following_t* tf_ssb_db_following_deep(tf_ssb_t* ssb, const char** ids, int count, int depth) { following_t** following = NULL; int following_count = 0; for (int i = 0; i < count; i++) { following_t* entry = _make_following_node(ids[i], &following, &following_count, NULL); _get_following(ssb, entry, &following, &following_count, 0, depth, NULL); entry->ref_count++; } int actual_following_count = 0; for (int i = 0; i < following_count; i++) { if (following[i]->ref_count > 0) { actual_following_count++; } } tf_ssb_following_t* result = tf_malloc(sizeof(tf_ssb_following_t) * (actual_following_count + 1)); memset(result, 0, sizeof(tf_ssb_following_t) * (actual_following_count + 1)); int write_index = 0; for (int i = 0; i < following_count; i++) { if (following[i]->ref_count > 0) { snprintf(result[write_index].id, sizeof(result[write_index].id), "%s", following[i]->id); result[write_index].following_count = following[i]->following_count; result[write_index].blocking_count = following[i]->blocking_count; result[write_index].followed_by_count = following[i]->ref_count; result[write_index].blocked_by_count = following[i]->block_ref_count; write_index++; } } for (int i = 0; i < following_count; i++) { tf_free(following[i]->following); tf_free(following[i]->blocking); tf_free(following[i]); } tf_free(following); return result; } const char** tf_ssb_db_following_deep_ids(tf_ssb_t* ssb, const char** ids, int count, int depth) { following_t** following = NULL; int following_count = 0; for (int i = 0; i < count; i++) { following_t* entry = _make_following_node(ids[i], &following, &following_count, NULL); _get_following(ssb, entry, &following, &following_count, 0, depth, NULL); entry->ref_count++; } int actual_following_count = 0; for (int i = 0; i < following_count; i++) { if (following[i]->ref_count > 0) { actual_following_count++; } } char** result = tf_malloc(sizeof(char*) * (actual_following_count + 1) + k_id_base64_len * actual_following_count); char* result_ids = (char*)result + sizeof(char*) * (actual_following_count + 1); int write_index = 0; for (int i = 0; i < following_count; i++) { if (following[i]->ref_count > 0) { result[write_index] = result_ids + k_id_base64_len * write_index; snprintf(result[write_index], k_id_base64_len, "%s", following[i]->id); write_index++; } } result[actual_following_count] = NULL; for (int i = 0; i < following_count; i++) { tf_free(following[i]->following); tf_free(following[i]->blocking); tf_free(following[i]); } tf_free(following); return (const char**)result; } typedef struct _identities_t { const char** ids; int count; } identities_t; static void _add_identity(const char* identity, void* user_data) { identities_t* identities = user_data; char full_id[k_id_base64_len]; snprintf(full_id, sizeof(full_id), "@%s", identity); identities->ids = tf_resize_vec(identities->ids, sizeof(const char*) * (identities->count + 1)); identities->ids[identities->count++] = tf_strdup(full_id); } const char** tf_ssb_db_get_all_visible_identities(tf_ssb_t* ssb, int depth) { identities_t identities = { 0 }; tf_ssb_db_identity_visit_all(ssb, _add_identity, &identities); const char** following = tf_ssb_db_following_deep_ids(ssb, identities.ids, identities.count, depth); for (int i = 0; i < identities.count; i++) { tf_free((void*)identities.ids[i]); } tf_free(identities.ids); return following; } JSValue tf_ssb_db_get_message_by_id(tf_ssb_t* ssb, const char* id, bool is_keys) { JSValue result = JS_UNDEFINED; JSContext* context = tf_ssb_get_context(ssb); sqlite3* db = tf_ssb_acquire_db_reader(ssb); sqlite3_stmt* statement; if (sqlite3_prepare(db, "SELECT previous, author, id, sequence, timestamp, hash, json(content), signature, flags FROM messages WHERE id = ?", -1, &statement, NULL) == SQLITE_OK) { if (sqlite3_bind_text(statement, 1, id, -1, NULL) == SQLITE_OK) { if (sqlite3_step(statement) == SQLITE_ROW) { JSValue message = JS_UNDEFINED; JSValue formatted = tf_ssb_format_message(context, (const char*)sqlite3_column_text(statement, 0), (const char*)sqlite3_column_text(statement, 1), sqlite3_column_int64(statement, 3), sqlite3_column_double(statement, 4), (const char*)sqlite3_column_text(statement, 5), (const char*)sqlite3_column_text(statement, 6), (const char*)sqlite3_column_text(statement, 7), sqlite3_column_int(statement, 8)); if (is_keys) { message = JS_NewObject(context); JS_SetPropertyStr(context, message, "key", JS_NewString(context, (const char*)sqlite3_column_text(statement, 2))); JS_SetPropertyStr(context, message, "value", formatted); JS_SetPropertyStr(context, message, "timestamp", JS_NewString(context, (const char*)sqlite3_column_text(statement, 4))); } else { message = formatted; } result = message; } } sqlite3_finalize(statement); } tf_ssb_release_db_reader(ssb, db); return result; } tf_ssb_db_stored_connection_t* tf_ssb_db_get_stored_connections(tf_ssb_t* ssb, int* out_count) { sqlite3* db = tf_ssb_acquire_db_reader(ssb); tf_ssb_db_stored_connection_t* result = NULL; int count = 0; sqlite3_stmt* statement; if (sqlite3_prepare(db, "SELECT host, port, key FROM connections ORDER BY host, port, key", -1, &statement, NULL) == SQLITE_OK) { while (sqlite3_step(statement) == SQLITE_ROW) { result = tf_resize_vec(result, sizeof(tf_ssb_db_stored_connection_t) * (count + 1)); result[count] = (tf_ssb_db_stored_connection_t) { .port = sqlite3_column_int(statement, 1), }; snprintf(result[count].address, sizeof(result[count].address), "%s", (const char*)sqlite3_column_text(statement, 0)); snprintf(result[count].pubkey, sizeof(result[count].pubkey), "%s", (const char*)sqlite3_column_text(statement, 2)); count++; } sqlite3_finalize(statement); } tf_ssb_release_db_reader(ssb, db); *out_count = count; return result; } void tf_ssb_db_forget_stored_connection(tf_ssb_t* ssb, const char* address, int port, const char* pubkey) { sqlite3* db = tf_ssb_acquire_db_writer(ssb); sqlite3_stmt* statement; if (sqlite3_prepare(db, "DELETE FROM connections WHERE host = ? AND port = ? AND key = ?", -1, &statement, NULL) == SQLITE_OK) { if (sqlite3_bind_text(statement, 1, address, -1, NULL) != SQLITE_OK || sqlite3_bind_int(statement, 2, port) != SQLITE_OK || sqlite3_bind_text(statement, 3, pubkey, -1, NULL) != SQLITE_OK || sqlite3_step(statement) != SQLITE_DONE) { tf_printf("Delete stored connection: %s.\n", sqlite3_errmsg(db)); } sqlite3_finalize(statement); } tf_ssb_release_db_writer(ssb, db); }