forked from cory/tildefriends
Make storing messages async. Phew.
git-svn-id: https://www.unprompted.com/svn/projects/tildefriends/trunk@4355 ed5197a5-7fde-0310-b194-c3ffbd925b24
This commit is contained in:
282
src/ssb.db.c
282
src/ssb.db.c
@ -17,13 +17,15 @@
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
|
||||
#include <unistd.h>
|
||||
|
||||
static void _tf_ssb_db_exec(sqlite3* db, const char* statement)
|
||||
{
|
||||
char* error = NULL;
|
||||
int result = sqlite3_exec(db, statement, NULL, NULL, &error);
|
||||
if (result != SQLITE_OK)
|
||||
{
|
||||
tf_printf("Error running '%s': %s.\n", statement, error);
|
||||
tf_printf("Error running '%s': %s.\n", statement, error ? error : sqlite3_errmsg(db));
|
||||
abort();
|
||||
}
|
||||
}
|
||||
@ -253,41 +255,25 @@ static bool _tf_ssb_db_previous_message_exists(sqlite3* db, const char* author,
|
||||
return exists;
|
||||
}
|
||||
|
||||
bool tf_ssb_db_store_message(tf_ssb_t* ssb, JSContext* context, const char* id, JSValue val, const char* signature, bool sequence_before_author)
|
||||
static int64_t _tf_ssb_db_store_message_raw(
|
||||
tf_ssb_t* ssb,
|
||||
const char* id,
|
||||
const char* previous,
|
||||
const char* author,
|
||||
int64_t sequence,
|
||||
double timestamp,
|
||||
const char* content,
|
||||
size_t content_len,
|
||||
const char* signature,
|
||||
bool sequence_before_author)
|
||||
{
|
||||
bool stored = false;
|
||||
|
||||
JSValue previousval = JS_GetPropertyStr(context, val, "previous");
|
||||
const char* previous = JS_IsNull(previousval) ? NULL : JS_ToCString(context, previousval);
|
||||
JS_FreeValue(context, previousval);
|
||||
|
||||
JSValue authorval = JS_GetPropertyStr(context, val, "author");
|
||||
const char* author = JS_ToCString(context, authorval);
|
||||
JS_FreeValue(context, authorval);
|
||||
|
||||
int64_t sequence = -1;
|
||||
JSValue sequenceval = JS_GetPropertyStr(context, val, "sequence");
|
||||
JS_ToInt64(context, &sequence, sequenceval);
|
||||
JS_FreeValue(context, sequenceval);
|
||||
|
||||
sqlite3* db = tf_ssb_acquire_db_writer(ssb);
|
||||
sqlite3_stmt* statement;
|
||||
int64_t last_row_id = -1;
|
||||
|
||||
if (_tf_ssb_db_previous_message_exists(db, author, sequence, previous))
|
||||
{
|
||||
double timestamp = -1.0;
|
||||
JSValue timestampval = JS_GetPropertyStr(context, val, "timestamp");
|
||||
JS_ToFloat64(context, ×tamp, timestampval);
|
||||
JS_FreeValue(context, timestampval);
|
||||
|
||||
JSValue contentval = JS_GetPropertyStr(context, val, "content");
|
||||
JSValue content = JS_JSONStringify(context, contentval, JS_NULL, JS_NULL);
|
||||
size_t content_len;
|
||||
const char* contentstr = JS_ToCStringLen(context, &content_len, content);
|
||||
JS_FreeValue(context, contentval);
|
||||
|
||||
const char* query = "INSERT INTO messages (id, previous, author, sequence, timestamp, content, hash, signature, sequence_before_author) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT DO NOTHING";
|
||||
sqlite3_stmt* statement;
|
||||
if (sqlite3_prepare(db, query, -1, &statement, NULL) == SQLITE_OK)
|
||||
{
|
||||
if (sqlite3_bind_text(statement, 1, id, -1, NULL) == SQLITE_OK &&
|
||||
@ -295,7 +281,7 @@ bool tf_ssb_db_store_message(tf_ssb_t* ssb, JSContext* context, const char* id,
|
||||
sqlite3_bind_text(statement, 3, author, -1, NULL) == SQLITE_OK &&
|
||||
sqlite3_bind_int64(statement, 4, sequence) == SQLITE_OK &&
|
||||
sqlite3_bind_double(statement, 5, timestamp) == SQLITE_OK &&
|
||||
sqlite3_bind_text(statement, 6, contentstr, content_len, NULL) == SQLITE_OK &&
|
||||
sqlite3_bind_text(statement, 6, content, content_len, NULL) == SQLITE_OK &&
|
||||
sqlite3_bind_text(statement, 7, "sha256", 6, NULL) == SQLITE_OK &&
|
||||
sqlite3_bind_text(statement, 8, signature, -1, NULL) == SQLITE_OK &&
|
||||
sqlite3_bind_int(statement, 9, sequence_before_author) == SQLITE_OK)
|
||||
@ -303,10 +289,10 @@ bool tf_ssb_db_store_message(tf_ssb_t* ssb, JSContext* context, const char* id,
|
||||
int r = sqlite3_step(statement);
|
||||
if (r != SQLITE_DONE)
|
||||
{
|
||||
tf_printf("%s\n", sqlite3_errmsg(db));
|
||||
tf_printf("_tf_ssb_db_store_message_raw: %s\n", sqlite3_errmsg(db));
|
||||
abort();
|
||||
}
|
||||
stored = r == SQLITE_DONE && sqlite3_changes(db) != 0;
|
||||
if (stored)
|
||||
if (r == SQLITE_DONE && sqlite3_changes(db) != 0)
|
||||
{
|
||||
last_row_id = sqlite3_last_insert_rowid(db);
|
||||
}
|
||||
@ -321,45 +307,211 @@ bool tf_ssb_db_store_message(tf_ssb_t* ssb, JSContext* context, const char* id,
|
||||
{
|
||||
tf_printf("%s: prepare failed: %s\n", __FUNCTION__, sqlite3_errmsg(db));
|
||||
}
|
||||
|
||||
JS_FreeCString(context, contentstr);
|
||||
JS_FreeValue(context, content);
|
||||
}
|
||||
else
|
||||
{
|
||||
tf_printf("Previous message doesn't exist.\n");
|
||||
}
|
||||
|
||||
if (last_row_id != -1)
|
||||
{
|
||||
const char* query = "SELECT DISTINCT json.value FROM messages, json_tree(messages.content) AS json LEFT OUTER JOIN blobs ON json.value = blobs.id WHERE messages.rowid = ?1 AND json.value LIKE '&%%.sha256' AND length(json.value) = ?2 AND blobs.content IS NULL";
|
||||
if (sqlite3_prepare(db, query, -1, &statement, NULL) == SQLITE_OK)
|
||||
{
|
||||
if (sqlite3_bind_int64(statement, 1, last_row_id) == SQLITE_OK &&
|
||||
sqlite3_bind_int(statement, 2, k_blob_id_len - 1) == SQLITE_OK)
|
||||
{
|
||||
int r = SQLITE_OK;
|
||||
while ((r = sqlite3_step(statement)) == SQLITE_ROW)
|
||||
{
|
||||
tf_ssb_notify_blob_want_added(ssb, (const char*)sqlite3_column_text(statement, 0));
|
||||
}
|
||||
if (r != SQLITE_DONE)
|
||||
{
|
||||
tf_printf("%s\n", sqlite3_errmsg(db));
|
||||
}
|
||||
}
|
||||
sqlite3_finalize(statement);
|
||||
}
|
||||
else
|
||||
{
|
||||
tf_printf("%s: prepare failed: %s\n", __FUNCTION__, sqlite3_errmsg(db));
|
||||
}
|
||||
tf_printf("%p: Previous message doesn't exist for author=%s sequence=%" PRId64 ".\n", db, author, sequence);
|
||||
}
|
||||
tf_ssb_release_db_writer(ssb, db);
|
||||
return last_row_id;
|
||||
}
|
||||
|
||||
static char* _tf_ssb_db_get_message_blob_wants(tf_ssb_t* ssb, int64_t rowid)
|
||||
{
|
||||
sqlite3* db = tf_ssb_acquire_db_reader(ssb);
|
||||
sqlite3_stmt* statement;
|
||||
char* result = NULL;
|
||||
size_t size = 0;
|
||||
|
||||
if (sqlite3_prepare(db, "SELECT DISTINCT json.value FROM messages, json_tree(messages.content) AS json LEFT OUTER JOIN blobs ON json.value = blobs.id WHERE messages.rowid = ?1 AND json.value LIKE '&%%.sha256' AND length(json.value) = ?2 AND blobs.content IS NULL", -1, &statement, NULL) == SQLITE_OK)
|
||||
{
|
||||
if (sqlite3_bind_int64(statement, 1, rowid) == SQLITE_OK &&
|
||||
sqlite3_bind_int(statement, 2, k_blob_id_len - 1) == SQLITE_OK)
|
||||
{
|
||||
int r = SQLITE_OK;
|
||||
while ((r = sqlite3_step(statement)) == SQLITE_ROW)
|
||||
{
|
||||
int id_size = sqlite3_column_bytes(statement, 0);
|
||||
const uint8_t* id = sqlite3_column_text(statement, 0);
|
||||
result = tf_realloc(result, size + id_size + 1);
|
||||
memcpy(result + size, id, id_size + 1);
|
||||
size += id_size + 1;
|
||||
}
|
||||
if (r != SQLITE_DONE)
|
||||
{
|
||||
tf_printf("%s\n", sqlite3_errmsg(db));
|
||||
}
|
||||
}
|
||||
sqlite3_finalize(statement);
|
||||
}
|
||||
else
|
||||
{
|
||||
tf_printf("%s: prepare failed: %s\n", __FUNCTION__, sqlite3_errmsg(db));
|
||||
}
|
||||
result = tf_realloc(result, size + 1);
|
||||
result[size] = '\0';
|
||||
|
||||
tf_ssb_release_db_reader(ssb, db);
|
||||
return result;
|
||||
}
|
||||
|
||||
typedef struct _message_store_t message_store_t;
|
||||
typedef struct _message_store_t
|
||||
{
|
||||
uv_work_t work;
|
||||
tf_ssb_t* ssb;
|
||||
char id[k_id_base64_len];
|
||||
char signature[512];
|
||||
bool sequence_before_author;
|
||||
char previous[k_id_base64_len];
|
||||
char author[k_id_base64_len];
|
||||
int64_t sequence;
|
||||
double timestamp;
|
||||
const char* content;
|
||||
size_t length;
|
||||
|
||||
bool out_stored;
|
||||
char* out_blob_wants;
|
||||
|
||||
tf_ssb_db_store_message_callback_t* callback;
|
||||
void* user_data;
|
||||
|
||||
message_store_t* next;
|
||||
} message_store_t;
|
||||
|
||||
static void _tf_ssb_db_store_message_work(uv_work_t* work)
|
||||
{
|
||||
message_store_t* store = work->data;
|
||||
int64_t last_row_id = _tf_ssb_db_store_message_raw(store->ssb, store->id, *store->previous ? store->previous : NULL, store->author, store->sequence, store->timestamp, store->content, store->length, store->signature, store->sequence_before_author);
|
||||
if (last_row_id != -1)
|
||||
{
|
||||
store->out_stored = true;
|
||||
store->out_blob_wants = _tf_ssb_db_get_message_blob_wants(store->ssb, last_row_id);
|
||||
}
|
||||
}
|
||||
|
||||
static void _tf_ssb_db_store_message_work_finish(message_store_t* store);
|
||||
static void _tf_ssb_db_store_message_after_work(uv_work_t* work, int status);
|
||||
|
||||
static void _wake_up_queue(tf_ssb_t* ssb, tf_ssb_store_queue_t* queue)
|
||||
{
|
||||
if (!queue->running)
|
||||
{
|
||||
message_store_t* next = queue->head;
|
||||
if (next)
|
||||
{
|
||||
queue->head = next->next;
|
||||
if (queue->tail == next)
|
||||
{
|
||||
queue->tail = NULL;
|
||||
}
|
||||
next->next = NULL;
|
||||
queue->running = true;
|
||||
int r = uv_queue_work(tf_ssb_get_loop(ssb), &next->work, _tf_ssb_db_store_message_work, _tf_ssb_db_store_message_after_work);
|
||||
if (r)
|
||||
{
|
||||
_tf_ssb_db_store_message_work_finish(next);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static void _tf_ssb_db_store_message_work_finish(message_store_t* store)
|
||||
{
|
||||
JSContext* context = tf_ssb_get_context(store->ssb);
|
||||
if (store->callback)
|
||||
{
|
||||
store->callback(store->id, store->out_stored, store->user_data);
|
||||
}
|
||||
JS_FreeCString(context, store->content);
|
||||
tf_ssb_store_queue_t* queue = tf_ssb_get_store_queue(store->ssb);
|
||||
queue->running = false;
|
||||
_wake_up_queue(store->ssb, queue);
|
||||
tf_free(store);
|
||||
}
|
||||
|
||||
static void _tf_ssb_db_store_message_after_work(uv_work_t* work, int status)
|
||||
{
|
||||
message_store_t* store = work->data;
|
||||
if (store->out_stored)
|
||||
{
|
||||
tf_ssb_notify_message_added(store->ssb, store->id);
|
||||
}
|
||||
if (store->out_blob_wants)
|
||||
{
|
||||
for (char* p = store->out_blob_wants; *p; p = p + strlen(p))
|
||||
{
|
||||
tf_ssb_notify_blob_want_added(store->ssb, p);
|
||||
}
|
||||
tf_free(store->out_blob_wants);
|
||||
}
|
||||
_tf_ssb_db_store_message_work_finish(store);
|
||||
}
|
||||
|
||||
void tf_ssb_db_store_message(tf_ssb_t* ssb, JSContext* context, const char* id, JSValue val, const char* signature, bool sequence_before_author, tf_ssb_db_store_message_callback_t* callback, void* user_data)
|
||||
{
|
||||
JSValue previousval = JS_GetPropertyStr(context, val, "previous");
|
||||
const char* previous = JS_IsNull(previousval) ? NULL : JS_ToCString(context, previousval);
|
||||
JS_FreeValue(context, previousval);
|
||||
|
||||
JSValue authorval = JS_GetPropertyStr(context, val, "author");
|
||||
const char* author = JS_ToCString(context, authorval);
|
||||
JS_FreeValue(context, authorval);
|
||||
|
||||
int64_t sequence = -1;
|
||||
JSValue sequenceval = JS_GetPropertyStr(context, val, "sequence");
|
||||
JS_ToInt64(context, &sequence, sequenceval);
|
||||
JS_FreeValue(context, sequenceval);
|
||||
|
||||
double timestamp = -1.0;
|
||||
JSValue timestampval = JS_GetPropertyStr(context, val, "timestamp");
|
||||
JS_ToFloat64(context, ×tamp, timestampval);
|
||||
JS_FreeValue(context, timestampval);
|
||||
|
||||
JSValue contentval = JS_GetPropertyStr(context, val, "content");
|
||||
JSValue content = JS_JSONStringify(context, contentval, JS_NULL, JS_NULL);
|
||||
size_t content_len;
|
||||
const char* contentstr = JS_ToCStringLen(context, &content_len, content);
|
||||
JS_FreeValue(context, content);
|
||||
JS_FreeValue(context, contentval);
|
||||
|
||||
message_store_t* store = tf_malloc(sizeof(message_store_t));
|
||||
*store = (message_store_t)
|
||||
{
|
||||
.work =
|
||||
{
|
||||
.data = store,
|
||||
},
|
||||
.ssb = ssb,
|
||||
.sequence = sequence,
|
||||
.timestamp = timestamp,
|
||||
.content = contentstr,
|
||||
.length = content_len,
|
||||
.sequence_before_author = sequence_before_author,
|
||||
|
||||
.callback = callback,
|
||||
.user_data = user_data,
|
||||
};
|
||||
snprintf(store->id, sizeof(store->id), "%s", id);
|
||||
snprintf(store->previous, sizeof(store->previous), "%s", previous ? previous : "");
|
||||
snprintf(store->author, sizeof(store->author), "%s", author);
|
||||
snprintf(store->signature, sizeof(store->signature), "%s", signature);
|
||||
JS_FreeCString(context, author);
|
||||
JS_FreeCString(context, previous);
|
||||
return stored;
|
||||
|
||||
tf_ssb_store_queue_t* queue = tf_ssb_get_store_queue(ssb);
|
||||
if (queue->tail)
|
||||
{
|
||||
message_store_t* tail = queue->tail;
|
||||
tail->next = store;
|
||||
queue->tail = store;
|
||||
}
|
||||
else
|
||||
{
|
||||
queue->head = store;
|
||||
queue->tail = store;
|
||||
}
|
||||
_wake_up_queue(ssb, queue);
|
||||
}
|
||||
|
||||
bool tf_ssb_db_message_content_get(tf_ssb_t* ssb, const char* id, uint8_t** out_blob, size_t* out_size)
|
||||
@ -812,7 +964,7 @@ JSValue tf_ssb_db_visit_query(tf_ssb_t* ssb, const char* query, const JSValue bi
|
||||
JSValue tf_ssb_format_message(JSContext* context, const char* previous, const char* author, int64_t sequence, double timestamp, const char* hash, const char* content, const char* signature, bool sequence_before_author)
|
||||
{
|
||||
JSValue value = JS_NewObject(context);
|
||||
JS_SetPropertyStr(context, value, "previous", previous ? JS_NewString(context, previous) : JS_NULL);
|
||||
JS_SetPropertyStr(context, value, "previous", (previous && *previous) ? JS_NewString(context, previous) : JS_NULL);
|
||||
if (sequence_before_author)
|
||||
{
|
||||
JS_SetPropertyStr(context, value, "sequence", JS_NewInt64(context, sequence));
|
||||
|
Reference in New Issue
Block a user