diff --git a/src/ssb.c b/src/ssb.c index 27a85fbc..e3cb5401 100644 --- a/src/ssb.c +++ b/src/ssb.c @@ -3236,6 +3236,10 @@ void tf_ssb_notify_message_added(tf_ssb_t* ssb, const char* id) for (tf_ssb_connection_t* connection = ssb->connections; connection; connection = connection->next) { + if (!connection->message_requests_count) + { + continue; + } tf_ssb_connection_message_request_t* message_request = bsearch( author_string, diff --git a/src/ssb.rpc.c b/src/ssb.rpc.c index 11884682..65f42009 100644 --- a/src/ssb.rpc.c +++ b/src/ssb.rpc.c @@ -1,11 +1,13 @@ #include "ssb.rpc.h" +#include "log.h" #include "mem.h" #include "ssb.h" #include "ssb.db.h" #include "util.js.h" #include "sqlite3.h" +#include "uv.h" #include #include @@ -420,9 +422,31 @@ typedef struct _blobs_get_t char id[k_blob_id_len]; size_t received; size_t expected_size; - char buffer[]; + bool done; + tf_ssb_t* ssb; + uv_work_t work; + uint8_t buffer[]; } blobs_get_t; +static void _tf_ssb_rpc_blob_store_work(uv_work_t* work) +{ + blobs_get_t* get = work->data; + tf_ssb_db_blob_store(get->ssb, get->buffer, get->received, NULL, 0, NULL); +} + +static void _tf_ssb_rpc_blob_store_after_work(uv_work_t* work, int status) +{ + blobs_get_t* get = work->data; + if (status != 0) + { + tf_printf("uv_queue_work failed: %s\n", uv_strerror(status)); + } + if (get->done) + { + tf_free(get); + } +} + static void _tf_ssb_rpc_connection_blobs_get_callback(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) { tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection); @@ -438,8 +462,17 @@ static void _tf_ssb_rpc_connection_blobs_get_callback(tf_ssb_connection_t* conne bool stored = false; if (JS_ToBool(context, args)) { - char id[256]; - stored = tf_ssb_db_blob_store(ssb, (uint8_t*)get->buffer, get->expected_size, id, sizeof(id), NULL); + get->work.data = get; + int r = uv_queue_work(tf_ssb_get_loop(ssb), &get->work, _tf_ssb_rpc_blob_store_work, _tf_ssb_rpc_blob_store_after_work); + if (r) + { + tf_printf("uv_queue_work failed: %s\n", uv_strerror(r)); + get->work.data = NULL; + } + else + { + stored = true; + } } tf_ssb_connection_rpc_send( connection, @@ -455,13 +488,18 @@ static void _tf_ssb_rpc_connection_blobs_get_callback(tf_ssb_connection_t* conne static void _tf_ssb_rpc_connection_blobs_get_cleanup(tf_ssb_t* ssb, void* user_data) { - tf_free(user_data); + blobs_get_t* get = user_data; + get->done = true; + if (!get->work.data) + { + tf_free(get); + } } static void _tf_ssb_rpc_connection_blobs_get(tf_ssb_connection_t* connection, const char* blob_id, size_t size) { blobs_get_t* get = tf_malloc(sizeof(blobs_get_t) + size); - *get = (blobs_get_t) { .expected_size = size }; + *get = (blobs_get_t) { .ssb = tf_ssb_connection_get_ssb(connection), .expected_size = size }; snprintf(get->id, sizeof(get->id), "%s", blob_id); memset(get->buffer, 0, size);