Make blob store actually not block the main thread.

git-svn-id: https://www.unprompted.com/svn/projects/tildefriends/trunk@4352 ed5197a5-7fde-0310-b194-c3ffbd925b24
This commit is contained in:
Cory McWilliams 2023-07-18 23:46:15 +00:00
parent 7fe8f66fd3
commit b0cd58f5aa
4 changed files with 123 additions and 44 deletions

View File

@ -12,6 +12,7 @@
#include "sodium/crypto_secretbox.h" #include "sodium/crypto_secretbox.h"
#include "sodium/crypto_sign.h" #include "sodium/crypto_sign.h"
#include "sqlite3.h" #include "sqlite3.h"
#include "uv.h"
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
@ -335,7 +336,7 @@ bool tf_ssb_db_store_message(tf_ssb_t* ssb, JSContext* context, const char* id,
if (sqlite3_prepare(db, query, -1, &statement, NULL) == SQLITE_OK) if (sqlite3_prepare(db, query, -1, &statement, NULL) == SQLITE_OK)
{ {
if (sqlite3_bind_int64(statement, 1, last_row_id) == SQLITE_OK && if (sqlite3_bind_int64(statement, 1, last_row_id) == SQLITE_OK &&
sqlite3_bind_int(statement, 2, k_blob_id_len - 1) == SQLITE_OK) sqlite3_bind_int(statement, 2, k_blob_id_len - 1) == SQLITE_OK)
{ {
int r = SQLITE_OK; int r = SQLITE_OK;
while ((r = sqlite3_step(statement)) == SQLITE_ROW) while ((r = sqlite3_step(statement)) == SQLITE_ROW)
@ -445,6 +446,65 @@ bool tf_ssb_db_blob_get(tf_ssb_t* ssb, const char* id, uint8_t** out_blob, size_
return result; return result;
} }
typedef struct _blob_store_work_t
{
uv_work_t work;
tf_ssb_t* ssb;
const uint8_t* blob;
size_t size;
char id[k_blob_id_len];
bool is_new;
tf_ssb_db_blob_store_callback_t* callback;
void* user_data;
} blob_store_work_t;
static void _tf_ssb_db_blob_store_work(uv_work_t* work)
{
blob_store_work_t* blob_work = work->data;
tf_ssb_db_blob_store(blob_work->ssb, blob_work->blob, blob_work->size, blob_work->id, sizeof(blob_work->id), &blob_work->is_new);
}
static void _tf_ssb_db_blob_store_after_work(uv_work_t* work, int status)
{
blob_store_work_t* blob_work = work->data;
if (status != 0)
{
tf_printf("tf_ssb_db_blob_store_async -> uv_queue_work failed asynchronously: %s\n", uv_strerror(status));
}
if (blob_work->callback)
{
blob_work->callback(status == 0 ? blob_work->id : NULL, blob_work->is_new, blob_work->user_data);
}
tf_free(blob_work);
}
void tf_ssb_db_blob_store_async(tf_ssb_t* ssb, const uint8_t* blob, size_t size, tf_ssb_db_blob_store_callback_t* callback, void* user_data)
{
blob_store_work_t* work = tf_malloc(sizeof(blob_store_work_t));
*work = (blob_store_work_t)
{
.work =
{
.data = work,
},
.ssb = ssb,
.blob = blob,
.size = size,
.callback = callback,
.user_data = user_data,
};
int r = uv_queue_work(tf_ssb_get_loop(ssb), &work->work, _tf_ssb_db_blob_store_work, _tf_ssb_db_blob_store_after_work);
if (r)
{
tf_printf("tf_ssb_db_blob_store_async -> uv_queue_work failed immediately: %s\n", uv_strerror(r));
if (callback)
{
callback(NULL, false, user_data);
}
tf_free(work);
}
}
bool tf_ssb_db_blob_store(tf_ssb_t* ssb, const uint8_t* blob, size_t size, char* out_id, size_t out_id_size, bool* out_new) bool tf_ssb_db_blob_store(tf_ssb_t* ssb, const uint8_t* blob, size_t size, char* out_id, size_t out_id_size, bool* out_new)
{ {
bool result = false; bool result = false;

View File

@ -14,6 +14,9 @@ bool tf_ssb_db_store_message(tf_ssb_t* ssb, JSContext* context, const char* id,
bool tf_ssb_db_message_content_get(tf_ssb_t* ssb, const char* id, uint8_t** out_blob, size_t* out_size); bool tf_ssb_db_message_content_get(tf_ssb_t* ssb, const char* id, uint8_t** out_blob, size_t* out_size);
bool tf_ssb_db_blob_has(tf_ssb_t* ssb, const char* id); bool tf_ssb_db_blob_has(tf_ssb_t* ssb, const char* id);
bool tf_ssb_db_blob_get(tf_ssb_t* ssb, const char* id, uint8_t** out_blob, size_t* out_size); bool tf_ssb_db_blob_get(tf_ssb_t* ssb, const char* id, uint8_t** out_blob, size_t* out_size);
typedef void (tf_ssb_db_blob_store_callback_t)(const char* id, bool is_new, void* user_data);
void tf_ssb_db_blob_store_async(tf_ssb_t* ssb, const uint8_t* blob, size_t size, tf_ssb_db_blob_store_callback_t* callback, void* user_data);
bool tf_ssb_db_blob_store(tf_ssb_t* ssb, const uint8_t* blob, size_t size, char* out_id, size_t out_id_size, bool* out_new); bool tf_ssb_db_blob_store(tf_ssb_t* ssb, const uint8_t* blob, size_t size, char* out_id, size_t out_id_size, bool* out_new);
JSValue tf_ssb_db_get_message_by_id( tf_ssb_t* ssb, const char* id, bool is_keys); JSValue tf_ssb_db_get_message_by_id( tf_ssb_t* ssb, const char* id, bool is_keys);

View File

@ -174,30 +174,54 @@ static JSValue _tf_ssb_blobGet(JSContext* context, JSValueConst this_val, int ar
return result; return result;
} }
typedef struct _blob_store_t
{
JSContext* context;
JSValue promise[2];
uint8_t* buffer;
} blob_store_t;
void _tf_ssb_blob_store_complete(blob_store_t* store, const char* id)
{
JSValue result = JS_Call(store->context, id ? store->promise[0] : store->promise[1], JS_UNDEFINED, 0, NULL);
tf_util_report_error(store->context, result);
JS_FreeValue(store->context, result);
JS_FreeValue(store->context, store->promise[0]);
JS_FreeValue(store->context, store->promise[1]);
tf_free(store->buffer);
tf_free(store);
}
void _tf_ssb_blob_store_callback(const char* id, bool is_new, void* user_data)
{
blob_store_t* store = user_data;
_tf_ssb_blob_store_complete(store, id);
}
static JSValue _tf_ssb_blobStore(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv) static JSValue _tf_ssb_blobStore(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv)
{ {
JSValue result = JS_NULL; blob_store_t* store = tf_malloc(sizeof(blob_store_t));
*store = (blob_store_t) { 0 };
JSValue result = JS_NewPromiseCapability(context, store->promise);
tf_ssb_t* ssb = JS_GetOpaque(this_val, _tf_ssb_classId); tf_ssb_t* ssb = JS_GetOpaque(this_val, _tf_ssb_classId);
if (ssb) if (ssb)
{ {
uint8_t* blob = NULL; uint8_t* blob = NULL;
size_t size = 0; size_t size = 0;
char id[512];
if (JS_IsString(argv[0])) if (JS_IsString(argv[0]))
{ {
const char* text = JS_ToCStringLen(context, &size, argv[0]); const char* text = JS_ToCStringLen(context, &size, argv[0]);
if (tf_ssb_db_blob_store(ssb, (const uint8_t*)text, size, id, sizeof(id), NULL)) store->buffer = tf_malloc(size);
{ memcpy(store->buffer, text, size);
result = JS_NewString(context, id); tf_ssb_db_blob_store_async(ssb, store->buffer, size, _tf_ssb_blob_store_callback, store);
}
JS_FreeCString(context, text); JS_FreeCString(context, text);
} }
else if ((blob = tf_util_try_get_array_buffer(context, &size, argv[0])) != 0) else if ((blob = tf_util_try_get_array_buffer(context, &size, argv[0])) != 0)
{ {
if (tf_ssb_db_blob_store(ssb, blob, size, id, sizeof(id), NULL)) store->buffer = tf_malloc(size);
{ memcpy(store->buffer, blob, size);
result = JS_NewString(context, id); tf_ssb_db_blob_store_async(ssb, store->buffer, size, _tf_ssb_blob_store_callback, store);
}
} }
else else
{ {
@ -209,11 +233,18 @@ static JSValue _tf_ssb_blobStore(JSContext* context, JSValueConst this_val, int
blob = tf_util_try_get_array_buffer(context, &size, buffer); blob = tf_util_try_get_array_buffer(context, &size, buffer);
if (blob) if (blob)
{ {
if (tf_ssb_db_blob_store(ssb, blob, size, id, sizeof(id), NULL)) store->buffer = tf_malloc(size);
{ memcpy(store->buffer, blob, size);
result = JS_NewString(context, id); tf_ssb_db_blob_store_async(ssb, store->buffer, size, _tf_ssb_blob_store_callback, store);
}
} }
else
{
_tf_ssb_blob_store_complete(store, NULL);
}
}
else
{
_tf_ssb_blob_store_complete(store, NULL);
} }
JS_FreeValue(context, buffer); JS_FreeValue(context, buffer);
} }
@ -1270,17 +1301,17 @@ void tf_ssb_register(JSContext* context, tf_ssb_t* ssb)
/* Requires an identity. */ /* Requires an identity. */
JS_SetPropertyStr(context, object, "createIdentity", JS_NewCFunction(context, _tf_ssb_createIdentity, "createIdentity", 1)); JS_SetPropertyStr(context, object, "createIdentity", JS_NewCFunction(context, _tf_ssb_createIdentity, "createIdentity", 1));
JS_SetPropertyStr(context, object, "getIdentities", JS_NewCFunction(context, _tf_ssb_getIdentities, "getIdentities", 1)); JS_SetPropertyStr(context, object, "getIdentities", JS_NewCFunction(context, _tf_ssb_getIdentities, "getIdentities", 1));
JS_SetPropertyStr(context, object, "appendMessageWithIdentity", JS_NewCFunction(context, _tf_ssb_appendMessageWithIdentity, "appendMessageWithIdentity", 3));
JS_SetPropertyStr(context, object, "hmacsha256sign", JS_NewCFunction(context, _tf_ssb_hmacsha256_sign, "hmacsha256sign", 3)); JS_SetPropertyStr(context, object, "hmacsha256sign", JS_NewCFunction(context, _tf_ssb_hmacsha256_sign, "hmacsha256sign", 3));
JS_SetPropertyStr(context, object, "hmacsha256verify", JS_NewCFunction(context, _tf_ssb_hmacsha256_verify, "hmacsha256verify", 3)); JS_SetPropertyStr(context, object, "hmacsha256verify", JS_NewCFunction(context, _tf_ssb_hmacsha256_verify, "hmacsha256verify", 3));
JS_SetPropertyStr(context, object, "privateMessageEncrypt", JS_NewCFunction(context, _tf_ssb_private_message_encrypt, "privateMessageEncrypt", 4)); JS_SetPropertyStr(context, object, "privateMessageEncrypt", JS_NewCFunction(context, _tf_ssb_private_message_encrypt, "privateMessageEncrypt", 4));
JS_SetPropertyStr(context, object, "privateMessageDecrypt", JS_NewCFunction(context, _tf_ssb_private_message_decrypt, "privateMessageDecrypt", 3)); JS_SetPropertyStr(context, object, "privateMessageDecrypt", JS_NewCFunction(context, _tf_ssb_private_message_decrypt, "privateMessageDecrypt", 3));
/* Write. */
JS_SetPropertyStr(context, object, "appendMessageWithIdentity", JS_NewCFunction(context, _tf_ssb_appendMessageWithIdentity, "appendMessageWithIdentity", 3));
/* Does not require an identity. */ /* Does not require an identity. */
JS_SetPropertyStr(context, object, "getAllIdentities", JS_NewCFunction(context, _tf_ssb_getAllIdentities, "getAllIdentities", 0)); JS_SetPropertyStr(context, object, "getAllIdentities", JS_NewCFunction(context, _tf_ssb_getAllIdentities, "getAllIdentities", 0));
JS_SetPropertyStr(context, object, "getMessage", JS_NewCFunction(context, _tf_ssb_getMessage, "getMessage", 2)); JS_SetPropertyStr(context, object, "getMessage", JS_NewCFunction(context, _tf_ssb_getMessage, "getMessage", 2));
JS_SetPropertyStr(context, object, "blobGet", JS_NewCFunction(context, _tf_ssb_blobGet, "blobGet", 1)); JS_SetPropertyStr(context, object, "blobGet", JS_NewCFunction(context, _tf_ssb_blobGet, "blobGet", 1));
JS_SetPropertyStr(context, object, "blobStore", JS_NewCFunction(context, _tf_ssb_blobStore, "blobStore", 2));
JS_SetPropertyStr(context, object, "messageContentGet", JS_NewCFunction(context, _tf_ssb_messageContentGet, "messageContentGet", 1)); JS_SetPropertyStr(context, object, "messageContentGet", JS_NewCFunction(context, _tf_ssb_messageContentGet, "messageContentGet", 1));
JS_SetPropertyStr(context, object, "connections", JS_NewCFunction(context, _tf_ssb_connections, "connections", 0)); JS_SetPropertyStr(context, object, "connections", JS_NewCFunction(context, _tf_ssb_connections, "connections", 0));
JS_SetPropertyStr(context, object, "storedConnections", JS_NewCFunction(context, _tf_ssb_storedConnections, "storedConnections", 0)); JS_SetPropertyStr(context, object, "storedConnections", JS_NewCFunction(context, _tf_ssb_storedConnections, "storedConnections", 0));
@ -1288,10 +1319,12 @@ void tf_ssb_register(JSContext* context, tf_ssb_t* ssb)
JS_SetPropertyStr(context, object, "closeConnection", JS_NewCFunction(context, _tf_ssb_closeConnection, "closeConnection", 1)); JS_SetPropertyStr(context, object, "closeConnection", JS_NewCFunction(context, _tf_ssb_closeConnection, "closeConnection", 1));
JS_SetPropertyStr(context, object, "forgetStoredConnection", JS_NewCFunction(context, _tf_ssb_forgetStoredConnection, "forgetStoredConnection", 1)); JS_SetPropertyStr(context, object, "forgetStoredConnection", JS_NewCFunction(context, _tf_ssb_forgetStoredConnection, "forgetStoredConnection", 1));
JS_SetPropertyStr(context, object, "sqlAsync", JS_NewCFunction(context, _tf_ssb_sqlAsync, "sqlAsync", 3)); JS_SetPropertyStr(context, object, "sqlAsync", JS_NewCFunction(context, _tf_ssb_sqlAsync, "sqlAsync", 3));
JS_SetPropertyStr(context, object, "storeMessage", JS_NewCFunction(context, _tf_ssb_storeMessage, "storeMessage", 1));
JS_SetPropertyStr(context, object, "getBroadcasts", JS_NewCFunction(context, _tf_ssb_getBroadcasts, "getBroadcasts", 0)); JS_SetPropertyStr(context, object, "getBroadcasts", JS_NewCFunction(context, _tf_ssb_getBroadcasts, "getBroadcasts", 0));
JS_SetPropertyStr(context, object, "connect", JS_NewCFunction(context, _tf_ssb_connect, "connect", 1)); JS_SetPropertyStr(context, object, "connect", JS_NewCFunction(context, _tf_ssb_connect, "connect", 1));
JS_SetPropertyStr(context, object, "createTunnel", JS_NewCFunction(context, _tf_ssb_createTunnel, "createTunnel", 3)); JS_SetPropertyStr(context, object, "createTunnel", JS_NewCFunction(context, _tf_ssb_createTunnel, "createTunnel", 3));
/* Write. */
JS_SetPropertyStr(context, object, "storeMessage", JS_NewCFunction(context, _tf_ssb_storeMessage, "storeMessage", 1));
JS_SetPropertyStr(context, object, "blobStore", JS_NewCFunction(context, _tf_ssb_blobStore, "blobStore", 2));
/* Should be trusted only. */ /* Should be trusted only. */
JS_SetPropertyStr(context, object, "addEventListener", JS_NewCFunction(context, _tf_ssb_add_event_listener, "addEventListener", 2)); JS_SetPropertyStr(context, object, "addEventListener", JS_NewCFunction(context, _tf_ssb_add_event_listener, "addEventListener", 2));

View File

@ -423,24 +423,15 @@ typedef struct _blobs_get_t
size_t received; size_t received;
size_t expected_size; size_t expected_size;
bool done; bool done;
bool storing;
tf_ssb_t* ssb; tf_ssb_t* ssb;
uv_work_t work;
uint8_t buffer[]; uint8_t buffer[];
} blobs_get_t; } blobs_get_t;
static void _tf_ssb_rpc_blob_store_work(uv_work_t* work) static void _tf_ssb_rpc_blob_store_callback(const char* id, bool is_new, void* user_data)
{ {
blobs_get_t* get = work->data; blobs_get_t* get = user_data;
tf_ssb_db_blob_store(get->ssb, get->buffer, get->received, NULL, 0, NULL); get->storing = false;
}
static void _tf_ssb_rpc_blob_store_after_work(uv_work_t* work, int status)
{
blobs_get_t* get = work->data;
if (status != 0)
{
tf_printf("uv_queue_work failed: %s\n", uv_strerror(status));
}
if (get->done) if (get->done)
{ {
tf_free(get); tf_free(get);
@ -459,21 +450,13 @@ static void _tf_ssb_rpc_connection_blobs_get_callback(tf_ssb_connection_t* conne
} }
else if ((flags & k_ssb_rpc_mask_type) == k_ssb_rpc_flag_json) else if ((flags & k_ssb_rpc_mask_type) == k_ssb_rpc_flag_json)
{ {
bool stored = false;
if (JS_ToBool(context, args)) if (JS_ToBool(context, args))
{ {
get->work.data = get; get->storing = true;
int r = uv_queue_work(tf_ssb_get_loop(ssb), &get->work, _tf_ssb_rpc_blob_store_work, _tf_ssb_rpc_blob_store_after_work); tf_ssb_db_blob_store_async(ssb, get->buffer, get->received, _tf_ssb_rpc_blob_store_callback, get);
if (r)
{
tf_printf("uv_queue_work failed: %s\n", uv_strerror(r));
get->work.data = NULL;
}
else
{
stored = true;
}
} }
/* TODO: Should we send the response in the callback? */
bool stored = true;
tf_ssb_connection_rpc_send( tf_ssb_connection_rpc_send(
connection, connection,
k_ssb_rpc_flag_json | k_ssb_rpc_flag_stream | k_ssb_rpc_flag_end_error, k_ssb_rpc_flag_json | k_ssb_rpc_flag_stream | k_ssb_rpc_flag_end_error,
@ -490,7 +473,7 @@ static void _tf_ssb_rpc_connection_blobs_get_cleanup(tf_ssb_t* ssb, void* user_d
{ {
blobs_get_t* get = user_data; blobs_get_t* get = user_data;
get->done = true; get->done = true;
if (!get->work.data) if (!get->storing)
{ {
tf_free(get); tf_free(get);
} }