forked from cory/tildefriends
Allow running read-only sqlite queries from libuv worker threads. Needs so much more testing.
git-svn-id: https://www.unprompted.com/svn/projects/tildefriends/trunk@4172 ed5197a5-7fde-0310-b194-c3ffbd925b24
This commit is contained in:
264
src/ssb.js.c
264
src/ssb.js.c
@ -4,15 +4,17 @@
|
||||
#include "mem.h"
|
||||
#include "ssb.db.h"
|
||||
#include "ssb.h"
|
||||
#include "task.h"
|
||||
#include "util.js.h"
|
||||
|
||||
#include <base64c.h>
|
||||
#include <sodium/crypto_hash_sha256.h>
|
||||
#include <sodium/crypto_sign.h>
|
||||
#include <string.h>
|
||||
#include <sqlite3.h>
|
||||
#include <uv.h>
|
||||
|
||||
#include <inttypes.h>
|
||||
|
||||
#include "quickjs-libc.h"
|
||||
|
||||
static JSClassID _tf_ssb_classId;
|
||||
@ -332,6 +334,265 @@ static JSValue _tf_ssb_sqlStream(JSContext* context, JSValueConst this_val, int
|
||||
return result;
|
||||
}
|
||||
|
||||
typedef struct _sql_work_t
|
||||
{
|
||||
uv_work_t request;
|
||||
tf_ssb_t* ssb;
|
||||
const char* query;
|
||||
uint8_t* binds;
|
||||
size_t binds_count;
|
||||
uint8_t* rows;
|
||||
size_t rows_count;
|
||||
JSValue callback;
|
||||
JSValue promise[2];
|
||||
int result;
|
||||
} sql_work_t;
|
||||
|
||||
static void _tf_ssb_sql_append(uint8_t** rows, size_t* rows_count, const void* data, size_t size)
|
||||
{
|
||||
*rows = tf_resize_vec(*rows, *rows_count + size);
|
||||
memcpy(*rows + *rows_count, data, size);
|
||||
*rows_count += size;
|
||||
}
|
||||
|
||||
static void _tf_ssb_sqlAsync_work(uv_work_t* work)
|
||||
{
|
||||
sql_work_t* sql_work = work->data;
|
||||
sqlite3* db = tf_ssb_acquire_db_reader(sql_work->ssb);
|
||||
sqlite3_stmt* statement = NULL;
|
||||
if (sqlite3_prepare(db, sql_work->query, -1, &statement, NULL) == SQLITE_OK)
|
||||
{
|
||||
const uint8_t* p = sql_work->binds;
|
||||
int column = 0;
|
||||
while (p < sql_work->binds + sql_work->binds_count)
|
||||
{
|
||||
switch (*p++)
|
||||
{
|
||||
case SQLITE_INTEGER:
|
||||
{
|
||||
int64_t value = 0;
|
||||
memcpy(&value, p, sizeof(value));
|
||||
sqlite3_bind_int64(statement, column + 1, value);
|
||||
p += sizeof(value);
|
||||
}
|
||||
break;
|
||||
case SQLITE_TEXT:
|
||||
{
|
||||
size_t length = 0;
|
||||
memcpy(&length, p, sizeof(length));
|
||||
p += sizeof(length);
|
||||
sqlite3_bind_text(statement, column + 1, (const char*)p, length, NULL);
|
||||
}
|
||||
break;
|
||||
case SQLITE_NULL:
|
||||
sqlite3_bind_null(statement, column + 1);
|
||||
break;
|
||||
default:
|
||||
abort();
|
||||
}
|
||||
column++;
|
||||
}
|
||||
int r = SQLITE_OK;
|
||||
while ((r = sqlite3_step(statement)) == SQLITE_ROW)
|
||||
{
|
||||
_tf_ssb_sql_append(&sql_work->rows, &sql_work->rows_count, &(uint8_t[]) { 'r' }, 1);
|
||||
for (int i = 0; i < sqlite3_column_count(statement); i++)
|
||||
{
|
||||
_tf_ssb_sql_append(&sql_work->rows, &sql_work->rows_count, &(uint8_t[]) { 'c' }, 1);
|
||||
const char* name = sqlite3_column_name(statement, i);
|
||||
_tf_ssb_sql_append(&sql_work->rows, &sql_work->rows_count, name, strlen(name) + 1);
|
||||
uint8_t type = sqlite3_column_type(statement, i);
|
||||
_tf_ssb_sql_append(&sql_work->rows, &sql_work->rows_count, &type, sizeof(type));
|
||||
switch (type)
|
||||
{
|
||||
case SQLITE_INTEGER:
|
||||
{
|
||||
int64_t value = sqlite3_column_int64(statement, i);
|
||||
_tf_ssb_sql_append(&sql_work->rows, &sql_work->rows_count, &value, sizeof(value));
|
||||
}
|
||||
break;
|
||||
case SQLITE_FLOAT:
|
||||
{
|
||||
double value = sqlite3_column_double(statement, i);
|
||||
_tf_ssb_sql_append(&sql_work->rows, &sql_work->rows_count, &value, sizeof(value));
|
||||
}
|
||||
break;
|
||||
case SQLITE_TEXT:
|
||||
{
|
||||
size_t bytes = sqlite3_column_bytes(statement, i);
|
||||
_tf_ssb_sql_append(&sql_work->rows, &sql_work->rows_count, &bytes, sizeof(bytes));
|
||||
_tf_ssb_sql_append(&sql_work->rows, &sql_work->rows_count, sqlite3_column_text(statement, i), bytes);
|
||||
}
|
||||
break;
|
||||
case SQLITE_BLOB:
|
||||
{
|
||||
size_t bytes = sqlite3_column_bytes(statement, i);
|
||||
_tf_ssb_sql_append(&sql_work->rows, &sql_work->rows_count, &bytes, sizeof(bytes));
|
||||
_tf_ssb_sql_append(&sql_work->rows, &sql_work->rows_count, sqlite3_column_blob(statement, i), bytes);
|
||||
}
|
||||
break;
|
||||
case SQLITE_NULL:
|
||||
break;
|
||||
default:
|
||||
abort();
|
||||
}
|
||||
}
|
||||
}
|
||||
_tf_ssb_sql_append(&sql_work->rows, &sql_work->rows_count, &(uint8_t[]) { 0 }, 1);
|
||||
sqlite3_finalize(statement);
|
||||
}
|
||||
else
|
||||
{
|
||||
printf("prepare failed\n");
|
||||
}
|
||||
tf_ssb_release_db_reader(sql_work->ssb, db);
|
||||
}
|
||||
|
||||
static void _tf_ssb_sqlAsync_after_work(uv_work_t* work, int status)
|
||||
{
|
||||
sql_work_t* sql_work = work->data;
|
||||
JSContext* context = tf_ssb_get_context(sql_work->ssb);
|
||||
uint8_t* p = sql_work->rows;
|
||||
while (p < sql_work->rows + sql_work->rows_count)
|
||||
{
|
||||
if (*p++ == 'r')
|
||||
{
|
||||
JSValue row = JS_NewObject(context);
|
||||
|
||||
while (*p == 'c')
|
||||
{
|
||||
p++;
|
||||
const char* column_name = (const char*)p;
|
||||
size_t length = strlen((char*)p);
|
||||
p += length + 1;
|
||||
|
||||
switch (*p++)
|
||||
{
|
||||
case SQLITE_INTEGER:
|
||||
{
|
||||
int64_t value = 0;
|
||||
memcpy(&value, p, sizeof(value));
|
||||
JS_SetPropertyStr(context, row, column_name, JS_NewInt64(context, value));
|
||||
p += sizeof(value);
|
||||
}
|
||||
break;
|
||||
case SQLITE_FLOAT:
|
||||
{
|
||||
double value = 0.0;
|
||||
memcpy(&value, p, sizeof(value));
|
||||
JS_SetPropertyStr(context, row, column_name, JS_NewFloat64(context, value));
|
||||
p += sizeof(value);
|
||||
}
|
||||
break;
|
||||
case SQLITE_TEXT:
|
||||
case SQLITE_BLOB:
|
||||
{
|
||||
size_t length = 0;
|
||||
memcpy(&length, p, sizeof(length));
|
||||
p += sizeof(length);
|
||||
JS_SetPropertyStr(context, row, column_name, JS_NewStringLen(context, (const char*)p, length));
|
||||
p += length;
|
||||
}
|
||||
break;
|
||||
case SQLITE_NULL:
|
||||
JS_SetPropertyStr(context, row, column_name, JS_NULL);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
JSValue response = JS_Call(context, sql_work->callback, JS_UNDEFINED, 1, &row);
|
||||
tf_util_report_error(context, response);
|
||||
JS_FreeValue(context, row);
|
||||
}
|
||||
else
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
tf_free(sql_work->binds);
|
||||
tf_free(sql_work->rows);
|
||||
|
||||
JSValue result = JS_Call(context, sql_work->promise[0], JS_UNDEFINED, 0, NULL);
|
||||
tf_util_report_error(context, result);
|
||||
JS_FreeValue(context, sql_work->promise[0]);
|
||||
JS_FreeValue(context, sql_work->promise[1]);
|
||||
|
||||
tf_free(sql_work);
|
||||
}
|
||||
|
||||
static JSValue _tf_ssb_sqlAsync(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv)
|
||||
{
|
||||
tf_ssb_t* ssb = JS_GetOpaque(this_val, _tf_ssb_classId);
|
||||
JSValue result = JS_UNDEFINED;
|
||||
if (ssb)
|
||||
{
|
||||
const char* query = JS_ToCString(context, argv[0]);
|
||||
sql_work_t* work = tf_malloc(sizeof(sql_work_t));
|
||||
*work = (sql_work_t)
|
||||
{
|
||||
.request =
|
||||
{
|
||||
.data = work,
|
||||
},
|
||||
.ssb = ssb,
|
||||
.callback = JS_DupValue(context, argv[2]),
|
||||
.query = query,
|
||||
};
|
||||
result = JS_NewPromiseCapability(context, work->promise);
|
||||
int32_t length = tf_util_get_length(context, argv[1]);
|
||||
for (int i = 0; i < length; i++)
|
||||
{
|
||||
JSValue value = JS_GetPropertyUint32(context, argv[1], i);
|
||||
if (JS_IsNumber(value))
|
||||
{
|
||||
uint8_t type = SQLITE_INTEGER;
|
||||
int64_t number = 0;
|
||||
JS_ToInt64(context, &number, value);
|
||||
_tf_ssb_sql_append(&work->binds, &work->binds_count, &type, sizeof(type));
|
||||
_tf_ssb_sql_append(&work->binds, &work->binds_count, &number, sizeof(number));
|
||||
}
|
||||
else if (JS_IsBool(value))
|
||||
{
|
||||
uint8_t type = SQLITE_INTEGER;
|
||||
int64_t number = JS_ToBool(context, value) ? 1 : 0;
|
||||
_tf_ssb_sql_append(&work->binds, &work->binds_count, &type, sizeof(type));
|
||||
_tf_ssb_sql_append(&work->binds, &work->binds_count, &number, sizeof(number));
|
||||
}
|
||||
else if (JS_IsNumber(value))
|
||||
{
|
||||
uint8_t type = SQLITE_NULL;
|
||||
_tf_ssb_sql_append(&work->binds, &work->binds_count, &type, sizeof(type));
|
||||
}
|
||||
else
|
||||
{
|
||||
uint8_t type = SQLITE_TEXT;
|
||||
size_t length = 0;
|
||||
const char* string = JS_ToCStringLen(context, &length, value);
|
||||
if (!string)
|
||||
{
|
||||
string = "";
|
||||
}
|
||||
_tf_ssb_sql_append(&work->binds, &work->binds_count, &type, sizeof(type));
|
||||
_tf_ssb_sql_append(&work->binds, &work->binds_count, string, length);
|
||||
JS_FreeCString(context, string);
|
||||
}
|
||||
}
|
||||
int r = uv_queue_work(tf_ssb_get_loop(ssb), &work->request, _tf_ssb_sqlAsync_work, _tf_ssb_sqlAsync_after_work);
|
||||
if (r)
|
||||
{
|
||||
JSValue error = JS_ThrowInternalError(context, "uv_queue_work failed: %s", uv_strerror(r));
|
||||
JSValue result = JS_Call(context, work->promise[1], JS_UNDEFINED, 1, &error);
|
||||
tf_util_report_error(context, result);
|
||||
JS_FreeValue(context, work->promise[0]);
|
||||
JS_FreeValue(context, work->promise[1]);
|
||||
JS_FreeValue(context, error);
|
||||
tf_free(work->binds);
|
||||
tf_free(work);
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
static JSValue _tf_ssb_storeMessage(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv)
|
||||
{
|
||||
tf_ssb_t* ssb = JS_GetOpaque(this_val, _tf_ssb_classId);
|
||||
@ -855,6 +1116,7 @@ void tf_ssb_register(JSContext* context, tf_ssb_t* ssb)
|
||||
JS_SetPropertyStr(context, object, "closeConnection", JS_NewCFunction(context, _tf_ssb_closeConnection, "closeConnection", 1));
|
||||
JS_SetPropertyStr(context, object, "forgetStoredConnection", JS_NewCFunction(context, _tf_ssb_forgetStoredConnection, "forgetStoredConnection", 1));
|
||||
JS_SetPropertyStr(context, object, "sqlStream", JS_NewCFunction(context, _tf_ssb_sqlStream, "sqlStream", 3));
|
||||
JS_SetPropertyStr(context, object, "sqlAsync", JS_NewCFunction(context, _tf_ssb_sqlAsync, "sqlAsync", 3));
|
||||
JS_SetPropertyStr(context, object, "storeMessage", JS_NewCFunction(context, _tf_ssb_storeMessage, "storeMessage", 1));
|
||||
JS_SetPropertyStr(context, object, "getBroadcasts", JS_NewCFunction(context, _tf_ssb_getBroadcasts, "getBroadcasts", 0));
|
||||
JS_SetPropertyStr(context, object, "connect", JS_NewCFunction(context, _tf_ssb_connect, "connect", 1));
|
||||
|
Reference in New Issue
Block a user