Resuming work to move all DB access off the main thread.
This commit is contained in:
parent
9497d7cf64
commit
9d35b4bdfb
@ -381,6 +381,7 @@ static int _tf_run_task(const tf_run_args_t* args, int index)
|
||||
tf_ssb_import(tf_task_get_ssb(task), "core", "apps");
|
||||
}
|
||||
}
|
||||
tf_ssb_set_main_thread(tf_task_get_ssb(task), true);
|
||||
if (tf_task_execute(task, args->script))
|
||||
{
|
||||
tf_task_run(task);
|
||||
|
38
src/ssb.db.c
38
src/ssb.db.c
@ -635,6 +635,44 @@ bool tf_ssb_db_blob_get(tf_ssb_t* ssb, const char* id, uint8_t** out_blob, size_
|
||||
return result;
|
||||
}
|
||||
|
||||
typedef struct _blob_get_async_t
|
||||
{
|
||||
tf_ssb_t* ssb;
|
||||
char id[k_blob_id_len];
|
||||
tf_ssb_db_blob_get_callback_t* callback;
|
||||
void* user_data;
|
||||
|
||||
bool out_found;
|
||||
uint8_t* out_data;
|
||||
size_t out_size;
|
||||
} blob_get_async_t;
|
||||
|
||||
static void _tf_ssb_db_blob_get_async_work(tf_ssb_t* ssb, void* user_data)
|
||||
{
|
||||
blob_get_async_t* async = user_data;
|
||||
async->out_found = tf_ssb_db_blob_get(ssb, async->id, &async->out_data, &async->out_size);
|
||||
}
|
||||
|
||||
static void _tf_ssb_db_blob_get_async_after_work(tf_ssb_t* ssb, int status, void* user_data)
|
||||
{
|
||||
blob_get_async_t* async = user_data;
|
||||
async->callback(async->out_found, async->out_data, async->out_size, async->user_data);
|
||||
tf_free(async->out_data);
|
||||
tf_free(async);
|
||||
}
|
||||
|
||||
void tf_ssb_db_blob_get_async(tf_ssb_t* ssb, const char* id, tf_ssb_db_blob_get_callback_t* callback, void* user_data)
|
||||
{
|
||||
blob_get_async_t* async = tf_malloc(sizeof(blob_get_async_t));
|
||||
*async = (blob_get_async_t) {
|
||||
.ssb = ssb,
|
||||
.callback = callback,
|
||||
.user_data = user_data,
|
||||
};
|
||||
snprintf(async->id, sizeof(async->id), "%s", id);
|
||||
tf_ssb_run_work(ssb, _tf_ssb_db_blob_get_async_work, _tf_ssb_db_blob_get_async_after_work, async);
|
||||
}
|
||||
|
||||
typedef struct _blob_store_work_t
|
||||
{
|
||||
const uint8_t* blob;
|
||||
|
16
src/ssb.db.h
16
src/ssb.db.h
@ -55,6 +55,22 @@ bool tf_ssb_db_blob_has(tf_ssb_t* ssb, const char* id);
|
||||
*/
|
||||
bool tf_ssb_db_blob_get(tf_ssb_t* ssb, const char* id, uint8_t** out_blob, size_t* out_size);
|
||||
|
||||
/**
|
||||
** A function called when a blob is retrieved from the database.
|
||||
** @param id The blob identifier.
|
||||
** @param user_data The user data.
|
||||
*/
|
||||
typedef void(tf_ssb_db_blob_get_callback_t)(bool found, const uint8_t* data, size_t size, void* user_data);
|
||||
|
||||
/**
|
||||
** Retrieve a blob from the database asynchronously.
|
||||
** @param ssb The SSB instance.
|
||||
** @param id The blob identifier.
|
||||
** @param callback Callback called with the result.
|
||||
** @param user_data The user data.
|
||||
*/
|
||||
void tf_ssb_db_blob_get_async(tf_ssb_t* ssb, const char* id, tf_ssb_db_blob_get_callback_t* callback, void* user_data);
|
||||
|
||||
/**
|
||||
** A function called when a message is stored in the database.
|
||||
** @param id The message identifier.
|
||||
|
34
src/ssb.js.c
34
src/ssb.js.c
@ -581,6 +581,29 @@ static JSValue _tf_ssb_appendMessageWithIdentity(JSContext* context, JSValueCons
|
||||
return result;
|
||||
}
|
||||
|
||||
typedef struct _blob_get_t
|
||||
{
|
||||
JSContext* context;
|
||||
JSValue promise[2];
|
||||
} blob_get_t;
|
||||
|
||||
static void _tf_ssb_blobGet_callback(bool found, const uint8_t* data, size_t size, void* user_data)
|
||||
{
|
||||
blob_get_t* get = user_data;
|
||||
JSValue result = JS_UNDEFINED;
|
||||
if (found)
|
||||
{
|
||||
result = JS_NewArrayBufferCopy(get->context, data, size);
|
||||
}
|
||||
JSValue error = JS_Call(get->context, get->promise[0], JS_UNDEFINED, 1, &result);
|
||||
JS_FreeValue(get->context, result);
|
||||
JS_FreeValue(get->context, get->promise[0]);
|
||||
JS_FreeValue(get->context, get->promise[1]);
|
||||
tf_util_report_error(get->context, error);
|
||||
JS_FreeValue(get->context, error);
|
||||
tf_free(get);
|
||||
}
|
||||
|
||||
static JSValue _tf_ssb_blobGet(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv)
|
||||
{
|
||||
JSValue result = JS_NULL;
|
||||
@ -588,13 +611,10 @@ static JSValue _tf_ssb_blobGet(JSContext* context, JSValueConst this_val, int ar
|
||||
if (ssb)
|
||||
{
|
||||
const char* id = JS_ToCString(context, argv[0]);
|
||||
uint8_t* blob = NULL;
|
||||
size_t size = 0;
|
||||
if (tf_ssb_db_blob_get(ssb, id, &blob, &size))
|
||||
{
|
||||
result = JS_NewArrayBufferCopy(context, blob, size);
|
||||
tf_free(blob);
|
||||
}
|
||||
blob_get_t* get = tf_malloc(sizeof(blob_get_t));
|
||||
*get = (blob_get_t) { .context = context };
|
||||
result = JS_NewPromiseCapability(context, get->promise);
|
||||
tf_ssb_db_blob_get_async(ssb, id, _tf_ssb_blobGet_callback, get);
|
||||
JS_FreeCString(context, id);
|
||||
}
|
||||
return result;
|
||||
|
@ -480,6 +480,45 @@ static void _tf_ssb_rpc_connection_blobs_get(tf_ssb_connection_t* connection, co
|
||||
JS_FreeValue(context, message);
|
||||
}
|
||||
|
||||
typedef struct _blob_create_wants_work_t
|
||||
{
|
||||
tf_ssb_connection_t* connection;
|
||||
char blob_id[k_blob_id_len];
|
||||
bool out_result;
|
||||
int64_t size;
|
||||
size_t out_size;
|
||||
} blob_create_wants_work_t;
|
||||
|
||||
static void _tf_ssb_rpc_connection_blobs_create_wants_work(tf_ssb_connection_t* connection, void* user_data)
|
||||
{
|
||||
blob_create_wants_work_t* work = user_data;
|
||||
tf_ssb_t* ssb = tf_ssb_connection_get_ssb(work->connection);
|
||||
work->out_result = tf_ssb_db_blob_get(ssb, work->blob_id, NULL, &work->out_size);
|
||||
}
|
||||
|
||||
static void _tf_ssb_rpc_connection_blobs_create_wants_after_work(tf_ssb_connection_t* connection, int result, void* user_data)
|
||||
{
|
||||
blob_create_wants_work_t* work = user_data;
|
||||
tf_ssb_t* ssb = tf_ssb_connection_get_ssb(work->connection);
|
||||
tf_ssb_blob_wants_t* blob_wants = tf_ssb_connection_get_blob_wants_state(connection);
|
||||
JSContext* context = tf_ssb_get_context(ssb);
|
||||
if (work->out_result)
|
||||
{
|
||||
JSValue message = JS_NewObject(context);
|
||||
JS_SetPropertyStr(context, message, work->blob_id, JS_NewInt64(context, work->out_size));
|
||||
tf_ssb_connection_rpc_send_json(work->connection, k_ssb_rpc_flag_stream, -blob_wants->request_number, NULL, message, NULL, NULL, NULL);
|
||||
JS_FreeValue(context, message);
|
||||
}
|
||||
else if (work->size == -1LL)
|
||||
{
|
||||
JSValue message = JS_NewObject(context);
|
||||
JS_SetPropertyStr(context, message, work->blob_id, JS_NewInt64(context, -2));
|
||||
tf_ssb_connection_rpc_send_json(work->connection, k_ssb_rpc_flag_stream, -blob_wants->request_number, NULL, message, NULL, NULL, NULL);
|
||||
JS_FreeValue(context, message);
|
||||
}
|
||||
tf_free(work);
|
||||
}
|
||||
|
||||
static void _tf_ssb_rpc_connection_blobs_createWants_callback(
|
||||
tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data)
|
||||
{
|
||||
@ -489,7 +528,6 @@ static void _tf_ssb_rpc_connection_blobs_createWants_callback(
|
||||
return;
|
||||
}
|
||||
|
||||
tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection);
|
||||
JSContext* context = tf_ssb_connection_get_context(connection);
|
||||
|
||||
JSValue name = JS_GetPropertyStr(context, args, "name");
|
||||
@ -524,21 +562,14 @@ static void _tf_ssb_rpc_connection_blobs_createWants_callback(
|
||||
}
|
||||
if (size < 0)
|
||||
{
|
||||
size_t blob_size = 0;
|
||||
if (tf_ssb_db_blob_get(ssb, blob_id, NULL, &blob_size))
|
||||
{
|
||||
JSValue message = JS_NewObject(context);
|
||||
JS_SetPropertyStr(context, message, blob_id, JS_NewInt64(context, blob_size));
|
||||
tf_ssb_connection_rpc_send_json(connection, k_ssb_rpc_flag_stream, -blob_wants->request_number, NULL, message, NULL, NULL, NULL);
|
||||
JS_FreeValue(context, message);
|
||||
}
|
||||
else if (size == -1LL)
|
||||
{
|
||||
JSValue message = JS_NewObject(context);
|
||||
JS_SetPropertyStr(context, message, blob_id, JS_NewInt64(context, -2));
|
||||
tf_ssb_connection_rpc_send_json(connection, k_ssb_rpc_flag_stream, -blob_wants->request_number, NULL, message, NULL, NULL, NULL);
|
||||
JS_FreeValue(context, message);
|
||||
}
|
||||
blob_create_wants_work_t* work = tf_malloc(sizeof(blob_create_wants_work_t));
|
||||
*work = (blob_create_wants_work_t) {
|
||||
.connection = connection,
|
||||
.size = size,
|
||||
.out_size = -1ULL,
|
||||
};
|
||||
snprintf(work->blob_id, sizeof(work->blob_id), "%s", blob_id);
|
||||
tf_ssb_connection_run_work(connection, _tf_ssb_rpc_connection_blobs_create_wants_work, _tf_ssb_rpc_connection_blobs_create_wants_after_work, work);
|
||||
}
|
||||
else
|
||||
{
|
||||
|
Loading…
x
Reference in New Issue
Block a user