diff --git a/src/ssb.db.c b/src/ssb.db.c index 31202b11..f51e9db4 100644 --- a/src/ssb.db.c +++ b/src/ssb.db.c @@ -12,6 +12,7 @@ #include "sodium/crypto_secretbox.h" #include "sodium/crypto_sign.h" #include "sqlite3.h" +#include "uv.h" #include #include @@ -335,7 +336,7 @@ bool tf_ssb_db_store_message(tf_ssb_t* ssb, JSContext* context, const char* id, if (sqlite3_prepare(db, query, -1, &statement, NULL) == SQLITE_OK) { if (sqlite3_bind_int64(statement, 1, last_row_id) == SQLITE_OK && - sqlite3_bind_int(statement, 2, k_blob_id_len - 1) == SQLITE_OK) + sqlite3_bind_int(statement, 2, k_blob_id_len - 1) == SQLITE_OK) { int r = SQLITE_OK; while ((r = sqlite3_step(statement)) == SQLITE_ROW) @@ -445,6 +446,65 @@ bool tf_ssb_db_blob_get(tf_ssb_t* ssb, const char* id, uint8_t** out_blob, size_ return result; } +typedef struct _blob_store_work_t +{ + uv_work_t work; + tf_ssb_t* ssb; + const uint8_t* blob; + size_t size; + char id[k_blob_id_len]; + bool is_new; + tf_ssb_db_blob_store_callback_t* callback; + void* user_data; +} blob_store_work_t; + +static void _tf_ssb_db_blob_store_work(uv_work_t* work) +{ + blob_store_work_t* blob_work = work->data; + tf_ssb_db_blob_store(blob_work->ssb, blob_work->blob, blob_work->size, blob_work->id, sizeof(blob_work->id), &blob_work->is_new); +} + +static void _tf_ssb_db_blob_store_after_work(uv_work_t* work, int status) +{ + blob_store_work_t* blob_work = work->data; + if (status != 0) + { + tf_printf("tf_ssb_db_blob_store_async -> uv_queue_work failed asynchronously: %s\n", uv_strerror(status)); + } + if (blob_work->callback) + { + blob_work->callback(status == 0 ? blob_work->id : NULL, blob_work->is_new, blob_work->user_data); + } + tf_free(blob_work); +} + +void tf_ssb_db_blob_store_async(tf_ssb_t* ssb, const uint8_t* blob, size_t size, tf_ssb_db_blob_store_callback_t* callback, void* user_data) +{ + blob_store_work_t* work = tf_malloc(sizeof(blob_store_work_t)); + *work = (blob_store_work_t) + { + .work = + { + .data = work, + }, + .ssb = ssb, + .blob = blob, + .size = size, + .callback = callback, + .user_data = user_data, + }; + int r = uv_queue_work(tf_ssb_get_loop(ssb), &work->work, _tf_ssb_db_blob_store_work, _tf_ssb_db_blob_store_after_work); + if (r) + { + tf_printf("tf_ssb_db_blob_store_async -> uv_queue_work failed immediately: %s\n", uv_strerror(r)); + if (callback) + { + callback(NULL, false, user_data); + } + tf_free(work); + } +} + bool tf_ssb_db_blob_store(tf_ssb_t* ssb, const uint8_t* blob, size_t size, char* out_id, size_t out_id_size, bool* out_new) { bool result = false; diff --git a/src/ssb.db.h b/src/ssb.db.h index b16f75aa..a7d3f99f 100644 --- a/src/ssb.db.h +++ b/src/ssb.db.h @@ -14,6 +14,9 @@ bool tf_ssb_db_store_message(tf_ssb_t* ssb, JSContext* context, const char* id, bool tf_ssb_db_message_content_get(tf_ssb_t* ssb, const char* id, uint8_t** out_blob, size_t* out_size); bool tf_ssb_db_blob_has(tf_ssb_t* ssb, const char* id); bool tf_ssb_db_blob_get(tf_ssb_t* ssb, const char* id, uint8_t** out_blob, size_t* out_size); + +typedef void (tf_ssb_db_blob_store_callback_t)(const char* id, bool is_new, void* user_data); +void tf_ssb_db_blob_store_async(tf_ssb_t* ssb, const uint8_t* blob, size_t size, tf_ssb_db_blob_store_callback_t* callback, void* user_data); bool tf_ssb_db_blob_store(tf_ssb_t* ssb, const uint8_t* blob, size_t size, char* out_id, size_t out_id_size, bool* out_new); JSValue tf_ssb_db_get_message_by_id( tf_ssb_t* ssb, const char* id, bool is_keys); diff --git a/src/ssb.js.c b/src/ssb.js.c index b91d8d08..880fc4c2 100644 --- a/src/ssb.js.c +++ b/src/ssb.js.c @@ -174,30 +174,54 @@ static JSValue _tf_ssb_blobGet(JSContext* context, JSValueConst this_val, int ar return result; } +typedef struct _blob_store_t +{ + JSContext* context; + JSValue promise[2]; + uint8_t* buffer; +} blob_store_t; + +void _tf_ssb_blob_store_complete(blob_store_t* store, const char* id) +{ + JSValue result = JS_Call(store->context, id ? store->promise[0] : store->promise[1], JS_UNDEFINED, 0, NULL); + tf_util_report_error(store->context, result); + JS_FreeValue(store->context, result); + JS_FreeValue(store->context, store->promise[0]); + JS_FreeValue(store->context, store->promise[1]); + tf_free(store->buffer); + tf_free(store); +} + +void _tf_ssb_blob_store_callback(const char* id, bool is_new, void* user_data) +{ + blob_store_t* store = user_data; + _tf_ssb_blob_store_complete(store, id); +} + static JSValue _tf_ssb_blobStore(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv) { - JSValue result = JS_NULL; + blob_store_t* store = tf_malloc(sizeof(blob_store_t)); + *store = (blob_store_t) { 0 }; + JSValue result = JS_NewPromiseCapability(context, store->promise); + tf_ssb_t* ssb = JS_GetOpaque(this_val, _tf_ssb_classId); if (ssb) { uint8_t* blob = NULL; size_t size = 0; - char id[512]; if (JS_IsString(argv[0])) { const char* text = JS_ToCStringLen(context, &size, argv[0]); - if (tf_ssb_db_blob_store(ssb, (const uint8_t*)text, size, id, sizeof(id), NULL)) - { - result = JS_NewString(context, id); - } + store->buffer = tf_malloc(size); + memcpy(store->buffer, text, size); + tf_ssb_db_blob_store_async(ssb, store->buffer, size, _tf_ssb_blob_store_callback, store); JS_FreeCString(context, text); } else if ((blob = tf_util_try_get_array_buffer(context, &size, argv[0])) != 0) { - if (tf_ssb_db_blob_store(ssb, blob, size, id, sizeof(id), NULL)) - { - result = JS_NewString(context, id); - } + store->buffer = tf_malloc(size); + memcpy(store->buffer, blob, size); + tf_ssb_db_blob_store_async(ssb, store->buffer, size, _tf_ssb_blob_store_callback, store); } else { @@ -209,11 +233,18 @@ static JSValue _tf_ssb_blobStore(JSContext* context, JSValueConst this_val, int blob = tf_util_try_get_array_buffer(context, &size, buffer); if (blob) { - if (tf_ssb_db_blob_store(ssb, blob, size, id, sizeof(id), NULL)) - { - result = JS_NewString(context, id); - } + store->buffer = tf_malloc(size); + memcpy(store->buffer, blob, size); + tf_ssb_db_blob_store_async(ssb, store->buffer, size, _tf_ssb_blob_store_callback, store); } + else + { + _tf_ssb_blob_store_complete(store, NULL); + } + } + else + { + _tf_ssb_blob_store_complete(store, NULL); } JS_FreeValue(context, buffer); } @@ -1270,17 +1301,17 @@ void tf_ssb_register(JSContext* context, tf_ssb_t* ssb) /* Requires an identity. */ JS_SetPropertyStr(context, object, "createIdentity", JS_NewCFunction(context, _tf_ssb_createIdentity, "createIdentity", 1)); JS_SetPropertyStr(context, object, "getIdentities", JS_NewCFunction(context, _tf_ssb_getIdentities, "getIdentities", 1)); - JS_SetPropertyStr(context, object, "appendMessageWithIdentity", JS_NewCFunction(context, _tf_ssb_appendMessageWithIdentity, "appendMessageWithIdentity", 3)); JS_SetPropertyStr(context, object, "hmacsha256sign", JS_NewCFunction(context, _tf_ssb_hmacsha256_sign, "hmacsha256sign", 3)); JS_SetPropertyStr(context, object, "hmacsha256verify", JS_NewCFunction(context, _tf_ssb_hmacsha256_verify, "hmacsha256verify", 3)); JS_SetPropertyStr(context, object, "privateMessageEncrypt", JS_NewCFunction(context, _tf_ssb_private_message_encrypt, "privateMessageEncrypt", 4)); JS_SetPropertyStr(context, object, "privateMessageDecrypt", JS_NewCFunction(context, _tf_ssb_private_message_decrypt, "privateMessageDecrypt", 3)); + /* Write. */ + JS_SetPropertyStr(context, object, "appendMessageWithIdentity", JS_NewCFunction(context, _tf_ssb_appendMessageWithIdentity, "appendMessageWithIdentity", 3)); /* Does not require an identity. */ JS_SetPropertyStr(context, object, "getAllIdentities", JS_NewCFunction(context, _tf_ssb_getAllIdentities, "getAllIdentities", 0)); JS_SetPropertyStr(context, object, "getMessage", JS_NewCFunction(context, _tf_ssb_getMessage, "getMessage", 2)); JS_SetPropertyStr(context, object, "blobGet", JS_NewCFunction(context, _tf_ssb_blobGet, "blobGet", 1)); - JS_SetPropertyStr(context, object, "blobStore", JS_NewCFunction(context, _tf_ssb_blobStore, "blobStore", 2)); JS_SetPropertyStr(context, object, "messageContentGet", JS_NewCFunction(context, _tf_ssb_messageContentGet, "messageContentGet", 1)); JS_SetPropertyStr(context, object, "connections", JS_NewCFunction(context, _tf_ssb_connections, "connections", 0)); JS_SetPropertyStr(context, object, "storedConnections", JS_NewCFunction(context, _tf_ssb_storedConnections, "storedConnections", 0)); @@ -1288,10 +1319,12 @@ void tf_ssb_register(JSContext* context, tf_ssb_t* ssb) JS_SetPropertyStr(context, object, "closeConnection", JS_NewCFunction(context, _tf_ssb_closeConnection, "closeConnection", 1)); JS_SetPropertyStr(context, object, "forgetStoredConnection", JS_NewCFunction(context, _tf_ssb_forgetStoredConnection, "forgetStoredConnection", 1)); JS_SetPropertyStr(context, object, "sqlAsync", JS_NewCFunction(context, _tf_ssb_sqlAsync, "sqlAsync", 3)); - JS_SetPropertyStr(context, object, "storeMessage", JS_NewCFunction(context, _tf_ssb_storeMessage, "storeMessage", 1)); JS_SetPropertyStr(context, object, "getBroadcasts", JS_NewCFunction(context, _tf_ssb_getBroadcasts, "getBroadcasts", 0)); JS_SetPropertyStr(context, object, "connect", JS_NewCFunction(context, _tf_ssb_connect, "connect", 1)); JS_SetPropertyStr(context, object, "createTunnel", JS_NewCFunction(context, _tf_ssb_createTunnel, "createTunnel", 3)); + /* Write. */ + JS_SetPropertyStr(context, object, "storeMessage", JS_NewCFunction(context, _tf_ssb_storeMessage, "storeMessage", 1)); + JS_SetPropertyStr(context, object, "blobStore", JS_NewCFunction(context, _tf_ssb_blobStore, "blobStore", 2)); /* Should be trusted only. */ JS_SetPropertyStr(context, object, "addEventListener", JS_NewCFunction(context, _tf_ssb_add_event_listener, "addEventListener", 2)); diff --git a/src/ssb.rpc.c b/src/ssb.rpc.c index 65f42009..db27b824 100644 --- a/src/ssb.rpc.c +++ b/src/ssb.rpc.c @@ -423,24 +423,15 @@ typedef struct _blobs_get_t size_t received; size_t expected_size; bool done; + bool storing; tf_ssb_t* ssb; - uv_work_t work; uint8_t buffer[]; } blobs_get_t; -static void _tf_ssb_rpc_blob_store_work(uv_work_t* work) +static void _tf_ssb_rpc_blob_store_callback(const char* id, bool is_new, void* user_data) { - blobs_get_t* get = work->data; - tf_ssb_db_blob_store(get->ssb, get->buffer, get->received, NULL, 0, NULL); -} - -static void _tf_ssb_rpc_blob_store_after_work(uv_work_t* work, int status) -{ - blobs_get_t* get = work->data; - if (status != 0) - { - tf_printf("uv_queue_work failed: %s\n", uv_strerror(status)); - } + blobs_get_t* get = user_data; + get->storing = false; if (get->done) { tf_free(get); @@ -459,21 +450,13 @@ static void _tf_ssb_rpc_connection_blobs_get_callback(tf_ssb_connection_t* conne } else if ((flags & k_ssb_rpc_mask_type) == k_ssb_rpc_flag_json) { - bool stored = false; if (JS_ToBool(context, args)) { - get->work.data = get; - int r = uv_queue_work(tf_ssb_get_loop(ssb), &get->work, _tf_ssb_rpc_blob_store_work, _tf_ssb_rpc_blob_store_after_work); - if (r) - { - tf_printf("uv_queue_work failed: %s\n", uv_strerror(r)); - get->work.data = NULL; - } - else - { - stored = true; - } + get->storing = true; + tf_ssb_db_blob_store_async(ssb, get->buffer, get->received, _tf_ssb_rpc_blob_store_callback, get); } + /* TODO: Should we send the response in the callback? */ + bool stored = true; tf_ssb_connection_rpc_send( connection, k_ssb_rpc_flag_json | k_ssb_rpc_flag_stream | k_ssb_rpc_flag_end_error, @@ -490,7 +473,7 @@ static void _tf_ssb_rpc_connection_blobs_get_cleanup(tf_ssb_t* ssb, void* user_d { blobs_get_t* get = user_data; get->done = true; - if (!get->work.data) + if (!get->storing) { tf_free(get); }