tildefriends/src/ssb.db.c

1525 lines
48 KiB
C

#include "ssb.db.h"
#include "log.h"
#include "mem.h"
#include "ssb.h"
#include "trace.h"
#include "util.js.h"
#include "sodium/crypto_hash_sha256.h"
#include "sodium/crypto_scalarmult.h"
#include "sodium/crypto_scalarmult_curve25519.h"
#include "sodium/crypto_secretbox.h"
#include "sodium/crypto_sign.h"
#include "sqlite3.h"
#include "uv.h"
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
typedef struct _message_store_t message_store_t;
static void _tf_ssb_db_store_message_work_finish(message_store_t* store);
static void _tf_ssb_db_store_message_after_work(uv_work_t* work, int status);
static void _tf_ssb_db_exec(sqlite3* db, const char* statement)
{
char* error = NULL;
int result = sqlite3_exec(db, statement, NULL, NULL, &error);
if (result != SQLITE_OK)
{
tf_printf("Error running '%s': %s.\n", statement, error ? error : sqlite3_errmsg(db));
abort();
}
}
static bool _tf_ssb_db_has_rows(sqlite3* db, const char* query)
{
bool found = false;
sqlite3_stmt* statement = NULL;
if (sqlite3_prepare(db, query, -1, &statement, NULL) == SQLITE_OK)
{
int result = SQLITE_OK;
while ((result = sqlite3_step(statement)) == SQLITE_ROW)
{
found = true;
}
if (result != SQLITE_DONE)
{
tf_printf("%s\n", sqlite3_errmsg(db));
abort();
}
sqlite3_finalize(statement);
}
else
{
tf_printf("%s\n", sqlite3_errmsg(db));
abort();
}
return found;
}
static void _tf_ssb_db_init_internal(sqlite3* db)
{
sqlite3_extended_result_codes(db, 1);
_tf_ssb_db_exec(db, "PRAGMA journal_mode = WAL");
_tf_ssb_db_exec(db, "PRAGMA synchronous = NORMAL");
}
void tf_ssb_db_init_reader(sqlite3* db)
{
_tf_ssb_db_init_internal(db);
}
void tf_ssb_db_init(tf_ssb_t* ssb)
{
sqlite3* db = tf_ssb_acquire_db_writer(ssb);
_tf_ssb_db_init_internal(db);
sqlite3_stmt* statement = NULL;
int auto_vacuum = 0;
if (sqlite3_prepare(db, "PRAGMA auto_vacuum", -1, &statement, NULL) == SQLITE_OK)
{
if (sqlite3_step(statement) == SQLITE_ROW)
{
auto_vacuum = sqlite3_column_int(statement, 0);
}
sqlite3_finalize(statement);
}
if (auto_vacuum != 1 /* FULL */)
{
tf_printf("Enabling auto-vacuum and performing full vacuum.\n");
_tf_ssb_db_exec(db, "PRAGMA auto_vacuum = FULL");
_tf_ssb_db_exec(db, "VACUUM main");
tf_printf("All clean.\n");
}
_tf_ssb_db_exec(db,
"CREATE TABLE IF NOT EXISTS messages ("
" author TEXT,"
" id TEXT PRIMARY KEY,"
" sequence INTEGER,"
" timestamp REAL,"
" previous TEXT,"
" hash TEXT,"
" content TEXT,"
" signature TEXT,"
" sequence_before_author INTEGER,"
" UNIQUE(author, sequence)"
")");
_tf_ssb_db_exec(db, "CREATE INDEX IF NOT EXISTS messages_author_id_index ON messages (author, id)");
_tf_ssb_db_exec(db, "CREATE INDEX IF NOT EXISTS messages_author_sequence_index ON messages (author, sequence)");
_tf_ssb_db_exec(db, "CREATE INDEX IF NOT EXISTS messages_author_timestamp_index ON messages (author, timestamp)");
_tf_ssb_db_exec(db,
"CREATE TABLE IF NOT EXISTS blobs ("
" id TEXT PRIMARY KEY,"
" content BLOB,"
" created INTEGER"
")");
_tf_ssb_db_exec(db,"DROP TABLE IF EXISTS blob_wants");
_tf_ssb_db_exec(db,
"CREATE TABLE IF NOT EXISTS properties ("
" id TEXT,"
" key TEXT,"
" value TEXT,"
" UNIQUE(id, key)"
")");
_tf_ssb_db_exec(db,
"CREATE TABLE IF NOT EXISTS connections ("
" host TEXT,"
" port INTEGER,"
" key TEXT,"
" last_attempt INTEGER,"
" last_success INTEGER,"
" UNIQUE(host, port, key)"
")");
_tf_ssb_db_exec(db,
"CREATE TABLE IF NOT EXISTS identities ("
" user TEXT,"
" public_key TEXT UNIQUE,"
" private_key TEXT UNIQUE"
")");
_tf_ssb_db_exec(db, "CREATE INDEX IF NOT EXISTS identities_user ON identities (user, public_key)");
bool populate_fts = false;
if (!_tf_ssb_db_has_rows(db, "PRAGMA table_list('messages_fts')"))
{
_tf_ssb_db_exec(db, "CREATE VIRTUAL TABLE IF NOT EXISTS messages_fts USING fts5(content, content=messages, content_rowid=rowid)");
populate_fts = true;
}
if (!populate_fts && /* HACK */ false)
{
tf_printf("Checking FTS5 integrity...\n");
if (sqlite3_exec(db, "INSERT INTO messages_fts(messages_fts, rank) VALUES ('integrity-check', 0)", NULL, NULL, NULL) == SQLITE_CORRUPT_VTAB)
{
populate_fts = true;
}
tf_printf("Done.\n");
}
if (populate_fts)
{
tf_printf("Populating full-text search...\n");
_tf_ssb_db_exec(db, "INSERT INTO messages_fts (rowid, content) SELECT rowid, content FROM messages");
tf_printf("Done.\n");
}
_tf_ssb_db_exec(db, "CREATE TRIGGER IF NOT EXISTS messages_ai AFTER INSERT ON messages BEGIN INSERT INTO messages_fts(rowid, content) VALUES (new.rowid, new.content); END");
_tf_ssb_db_exec(db, "CREATE TRIGGER IF NOT EXISTS messages_ad AFTER DELETE ON messages BEGIN INSERT INTO messages_fts(messages_fts, rowid, content) VALUES ('delete', old.rowid, old.content); END");
if (!_tf_ssb_db_has_rows(db, "PRAGMA table_list('messages_refs')"))
{
_tf_ssb_db_exec(db,
"CREATE TABLE IF NOT EXISTS messages_refs ("
" message TEXT, "
" ref TEXT, "
" UNIQUE(message, ref)"
")");
tf_printf("Populating messages_refs...\n");
_tf_ssb_db_exec(db, "INSERT INTO messages_refs(message, ref) "
"SELECT messages.id, j.value FROM messages, json_tree(messages.content) as j WHERE "
"j.value LIKE '&%.sha256' OR "
"j.value LIKE '%%%.sha256' OR "
"j.value LIKE '@%.ed25519' "
"ON CONFLICT DO NOTHING");
tf_printf("Done.\n");
}
_tf_ssb_db_exec(db, "DROP TRIGGER IF EXISTS messages_ai_refs");
_tf_ssb_db_exec(db, "CREATE TRIGGER IF NOT EXISTS messages_ai_refs AFTER INSERT ON messages BEGIN "
"INSERT INTO messages_refs(message, ref) "
"SELECT DISTINCT new.id, j.value FROM json_tree(new.content) as j WHERE "
"j.value LIKE '&%.sha256' OR "
"j.value LIKE '%%%.sha256' OR "
"j.value LIKE '@%.ed25519' "
"ON CONFLICT DO NOTHING; END");
_tf_ssb_db_exec(db, "DROP TRIGGER IF EXISTS messages_ad_refs");
_tf_ssb_db_exec(db, "CREATE TRIGGER IF NOT EXISTS messages_ad_refs AFTER DELETE ON messages BEGIN DELETE FROM messages_refs WHERE messages_refs.message = old.id; END");
_tf_ssb_db_exec(db, "CREATE INDEX IF NOT EXISTS messages_refs_message_idx ON messages_refs (message)");
_tf_ssb_db_exec(db, "CREATE INDEX IF NOT EXISTS messages_refs_ref_idx ON messages_refs (ref)");
_tf_ssb_db_exec(db, "DROP VIEW IF EXISTS blob_wants_view");
_tf_ssb_db_exec(db,
"CREATE VIEW IF NOT EXISTS blob_wants_view (id, timestamp) AS "
" SELECT messages_refs.ref AS id, messages.timestamp AS timestamp "
" FROM messages_refs "
" JOIN messages ON messages.id = messages_refs.message "
" LEFT OUTER JOIN blobs ON messages_refs.ref = blobs.id "
" WHERE blobs.id IS NULL "
" AND LENGTH(messages_refs.ref) = 52 "
" AND messages_refs.ref LIKE '&%.sha256'");
bool need_add_sequence_before_author = true;
bool need_convert_timestamp_to_real = false;
if (sqlite3_prepare(db, "PRAGMA table_info(messages)", -1, &statement, NULL) == SQLITE_OK)
{
int result = SQLITE_OK;
while ((result = sqlite3_step(statement)) == SQLITE_ROW)
{
const char* name = (const char*)sqlite3_column_text(statement, 1);
const char* type = (const char*)sqlite3_column_text(statement, 2);
if (name && type && strcmp(name, "timestamp") == 0 && strcmp(type, "INTEGER") == 0)
{
need_convert_timestamp_to_real = true;
}
if (name && strcmp(name, "sequence_before_author") == 0)
{
need_add_sequence_before_author = false;
}
}
sqlite3_finalize(statement);
}
if (need_convert_timestamp_to_real)
{
tf_printf("Converting timestamp column from INTEGER to REAL.\n");
_tf_ssb_db_exec(db, "BEGIN TRANSACTION");
_tf_ssb_db_exec(db, "DROP INDEX IF EXISTS messages_author_timestamp_index");
_tf_ssb_db_exec(db, "ALTER TABLE messages ADD COLUMN timestamp_real REAL");
_tf_ssb_db_exec(db, "UPDATE messages SET timestamp_real = timestamp");
_tf_ssb_db_exec(db, "ALTER TABLE messages DROP COLUMN timestamp");
_tf_ssb_db_exec(db, "ALTER TABLE messages RENAME COLUMN timestamp_real TO timestamp");
_tf_ssb_db_exec(db, "CREATE INDEX IF NOT EXISTS messages_author_timestamp_index ON messages (author, timestamp)");
_tf_ssb_db_exec(db, "COMMIT TRANSACTION");
}
if (need_add_sequence_before_author)
{
tf_printf("Adding sequence_before_author column.\n");
_tf_ssb_db_exec(db, "ALTER TABLE messages ADD COLUMN sequence_before_author INTEGER");
}
tf_ssb_release_db_writer(ssb, db);
}
static bool _tf_ssb_db_previous_message_exists(sqlite3* db, const char* author, int64_t sequence, const char* previous)
{
bool exists = false;
if (sequence == 1)
{
exists = true;
}
else
{
sqlite3_stmt* statement;
if (sqlite3_prepare(db, "SELECT COUNT(*) FROM messages WHERE author = ?1 AND sequence = ?2 AND id = ?3", -1, &statement, NULL) == SQLITE_OK)
{
if (sqlite3_bind_text(statement, 1, author, -1, NULL) == SQLITE_OK &&
sqlite3_bind_int64(statement, 2, sequence - 1) == SQLITE_OK &&
sqlite3_bind_text(statement, 3, previous, -1, NULL) == SQLITE_OK &&
sqlite3_step(statement) == SQLITE_ROW)
{
exists = sqlite3_column_int(statement, 0) != 0;
}
sqlite3_finalize(statement);
}
}
return exists;
}
static int64_t _tf_ssb_db_store_message_raw(
tf_ssb_t* ssb,
const char* id,
const char* previous,
const char* author,
int64_t sequence,
double timestamp,
const char* content,
size_t content_len,
const char* signature,
bool sequence_before_author)
{
sqlite3* db = tf_ssb_acquire_db_writer(ssb);
int64_t last_row_id = -1;
if (_tf_ssb_db_previous_message_exists(db, author, sequence, previous))
{
const char* query = "INSERT INTO messages (id, previous, author, sequence, timestamp, content, hash, signature, sequence_before_author) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT DO NOTHING";
sqlite3_stmt* statement;
if (sqlite3_prepare(db, query, -1, &statement, NULL) == SQLITE_OK)
{
if (sqlite3_bind_text(statement, 1, id, -1, NULL) == SQLITE_OK &&
(previous ? sqlite3_bind_text(statement, 2, previous, -1, NULL) : sqlite3_bind_null(statement, 2)) == SQLITE_OK &&
sqlite3_bind_text(statement, 3, author, -1, NULL) == SQLITE_OK &&
sqlite3_bind_int64(statement, 4, sequence) == SQLITE_OK &&
sqlite3_bind_double(statement, 5, timestamp) == SQLITE_OK &&
sqlite3_bind_text(statement, 6, content, content_len, NULL) == SQLITE_OK &&
sqlite3_bind_text(statement, 7, "sha256", 6, NULL) == SQLITE_OK &&
sqlite3_bind_text(statement, 8, signature, -1, NULL) == SQLITE_OK &&
sqlite3_bind_int(statement, 9, sequence_before_author) == SQLITE_OK)
{
int r = sqlite3_step(statement);
if (r != SQLITE_DONE)
{
tf_printf("_tf_ssb_db_store_message_raw: %s\n", sqlite3_errmsg(db));
}
if (r == SQLITE_DONE && sqlite3_changes(db) != 0)
{
last_row_id = sqlite3_last_insert_rowid(db);
}
}
else
{
tf_printf("bind failed\n");
}
sqlite3_finalize(statement);
}
else
{
tf_printf("%s: prepare failed: %s\n", __FUNCTION__, sqlite3_errmsg(db));
}
}
else
{
tf_printf("%p: Previous message doesn't exist for author=%s sequence=%" PRId64 ".\n", db, author, sequence);
}
tf_ssb_release_db_writer(ssb, db);
return last_row_id;
}
static char* _tf_ssb_db_get_message_blob_wants(tf_ssb_t* ssb, int64_t rowid)
{
sqlite3* db = tf_ssb_acquire_db_reader(ssb);
sqlite3_stmt* statement;
char* result = NULL;
size_t size = 0;
if (sqlite3_prepare(db, "SELECT DISTINCT json.value FROM messages, json_tree(messages.content) AS json LEFT OUTER JOIN blobs ON json.value = blobs.id WHERE messages.rowid = ?1 AND json.value LIKE '&%%.sha256' AND length(json.value) = ?2 AND blobs.content IS NULL", -1, &statement, NULL) == SQLITE_OK)
{
if (sqlite3_bind_int64(statement, 1, rowid) == SQLITE_OK &&
sqlite3_bind_int(statement, 2, k_blob_id_len - 1) == SQLITE_OK)
{
int r = SQLITE_OK;
while ((r = sqlite3_step(statement)) == SQLITE_ROW)
{
int id_size = sqlite3_column_bytes(statement, 0);
const uint8_t* id = sqlite3_column_text(statement, 0);
result = tf_realloc(result, size + id_size + 1);
memcpy(result + size, id, id_size + 1);
size += id_size + 1;
}
if (r != SQLITE_DONE)
{
tf_printf("%s\n", sqlite3_errmsg(db));
}
}
else
{
tf_printf("bind failed: %s\n", sqlite3_errmsg(db));
}
sqlite3_finalize(statement);
}
else
{
tf_printf("%s: prepare failed: %s\n", __FUNCTION__, sqlite3_errmsg(db));
}
result = tf_realloc(result, size + 1);
result[size] = '\0';
tf_ssb_release_db_reader(ssb, db);
return result;
}
typedef struct _message_store_t
{
uv_work_t work;
uv_thread_t thread_id;
tf_ssb_t* ssb;
char id[k_id_base64_len];
char signature[512];
bool sequence_before_author;
char previous[k_id_base64_len];
char author[k_id_base64_len];
int64_t sequence;
double timestamp;
const char* content;
size_t length;
bool out_stored;
char* out_blob_wants;
tf_ssb_db_store_message_callback_t* callback;
void* user_data;
uint64_t start_time;
uint64_t end_time;
message_store_t* next;
} message_store_t;
static void _tf_ssb_db_store_message_work(uv_work_t* work)
{
message_store_t* store = work->data;
store->start_time = uv_hrtime();
store->thread_id = uv_thread_self();
tf_trace_t* trace = tf_ssb_get_trace(store->ssb);
tf_trace_begin(trace, "message_store_work");
int64_t last_row_id = _tf_ssb_db_store_message_raw(store->ssb, store->id, *store->previous ? store->previous : NULL, store->author, store->sequence, store->timestamp, store->content, store->length, store->signature, store->sequence_before_author);
if (last_row_id != -1)
{
store->out_stored = true;
store->out_blob_wants = _tf_ssb_db_get_message_blob_wants(store->ssb, last_row_id);
}
tf_trace_end(trace);
store->end_time = uv_hrtime();
}
static void _wake_up_queue(tf_ssb_t* ssb, tf_ssb_store_queue_t* queue)
{
if (!queue->running)
{
message_store_t* next = queue->head;
if (next)
{
queue->head = next->next;
if (queue->tail == next)
{
queue->tail = NULL;
}
next->next = NULL;
queue->running = true;
int r = uv_queue_work(tf_ssb_get_loop(ssb), &next->work, _tf_ssb_db_store_message_work, _tf_ssb_db_store_message_after_work);
if (r)
{
_tf_ssb_db_store_message_work_finish(next);
}
}
}
}
static void _tf_ssb_db_store_message_work_finish(message_store_t* store)
{
JSContext* context = tf_ssb_get_context(store->ssb);
if (store->callback)
{
store->callback(store->id, store->out_stored, store->user_data);
}
JS_FreeCString(context, store->content);
tf_ssb_store_queue_t* queue = tf_ssb_get_store_queue(store->ssb);
queue->running = false;
_wake_up_queue(store->ssb, queue);
tf_free(store);
}
static void _tf_ssb_db_store_message_after_work(uv_work_t* work, int status)
{
message_store_t* store = work->data;
tf_ssb_record_thread_time(store->ssb, (int64_t)store->thread_id, store->end_time - store->start_time);
tf_trace_t* trace = tf_ssb_get_trace(store->ssb);
tf_trace_begin(trace, "message_store_after_work");
if (store->out_stored)
{
tf_trace_begin(trace, "notify_message_added");
tf_ssb_notify_message_added(store->ssb, store->id);
tf_trace_end(trace);
}
if (store->out_blob_wants)
{
tf_trace_begin(trace, "notify_blob_wants_added");
for (char* p = store->out_blob_wants; *p; p = p + strlen(p))
{
tf_ssb_notify_blob_want_added(store->ssb, p);
}
tf_free(store->out_blob_wants);
tf_trace_end(trace);
}
_tf_ssb_db_store_message_work_finish(store);
tf_trace_end(trace);
}
void tf_ssb_db_store_message(tf_ssb_t* ssb, JSContext* context, const char* id, JSValue val, const char* signature, bool sequence_before_author, tf_ssb_db_store_message_callback_t* callback, void* user_data)
{
JSValue previousval = JS_GetPropertyStr(context, val, "previous");
const char* previous = JS_IsNull(previousval) ? NULL : JS_ToCString(context, previousval);
JS_FreeValue(context, previousval);
JSValue authorval = JS_GetPropertyStr(context, val, "author");
const char* author = JS_ToCString(context, authorval);
JS_FreeValue(context, authorval);
int64_t sequence = -1;
JSValue sequenceval = JS_GetPropertyStr(context, val, "sequence");
JS_ToInt64(context, &sequence, sequenceval);
JS_FreeValue(context, sequenceval);
double timestamp = -1.0;
JSValue timestampval = JS_GetPropertyStr(context, val, "timestamp");
JS_ToFloat64(context, &timestamp, timestampval);
JS_FreeValue(context, timestampval);
JSValue contentval = JS_GetPropertyStr(context, val, "content");
JSValue content = JS_JSONStringify(context, contentval, JS_NULL, JS_NULL);
size_t content_len;
const char* contentstr = JS_ToCStringLen(context, &content_len, content);
JS_FreeValue(context, content);
JS_FreeValue(context, contentval);
message_store_t* store = tf_malloc(sizeof(message_store_t));
*store = (message_store_t)
{
.work =
{
.data = store,
},
.ssb = ssb,
.sequence = sequence,
.timestamp = timestamp,
.content = contentstr,
.length = content_len,
.sequence_before_author = sequence_before_author,
.callback = callback,
.user_data = user_data,
};
snprintf(store->id, sizeof(store->id), "%s", id);
snprintf(store->previous, sizeof(store->previous), "%s", previous ? previous : "");
snprintf(store->author, sizeof(store->author), "%s", author);
snprintf(store->signature, sizeof(store->signature), "%s", signature);
JS_FreeCString(context, author);
JS_FreeCString(context, previous);
tf_ssb_store_queue_t* queue = tf_ssb_get_store_queue(ssb);
if (queue->tail)
{
message_store_t* tail = queue->tail;
tail->next = store;
queue->tail = store;
}
else
{
queue->head = store;
queue->tail = store;
}
_wake_up_queue(ssb, queue);
}
bool tf_ssb_db_message_content_get(tf_ssb_t* ssb, const char* id, uint8_t** out_blob, size_t* out_size)
{
bool result = false;
sqlite3_stmt* statement;
sqlite3* db = tf_ssb_acquire_db_reader(ssb);
const char* query = "SELECT content FROM messages WHERE id = ?";
if (sqlite3_prepare(db, query, -1, &statement, NULL) == SQLITE_OK)
{
if (sqlite3_bind_text(statement, 1, id, -1, NULL) == SQLITE_OK &&
sqlite3_step(statement) == SQLITE_ROW)
{
const uint8_t* blob = sqlite3_column_blob(statement, 0);
int size = sqlite3_column_bytes(statement, 0);
if (out_blob)
{
*out_blob = tf_malloc(size + 1);
memcpy(*out_blob, blob, size);
(*out_blob)[size] = '\0';
}
if (out_size)
{
*out_size = size;
}
result = true;
}
sqlite3_finalize(statement);
}
tf_ssb_release_db_reader(ssb, db);
return result;
}
bool tf_ssb_db_blob_has(tf_ssb_t* ssb, const char* id)
{
bool result = false;
sqlite3_stmt* statement;
sqlite3* db = tf_ssb_acquire_db_reader(ssb);
const char* query = "SELECT COUNT(*) FROM blobs WHERE id = $1";
if (sqlite3_prepare(db, query, -1, &statement, NULL) == SQLITE_OK)
{
if (sqlite3_bind_text(statement, 1, id, -1, NULL) == SQLITE_OK &&
sqlite3_step(statement) == SQLITE_ROW)
{
result = sqlite3_column_int64(statement, 0) != 0;
}
sqlite3_finalize(statement);
}
tf_ssb_release_db_reader(ssb, db);
return result;
}
bool tf_ssb_db_blob_get(tf_ssb_t* ssb, const char* id, uint8_t** out_blob, size_t* out_size)
{
bool result = false;
sqlite3_stmt* statement;
sqlite3* db = tf_ssb_acquire_db_reader(ssb);
const char* query = "SELECT content FROM blobs WHERE id = $1";
if (sqlite3_prepare(db, query, -1, &statement, NULL) == SQLITE_OK)
{
if (sqlite3_bind_text(statement, 1, id, -1, NULL) == SQLITE_OK &&
sqlite3_step(statement) == SQLITE_ROW)
{
const uint8_t* blob = sqlite3_column_blob(statement, 0);
int size = sqlite3_column_bytes(statement, 0);
if (out_blob)
{
*out_blob = tf_malloc(size + 1);
if (size)
{
memcpy(*out_blob, blob, size);
}
(*out_blob)[size] = '\0';
}
if (out_size)
{
*out_size = size;
}
result = true;
}
sqlite3_finalize(statement);
}
tf_ssb_release_db_reader(ssb, db);
return result;
}
typedef struct _blob_store_work_t
{
uv_work_t work;
uv_thread_t thread_id;
tf_ssb_t* ssb;
const uint8_t* blob;
size_t size;
char id[k_blob_id_len];
bool is_new;
tf_ssb_db_blob_store_callback_t* callback;
void* user_data;
uint64_t start_time;
uint64_t end_time;
} blob_store_work_t;
static void _tf_ssb_db_blob_store_work(uv_work_t* work)
{
blob_store_work_t* blob_work = work->data;
blob_work->start_time = uv_hrtime();
blob_work->thread_id = uv_thread_self();
tf_trace_t* trace = tf_ssb_get_trace(blob_work->ssb);
tf_trace_begin(trace, "blob_store_work");
tf_ssb_db_blob_store(blob_work->ssb, blob_work->blob, blob_work->size, blob_work->id, sizeof(blob_work->id), &blob_work->is_new);
tf_trace_end(trace);
blob_work->end_time = uv_hrtime();
}
static void _tf_ssb_db_blob_store_after_work(uv_work_t* work, int status)
{
blob_store_work_t* blob_work = work->data;
tf_ssb_record_thread_time(blob_work->ssb, (int64_t)blob_work->thread_id, blob_work->end_time - blob_work->start_time);
tf_trace_t* trace = tf_ssb_get_trace(blob_work->ssb);
tf_trace_begin(trace, "blob_store_after_work");
if (status == 0 && *blob_work->id)
{
tf_ssb_notify_blob_stored(blob_work->ssb, blob_work->id);
}
if (status != 0)
{
tf_printf("tf_ssb_db_blob_store_async -> uv_queue_work failed asynchronously: %s\n", uv_strerror(status));
}
if (blob_work->callback)
{
blob_work->callback(status == 0 ? blob_work->id : NULL, blob_work->is_new, blob_work->user_data);
}
tf_trace_end(trace);
tf_free(blob_work);
}
void tf_ssb_db_blob_store_async(tf_ssb_t* ssb, const uint8_t* blob, size_t size, tf_ssb_db_blob_store_callback_t* callback, void* user_data)
{
blob_store_work_t* work = tf_malloc(sizeof(blob_store_work_t));
*work = (blob_store_work_t)
{
.work =
{
.data = work,
},
.ssb = ssb,
.blob = blob,
.size = size,
.callback = callback,
.user_data = user_data,
};
int r = uv_queue_work(tf_ssb_get_loop(ssb), &work->work, _tf_ssb_db_blob_store_work, _tf_ssb_db_blob_store_after_work);
if (r)
{
tf_printf("tf_ssb_db_blob_store_async -> uv_queue_work failed immediately: %s\n", uv_strerror(r));
if (callback)
{
callback(NULL, false, user_data);
}
tf_free(work);
}
}
bool tf_ssb_db_blob_store(tf_ssb_t* ssb, const uint8_t* blob, size_t size, char* out_id, size_t out_id_size, bool* out_new)
{
bool result = false;
uint8_t hash[crypto_hash_sha256_BYTES];
crypto_hash_sha256(hash, blob, size);
char hash64[256];
tf_base64_encode(hash, sizeof(hash), hash64, sizeof(hash64));
char id[512];
snprintf(id, sizeof(id), "&%s.sha256", hash64);
int rows = 0;
sqlite3* db = tf_ssb_acquire_db_writer(ssb);
sqlite3_stmt* statement;
if (sqlite3_prepare(db, "INSERT INTO blobs (id, content, created) VALUES ($1, $2, CAST(strftime('%s') AS INTEGER)) ON CONFLICT DO NOTHING", -1, &statement, NULL) == SQLITE_OK)
{
if (sqlite3_bind_text(statement, 1, id, -1, NULL) == SQLITE_OK &&
sqlite3_bind_blob(statement, 2, blob, size, NULL) == SQLITE_OK)
{
result = sqlite3_step(statement) == SQLITE_DONE;
rows = sqlite3_changes(db);
}
else
{
tf_printf("bind failed: %s\n", sqlite3_errmsg(db));
}
sqlite3_finalize(statement);
}
else
{
tf_printf("%s: prepare failed: %s\n", __FUNCTION__, sqlite3_errmsg(db));
}
tf_ssb_release_db_writer(ssb, db);
if (rows)
{
if (!out_new)
{
tf_printf("blob stored %s %zd => %d\n", id, size, result);
}
}
if (result && out_id)
{
snprintf(out_id, out_id_size, "%s", id);
}
if (out_new)
{
*out_new = rows != 0;
}
return result;
}
bool tf_ssb_db_get_message_by_author_and_sequence(tf_ssb_t* ssb, const char* author, int64_t sequence, char* out_message_id, size_t out_message_id_size, double* out_timestamp, char** out_content)
{
bool found = false;
sqlite3_stmt* statement;
const char* query = "SELECT id, timestamp, content FROM messages WHERE author = $1 AND sequence = $2";
sqlite3* db = tf_ssb_acquire_db_reader(ssb);
if (sqlite3_prepare(db, query, -1, &statement, NULL) == SQLITE_OK)
{
if (sqlite3_bind_text(statement, 1, author, -1, NULL) == SQLITE_OK &&
sqlite3_bind_int64(statement, 2, sequence) == SQLITE_OK &&
sqlite3_step(statement) == SQLITE_ROW)
{
if (out_message_id)
{
strncpy(out_message_id, (const char*)sqlite3_column_text(statement, 0), out_message_id_size - 1);
}
if (out_timestamp)
{
*out_timestamp = sqlite3_column_double(statement, 1);
}
if (out_content)
{
*out_content = tf_strdup((const char*)sqlite3_column_text(statement, 2));
}
found = true;
}
sqlite3_finalize(statement);
}
else
{
tf_printf("%s: prepare failed: %s\n", __FUNCTION__, sqlite3_errmsg(db));
}
tf_ssb_release_db_reader(ssb, db);
return found;
}
bool tf_ssb_db_get_latest_message_by_author(tf_ssb_t* ssb, const char* author, int64_t* out_sequence, char* out_message_id, size_t out_message_id_size)
{
bool found = false;
sqlite3_stmt* statement;
sqlite3* db = tf_ssb_acquire_db_reader(ssb);
const char* query = "SELECT id, sequence FROM messages WHERE author = $1 AND sequence = (SELECT MAX(sequence) FROM messages WHERE author = $1)";
if (sqlite3_prepare(db, query, -1, &statement, NULL) == SQLITE_OK)
{
if (sqlite3_bind_text(statement, 1, author, -1, NULL) == SQLITE_OK &&
sqlite3_step(statement) == SQLITE_ROW)
{
if (out_sequence)
{
*out_sequence = sqlite3_column_int64(statement, 1);
}
if (out_message_id)
{
strncpy(out_message_id, (const char*)sqlite3_column_text(statement, 0), out_message_id_size - 1);
}
found = true;
}
sqlite3_finalize(statement);
}
else
{
tf_printf("%s: prepare failed: %s\n", __FUNCTION__, sqlite3_errmsg(db));
}
tf_ssb_release_db_reader(ssb, db);
return found;
}
static JSValue _tf_ssb_sqlite_bind_json(JSContext* context, sqlite3* db, sqlite3_stmt* statement, JSValue binds)
{
if (JS_IsUndefined(binds))
{
return JS_UNDEFINED;
}
if (!JS_IsArray(context, binds))
{
return JS_ThrowTypeError(context, "Expected bind parameters to be an array.");
}
JSValue result = JS_UNDEFINED;
int32_t length = tf_util_get_length(context, binds);
for (int i = 0; i < length && JS_IsUndefined(result); i++)
{
JSValue value = JS_GetPropertyUint32(context, binds, i);
if (JS_IsNumber(value))
{
int64_t number = 0;
JS_ToInt64(context, &number, value);
if (sqlite3_bind_int64(statement, i + 1, number) != SQLITE_OK)
{
result = JS_ThrowInternalError(context, "Failed to bind: %s.", sqlite3_errmsg(db));
}
}
else if (JS_IsBool(value))
{
if (sqlite3_bind_int(statement, i + 1, JS_ToBool(context, value) ? 1 : 0) != SQLITE_OK)
{
result = JS_ThrowInternalError(context, "Failed to bind: %s.", sqlite3_errmsg(db));
}
}
else if (JS_IsNull(value))
{
if (sqlite3_bind_null(statement, i + 1) != SQLITE_OK)
{
result = JS_ThrowInternalError(context, "Failed to bind: %s.", sqlite3_errmsg(db));
}
}
else
{
size_t str_len = 0;
const char* str = JS_ToCStringLen(context, &str_len, value);
if (str)
{
if (sqlite3_bind_text(statement, i + 1, str, str_len, SQLITE_TRANSIENT) != SQLITE_OK)
{
result = JS_ThrowInternalError(context, "Failed to bind: %s.", sqlite3_errmsg(db));
}
JS_FreeCString(context, str);
}
else
{
result = JS_ThrowInternalError(context, "Could not convert bind argument %d to string.", i);
}
}
JS_FreeValue(context, value);
}
return result;
}
static JSValue _tf_ssb_sqlite_row_to_json(JSContext* context, sqlite3_stmt* row)
{
JSValue result = JS_NewObject(context);
for (int i = 0; i < sqlite3_column_count(row); i++)
{
const char* name = sqlite3_column_name(row, i);
switch (sqlite3_column_type(row, i))
{
case SQLITE_INTEGER:
JS_SetPropertyStr(context, result, name, JS_NewInt64(context, sqlite3_column_int64(row, i)));
break;
case SQLITE_FLOAT:
JS_SetPropertyStr(context, result, name, JS_NewFloat64(context, sqlite3_column_double(row, i)));
break;
case SQLITE_TEXT:
JS_SetPropertyStr(context, result, name, JS_NewStringLen(context, (const char*)sqlite3_column_text(row, i), sqlite3_column_bytes(row, i)));
break;
case SQLITE_BLOB:
JS_SetPropertyStr(context, result, name, JS_NewArrayBufferCopy(context, sqlite3_column_blob(row, i), sqlite3_column_bytes(row, i)));
break;
case SQLITE_NULL:
JS_SetPropertyStr(context, result, name, JS_NULL);
break;
}
}
return result;
}
static int _tf_ssb_sqlite_authorizer(void* user_data, int action_code, const char* arg0, const char* arg1, const char* arg2, const char* arg3)
{
int result = SQLITE_DENY;
switch (action_code)
{
case SQLITE_SELECT:
case SQLITE_FUNCTION:
result = SQLITE_OK;
break;
case SQLITE_READ:
result = (
strcmp(arg0, "blob_wants_view") == 0 ||
strcmp(arg0, "json_each") == 0 ||
strcmp(arg0, "json_tree") == 0 ||
strcmp(arg0, "messages") == 0 ||
strcmp(arg0, "messages_fts") == 0 ||
strcmp(arg0, "messages_fts_idx") == 0 ||
strcmp(arg0, "messages_fts_config") == 0 ||
strcmp(arg0, "messages_refs") == 0 ||
strcmp(arg0, "messages_refs_message_idx") == 0 ||
strcmp(arg0, "messages_refs_ref_idx") == 0 ||
strcmp(arg0, "sqlite_master") == 0 ||
false)
? SQLITE_OK : SQLITE_DENY;
break;
case SQLITE_PRAGMA:
result = strcmp(arg0, "data_version") == 0 ? SQLITE_OK : SQLITE_DENY;
break;
case SQLITE_UPDATE:
result = strcmp(arg0, "sqlite_master") == 0 ? SQLITE_OK : SQLITE_DENY;
break;
}
if (result != SQLITE_OK)
{
tf_printf("Denying sqlite access to %d %s %s %s %s\n", action_code, arg0, arg1, arg2, arg3);
fflush(stdout);
}
return result;
}
JSValue tf_ssb_db_visit_query(tf_ssb_t* ssb, const char* query, const JSValue binds, void (*callback)(JSValue row, void* user_data), void* user_data)
{
JSValue result = JS_UNDEFINED;
sqlite3* db = tf_ssb_acquire_db_reader(ssb);
JSContext* context = tf_ssb_get_context(ssb);
sqlite3_stmt* statement;
sqlite3_set_authorizer(db, _tf_ssb_sqlite_authorizer, ssb);
if (sqlite3_prepare(db, query, -1, &statement, NULL) == SQLITE_OK)
{
JSValue bind_result = _tf_ssb_sqlite_bind_json(context, db, statement, binds);
if (JS_IsUndefined(bind_result))
{
int r = SQLITE_OK;
while ((r = sqlite3_step(statement)) == SQLITE_ROW)
{
JSValue row = _tf_ssb_sqlite_row_to_json(context, statement);
tf_trace_t* trace = tf_ssb_get_trace(ssb);
tf_trace_begin(trace, "callback");
callback(row, user_data);
tf_trace_end(trace);
JS_FreeValue(context, row);
}
if (r != SQLITE_DONE)
{
result = JS_ThrowInternalError(context, "SQL Error %s: running \"%s\".", sqlite3_errmsg(db), query);
}
}
else
{
result = bind_result;
}
sqlite3_finalize(statement);
}
else
{
result = JS_ThrowInternalError(context, "SQL Error %s: preparing \"%s\".", sqlite3_errmsg(db), query);
}
sqlite3_set_authorizer(db, NULL, NULL);
tf_ssb_release_db_reader(ssb, db);
return result;
}
JSValue tf_ssb_format_message(JSContext* context, const char* previous, const char* author, int64_t sequence, double timestamp, const char* hash, const char* content, const char* signature, bool sequence_before_author)
{
JSValue value = JS_NewObject(context);
JS_SetPropertyStr(context, value, "previous", (previous && *previous) ? JS_NewString(context, previous) : JS_NULL);
if (sequence_before_author)
{
JS_SetPropertyStr(context, value, "sequence", JS_NewInt64(context, sequence));
JS_SetPropertyStr(context, value, "author", JS_NewString(context, author));
}
else
{
JS_SetPropertyStr(context, value, "author", JS_NewString(context, author));
JS_SetPropertyStr(context, value, "sequence", JS_NewInt64(context, sequence));
}
JS_SetPropertyStr(context, value, "timestamp", JS_NewFloat64(context, timestamp));
JS_SetPropertyStr(context, value, "hash", JS_NewString(context, hash));
JS_SetPropertyStr(context, value, "content", JS_ParseJSON(context, content, strlen(content), NULL));
JS_SetPropertyStr(context, value, "signature", JS_NewString(context, signature));
return value;
}
bool _tf_ssb_update_message_id(sqlite3* db, const char* old_id, const char* new_id)
{
bool success = false;
sqlite3_stmt* statement = NULL;
if (sqlite3_prepare(db, "UPDATE messages SET id = ? WHERE id = ?", -1, &statement, NULL) == SQLITE_OK)
{
if (sqlite3_bind_text(statement, 1, new_id, -1, NULL) == SQLITE_OK &&
sqlite3_bind_text(statement, 2, old_id, -1, NULL) == SQLITE_OK)
{
success = sqlite3_step(statement) == SQLITE_DONE;
}
sqlite3_finalize(statement);
}
return success;
}
bool tf_ssb_db_check(sqlite3* db, const char* check_author)
{
JSMallocFunctions funcs = { 0 };
tf_get_js_malloc_functions(&funcs);
JSRuntime* runtime = JS_NewRuntime2(&funcs, NULL);
JSContext* context = JS_NewContext(runtime);
sqlite3_stmt* statement = NULL;
int result = check_author ?
sqlite3_prepare(db, "SELECT id, previous, author, sequence, timestamp, hash, content, signature, sequence_before_author FROM messages WHERE author = ? ORDER BY author, sequence", -1, &statement, NULL) :
sqlite3_prepare(db, "SELECT id, previous, author, sequence, timestamp, hash, content, signature, sequence_before_author FROM messages ORDER BY author, sequence", -1, &statement, NULL);
if (result == SQLITE_OK)
{
if (check_author)
{
sqlite3_bind_text(statement, 1, check_author, -1, NULL);
}
char previous_id[k_id_base64_len];
int64_t previous_sequence = -1;
char previous_author[k_id_base64_len] = { 0 };
while (sqlite3_step(statement) == SQLITE_ROW)
{
const char* id = (const char*)sqlite3_column_text(statement, 0);
const char* previous = (const char*)sqlite3_column_text(statement, 1);
const char* author = (const char*)sqlite3_column_text(statement, 2);
int64_t sequence = sqlite3_column_int64(statement, 3);
double timestamp = sqlite3_column_double(statement, 4);
const char* hash = (const char*)sqlite3_column_text(statement, 5);
const char* content = (const char*)sqlite3_column_text(statement, 6);
const char* signature = (const char*)sqlite3_column_text(statement, 7);
bool sequence_before_author = sqlite3_column_int(statement, 8);
JSValue message = tf_ssb_format_message(context, previous, author, sequence, timestamp, hash, content, signature, sequence_before_author);
char out_signature[512];
char actual_id[k_id_base64_len];
bool actual_sequence_before_author = false;
JSValue j = JS_JSONStringify(context, message, JS_NULL, JS_NewInt32(context, 2));
const char* jv = JS_ToCString(context, j);
bool delete_following = false;
if (strcmp(author, previous_author))
{
tf_printf("%s\n", author);
}
if (strcmp(author, previous_author) == 0 && sequence != previous_sequence + 1)
{
tf_printf("Detected gap in messages for %s at sequence = %" PRId64 " => %" PRId64 ".\n", author, previous_sequence, sequence);
delete_following = true;
}
else
{
if (tf_ssb_verify_and_strip_signature(context, message, actual_id, sizeof(actual_id), out_signature, sizeof(out_signature), &actual_sequence_before_author))
{
if (previous && strcmp(previous, previous_id))
{
tf_printf("%s:%d previous was %s should be %s\n", id, (int)sequence, previous_id, previous);
}
if (strcmp(id, actual_id))
{
if (_tf_ssb_update_message_id(db, id, actual_id))
{
tf_printf("updated %s to %s\n", id, actual_id);
}
else
{
tf_printf("failed to update %s to %s\n", id, actual_id);
}
}
}
else
{
tf_printf("%s sequence=%" PRId64 " unable to verify signature for %s sequence_before_author=%d message=[%.*s]\n", author, sequence, id, sequence_before_author, (int)strlen(jv), jv);
delete_following = true;
}
}
if (delete_following)
{
tf_printf("Deleting author = %s sequence >= %" PRId64 ".\n", author, sequence);
sqlite3_stmt* delete_statement = NULL;
if (sqlite3_prepare(db, "DELETE FROM messages WHERE author = ? AND sequence >= ?", -1, &delete_statement, NULL) == SQLITE_OK)
{
if (sqlite3_bind_text(delete_statement, 1, author, -1, NULL) == SQLITE_OK &&
sqlite3_bind_int64(delete_statement, 2, sequence) == SQLITE_OK)
{
if (sqlite3_step(delete_statement) != SQLITE_DONE)
{
tf_printf("Error deleting author = %s sequence >= %" PRId64 ".\n", author, sequence);
}
}
sqlite3_finalize(delete_statement);
}
}
snprintf(previous_author, sizeof(previous_author), "%s", author);
previous_sequence = sequence;
JS_FreeCString(context, jv);
JS_FreeValue(context, j);
snprintf(previous_id, sizeof(previous_id), "%s", id);
JS_FreeValue(context, message);
}
sqlite3_finalize(statement);
}
JS_FreeContext(context);
JS_FreeRuntime(runtime);
return false;
}
int tf_ssb_db_identity_get_count_for_user(tf_ssb_t* ssb, const char* user)
{
int count = 0;
sqlite3* db = tf_ssb_acquire_db_reader(ssb);
sqlite3_stmt* statement = NULL;
if (sqlite3_prepare(db, "SELECT COUNT(*) FROM identities WHERE user = ?", -1, &statement, NULL) == SQLITE_OK)
{
if (sqlite3_bind_text(statement, 1, user, -1, NULL) == SQLITE_OK)
{
if (sqlite3_step(statement) == SQLITE_ROW)
{
count = sqlite3_column_int(statement, 0);
}
}
sqlite3_finalize(statement);
}
tf_ssb_release_db_reader(ssb, db);
return count;
}
bool tf_ssb_db_identity_create(tf_ssb_t* ssb, const char* user, uint8_t* out_public_key, uint8_t* out_private_key)
{
int count = tf_ssb_db_identity_get_count_for_user(ssb, user);
if (count < 16)
{
char public[512];
char private[512];
tf_ssb_generate_keys_buffer(public, sizeof(public), private, sizeof(private));
if (tf_ssb_db_identity_add(ssb, user, public, private))
{
tf_ssb_id_str_to_bin(out_public_key, public);
tf_ssb_id_str_to_bin(out_private_key, private);
return true;
}
}
return false;
}
bool tf_ssb_db_identity_add(tf_ssb_t* ssb, const char* user, const char* public_key, const char* private_key)
{
bool added = false;
sqlite3* db = tf_ssb_acquire_db_writer(ssb);
sqlite3_stmt* statement = NULL;
if (sqlite3_prepare(db, "INSERT INTO identities (user, public_key, private_key) VALUES (?, ?, ?) ON CONFLICT DO NOTHING", -1, &statement, NULL) == SQLITE_OK)
{
if (sqlite3_bind_text(statement, 1, user, -1, NULL) == SQLITE_OK &&
sqlite3_bind_text(statement, 2, public_key, -1, NULL) == SQLITE_OK &&
sqlite3_bind_text(statement, 3, private_key, -1, NULL) == SQLITE_OK)
{
added =
sqlite3_step(statement) == SQLITE_DONE &&
sqlite3_changes(db) != 0;
if (!added)
{
tf_printf("Unable to add identity: %s.\n", sqlite3_errmsg(db));
}
}
sqlite3_finalize(statement);
}
tf_ssb_release_db_writer(ssb, db);
return added;
}
void tf_ssb_db_identity_visit(tf_ssb_t* ssb, const char* user, void (*callback)(const char* identity, void* user_data), void* user_data)
{
sqlite3* db = tf_ssb_acquire_db_reader(ssb);
sqlite3_stmt* statement = NULL;
if (sqlite3_prepare(db, "SELECT public_key FROM identities WHERE user = ? ORDER BY public_key", -1, &statement, NULL) == SQLITE_OK)
{
if (sqlite3_bind_text(statement, 1, user, -1, NULL) == SQLITE_OK)
{
while (sqlite3_step(statement) == SQLITE_ROW)
{
callback((const char*)sqlite3_column_text(statement, 0), user_data);
}
}
sqlite3_finalize(statement);
}
tf_ssb_release_db_reader(ssb, db);
}
void tf_ssb_db_identity_visit_all(tf_ssb_t* ssb, void (*callback)(const char* identity, void* user_data), void* user_data)
{
sqlite3* db = tf_ssb_acquire_db_reader(ssb);
sqlite3_stmt* statement = NULL;
if (sqlite3_prepare(db, "SELECT public_key FROM identities ORDER BY public_key", -1, &statement, NULL) == SQLITE_OK)
{
while (sqlite3_step(statement) == SQLITE_ROW)
{
callback((const char*)sqlite3_column_text(statement, 0), user_data);
}
sqlite3_finalize(statement);
}
tf_ssb_release_db_reader(ssb, db);
}
bool tf_ssb_db_identity_get_private_key(tf_ssb_t* ssb, const char* user, const char* public_key, uint8_t* out_private_key, size_t private_key_size)
{
bool success = false;
memset(out_private_key, 0, crypto_sign_SECRETKEYBYTES);
sqlite3* db = tf_ssb_acquire_db_reader(ssb);
sqlite3_stmt* statement = NULL;
if (sqlite3_prepare(db, "SELECT private_key FROM identities WHERE user = ? AND public_key = ?", -1, &statement, NULL) == SQLITE_OK)
{
if (sqlite3_bind_text(statement, 1, user, -1, NULL) == SQLITE_OK &&
sqlite3_bind_text(statement, 2, (public_key && *public_key == '@') ? public_key + 1 : public_key, -1, NULL) == SQLITE_OK)
{
if (sqlite3_step(statement) == SQLITE_ROW)
{
const char* key = (const char*)sqlite3_column_text(statement, 0);
int r = tf_base64_decode(key, sqlite3_column_bytes(statement, 0) - strlen(".ed25519"), out_private_key, private_key_size);
success = r > 0;
}
}
sqlite3_finalize(statement);
}
tf_ssb_release_db_reader(ssb, db);
return success;
}
typedef struct _following_t following_t;
typedef struct _following_t
{
char id[k_id_base64_len];
following_t** following;
following_t** blocking;
int following_count;
int blocking_count;
int depth;
} following_t;
static int _following_compare(const void* a, const void* b)
{
const char* ida = a;
const following_t* const* followingb = b;
return strcmp(ida, (*followingb)->id);
}
static void _add_following_entry(following_t*** list, int* count, following_t* add)
{
int index = tf_util_insert_index(add->id, *list, *count, sizeof(following_t*), _following_compare);
if (index >= *count || strcmp(add->id, (*list)[index]->id) == 0)
{
*list = tf_resize_vec(*list, sizeof(**list) * (*count + 1));
if (*count - index)
{
memmove(*list + index + 1, *list + index, sizeof(following_t*) * (*count - index));
}
(*list)[index] = add;
}
}
static following_t* _get_following(tf_ssb_t* ssb, const char* id, following_t*** following, int* following_count, int depth, int max_depth)
{
int index = tf_util_insert_index(id, *following, *following_count, sizeof(following_t*), _following_compare);
following_t* entry = NULL;
bool already_populated = false;
if (index < *following_count && strcmp(id, (*following)[index]->id) == 0)
{
entry = (*following)[index];
already_populated = entry->depth < max_depth;
if (depth < entry->depth)
{
entry->depth = depth;
}
}
else
{
*following = tf_resize_vec(*following, sizeof(*following) * (*following_count + 1));
if (*following_count - index)
{
memmove(*following + index + 1, *following + index, sizeof(following_t*) * (*following_count - index));
}
entry = tf_malloc(sizeof(following_t));
(*following)[index] = entry;
(*following_count)++;
memset(entry, 0, sizeof(*entry));
snprintf(entry->id, sizeof(entry->id), "%s", id);
entry->depth = depth;
}
if (depth < max_depth && !already_populated)
{
sqlite3* db = tf_ssb_acquire_db_reader(ssb);
sqlite3_stmt* statement = NULL;
if (sqlite3_prepare(db, "SELECT json_extract(content, '$.contact'), json_extract(content, '$.following'), json_extract(content, '$.blocking') FROM messages WHERE author = ? AND json_extract(content, '$.type') = 'contact' ORDER BY sequence", -1, &statement, NULL) == SQLITE_OK)
{
if (sqlite3_bind_text(statement, 1, id, -1, NULL) == SQLITE_OK)
{
while (sqlite3_step(statement) == SQLITE_ROW)
{
const char* contact = (const char*)sqlite3_column_text(statement, 0);
if (sqlite3_column_type(statement, 1) != SQLITE_NULL)
{
bool is_following = sqlite3_column_int(statement, 1);
following_t* next = _get_following(ssb, contact, following, following_count, depth + 1, max_depth);
if (is_following)
{
_add_following_entry(&entry->following, &entry->following_count, next);
}
}
if (sqlite3_column_type(statement, 2) != SQLITE_NULL)
{
bool is_blocking = sqlite3_column_int(statement, 2);
following_t* next = _get_following(ssb, contact, following, following_count, depth + 1, 0 /* don't dig deeper into blocked users */);
if (is_blocking)
{
_add_following_entry(&entry->blocking, &entry->blocking_count, next);
}
}
}
}
sqlite3_finalize(statement);
}
tf_ssb_release_db_reader(ssb, db);
}
return entry;
}
const char** tf_ssb_db_following_deep(tf_ssb_t* ssb, const char** ids, int count, int depth)
{
following_t** following = NULL;
int following_count = 0;
for (int i = 0; i < count; i++)
{
_get_following(ssb, ids[i], &following, &following_count, 0, depth);
}
char** result = tf_malloc(sizeof(char*) * (following_count + 1) + k_id_base64_len * following_count);
char* result_ids = (char*)result + sizeof(char*) * (following_count + 1);
for (int i = 0; i < following_count; i++)
{
result[i] = result_ids + k_id_base64_len * i;
snprintf(result[i], k_id_base64_len, "%s", following[i]->id);
}
result[following_count] = NULL;
for (int i = 0; i < following_count; i++)
{
tf_free(following[i]->following);
tf_free(following[i]->blocking);
tf_free(following[i]);
}
tf_free(following);
return (const char**)result;
}
typedef struct _identities_t
{
const char** ids;
int count;
} identities_t;
static void _add_identity(const char* identity, void* user_data)
{
identities_t* identities = user_data;
char full_id[k_id_base64_len];
snprintf(full_id, sizeof(full_id), "@%s", identity);
identities->ids = tf_resize_vec(identities->ids, sizeof(const char*) * (identities->count + 1));
identities->ids[identities->count++] = tf_strdup(full_id);
}
const char** tf_ssb_db_get_all_visible_identities(tf_ssb_t* ssb, int depth)
{
identities_t identities = { 0 };
tf_ssb_db_identity_visit_all(ssb, _add_identity, &identities);
const char** following = tf_ssb_db_following_deep(ssb, identities.ids, identities.count, depth);
for (int i = 0; i < identities.count; i++)
{
tf_free((void*)identities.ids[i]);
}
tf_free(identities.ids);
return following;
}
JSValue tf_ssb_db_get_message_by_id( tf_ssb_t* ssb, const char* id, bool is_keys)
{
JSValue result = JS_UNDEFINED;
JSContext* context = tf_ssb_get_context(ssb);
sqlite3* db = tf_ssb_acquire_db_reader(ssb);
sqlite3_stmt* statement;
if (sqlite3_prepare(db, "SELECT previous, author, id, sequence, timestamp, hash, content, signature, sequence_before_author FROM messages WHERE id = ?", -1, &statement, NULL) == SQLITE_OK)
{
if (sqlite3_bind_text(statement, 1, id, -1, NULL) == SQLITE_OK)
{
if (sqlite3_step(statement) == SQLITE_ROW)
{
JSValue message = JS_UNDEFINED;
JSValue formatted = tf_ssb_format_message(
context,
(const char*)sqlite3_column_text(statement, 0),
(const char*)sqlite3_column_text(statement, 1),
sqlite3_column_int64(statement, 3),
sqlite3_column_double(statement, 4),
(const char*)sqlite3_column_text(statement, 5),
(const char*)sqlite3_column_text(statement, 6),
(const char*)sqlite3_column_text(statement, 7),
sqlite3_column_int(statement, 8));
if (is_keys)
{
message = JS_NewObject(context);
JS_SetPropertyStr(context, message, "key", JS_NewString(context, (const char*)sqlite3_column_text(statement, 2)));
JS_SetPropertyStr(context, message, "value", formatted);
JS_SetPropertyStr(context, message, "timestamp", JS_NewString(context, (const char*)sqlite3_column_text(statement, 4)));
}
else
{
message = formatted;
}
result = message;
}
}
sqlite3_finalize(statement);
}
tf_ssb_release_db_reader(ssb, db);
return result;
}
tf_ssb_db_stored_connection_t* tf_ssb_db_get_stored_connections(tf_ssb_t* ssb, int* out_count)
{
sqlite3* db = tf_ssb_acquire_db_reader(ssb);
tf_ssb_db_stored_connection_t* result = NULL;
int count = 0;
sqlite3_stmt* statement;
if (sqlite3_prepare(db, "SELECT host, port, key FROM connections ORDER BY host, port, key", -1, &statement, NULL) == SQLITE_OK)
{
while (sqlite3_step(statement) == SQLITE_ROW)
{
result = tf_resize_vec(result, sizeof(tf_ssb_db_stored_connection_t) * (count + 1));
result[count] = (tf_ssb_db_stored_connection_t)
{
.port = sqlite3_column_int(statement, 1),
};
snprintf(result[count].address, sizeof(result[count].address), "%s", (const char*)sqlite3_column_text(statement, 0));
snprintf(result[count].pubkey, sizeof(result[count].pubkey), "%s", (const char*)sqlite3_column_text(statement, 2));
count++;
}
sqlite3_finalize(statement);
}
tf_ssb_release_db_reader(ssb, db);
*out_count = count;
return result;
}
void tf_ssb_db_forget_stored_connection(tf_ssb_t* ssb, const char* address, int port, const char* pubkey)
{
sqlite3* db = tf_ssb_acquire_db_writer(ssb);
sqlite3_stmt* statement;
if (sqlite3_prepare(db, "DELETE FROM connections WHERE host = ? AND port = ? AND key = ?", -1, &statement, NULL) == SQLITE_OK)
{
if (sqlite3_bind_text(statement, 1, address, -1, NULL) != SQLITE_OK ||
sqlite3_bind_int(statement, 2, port) != SQLITE_OK ||
sqlite3_bind_text(statement, 3, pubkey, -1, NULL) != SQLITE_OK ||
sqlite3_step(statement) != SQLITE_DONE)
{
tf_printf("Delete stored connection: %s.\n", sqlite3_errmsg(db));
}
sqlite3_finalize(statement);
}
tf_ssb_release_db_writer(ssb, db);
}