Allow the DB writer to be used from a worker thread. Not well tested, just still trying to charge forward on moving all blocking work off the main thread.

git-svn-id: https://www.unprompted.com/svn/projects/tildefriends/trunk@4325 ed5197a5-7fde-0310-b194-c3ffbd925b24
This commit is contained in:
2023-06-15 00:27:49 +00:00
parent 51b317233a
commit 7d562ce85c
11 changed files with 163 additions and 92 deletions

View File

@ -67,7 +67,7 @@ void tf_ssb_db_init_reader(sqlite3* db)
void tf_ssb_db_init(tf_ssb_t* ssb)
{
sqlite3* db = tf_ssb_get_db(ssb);
sqlite3* db = tf_ssb_acquire_db_writer(ssb);
_tf_ssb_db_init_internal(db);
_tf_ssb_db_exec(db,
"CREATE TABLE IF NOT EXISTS messages ("
@ -222,6 +222,7 @@ void tf_ssb_db_init(tf_ssb_t* ssb)
tf_printf("Adding sequence_before_author column.\n");
_tf_ssb_db_exec(db, "ALTER TABLE messages ADD COLUMN sequence_before_author INTEGER");
}
tf_ssb_release_db_writer(ssb, db);
}
static bool _tf_ssb_db_previous_message_exists(sqlite3* db, const char* author, int64_t sequence, const char* previous)
@ -266,7 +267,7 @@ bool tf_ssb_db_store_message(tf_ssb_t* ssb, JSContext* context, const char* id,
JS_ToInt64(context, &sequence, sequenceval);
JS_FreeValue(context, sequenceval);
sqlite3* db = tf_ssb_get_db(ssb);
sqlite3* db = tf_ssb_acquire_db_writer(ssb);
sqlite3_stmt* statement;
int64_t last_row_id = -1;
@ -315,7 +316,7 @@ bool tf_ssb_db_store_message(tf_ssb_t* ssb, JSContext* context, const char* id,
}
else
{
tf_printf("prepare failed: %s\n", sqlite3_errmsg(db));
tf_printf("%s: prepare failed: %s\n", __FUNCTION__, sqlite3_errmsg(db));
}
JS_FreeCString(context, contentstr);
@ -348,9 +349,10 @@ bool tf_ssb_db_store_message(tf_ssb_t* ssb, JSContext* context, const char* id,
}
else
{
tf_printf("prepare failed: %s\n", sqlite3_errmsg(db));
tf_printf("%s: prepare failed: %s\n", __FUNCTION__, sqlite3_errmsg(db));
}
}
tf_ssb_release_db_writer(ssb, db);
JS_FreeCString(context, author);
JS_FreeCString(context, previous);
@ -361,8 +363,9 @@ bool tf_ssb_db_message_content_get(tf_ssb_t* ssb, const char* id, uint8_t** out_
{
bool result = false;
sqlite3_stmt* statement;
sqlite3* db = tf_ssb_acquire_db_reader(ssb);
const char* query = "SELECT content FROM messages WHERE id = ?";
if (sqlite3_prepare(tf_ssb_get_db(ssb), query, -1, &statement, NULL) == SQLITE_OK)
if (sqlite3_prepare(db, query, -1, &statement, NULL) == SQLITE_OK)
{
if (sqlite3_bind_text(statement, 1, id, -1, NULL) == SQLITE_OK &&
sqlite3_step(statement) == SQLITE_ROW)
@ -383,6 +386,7 @@ bool tf_ssb_db_message_content_get(tf_ssb_t* ssb, const char* id, uint8_t** out_
}
sqlite3_finalize(statement);
}
tf_ssb_release_db_reader(ssb, db);
return result;
}
@ -390,8 +394,9 @@ bool tf_ssb_db_blob_has(tf_ssb_t* ssb, const char* id)
{
bool result = false;
sqlite3_stmt* statement;
sqlite3* db = tf_ssb_acquire_db_reader(ssb);
const char* query = "SELECT COUNT(*) FROM blobs WHERE id = $1";
if (sqlite3_prepare(tf_ssb_get_db(ssb), query, -1, &statement, NULL) == SQLITE_OK)
if (sqlite3_prepare(db, query, -1, &statement, NULL) == SQLITE_OK)
{
if (sqlite3_bind_text(statement, 1, id, -1, NULL) == SQLITE_OK &&
sqlite3_step(statement) == SQLITE_ROW)
@ -400,6 +405,7 @@ bool tf_ssb_db_blob_has(tf_ssb_t* ssb, const char* id)
}
sqlite3_finalize(statement);
}
tf_ssb_release_db_reader(ssb, db);
return result;
}
@ -407,8 +413,9 @@ bool tf_ssb_db_blob_get(tf_ssb_t* ssb, const char* id, uint8_t** out_blob, size_
{
bool result = false;
sqlite3_stmt* statement;
sqlite3* db = tf_ssb_acquire_db_reader(ssb);
const char* query = "SELECT content FROM blobs WHERE id = $1";
if (sqlite3_prepare(tf_ssb_get_db(ssb), query, -1, &statement, NULL) == SQLITE_OK)
if (sqlite3_prepare(db, query, -1, &statement, NULL) == SQLITE_OK)
{
if (sqlite3_bind_text(statement, 1, id, -1, NULL) == SQLITE_OK &&
sqlite3_step(statement) == SQLITE_ROW)
@ -432,13 +439,14 @@ bool tf_ssb_db_blob_get(tf_ssb_t* ssb, const char* id, uint8_t** out_blob, size_
}
sqlite3_finalize(statement);
}
tf_ssb_release_db_reader(ssb, db);
return result;
}
bool tf_ssb_db_blob_store(tf_ssb_t* ssb, const uint8_t* blob, size_t size, char* out_id, size_t out_id_size, bool* out_new)
{
bool result = false;
sqlite3* db = tf_ssb_get_db(ssb);
sqlite3* db = tf_ssb_acquire_db_writer(ssb);
sqlite3_stmt* statement;
uint8_t hash[crypto_hash_sha256_BYTES];
@ -468,8 +476,9 @@ bool tf_ssb_db_blob_store(tf_ssb_t* ssb, const uint8_t* blob, size_t size, char*
}
else
{
tf_printf("prepare failed: %s\n", sqlite3_errmsg(db));
tf_printf("%s: prepare failed: %s\n", __FUNCTION__, sqlite3_errmsg(db));
}
tf_ssb_release_db_writer(ssb, db);
if (rows)
{
@ -496,7 +505,8 @@ bool tf_ssb_db_get_message_by_author_and_sequence(tf_ssb_t* ssb, const char* aut
bool found = false;
sqlite3_stmt* statement;
const char* query = "SELECT id, timestamp, content FROM messages WHERE author = $1 AND sequence = $2";
if (sqlite3_prepare(tf_ssb_get_db(ssb), query, -1, &statement, NULL) == SQLITE_OK)
sqlite3* db = tf_ssb_acquire_db_reader(ssb);
if (sqlite3_prepare(db, query, -1, &statement, NULL) == SQLITE_OK)
{
if (sqlite3_bind_text(statement, 1, author, -1, NULL) == SQLITE_OK &&
sqlite3_bind_int64(statement, 2, sequence) == SQLITE_OK &&
@ -520,8 +530,9 @@ bool tf_ssb_db_get_message_by_author_and_sequence(tf_ssb_t* ssb, const char* aut
}
else
{
tf_printf("prepare failed: %s\n", sqlite3_errmsg(tf_ssb_get_db(ssb)));
tf_printf("%s: prepare failed: %s\n", __FUNCTION__, sqlite3_errmsg(db));
}
tf_ssb_release_db_reader(ssb, db);
return found;
}
@ -529,8 +540,9 @@ bool tf_ssb_db_get_latest_message_by_author(tf_ssb_t* ssb, const char* author, i
{
bool found = false;
sqlite3_stmt* statement;
sqlite3* db = tf_ssb_acquire_db_reader(ssb);
const char* query = "SELECT id, sequence FROM messages WHERE author = $1 AND sequence = (SELECT MAX(sequence) FROM messages WHERE author = $1)";
if (sqlite3_prepare(tf_ssb_get_db(ssb), query, -1, &statement, NULL) == SQLITE_OK)
if (sqlite3_prepare(db, query, -1, &statement, NULL) == SQLITE_OK)
{
if (sqlite3_bind_text(statement, 1, author, -1, NULL) == SQLITE_OK &&
sqlite3_step(statement) == SQLITE_ROW)
@ -549,8 +561,9 @@ bool tf_ssb_db_get_latest_message_by_author(tf_ssb_t* ssb, const char* author, i
}
else
{
tf_printf("prepare failed: %s\n", sqlite3_errmsg(tf_ssb_get_db(ssb)));
tf_printf("%s: prepare failed: %s\n", __FUNCTION__, sqlite3_errmsg(db));
}
tf_ssb_release_db_reader(ssb, db);
return found;
}
@ -686,7 +699,7 @@ static int _tf_ssb_sqlite_authorizer(void* user_data, int action_code, const cha
JSValue tf_ssb_db_visit_query(tf_ssb_t* ssb, const char* query, const JSValue binds, void (*callback)(JSValue row, void* user_data), void* user_data)
{
JSValue result = JS_UNDEFINED;
sqlite3* db = tf_ssb_get_db(ssb);
sqlite3* db = tf_ssb_acquire_db_reader(ssb);
JSContext* context = tf_ssb_get_context(ssb);
sqlite3_stmt* statement;
sqlite3_set_authorizer(db, _tf_ssb_sqlite_authorizer, ssb);
@ -721,6 +734,7 @@ JSValue tf_ssb_db_visit_query(tf_ssb_t* ssb, const char* query, const JSValue bi
result = JS_ThrowInternalError(context, "SQL Error %s: preparing \"%s\".", sqlite3_errmsg(db), query);
}
sqlite3_set_authorizer(db, NULL, NULL);
tf_ssb_release_db_reader(ssb, db);
return result;
}
@ -874,7 +888,7 @@ bool tf_ssb_db_check(sqlite3* db, const char* check_author)
int tf_ssb_db_identity_get_count_for_user(tf_ssb_t* ssb, const char* user)
{
int count = 0;
sqlite3* db = tf_ssb_get_db(ssb);
sqlite3* db = tf_ssb_acquire_db_reader(ssb);
sqlite3_stmt* statement = NULL;
if (sqlite3_prepare(db, "SELECT COUNT(*) FROM identities WHERE user = ?", -1, &statement, NULL) == SQLITE_OK)
{
@ -887,6 +901,7 @@ int tf_ssb_db_identity_get_count_for_user(tf_ssb_t* ssb, const char* user)
}
sqlite3_finalize(statement);
}
tf_ssb_release_db_reader(ssb, db);
return count;
}
@ -911,7 +926,7 @@ bool tf_ssb_db_identity_create(tf_ssb_t* ssb, const char* user, uint8_t* out_pub
bool tf_ssb_db_identity_add(tf_ssb_t* ssb, const char* user, const char* public_key, const char* private_key)
{
bool added = false;
sqlite3* db = tf_ssb_get_db(ssb);
sqlite3* db = tf_ssb_acquire_db_writer(ssb);
sqlite3_stmt* statement = NULL;
if (sqlite3_prepare(db, "INSERT INTO identities (user, public_key, private_key) VALUES (?, ?, ?) ON CONFLICT DO NOTHING", -1, &statement, NULL) == SQLITE_OK)
{
@ -929,12 +944,13 @@ bool tf_ssb_db_identity_add(tf_ssb_t* ssb, const char* user, const char* public_
}
sqlite3_finalize(statement);
}
tf_ssb_release_db_writer(ssb, db);
return added;
}
void tf_ssb_db_identity_visit(tf_ssb_t* ssb, const char* user, void (*callback)(const char* identity, void* user_data), void* user_data)
{
sqlite3* db = tf_ssb_get_db(ssb);
sqlite3* db = tf_ssb_acquire_db_reader(ssb);
sqlite3_stmt* statement = NULL;
if (sqlite3_prepare(db, "SELECT public_key FROM identities WHERE user = ? ORDER BY public_key", -1, &statement, NULL) == SQLITE_OK)
{
@ -947,11 +963,12 @@ void tf_ssb_db_identity_visit(tf_ssb_t* ssb, const char* user, void (*callback)(
}
sqlite3_finalize(statement);
}
tf_ssb_release_db_reader(ssb, db);
}
void tf_ssb_db_identity_visit_all(tf_ssb_t* ssb, void (*callback)(const char* identity, void* user_data), void* user_data)
{
sqlite3* db = tf_ssb_get_db(ssb);
sqlite3* db = tf_ssb_acquire_db_reader(ssb);
sqlite3_stmt* statement = NULL;
if (sqlite3_prepare(db, "SELECT public_key FROM identities ORDER BY public_key", -1, &statement, NULL) == SQLITE_OK)
{
@ -961,13 +978,14 @@ void tf_ssb_db_identity_visit_all(tf_ssb_t* ssb, void (*callback)(const char* id
}
sqlite3_finalize(statement);
}
tf_ssb_release_db_reader(ssb, db);
}
bool tf_ssb_db_identity_get_private_key(tf_ssb_t* ssb, const char* user, const char* public_key, uint8_t* out_private_key, size_t private_key_size)
{
bool success = false;
memset(out_private_key, 0, crypto_sign_SECRETKEYBYTES);
sqlite3* db = tf_ssb_get_db(ssb);
sqlite3* db = tf_ssb_acquire_db_reader(ssb);
sqlite3_stmt* statement = NULL;
if (sqlite3_prepare(db, "SELECT private_key FROM identities WHERE user = ? AND public_key = ?", -1, &statement, NULL) == SQLITE_OK)
{
@ -983,6 +1001,7 @@ bool tf_ssb_db_identity_get_private_key(tf_ssb_t* ssb, const char* user, const c
}
sqlite3_finalize(statement);
}
tf_ssb_release_db_reader(ssb, db);
return success;
}
@ -1050,7 +1069,7 @@ static following_t* _get_following(tf_ssb_t* ssb, const char* id, following_t***
if (depth < max_depth && !already_populated)
{
sqlite3* db = tf_ssb_get_db(ssb);
sqlite3* db = tf_ssb_acquire_db_reader(ssb);
sqlite3_stmt* statement = NULL;
if (sqlite3_prepare(db, "SELECT json_extract(content, '$.contact'), json_extract(content, '$.following'), json_extract(content, '$.blocking') FROM messages WHERE author = ? AND json_extract(content, '$.type') = 'contact' ORDER BY sequence", -1, &statement, NULL) == SQLITE_OK)
{
@ -1081,6 +1100,7 @@ static following_t* _get_following(tf_ssb_t* ssb, const char* id, following_t***
}
sqlite3_finalize(statement);
}
tf_ssb_release_db_reader(ssb, db);
}
return entry;
}
@ -1147,7 +1167,7 @@ JSValue tf_ssb_db_get_message_by_id( tf_ssb_t* ssb, const char* id, bool is_keys
{
JSValue result = JS_UNDEFINED;
JSContext* context = tf_ssb_get_context(ssb);
sqlite3* db = tf_ssb_get_db(ssb);
sqlite3* db = tf_ssb_acquire_db_reader(ssb);
sqlite3_stmt* statement;
if (sqlite3_prepare(db, "SELECT previous, author, id, sequence, timestamp, hash, content, signature, sequence_before_author FROM messages WHERE id = ?", -1, &statement, NULL) == SQLITE_OK)
{
@ -1182,12 +1202,13 @@ JSValue tf_ssb_db_get_message_by_id( tf_ssb_t* ssb, const char* id, bool is_keys
}
sqlite3_finalize(statement);
}
tf_ssb_release_db_reader(ssb, db);
return result;
}
tf_ssb_db_stored_connection_t* tf_ssb_db_get_stored_connections(tf_ssb_t* ssb, int* out_count)
{
sqlite3* db = tf_ssb_get_db(ssb);
sqlite3* db = tf_ssb_acquire_db_reader(ssb);
tf_ssb_db_stored_connection_t* result = NULL;
int count = 0;
@ -1207,6 +1228,7 @@ tf_ssb_db_stored_connection_t* tf_ssb_db_get_stored_connections(tf_ssb_t* ssb, i
}
sqlite3_finalize(statement);
}
tf_ssb_release_db_reader(ssb, db);
*out_count = count;
return result;
@ -1214,7 +1236,7 @@ tf_ssb_db_stored_connection_t* tf_ssb_db_get_stored_connections(tf_ssb_t* ssb, i
void tf_ssb_db_forget_stored_connection(tf_ssb_t* ssb, const char* address, int port, const char* pubkey)
{
sqlite3* db = tf_ssb_get_db(ssb);
sqlite3* db = tf_ssb_acquire_db_writer(ssb);
sqlite3_stmt* statement;
if (sqlite3_prepare(db, "DELETE FROM connections WHERE host = ? AND port = ? AND key = ?", -1, &statement, NULL) == SQLITE_OK)
{
@ -1227,4 +1249,5 @@ void tf_ssb_db_forget_stored_connection(tf_ssb_t* ssb, const char* address, int
}
sqlite3_finalize(statement);
}
tf_ssb_release_db_writer(ssb, db);
}