#include "ssb.db.h" #include "mem.h" #include "ssb.h" #include "trace.h" #include #include #include #include #include static void _tf_ssb_db_exec(sqlite3* db, const char* statement) { char* error = NULL; int result = sqlite3_exec(db, statement, NULL, NULL, &error); if (result != SQLITE_OK) { printf("Error running '%s': %s.\n", statement, error); abort(); } } void tf_ssb_db_init(tf_ssb_t* ssb) { sqlite3* db = tf_ssb_get_db(ssb); _tf_ssb_db_exec(db, "PRAGMA journal_mode = WAL"); _tf_ssb_db_exec(db, "PRAGMA synchronous = NORMAL"); _tf_ssb_db_exec(db, "CREATE TABLE IF NOT EXISTS messages (" " author TEXT," " id TEXT PRIMARY KEY," " sequence INTEGER," " timestamp REAL," " previous TEXT," " hash TEXT," " content TEXT," " signature TEXT," " sequence_before_author INTEGER," " UNIQUE(author, sequence)" ")"); _tf_ssb_db_exec(db, "CREATE INDEX IF NOT EXISTS messages_author_id_index ON messages (author, id)"); _tf_ssb_db_exec(db, "CREATE INDEX IF NOT EXISTS messages_author_sequence_index ON messages (author, sequence)"); _tf_ssb_db_exec(db, "CREATE INDEX IF NOT EXISTS messages_author_timestamp_index ON messages (author, timestamp)"); _tf_ssb_db_exec(db, "CREATE TABLE IF NOT EXISTS blobs (" " id TEXT PRIMARY KEY," " content BLOB," " created INTEGER" ")"); _tf_ssb_db_exec(db, "CREATE TABLE IF NOT EXISTS blob_wants (" " id TEXT PRIMARY KEY" ")"); _tf_ssb_db_exec(db, "CREATE TABLE IF NOT EXISTS properties (" " id TEXT," " key TEXT," " value TEXT," " UNIQUE(id, key)" ")"); _tf_ssb_db_exec(db, "CREATE TABLE IF NOT EXISTS connections (" " host TEXT," " port INTEGER," " key TEXT," " last_attempt INTEGER," " last_success INTEGER," " UNIQUE(host, port, key)" ")"); bool need_add_sequence_before_author = true; bool need_convert_timestamp_to_real = false; sqlite3_stmt* statement = NULL; if (sqlite3_prepare(db, "PRAGMA table_info(messages)", -1, &statement, NULL) == SQLITE_OK) { int result = SQLITE_OK; while ((result = sqlite3_step(statement)) == SQLITE_ROW) { const char* name = (const char*)sqlite3_column_text(statement, 1); const char* type = (const char*)sqlite3_column_text(statement, 2); if (name && type && strcmp(name, "timestamp") == 0 && strcmp(type, "INTEGER") == 0) { need_convert_timestamp_to_real = true; } if (name && strcmp(name, "sequence_before_author") == 0) { need_add_sequence_before_author = false; } } sqlite3_finalize(statement); } if (need_convert_timestamp_to_real) { printf("Converting timestamp column from INTEGER to REAL.\n"); _tf_ssb_db_exec(db, "BEGIN TRANSACTION"); _tf_ssb_db_exec(db, "DROP INDEX IF EXISTS messages_author_timestamp_index"); _tf_ssb_db_exec(db, "ALTER TABLE messages ADD COLUMN timestamp_real REAL"); _tf_ssb_db_exec(db, "UPDATE messages SET timestamp_real = timestamp"); _tf_ssb_db_exec(db, "ALTER TABLE messages DROP COLUMN timestamp"); _tf_ssb_db_exec(db, "ALTER TABLE messages RENAME COLUMN timestamp_real TO timestamp"); _tf_ssb_db_exec(db, "CREATE INDEX IF NOT EXISTS messages_author_timestamp_index ON messages (author, timestamp)"); _tf_ssb_db_exec(db, "COMMIT TRANSACTION"); } if (need_add_sequence_before_author) { printf("Adding sequence_before_author column.\n"); _tf_ssb_db_exec(db, "ALTER TABLE messages ADD COLUMN sequence_before_author INTEGER"); } } static bool _tf_ssb_db_previous_message_exists(sqlite3* db, const char* author, int64_t sequence, const char* previous) { bool exists = false; if (sequence == 1) { exists = true; } else { sqlite3_stmt* statement; if (sqlite3_prepare(db, "SELECT COUNT(*) FROM messages WHERE author = ?1 AND sequence = ?2 AND id = ?3", -1, &statement, NULL) == SQLITE_OK) { if (sqlite3_bind_text(statement, 1, author, -1, NULL) == SQLITE_OK && sqlite3_bind_int64(statement, 2, sequence - 1) == SQLITE_OK && sqlite3_bind_text(statement, 3, previous, -1, NULL) == SQLITE_OK && sqlite3_step(statement) == SQLITE_ROW) { exists = sqlite3_column_int(statement, 0) != 0; } sqlite3_finalize(statement); } } return exists; } bool tf_ssb_db_store_message(tf_ssb_t* ssb, JSContext* context, const char* id, JSValue val, const char* signature, bool sequence_before_author) { bool stored = false; JSValue previousval = JS_GetPropertyStr(context, val, "previous"); const char* previous = JS_IsNull(previousval) ? NULL : JS_ToCString(context, previousval); JSValue authorval = JS_GetPropertyStr(context, val, "author"); const char* author = JS_ToCString(context, authorval); int64_t sequence = -1; JS_ToInt64(context, &sequence, JS_GetPropertyStr(context, val, "sequence")); double timestamp = -1.0; JS_ToFloat64(context, ×tamp, JS_GetPropertyStr(context, val, "timestamp")); JSValue contentval = JS_GetPropertyStr(context, val, "content"); JSValue content = JS_JSONStringify(context, contentval, JS_NULL, JS_NULL); size_t content_len; const char* contentstr = JS_ToCStringLen(context, &content_len, content); JS_FreeValue(context, contentval); sqlite3* db = tf_ssb_get_db(ssb); sqlite3_stmt* statement; int64_t last_row_id = -1; if (_tf_ssb_db_previous_message_exists(db, author, sequence, previous)) { const char* query = "INSERT INTO messages (id, previous, author, sequence, timestamp, content, hash, signature, sequence_before_author) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT DO NOTHING"; if (sqlite3_prepare(db, query, -1, &statement, NULL) == SQLITE_OK) { if (sqlite3_bind_text(statement, 1, id, -1, NULL) == SQLITE_OK && (previous ? sqlite3_bind_text(statement, 2, previous, -1, NULL) : sqlite3_bind_null(statement, 2)) == SQLITE_OK && sqlite3_bind_text(statement, 3, author, -1, NULL) == SQLITE_OK && sqlite3_bind_int64(statement, 4, sequence) == SQLITE_OK && sqlite3_bind_double(statement, 5, timestamp) == SQLITE_OK && sqlite3_bind_text(statement, 6, contentstr, content_len, NULL) == SQLITE_OK && sqlite3_bind_text(statement, 7, "sha256", 6, NULL) == SQLITE_OK && sqlite3_bind_text(statement, 8, signature, -1, NULL) == SQLITE_OK && sqlite3_bind_int(statement, 9, sequence_before_author) == SQLITE_OK) { int r = sqlite3_step(statement); if (r != SQLITE_DONE) { printf("%s\n", sqlite3_errmsg(db)); } stored = r == SQLITE_DONE && sqlite3_changes(db) != 0; if (stored) { last_row_id = sqlite3_last_insert_rowid(db); } } else { printf("bind failed\n"); } sqlite3_finalize(statement); } else { printf("prepare failed: %s\n", sqlite3_errmsg(db)); } } else { printf("Previous message doesn't exist.\n"); } if (last_row_id != -1) { const char* query = "INSERT INTO blob_wants (id) SELECT DISTINCT json.value FROM messages, json_tree(messages.content) AS json LEFT OUTER JOIN blobs ON json.value = blobs.id WHERE messages.rowid = ?1 AND json.value LIKE '&%%.sha256' AND length(json.value) = ?2 AND blobs.content IS NULL ON CONFLICT DO NOTHING RETURNING id"; if (sqlite3_prepare(db, query, -1, &statement, NULL) == SQLITE_OK) { if (sqlite3_bind_int64(statement, 1, last_row_id) == SQLITE_OK && sqlite3_bind_int(statement, 2, k_blob_id_len - 1) == SQLITE_OK) { int r = SQLITE_OK; while ((r = sqlite3_step(statement)) == SQLITE_ROW) { tf_ssb_notify_blob_want_added(ssb, (const char*)sqlite3_column_text(statement, 0)); } if (r != SQLITE_DONE) { printf("%s\n", sqlite3_errmsg(db)); } } sqlite3_finalize(statement); } else { printf("prepare failed: %s\n", sqlite3_errmsg(db)); } } JS_FreeValue(context, previousval); JS_FreeCString(context, author); JS_FreeValue(context, authorval); JS_FreeCString(context, previous); JS_FreeCString(context, contentstr); JS_FreeValue(context, content); return stored; } bool tf_ssb_db_message_content_get(tf_ssb_t* ssb, const char* id, uint8_t** out_blob, size_t* out_size) { bool result = false; sqlite3_stmt* statement; const char* query = "SELECT content FROM messages WHERE id = ?"; if (sqlite3_prepare(tf_ssb_get_db(ssb), query, -1, &statement, NULL) == SQLITE_OK) { if (sqlite3_bind_text(statement, 1, id, -1, NULL) == SQLITE_OK && sqlite3_step(statement) == SQLITE_ROW) { const uint8_t* blob = sqlite3_column_blob(statement, 0); int size = sqlite3_column_bytes(statement, 0); if (out_blob) { *out_blob = tf_malloc(size + 1); memcpy(*out_blob, blob, size); (*out_blob)[size] = '\0'; } if (out_size) { *out_size = size; } result = true; } sqlite3_finalize(statement); } return result; } bool tf_ssb_db_blob_get(tf_ssb_t* ssb, const char* id, uint8_t** out_blob, size_t* out_size) { bool result = false; sqlite3_stmt* statement; const char* query = "SELECT content FROM blobs WHERE id = $1"; if (sqlite3_prepare(tf_ssb_get_db(ssb), query, -1, &statement, NULL) == SQLITE_OK) { if (sqlite3_bind_text(statement, 1, id, -1, NULL) == SQLITE_OK && sqlite3_step(statement) == SQLITE_ROW) { const uint8_t* blob = sqlite3_column_blob(statement, 0); int size = sqlite3_column_bytes(statement, 0); if (out_blob) { *out_blob = tf_malloc(size + 1); if (size) { memcpy(*out_blob, blob, size); } (*out_blob)[size] = '\0'; } if (out_size) { *out_size = size; } result = true; } sqlite3_finalize(statement); } return result; } bool tf_ssb_db_blob_store(tf_ssb_t* ssb, const uint8_t* blob, size_t size, char* out_id, size_t out_id_size) { bool result = false; sqlite3* db = tf_ssb_get_db(ssb); sqlite3_stmt* statement; uint8_t hash[crypto_hash_sha256_BYTES]; crypto_hash_sha256(hash, blob, size); char hash64[256]; base64c_encode(hash, sizeof(hash), (uint8_t*)hash64, sizeof(hash64)); char id[512]; snprintf(id, sizeof(id), "&%s.sha256", hash64); int rows = 0; if (sqlite3_prepare(db, "INSERT INTO blobs (id, content, created) VALUES ($1, $2, CAST(strftime('%s') AS INTEGER)) ON CONFLICT DO NOTHING", -1, &statement, NULL) == SQLITE_OK) { if (sqlite3_bind_text(statement, 1, id, -1, NULL) == SQLITE_OK && sqlite3_bind_blob(statement, 2, blob, size, NULL) == SQLITE_OK) { result = sqlite3_step(statement) == SQLITE_DONE; rows = sqlite3_changes(db); } else { printf("bind failed: %s\n", sqlite3_errmsg(db)); } sqlite3_finalize(statement); } else { printf("prepare failed: %s\n", sqlite3_errmsg(db)); } if (rows) { printf("blob stored %s %zd => %d\n", id, size, result); } if (result) { if (sqlite3_prepare(db, "DELETE FROM blob_wants WHERE id = ?1", -1, &statement, NULL) == SQLITE_OK) { if (sqlite3_bind_text(statement, 1, id, -1, NULL) == SQLITE_OK) { sqlite3_step(statement); } sqlite3_finalize(statement); } } if (result && out_id) { snprintf(out_id, out_id_size, "%s", id); } return result; } bool tf_ssb_db_get_message_by_author_and_sequence(tf_ssb_t* ssb, const char* author, int64_t sequence, char* out_message_id, size_t out_message_id_size, double* out_timestamp, char** out_content) { bool found = false; sqlite3_stmt* statement; const char* query = "SELECT id, timestamp, content FROM messages WHERE author = $1 AND sequence = $2"; if (sqlite3_prepare(tf_ssb_get_db(ssb), query, -1, &statement, NULL) == SQLITE_OK) { if (sqlite3_bind_text(statement, 1, author, -1, NULL) == SQLITE_OK && sqlite3_bind_int64(statement, 2, sequence) == SQLITE_OK && sqlite3_step(statement) == SQLITE_ROW) { if (out_message_id) { strncpy(out_message_id, (const char*)sqlite3_column_text(statement, 0), out_message_id_size - 1); } if (out_timestamp) { *out_timestamp = sqlite3_column_double(statement, 1); } if (out_content) { *out_content = tf_strdup((const char*)sqlite3_column_text(statement, 2)); } found = true; } sqlite3_finalize(statement); } else { printf("prepare failed: %s\n", sqlite3_errmsg(tf_ssb_get_db(ssb))); } return found; } bool tf_ssb_db_get_latest_message_by_author(tf_ssb_t* ssb, const char* author, int64_t* out_sequence, char* out_message_id, size_t out_message_id_size) { bool found = false; sqlite3_stmt* statement; const char* query = "SELECT id, sequence FROM messages WHERE author = $1 AND sequence = (SELECT MAX(sequence) FROM messages WHERE author = $1)"; if (sqlite3_prepare(tf_ssb_get_db(ssb), query, -1, &statement, NULL) == SQLITE_OK) { if (sqlite3_bind_text(statement, 1, author, -1, NULL) == SQLITE_OK && sqlite3_step(statement) == SQLITE_ROW) { if (out_sequence) { *out_sequence = sqlite3_column_int64(statement, 1); } if (out_message_id) { strncpy(out_message_id, (const char*)sqlite3_column_text(statement, 0), out_message_id_size - 1); } found = true; } sqlite3_finalize(statement); } else { printf("prepare failed: %s\n", sqlite3_errmsg(tf_ssb_get_db(ssb))); } return found; } static bool _tf_ssb_sqlite_bind_json(JSContext* context, sqlite3* db, sqlite3_stmt* statement, JSValue binds) { bool all_bound = true; int32_t length = 0; if (JS_IsUndefined(binds)) { return true; } JSValue lengthval = JS_GetPropertyStr(context, binds, "length"); if (JS_ToInt32(context, &length, lengthval) == 0) { for (int i = 0; i < length; i++) { JSValue value = JS_GetPropertyUint32(context, binds, i); if (JS_IsString(value)) { size_t str_len = 0; const char* str = JS_ToCStringLen(context, &str_len, value); if (str) { if (sqlite3_bind_text(statement, i + 1, str, str_len, SQLITE_TRANSIENT) != SQLITE_OK) { printf("failed to bind: %s\n", sqlite3_errmsg(db)); all_bound = false; } JS_FreeCString(context, str); } else { printf("expected cstring\n"); } } else if (JS_IsNumber(value)) { int64_t number = 0; JS_ToInt64(context, &number, value); if (sqlite3_bind_int64(statement, i + 1, number) != SQLITE_OK) { printf("failed to bind: %s\n", sqlite3_errmsg(db)); all_bound = false; } } else if (JS_IsNull(value)) { if (sqlite3_bind_null(statement, i + 1) != SQLITE_OK) { printf("failed to bind: %s\n", sqlite3_errmsg(db)); all_bound = false; } } else { const char* str = JS_ToCString(context, value); printf("expected string: %s\n", str); JS_FreeCString(context, str); } JS_FreeValue(context, value); } } else { printf("expected array\n"); } JS_FreeValue(context, lengthval); return all_bound; } static JSValue _tf_ssb_sqlite_row_to_json(JSContext* context, sqlite3_stmt* row) { JSValue result = JS_NewObject(context); for (int i = 0; i < sqlite3_column_count(row); i++) { const char* name = sqlite3_column_name(row, i); switch (sqlite3_column_type(row, i)) { case SQLITE_INTEGER: JS_SetPropertyStr(context, result, name, JS_NewInt64(context, sqlite3_column_int64(row, i))); break; case SQLITE_FLOAT: JS_SetPropertyStr(context, result, name, JS_NewFloat64(context, sqlite3_column_double(row, i))); break; case SQLITE_TEXT: JS_SetPropertyStr(context, result, name, JS_NewStringLen(context, (const char*)sqlite3_column_text(row, i), sqlite3_column_bytes(row, i))); break; case SQLITE_BLOB: JS_SetPropertyStr(context, result, name, JS_NewArrayBufferCopy(context, sqlite3_column_blob(row, i), sqlite3_column_bytes(row, i))); break; case SQLITE_NULL: JS_SetPropertyStr(context, result, name, JS_NULL); break; } } return result; } static int _tf_ssb_sqlite_authorizer(void* user_data, int action_code, const char* arg0, const char* arg1, const char* arg2, const char* arg3) { switch (action_code) { case SQLITE_SELECT: case SQLITE_FUNCTION: return SQLITE_OK; case SQLITE_READ: return (strcmp(arg0, "messages") == 0 || strcmp(arg0, "blob_wants") == 0) ? SQLITE_OK : SQLITE_DENY; break; } return SQLITE_DENY; } JSValue tf_ssb_db_visit_query(tf_ssb_t* ssb, const char* query, const JSValue binds, void (*callback)(JSValue row, void* user_data), void* user_data) { JSValue result = JS_UNDEFINED; sqlite3* db = tf_ssb_get_db(ssb); JSContext* context = tf_ssb_get_context(ssb); sqlite3_stmt* statement; sqlite3_set_authorizer(db, _tf_ssb_sqlite_authorizer, ssb); if (sqlite3_prepare(db, query, -1, &statement, NULL) == SQLITE_OK) { if (_tf_ssb_sqlite_bind_json(context, db, statement, binds)) { int r = SQLITE_OK; while ((r = sqlite3_step(statement)) == SQLITE_ROW) { JSValue row = _tf_ssb_sqlite_row_to_json(context, statement); tf_trace_t* trace = tf_ssb_get_trace(ssb); tf_trace_begin(trace, "callback"); callback(row, user_data); tf_trace_end(trace); JS_FreeValue(context, row); } if (r != SQLITE_DONE) { result = JS_ThrowInternalError(context, "SQL Error %s: running \"%s\".", sqlite3_errmsg(db), query); } } sqlite3_finalize(statement); } else { result = JS_ThrowInternalError(context, "SQL Error %s: preparing \"%s\".", sqlite3_errmsg(db), query); } sqlite3_set_authorizer(db, NULL, NULL); return result; } static JSValue _tf_ssb_format_message(JSContext* context, const char* previous, const char* author, int64_t sequence, double timestamp, const char* hash, const char* content, const char* signature, bool sequence_before_author) { JSValue value = JS_NewObject(context); JS_SetPropertyStr(context, value, "previous", previous ? JS_NewString(context, previous) : JS_NULL); if (sequence_before_author) { JS_SetPropertyStr(context, value, "sequence", JS_NewInt64(context, sequence)); JS_SetPropertyStr(context, value, "author", JS_NewString(context, author)); } else { JS_SetPropertyStr(context, value, "author", JS_NewString(context, author)); JS_SetPropertyStr(context, value, "sequence", JS_NewInt64(context, sequence)); } JS_SetPropertyStr(context, value, "timestamp", JS_NewFloat64(context, timestamp)); JS_SetPropertyStr(context, value, "hash", JS_NewString(context, hash)); JS_SetPropertyStr(context, value, "content", JS_ParseJSON(context, content, strlen(content), NULL)); JS_SetPropertyStr(context, value, "signature", JS_NewString(context, signature)); return value; } bool _tf_ssb_update_message_id(sqlite3* db, const char* old_id, const char* new_id) { bool success = false; sqlite3_stmt* statement = NULL; if (sqlite3_prepare(db, "UPDATE messages SET id = ? WHERE id = ?", -1, &statement, NULL) == SQLITE_OK) { if (sqlite3_bind_text(statement, 1, new_id, -1, NULL) == SQLITE_OK && sqlite3_bind_text(statement, 2, old_id, -1, NULL) == SQLITE_OK) { success = sqlite3_step(statement) == SQLITE_DONE; } sqlite3_finalize(statement); } return success; } bool tf_ssb_db_check(sqlite3* db, const char* check_author) { JSMallocFunctions funcs = { 0 }; tf_get_js_malloc_functions(&funcs); JSRuntime* runtime = JS_NewRuntime2(&funcs, NULL); JSContext* context = JS_NewContext(runtime); sqlite3_stmt* statement = NULL; int result = check_author ? sqlite3_prepare(db, "SELECT id, previous, author, sequence, timestamp, hash, content, signature, sequence_before_author FROM messages WHERE author = ? ORDER BY author, sequence", -1, &statement, NULL) : sqlite3_prepare(db, "SELECT id, previous, author, sequence, timestamp, hash, content, signature, sequence_before_author FROM messages ORDER BY author, sequence", -1, &statement, NULL); if (result == SQLITE_OK) { if (check_author) { sqlite3_bind_text(statement, 1, check_author, -1, NULL); } char previous_id[k_id_base64_len]; while (sqlite3_step(statement) == SQLITE_ROW) { const char* id = (const char*)sqlite3_column_text(statement, 0); const char* previous = (const char*)sqlite3_column_text(statement, 1); const char* author = (const char*)sqlite3_column_text(statement, 2); int64_t sequence = sqlite3_column_int64(statement, 3); double timestamp = sqlite3_column_double(statement, 4); const char* hash = (const char*)sqlite3_column_text(statement, 5); const char* content = (const char*)sqlite3_column_text(statement, 6); const char* signature = (const char*)sqlite3_column_text(statement, 7); bool sequence_before_author = sqlite3_column_int(statement, 8); JSValue message = _tf_ssb_format_message(context, previous, author, sequence, timestamp, hash, content, signature, sequence_before_author); char out_signature[512]; char actual_id[k_id_base64_len]; bool actual_sequence_before_author = false; JSValue j = JS_JSONStringify(context, message, JS_NULL, JS_NewInt32(context, 2)); const char* jv = JS_ToCString(context, j); if (tf_ssb_verify_and_strip_signature(context, message, actual_id, sizeof(actual_id), out_signature, sizeof(out_signature), &actual_sequence_before_author)) { if (previous && strcmp(previous, previous_id)) { printf("%s:%d previous was %s should be %s\n", id, (int)sequence, previous_id, previous); } if (strcmp(id, actual_id)) { if (_tf_ssb_update_message_id(db, id, actual_id)) { printf("updated %s to %s\n", id, actual_id); } else { printf("failed to update %s to %s\n", id, actual_id); } } } else { printf("%s sequence=%" PRId64 " unable to verify signature for %s sequence_before_author=%d message=[%.*s]\n", author, sequence, id, sequence_before_author, (int)strlen(jv), jv); printf("Deleting author = %s sequence >= %" PRId64 ".\n", author, sequence); sqlite3_stmt* delete_statement = NULL; if (sqlite3_prepare(db, "DELETE FROM messages WHERE author = ? AND sequence >= ?", -1, &delete_statement, NULL) == SQLITE_OK) { if (sqlite3_bind_text(delete_statement, 1, author, -1, NULL) == SQLITE_OK && sqlite3_bind_int64(delete_statement, 2, sequence) == SQLITE_OK) { if (sqlite3_step(delete_statement) != SQLITE_DONE) { printf("Error deleting author = %s sequence >= %" PRId64 ".\n", author, sequence); } } sqlite3_finalize(delete_statement); } } JS_FreeCString(context, jv); JS_FreeValue(context, j); snprintf(previous_id, sizeof(previous_id), "%s", id); JS_FreeValue(context, message); } sqlite3_finalize(statement); } JS_FreeContext(context); JS_FreeRuntime(runtime); return false; }