#include "task.h" #include "bcrypt.js.h" #include "database.js.h" #include "file.js.h" #include "mem.h" #include "packetstream.h" #include "serialize.h" #include "socket.js.h" #include "ssb.h" #include "ssb.js.h" #include "taskstub.js.h" #include "tlscontext.js.h" #include "trace.h" #include "util.js.h" #include #include #include #include #include #include #include #include "quickjs.h" #include "quickjs-libc.h" #ifndef _WIN32 #include #endif static const char* k_version = "1.0"; static JSClassID _import_class_id; static int _count; typedef struct _export_record_t export_record_t; typedef struct _import_record_t import_record_t; typedef struct _task_child_node_t task_child_node_t; typedef struct _task_child_node_t { taskid_t id; tf_taskstub_t* stub; task_child_node_t* next; } task_child_node_t; typedef struct _promise_t promise_t; typedef struct _promise_t { promiseid_t id; JSValue values[2]; } promise_t; typedef struct _tf_task_t { taskid_t _nextTask; task_child_node_t* _children; int _child_count; tf_taskstub_t* _stub; tf_taskstub_t* _parent; const char* _path; bool _activated; bool _trusted; bool _killed; int32_t _exitCode; char _scriptName[256]; JSRuntime* _runtime; JSContext* _context; sqlite3* _db; tf_ssb_t* _ssb; tf_trace_t* _trace; promise_t* _promises; int _promise_count; promiseid_t _nextPromise; uv_loop_t _loop; uv_timer_t gc_timer; uv_timer_t trace_timer; uint64_t last_hrtime; uint64_t last_idle_time; float idle_percent; uv_idle_t idle; uv_prepare_t prepare; export_record_t** _exports; int _export_count; exportid_t _nextExport; import_record_t** _imports; int _import_count; JSValue _loadedFiles; int _ssb_port; int _http_port; int _https_port; char _db_path[256]; char _secrets_path[256]; const char* _args; } tf_task_t; typedef struct _export_record_t { taskid_t _taskid; exportid_t _export_id; JSValue _function; int _useCount; } export_record_t; static void _export_record_ref(export_record_t* export) { export->_useCount++; } static bool _export_record_release(tf_task_t* task, export_record_t** export) { if (--(*export)->_useCount == 0) { JS_FreeValue(task->_context, (*export)->_function); (*export)->_function = JS_UNDEFINED; tf_free(*export); int index = export - task->_exports; if (task->_export_count - index) { memmove(export, export + 1, sizeof(export_record_t*) * (task->_export_count - index - 1)); } task->_export_count--; return true; } return false; } static JSValue _tf_task_version(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv); static JSValue _tf_task_get_parent(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv); static JSValue _tf_task_exit(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv); static JSValue _tf_task_trace(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv); static JSValue _tf_task_getStats(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv); static JSValue _tf_task_getFile(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv); static void _tf_task_sendPromiseResolve(tf_task_t* from, tf_taskstub_t* to, promiseid_t promise, JSValue result); static void _tf_task_sendPromiseReject(tf_task_t* from, tf_taskstub_t* to, promiseid_t promise, JSValue result); static void _tf_task_sendPromiseExportMessage(tf_task_t* from, tf_taskstub_t* to, tf_task_message_t messageType, promiseid_t promiseId, exportid_t exportId, JSValue result); static JSValue _tf_task_executeSource(tf_task_t* task, const char* source, const char* name); static tf_taskstub_t* _tf_task_get_stub(tf_task_t* task, taskid_t id); static void _tf_task_release_export(tf_taskstub_t* stub, exportid_t exportId); static bool _tf_task_run_jobs(tf_task_t* task); static void _tf_task_run_jobs_idle(uv_idle_t* idle); static void _tf_task_run_jobs_prepare(uv_prepare_t* prepare); typedef struct _import_record_t { tf_task_t* _owner; JSValue _function; exportid_t _export; taskid_t _task; int _useCount; } import_record_t; static int _import_compare(const void* a, const void* b) { const import_record_t* ia = a; const import_record_t* const* ib = b; if (ia->_task != (*ib)->_task) { return ia->_task < (*ib)->_task ? -1 : 1; } else if (ia->_export != (*ib)->_export) { return ia->_export < (*ib)->_export ? -1 : 1; } else { return 0; } } static import_record_t** _tf_task_find_import(tf_task_t* task, taskid_t task_id, exportid_t export_id) { if (!task->_imports) { return NULL; } import_record_t search = { ._task = task_id, ._export = export_id, }; import_record_t** it = bsearch(&search, task->_imports, task->_import_count, sizeof(import_record_t*), _import_compare); return it; } static bool _import_record_release(import_record_t** import) { import_record_t* record = *import; if (--record->_useCount == 0) { tf_task_t* task = record->_owner; JSContext* context = task->_context; if (!JS_IsUndefined(record->_function)) { JS_SetOpaque(record->_function, NULL); JS_FreeValue(context, record->_function); record->_function = JS_UNDEFINED; } _tf_task_release_export(_tf_task_get_stub(task, record->_task), record->_export); int index = import - task->_imports; if (task->_import_count - index) { memmove(import, import + 1, sizeof(import_record_t*) * (task->_import_count - index - 1)); } task->_import_count--; tf_free(record); return true; } return false; } static void _import_record_release_for_task(tf_task_t* task, taskid_t task_id) { for (int i = 0; i < task->_import_count; i++) { if (task->_imports[i]->_task == task_id) { while (!_import_record_release(&task->_imports[i])) { } --i; } } } static int _export_compare(const void* a, const void* b) { exportid_t ia = *(const exportid_t*)a; const export_record_t* const* ib = b; if (ia != (*ib)->_export_id) { return ia < (*ib)->_export_id ? -1 : 1; } else { return 0; } } static void _export_record_release_for_task(tf_task_t* task, taskid_t task_id) { for (int i = 0; i < task->_export_count; i++) { if (task->_exports[i]->_taskid == task_id) { while (!_export_record_release(task, &task->_exports[i])) { } i--; } } } bool tf_task_send_error_to_parent(tf_task_t* task, JSValue error) { if (task && task->_parent) { void* buffer = NULL; size_t size = 0; tf_serialize_store(task, task->_parent, &buffer, &size, error); tf_packetstream_send(tf_taskstub_get_stream(task->_parent), kTaskError, buffer, size); tf_free(buffer); return true; } return false; } static const char* _task_loadFile(const char* fileName) { char* result = NULL; FILE* file = fopen(fileName, "rb"); if (file) { fseek(file, 0, SEEK_END); long fileSize = ftell(file); fseek(file, 0, SEEK_SET); result = tf_malloc(fileSize + 1); int bytes_read = fread(result, 1, fileSize, file); result[bytes_read] = '\0'; fclose(file); } return result; } JSValue _tf_task_exit(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv) { tf_task_t* task = JS_GetContextOpaque(context); int exitCode = 0; JS_ToInt32(task->_context, &exitCode, argv[0]); exit(exitCode); return JS_UNDEFINED; } int tf_task_execute(tf_task_t* task, const char* fileName) { bool executed = false; tf_trace_begin(task->_trace, "tf_task_execute"); const char* source = _task_loadFile(fileName); printf("Running script %s\n", fileName); if (!*task->_scriptName) { strncpy(task->_scriptName, fileName, sizeof(task->_scriptName) - 1); } if (!task->_path) { char* path = tf_strdup(fileName); char* slash = strrchr(path, '/'); if (slash) { *slash = '\0'; task->_path = tf_strdup(path); } else { task->_path = tf_strdup("./"); } tf_free(path); } if (source) { JSValue result = JS_Eval(task->_context, source, strlen(source), fileName, JS_EVAL_TYPE_MODULE); if (tf_util_report_error(task->_context, result)) { printf("Reported an error.\n"); } if (!JS_IsError(task->_context, result) && !JS_IsException(result)) { executed = true; } JS_FreeValue(task->_context, result); tf_free((void*)source); } else { printf("Failed to load file: %s.\n", fileName); } tf_trace_end(task->_trace); return executed; } static tf_taskstub_t* _tf_task_get_stub(tf_task_t* task, taskid_t id) { if (id == k_task_parent_id) { return task->_parent; } for (task_child_node_t* node = task->_children; node; node = node->next) { if (node->id == id) { return node->stub; } } return NULL; } static JSValue _import_call(JSContext* context, JSValueConst func_obj, JSValueConst this_val, int argc, JSValueConst* argv, int flags) { import_record_t* import = JS_GetOpaque(func_obj, _import_class_id); if (!import) { return JS_ThrowInternalError(context, "Invoking a function that has been released."); } import->_useCount++; JSValue array = JS_NewArray(context); JS_SetPropertyUint32(context, array, 0, JS_UNDEFINED); for (int i = 0; i < argc; ++i) { JS_SetPropertyUint32(context, array, i + 1, JS_DupValue(context, argv[i])); } tf_task_t* sender = JS_GetContextOpaque(context); JSValue result; tf_taskstub_t* recipient = _tf_task_get_stub(sender, import->_task); if (recipient) { promiseid_t promise = -1; result = tf_task_allocate_promise(sender, &promise); _tf_task_sendPromiseExportMessage(sender, recipient, kInvokeExport, promise, import->_export, array); } else { result = JS_ThrowInternalError(context, "Invoking a function on a nonexistent task."); } JS_FreeValue(context, array); return result; } static export_record_t** _task_get_export(tf_task_t* task, exportid_t export_id) { if (!task->_export_count) { return NULL; } export_record_t** it = bsearch(&export_id, task->_exports, task->_export_count, sizeof(export_record_t*), _export_compare); return it; } JSValue _task_invokeExport_internal(tf_taskstub_t* from, tf_task_t* to, exportid_t exportId, const char* buffer, size_t size) { JSValue result = JS_NULL; export_record_t** it = _task_get_export(to, exportId); export_record_t* export = it ? *it : NULL; if (export) { JSValue arguments = tf_serialize_load(to, from, buffer, size); JSValue* argument_array = NULL; JSValue this_val = JS_NULL; int length = tf_util_get_length(to->_context, arguments); if (length > 0) { this_val = JS_GetPropertyUint32(to->_context, arguments, 0); argument_array = alloca(sizeof(JSValue) * (length - 1)); } for (int i = 1; i < length; ++i) { argument_array[i - 1] = JS_GetPropertyUint32(to->_context, arguments, i); } JSValue function = export->_function; JSPropertyDescriptor desc = { 0 }; const char* name = NULL; JSAtom atom = JS_NewAtom(to->_context, "name"); if (JS_GetOwnProperty(to->_context, &desc, function, atom) == 1) { name = JS_ToCString(to->_context, desc.value); JS_FreeValue(to->_context, desc.value); JS_FreeValue(to->_context, desc.setter); JS_FreeValue(to->_context, desc.getter); } JS_FreeAtom(to->_context, atom); tf_trace_begin(to->_trace, name ? name : "_task_invokeExport_internal"); if (name) { JS_FreeCString(to->_context, name); } result = JS_Call(to->_context, function, this_val, length - 1, argument_array); tf_trace_end(to->_trace); JS_FreeValue(to->_context, this_val); for (int i = 0; i < length - 1; i++) { JS_FreeValue(to->_context, argument_array[i]); } JS_FreeValue(to->_context, arguments); } else { printf("%s: That's not an export we have (exportId=%d, exports=%d)\n", to->_scriptName, exportId, to->_export_count); } tf_packetstream_send(tf_taskstub_get_stream(from), kReleaseImport, (void*)&exportId, sizeof(exportId)); return result; } static JSValue _invokeThen(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv, int magic, JSValue* func_data) { taskid_t taskid = 0; JS_ToInt32(context, &taskid, func_data[0]); tf_task_t* from = JS_GetContextOpaque(context); tf_taskstub_t* to = _tf_task_get_stub(from, taskid); promiseid_t promiseid = 0; JS_ToInt32(context, &promiseid, func_data[1]); tf_task_send_promise_message(from, to, kResolvePromise, promiseid, argv[0]); return JS_DupValue(context, this_val); } static JSValue _invokeCatch(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv, int magic, JSValue* func_data) { taskid_t taskid = 0; JS_ToInt32(context, &taskid, func_data[0]); tf_task_t* from = JS_GetContextOpaque(context); tf_taskstub_t* to = _tf_task_get_stub(from, taskid); promiseid_t promiseid = 0; JS_ToInt32(context, &promiseid, func_data[1]); tf_task_send_promise_message(from, to, kRejectPromise, promiseid, argv[0]); return JS_DupValue(context, this_val); } static void _forward_promise(tf_task_t* from, tf_taskstub_t* to, promiseid_t promise, JSValue result) { // We're not going to serialize/deserialize a promise... JSValue data[] = { JS_NewInt32(from->_context, tf_taskstub_get_id(to)), JS_NewInt32(from->_context, promise), }; JSValue promise_then = JS_GetPropertyStr(from->_context, result, "then"); JSValue promise_catch = JS_GetPropertyStr(from->_context, result, "catch"); JSValue then_handler = JS_NewCFunctionData(from->_context, _invokeThen, 0, 0, 2, data); JSValue catch_handler = JS_NewCFunctionData(from->_context, _invokeCatch, 0, 0, 2, data); JSValue error = JS_Call(from->_context, promise_then, result, 1, &then_handler); tf_util_report_error(from->_context, error); JS_FreeValue(from->_context, error); error = JS_Call(from->_context, promise_catch, result, 1, &catch_handler); tf_util_report_error(from->_context, error); JS_FreeValue(from->_context, error); JS_FreeValue(from->_context, promise_then); JS_FreeValue(from->_context, promise_catch); JS_FreeValue(from->_context, then_handler); JS_FreeValue(from->_context, catch_handler); } static void _tf_task_sendPromiseResolve(tf_task_t* from, tf_taskstub_t* to, promiseid_t promise, JSValue result) { JSValue global = JS_GetGlobalObject(from->_context); JSValue promiseType = JS_GetPropertyStr(from->_context, global, "Promise"); JS_FreeValue(from->_context, global); if (JS_IsInstanceOf(from->_context, result, promiseType)) { _forward_promise(from, to, promise, result); } else { tf_task_send_promise_message(from, to, kResolvePromise, promise, result); } JS_FreeValue(from->_context, promiseType); } static void _tf_task_sendPromiseReject(tf_task_t* from, tf_taskstub_t* to, promiseid_t promise, JSValue result) { JSValue global = JS_GetGlobalObject(from->_context); JSValue promiseType = JS_GetPropertyStr(from->_context, global, "Promise"); JS_FreeValue(from->_context, global); if (JS_IsInstanceOf(from->_context, result, promiseType)) { _forward_promise(from, to, promise, result); } else { tf_task_send_promise_message(from, to, kRejectPromise, promise, result); } JS_FreeValue(from->_context, promiseType); } void tf_task_send_promise_message(tf_task_t* from, tf_taskstub_t* to, tf_task_message_t type, promiseid_t promise, JSValue payload) { if (to) { void* buffer; size_t size; tf_serialize_store(from, to, &buffer, &size, payload); char* copy = tf_malloc(sizeof(promise) + size); memcpy(copy, &promise, sizeof(promise)); memcpy(copy + sizeof(promise), buffer, size); tf_packetstream_send(tf_taskstub_get_stream(to), type, copy, size + sizeof(promise)); tf_free(buffer); tf_free(copy); } else { printf("Sending to a NULL task.\n"); } } static void _tf_task_sendPromiseExportMessage(tf_task_t* from, tf_taskstub_t* to, tf_task_message_t messageType, promiseid_t promise, exportid_t exportId, JSValue result) { void* buffer; size_t size; tf_serialize_store(from, to, &buffer, &size, result); char* copy = tf_malloc(sizeof(promise) + sizeof(exportId) + size); memcpy(copy, &promise, sizeof(promise)); memcpy(copy + sizeof(promise), &exportId, sizeof(exportId)); memcpy(copy + sizeof(promise) + sizeof(exportId), buffer, size); tf_packetstream_send(tf_taskstub_get_stream(to), messageType, copy, sizeof(promise) + sizeof(exportId) + size); tf_free(buffer); tf_free(copy); } JSValue _tf_task_get_parent(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv) { tf_task_t* task = JS_GetContextOpaque(context); return task->_parent ? JS_DupValue(context, tf_taskstub_get_task_object(task->_parent)) : JS_UNDEFINED; } static JSValue _tf_task_version(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv) { tf_task_t* task = JS_GetContextOpaque(context); return JS_NewString(task->_context, k_version); } exportid_t tf_task_export_function(tf_task_t* task, tf_taskstub_t* to, JSValue function) { export_record_t* export = NULL; /* TODO: _exports_by_function */ for (int i = 0; i < task->_export_count; i++) { if (JS_VALUE_GET_PTR(task->_exports[i]->_function) == JS_VALUE_GET_PTR(function)) { export = task->_exports[i]; break; } } if (!export) { int id = -1; do { id = task->_nextExport++; } while (_task_get_export(task, id)); export = tf_malloc(sizeof(export_record_t)); *export = (export_record_t) { ._export_id = id, ._taskid = tf_taskstub_get_id(to), ._function = JS_DupValue(task->_context, function), }; int index = tf_util_insert_index(&id, task->_exports, task->_export_count, sizeof(export_record_t*), _export_compare); task->_exports = tf_resize_vec(task->_exports, sizeof(export_record_t*) * (task->_export_count + 1)); if (task->_export_count - index) { memmove(task->_exports + index + 1, task->_exports + index, sizeof(export_record_t*) * (task->_export_count - index)); } task->_exports[index] = export; task->_export_count++; } if (export) { _export_record_ref(export); } return export ? export->_export_id : -1; } static void _tf_task_release_export(tf_taskstub_t* stub, exportid_t exportId) { if (stub) { tf_packetstream_t* stream = tf_taskstub_get_stream(stub); if (stream) { tf_packetstream_send(stream, kReleaseExport, (void*)&exportId, sizeof(exportId)); } } } static JSValue _tf_task_trace(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv) { tf_task_t* task = JS_GetContextOpaque(context); if (!task->_trace) { return JS_UNDEFINED; } char* trace = tf_trace_export(task->_trace); JSValue result = JS_NewString(context, trace); tf_free(trace); return result; } static JSValue _tf_task_getStats(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv) { tf_task_t* task = JS_GetContextOpaque(context); JSValue result = JS_NewObject(context); JS_SetPropertyStr(context, result, "child_count", JS_NewInt32(context, task->_child_count)); JS_SetPropertyStr(context, result, "import_count", JS_NewInt32(context, task->_import_count)); JS_SetPropertyStr(context, result, "export_count", JS_NewInt32(context, task->_export_count)); JS_SetPropertyStr(context, result, "promise_count", JS_NewInt32(context, task->_promise_count)); JS_SetPropertyStr(context, result, "idle_percent", JS_NewFloat64(context, task->idle_percent)); uint64_t total_memory = uv_get_total_memory(); size_t rss; if (uv_resident_set_memory(&rss) == 0) { JS_SetPropertyStr(context, result, "memory_percent", JS_NewFloat64(context, 100.0f * rss / total_memory)); } JS_SetPropertyStr(context, result, "sqlite3_memory_percent", JS_NewFloat64(context, 100.0f * sqlite3_memory_used() / total_memory)); JS_SetPropertyStr(context, result, "js_malloc_percent", JS_NewFloat64(context, 100.0f * tf_mem_get_js_malloc_size() / total_memory)); JS_SetPropertyStr(context, result, "uv_malloc_percent", JS_NewFloat64(context, 100.0f * tf_mem_get_uv_malloc_size() / total_memory)); JS_SetPropertyStr(context, result, "tls_malloc_percent", JS_NewFloat64(context, 100.0f * tf_mem_get_tls_malloc_size() / total_memory)); JS_SetPropertyStr(context, result, "tf_malloc_percent", JS_NewFloat64(context, 100.0f * tf_mem_get_tf_malloc_size() / total_memory)); JS_SetPropertyStr(context, result, "socket_count", JS_NewInt32(context, tf_socket_get_count())); JS_SetPropertyStr(context, result, "socket_open_count", JS_NewInt32(context, tf_socket_get_open_count())); if (task->_ssb) { tf_ssb_stats_t ssb_stats = { 0 }; tf_ssb_get_stats(task->_ssb, &ssb_stats); JS_SetPropertyStr(context, result, "messages_stored", JS_NewInt32(context, ssb_stats.messages_stored)); JS_SetPropertyStr(context, result, "rpc_in", JS_NewInt32(context, ssb_stats.rpc_in)); JS_SetPropertyStr(context, result, "rpc_out", JS_NewInt32(context, ssb_stats.rpc_out)); JS_SetPropertyStr(context, result, "requests", JS_NewInt32(context, ssb_stats.request_count)); } return result; } static JSValue _tf_task_getFile(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv) { tf_task_t* task = JS_GetContextOpaque(context); const char* name = JS_ToCString(context, argv[0]); JSValue result = JS_GetPropertyStr(context, task->_loadedFiles, name); JS_FreeCString(context, name); return result; } static JSValue _tf_task_promiseTest(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv) { tf_task_t* task = JS_GetContextOpaque(context); promiseid_t promise_id = -1; JSValue promise = tf_task_allocate_promise(task, &promise_id); tf_task_resolve_promise(task, promise_id, argv[0]); return promise; } const char* _tf_task_get_message_type(tf_task_message_t type) { switch (type) { case kResolvePromise: return "kResolvePromise"; case kRejectPromise: return "kRejectPromise"; case kInvokeExport: return "kInvokeExport"; case kReleaseExport: return "kReleaseExport"; case kReleaseImport: return "kReleaseImport"; case kSetRequires: return "kSetRequires"; case kActivate: return "kActivate"; case kExecute: return "kExecute"; case kKill: return "kKill"; case kSetImports: return "kSetImports"; case kGetExports: return "kGetExports"; case kLoadFile: return "kLoadFile"; case kTaskError: return "kTaskError"; case kTaskTrace: return "kTaskTrace"; case kPrint: return "kPrint"; } return "unknown"; } void tf_task_on_receive_packet(int packetType, const char* begin, size_t length, void* userData) { tf_taskstub_t* stub = userData; tf_taskstub_t* from = stub; tf_task_t* to = tf_taskstub_get_owner(stub); tf_trace_begin(to->_trace, _tf_task_get_message_type(packetType)); switch (packetType) { case kInvokeExport: { promiseid_t promise; exportid_t exportId; memcpy(&promise, begin, sizeof(promise)); memcpy(&exportId, begin + sizeof(promise), sizeof(exportId)); JSValue result = _task_invokeExport_internal(from, to, exportId, begin + sizeof(promiseid_t) + sizeof(exportid_t), length - sizeof(promiseid_t) - sizeof(exportid_t)); if (JS_IsException(result)) { _tf_task_sendPromiseReject(to, from, promise, result); } else { _tf_task_sendPromiseResolve(to, from, promise, result); } JS_FreeValue(to->_context, result); } break; case kResolvePromise: case kRejectPromise: { JSValue arg = JS_UNDEFINED; promiseid_t promise; memcpy(&promise, begin, sizeof(promiseid_t)); if (length > sizeof(promiseid_t)) { arg = tf_serialize_load(to, from, begin + sizeof(promiseid_t), length - sizeof(promiseid_t)); } if (packetType == kResolvePromise) { tf_task_resolve_promise(to, promise, arg); } else { tf_task_reject_promise(to, promise, arg); } JS_FreeValue(to->_context, arg); } break; case kReleaseExport: { assert(length == sizeof(exportid_t)); exportid_t exportId; memcpy(&exportId, begin, sizeof(exportId)); export_record_t** it = _task_get_export(to, exportId); if (it) { _export_record_release(to, it); } } break; case kReleaseImport: { assert(length == sizeof(exportid_t)); exportid_t exportId; memcpy(&exportId, begin, sizeof(exportId)); import_record_t** it = _tf_task_find_import(to, tf_taskstub_get_id(from), exportId); if (it) { _import_record_release(it); } } break; case kLoadFile: { JSValue args = tf_serialize_load(to, from, begin, length); JSValue key = JS_GetPropertyUint32(to->_context, args, 0); JSValue content = JS_GetPropertyUint32(to->_context, args, 1); const char* name = JS_ToCString(to->_context, key); JS_SetPropertyStr(to->_context, to->_loadedFiles, name, JS_DupValue(to->_context, content)); JS_FreeCString(to->_context, name); JS_FreeValue(to->_context, key); JS_FreeValue(to->_context, content); JS_FreeValue(to->_context, args); } break; case kActivate: tf_task_activate(to); break; case kExecute: { assert(length >= sizeof(promiseid_t)); promiseid_t promise; memcpy(&promise, begin, sizeof(promiseid_t)); JSValue value = tf_serialize_load(to, from, begin + sizeof(promiseid_t), length - sizeof(promiseid_t)); JSValue source = JS_GetPropertyStr(to->_context, value, "source"); JSValue name_val = JS_GetPropertyStr(to->_context, value, "name"); const char* name = JS_ToCString(to->_context, name_val); JS_FreeValue(to->_context, name_val); JS_FreeValue(to->_context, value); JSValue utf8 = tf_util_utf8_decode(to->_context, source); JS_FreeValue(to->_context, source); const char* source_str = JS_ToCString(to->_context, utf8); JS_FreeValue(to->_context, utf8); JSValue result = _tf_task_executeSource(to, source_str, name); if (JS_IsException(result)) { JSValue exception = JS_GetException(to->_context); _tf_task_sendPromiseReject(to, from, promise, exception); JS_FreeValue(to->_context, exception); } else { _tf_task_sendPromiseResolve(to, from, promise, result); } JS_FreeValue(to->_context, result); JS_FreeCString(to->_context, source_str); JS_FreeCString(to->_context, name); } break; case kKill: exit(1); break; case kSetImports: { JSValue global = JS_GetGlobalObject(to->_context); JSValue imports = tf_serialize_load(to, from, begin, length); JSPropertyEnum* ptab; uint32_t plen; JS_GetOwnPropertyNames(to->_context, &ptab, &plen, imports, JS_GPN_STRING_MASK); for (uint32_t i = 0; i < plen; ++i) { JSPropertyDescriptor desc; if (JS_GetOwnProperty(to->_context, &desc, imports, ptab[i].atom) == 1) { JS_SetProperty(to->_context, global, ptab[i].atom, desc.value); JS_FreeValue(to->_context, desc.setter); JS_FreeValue(to->_context, desc.getter); } JS_FreeAtom(to->_context, ptab[i].atom); } js_free(to->_context, ptab); JS_FreeValue(to->_context, imports); JS_FreeValue(to->_context, global); } break; case kGetExports: { promiseid_t promise; assert(length >= sizeof(promise)); memcpy(&promise, begin, sizeof(promiseid_t)); JSValue global = JS_GetGlobalObject(to->_context); JSValue exportObject = JS_GetPropertyStr(to->_context, global, "exports"); _tf_task_sendPromiseResolve(to, from, promise, exportObject); JS_FreeValue(to->_context, exportObject); JS_FreeValue(to->_context, global); } break; case kTaskError: { JSValue error = tf_serialize_load(to, from, begin, length); tf_taskstub_on_error(from, error); JS_FreeValue(to->_context, error); } break; case kPrint: { JSValue arguments = tf_serialize_load(to, from, begin, length); tf_taskstub_on_print(from, arguments); JS_FreeValue(to->_context, arguments); } break; case kTaskTrace: tf_trace_raw(to->_trace, begin, length); break; } tf_trace_end(to->_trace); } static JSValue _tf_task_executeSource(tf_task_t* task, const char* source, const char* name) { tf_trace_begin(task->_trace, "_tf_task_executeSource"); JSValue result = JS_Eval(task->_context, source, strlen(source), name, JS_EVAL_TYPE_MODULE); if (!*task->_scriptName) { snprintf(task->_scriptName, sizeof(task->_scriptName), "%s", name); } tf_trace_end(task->_trace); return result; } uv_loop_t* tf_task_get_loop(tf_task_t* task) { return &task->_loop; } tf_trace_t* tf_task_get_trace(tf_task_t* task) { return task->_trace; } static int _promise_compare(const void* a, const void* b) { promiseid_t ida = (promiseid_t)(intptr_t)a; const promise_t* pb = b; return ida < pb->id ? -1 : pb->id < ida ? 1 : 0; } static promise_t* _tf_task_find_promise(tf_task_t* task, promiseid_t id) { if (!task->_promises) { return NULL; } promise_t* it = bsearch((void*)(intptr_t)id, task->_promises, task->_promise_count, sizeof(promise_t), _promise_compare); return it ? it : NULL; } static void _tf_task_free_promise(tf_task_t* task, promiseid_t id) { promise_t* it = bsearch((void*)(intptr_t)id, task->_promises, task->_promise_count, sizeof(promise_t), _promise_compare); if (it) { int index = it - task->_promises; memmove(it, it + 1, sizeof(promise_t) * (task->_promise_count - index - 1)); task->_promise_count--; } } JSValue tf_task_allocate_promise(tf_task_t* task, promiseid_t* out_promise) { promiseid_t promiseId; do { promiseId = task->_nextPromise++; } while (_tf_task_find_promise(task, promiseId) || !promiseId); promise_t promise = { .id = promiseId, .values = { JS_NULL, JS_NULL }, }; JSValue result = JS_NewPromiseCapability(task->_context, promise.values); int index = tf_util_insert_index((void*)(intptr_t)promiseId, task->_promises, task->_promise_count, sizeof(promise_t), _promise_compare); task->_promises = tf_resize_vec(task->_promises, sizeof(promise_t) * (task->_promise_count + 1)); if (task->_promise_count - index) { memmove(task->_promises + index + 1, task->_promises + index, sizeof(promise_t) * (task->_promise_count - index)); } task->_promises[index] = promise; task->_promise_count++; *out_promise = promiseId; return result; } void tf_task_resolve_promise(tf_task_t* task, promiseid_t promise, JSValue value) { promise_t* it = _tf_task_find_promise(task, promise); if (it) { JSValue result = JS_Call(task->_context, it->values[0], JS_UNDEFINED, 1, &value); tf_util_report_error(task->_context, result); JS_FreeValue(task->_context, it->values[0]); JS_FreeValue(task->_context, it->values[1]); JS_FreeValue(task->_context, result); _tf_task_free_promise(task, promise); } else { printf("Didn't find promise %d to resolve.\n", promise); abort(); } } void tf_task_reject_promise(tf_task_t* task, promiseid_t promise, JSValue value) { promise_t* it = _tf_task_find_promise(task, promise); if (it) { JSValue arg = value; bool free_arg = false; if (JS_IsException(value)) { arg = JS_GetException(task->_context); free_arg = true; } JSValue result = JS_Call(task->_context, it->values[1], JS_UNDEFINED, 1, &arg); tf_util_report_error(task->_context, result); if (free_arg) { JS_FreeValue(task->_context, arg); } JS_FreeValue(task->_context, it->values[0]); JS_FreeValue(task->_context, it->values[1]); JS_FreeValue(task->_context, result); _tf_task_free_promise(task, promise); } else { printf("Didn't find promise %d to reject.\n", promise); abort(); } } taskid_t tf_task_allocate_task_id(tf_task_t* task, tf_taskstub_t* stub) { taskid_t id = 0; do { id = task->_nextTask++; } while (id == k_task_parent_id || _tf_task_get_stub(task, id)); task_child_node_t* node = tf_malloc(sizeof(task_child_node_t)); *node = (task_child_node_t) { .id = id, .stub = stub, .next = task->_children, }; task->_children = node; task->_child_count++; return id; } void tf_task_remove_child(tf_task_t* task, tf_taskstub_t* child) { _import_record_release_for_task(task, tf_taskstub_get_id(child)); _export_record_release_for_task(task, tf_taskstub_get_id(child)); for (task_child_node_t** it = &task->_children; *it; it = &(*it)->next) { if ((*it)->stub == child) { task_child_node_t* node = *it; *it = node->next; tf_free(node); task->_child_count--; break; } } } static void _import_finalizer(JSRuntime* runtime, JSValue value) { import_record_t* import = JS_GetOpaque(value, _import_class_id); if (import) { import->_function = JS_UNDEFINED; import_record_t** it = _tf_task_find_import(import->_owner, import->_task, import->_export); _import_record_release(it); } } static void _import_mark_func(JSRuntime* runtime, JSValueConst value, JS_MarkFunc mark_func) { import_record_t* import = JS_GetOpaque(value, _import_class_id); if (import && import->_useCount > 0) { JS_MarkValue(runtime, import->_function, mark_func); } } static void _tf_task_promise_rejection_tracker(JSContext* context, JSValueConst promise, JSValueConst reason, JS_BOOL is_handled, void* user_data) { if (!is_handled) { tf_util_report_error(context, reason); } } static void _tf_task_gc_timer(uv_timer_t* timer) { tf_task_t* task = timer->data; JS_RunGC(task->_runtime); } static void _tf_task_trace_timer(uv_timer_t* timer) { tf_task_t* task = timer->data; uint64_t hrtime = uv_hrtime(); uint64_t idle_time = uv_metrics_idle_time(&task->_loop); task->idle_percent = (hrtime - task->last_hrtime) ? 100.0f * (idle_time - task->last_idle_time) / (hrtime - task->last_hrtime) : 0.0f; task->last_hrtime = hrtime; task->last_idle_time = idle_time; const char* k_names[] = { "child_tasks", "imports", "exports", "promises", "idle_percent", }; int64_t values[] = { task->_child_count, task->_import_count, task->_export_count, task->_promise_count, (int64_t)task->idle_percent, }; tf_trace_counter(task->_trace, "task", sizeof(k_names) / sizeof(*k_names), k_names, values); } static bool _tf_task_run_jobs(tf_task_t* task) { if (JS_IsJobPending(task->_runtime)) { JSContext* context = NULL; int r = JS_ExecutePendingJob(task->_runtime, &context); JSValue result = JS_GetException(context); if (context) { tf_util_report_error(context, result); JS_FreeValue(context, result); } if (r < 0) { js_std_dump_error(context); } else { return r != 0; } } return 0; } static void _tf_task_run_jobs_idle(uv_idle_t* idle) { tf_task_t* task = idle->data; if (!_tf_task_run_jobs(task)) { /* No more jobs. Don't try again as actively. */ uv_idle_stop(&task->idle); uv_prepare_start(&task->prepare, _tf_task_run_jobs_prepare); } } static void _tf_task_run_jobs_prepare(uv_prepare_t* prepare) { tf_task_t* task = prepare->data; if (_tf_task_run_jobs(task)) { /* More jobs. We can run again immediately. */ uv_idle_start(&task->idle, _tf_task_run_jobs_idle); uv_prepare_stop(&task->prepare); } } JSModuleDef* _tf_task_module_loader(JSContext* context, const char* module_name, void* opaque) { tf_task_t* task = opaque; JSValue source_value = JS_GetPropertyStr(context, task->_loadedFiles, module_name); char* source = NULL; size_t length = 0; if (!JS_IsUndefined(source_value)) { uint8_t* array = tf_util_try_get_array_buffer(context, &length, source_value); if (array) { source = tf_malloc(length + 1); memcpy(source, array, length); source[length] = '\0'; } JS_FreeValue(context, source_value); } if (!source && task->_trusted) { source = (char*)_task_loadFile(module_name); length = source ? strlen(source) : 0; } if (!source) { JS_ThrowReferenceError(context, "Could not load '%s'.", module_name); return NULL; } JSValue result = JS_Eval(context, source, length, module_name, JS_EVAL_TYPE_MODULE | JS_EVAL_FLAG_COMPILE_ONLY); tf_free(source); if (tf_util_report_error(task->_context, result)) { return NULL; } JSModuleDef* module = JS_VALUE_GET_PTR(result); JS_FreeValue(context, result); return module; } tf_task_t* tf_task_create() { tf_task_t* task = tf_malloc(sizeof(tf_task_t)); *task = (tf_task_t) { 0 }; ++_count; JSMallocFunctions funcs = { 0 }; tf_get_js_malloc_functions(&funcs); task->_runtime = JS_NewRuntime2(&funcs, NULL); task->_context = JS_NewContext(task->_runtime); JS_SetContextOpaque(task->_context, task); JS_SetHostPromiseRejectionTracker(task->_runtime, _tf_task_promise_rejection_tracker, task); JS_SetModuleLoaderFunc(task->_runtime, NULL, _tf_task_module_loader, task); JS_NewClassID(&_import_class_id); JSClassDef def = { .class_name = "imported_function", .finalizer = _import_finalizer, .gc_mark = _import_mark_func, .call = _import_call, }; JS_NewClass(task->_runtime, _import_class_id, &def); task->_loadedFiles = JS_NewObject(task->_context); uv_loop_init(&task->_loop); uv_loop_configure(&task->_loop, UV_METRICS_IDLE_TIME); task->_loop.data = task; task->trace_timer.data = task; uv_timer_init(&task->_loop, &task->trace_timer); uv_timer_start(&task->trace_timer, _tf_task_trace_timer, 100, 100); uv_unref((uv_handle_t*)&task->trace_timer); task->gc_timer.data = task; uv_timer_init(&task->_loop, &task->gc_timer); uv_timer_start(&task->gc_timer, _tf_task_gc_timer, 10000, 10000); uv_unref((uv_handle_t*)&task->gc_timer); task->idle.data = task; uv_idle_init(&task->_loop, &task->idle); uv_unref((uv_handle_t*)&task->idle); task->prepare.data = task; uv_prepare_init(&task->_loop, &task->prepare); uv_unref((uv_handle_t*)&task->prepare); uv_idle_start(&task->idle, _tf_task_run_jobs_idle); return task; } void tf_task_configure_from_stdin(tf_task_t* task) { task->_parent = tf_taskstub_create_parent(task, STDIN_FILENO); } static void _tf_task_trace_to_parent(tf_trace_t* trace, const char* buffer, size_t size, void* user_data) { tf_task_t* task = user_data; tf_packetstream_send(tf_taskstub_get_stream(task->_parent), kTaskTrace, buffer, size); } void tf_task_activate(tf_task_t* task) { assert(!task->_activated); task->_activated = true; JSContext* context = task->_context; JSValue global = JS_GetGlobalObject(context); JSValue e = JS_NewObject(context); JS_SetPropertyStr(context, global, "exports", e); JSAtom atom = JS_NewAtom(context, "parent"); JS_DefinePropertyGetSet(context, global, atom, JS_NewCFunction(context, _tf_task_get_parent, "parent", 0), JS_UNDEFINED, 0); JS_FreeAtom(context, atom); JSValue tildefriends = JS_NewObject(context); JS_SetPropertyStr(context, tildefriends, "ssb_port", JS_NewInt32(context, task->_ssb_port)); JS_SetPropertyStr(context, tildefriends, "http_port", JS_NewInt32(context, task->_http_port)); JS_SetPropertyStr(context, tildefriends, "https_port", JS_NewInt32(context, task->_https_port)); JSValue args = JS_NewObject(context); JS_SetPropertyStr(context, tildefriends, "args", args); if (task->_args) { char* saveptr = NULL; char* copy = tf_strdup(task->_args); char* start = copy; while (true) { char* assignment = strtok_r(start, ",", &saveptr); start = NULL; if (!assignment) { break; } char* equals = strchr(assignment, '='); if (equals) { *equals = '\0'; JS_SetPropertyStr(context, args, assignment, JS_NewString(context, equals + 1)); } else { printf("Assignment missing '=': %s.\n", assignment); exit(1); } } tf_free(copy); } JS_SetPropertyStr(context, global, "tildefriends", tildefriends); task->_trace = tf_trace_create(); if (task->_trusted) { sqlite3_open(*task->_db_path ? task->_db_path : "db.sqlite", &task->_db); JS_SetPropertyStr(context, global, "Task", tf_taskstub_register(context)); JS_SetPropertyStr(context, global, "Socket", tf_socket_register(context)); JS_SetPropertyStr(context, global, "TlsContext", tf_tls_context_register(context)); tf_file_register(context); task->_ssb = tf_ssb_create(&task->_loop, task->_context, task->_db, *task->_secrets_path ? task->_secrets_path : NULL); tf_ssb_set_trace(task->_ssb, task->_trace); tf_ssb_register(context, task->_ssb); if (task->_ssb_port) { tf_ssb_broadcast_listener_start(task->_ssb, false); tf_ssb_server_open(task->_ssb, task->_ssb_port); } JS_SetPropertyStr(context, global, "trace", JS_NewCFunction(context, _tf_task_trace, "trace", 1)); JS_SetPropertyStr(context, global, "getStats", JS_NewCFunction(context, _tf_task_getStats, "getStats", 0)); } else { tf_trace_set_write_callback(task->_trace, _tf_task_trace_to_parent, task); } tf_bcrypt_register(context); tf_util_register(context); JS_SetPropertyStr(context, global, "exit", JS_NewCFunction(context, _tf_task_exit, "exit", 1)); JS_SetPropertyStr(context, global, "version", JS_NewCFunction(context, _tf_task_version, "version", 0)); JS_SetPropertyStr(context, global, "getFile", JS_NewCFunction(context, _tf_task_getFile, "getFile", 1)); JS_SetPropertyStr(context, global, "promiseTest", JS_NewCFunction(context, _tf_task_promiseTest, "promiseTest", 1)); JS_FreeValue(context, global); } void tf_task_run(tf_task_t* task) { do { uv_run(&task->_loop, UV_RUN_DEFAULT); } while (_tf_task_run_jobs(task)); } void tf_task_set_trusted(tf_task_t* task, bool trusted) { task->_trusted = trusted; } JSContext* tf_task_get_context(tf_task_t* task) { return task->_context; } tf_ssb_t* tf_task_get_ssb(tf_task_t* task) { return task->_ssb; } static void _tf_task_on_handle_close(uv_handle_t* handle) { handle->data = NULL; } void tf_task_destroy(tf_task_t* task) { for (int i = 0; i < task->_import_count; i++) { JS_FreeValue(task->_context, task->_imports[i]->_function); tf_free(task->_imports[i]); } tf_free(task->_imports); task->_imports = NULL; task->_import_count = 0; for (int i = 0; i < task->_export_count; i++) { JS_FreeValue(task->_context, task->_exports[i]->_function); tf_free(task->_exports[i]); } tf_free(task->_exports); task->_exports = NULL; task->_export_count = 0; while (task->_children) { task_child_node_t* node = task->_children; tf_taskstub_destroy(node->stub); task->_children = node->next; tf_free(node); } if (task->_parent) { JS_FreeValue(task->_context, tf_taskstub_get_task_object(task->_parent)); } while (task->_promise_count) { tf_task_reject_promise(task, task->_promises[task->_promise_count - 1].id, JS_NULL); } tf_free(task->_promises); task->_promises = NULL; JS_FreeValue(task->_context, task->_loadedFiles); if (task->_ssb) { tf_ssb_destroy(task->_ssb); } JS_FreeContext(task->_context); JS_FreeRuntime(task->_runtime); if (task->_db) { sqlite3_close(task->_db); } if (task->trace_timer.data && !uv_is_closing((uv_handle_t*)&task->trace_timer)) { uv_close((uv_handle_t*)&task->trace_timer, _tf_task_on_handle_close); } if (task->gc_timer.data && !uv_is_closing((uv_handle_t*)&task->gc_timer)) { uv_close((uv_handle_t*)&task->gc_timer, _tf_task_on_handle_close); } uv_close((uv_handle_t*)&task->idle, _tf_task_on_handle_close); uv_close((uv_handle_t*)&task->prepare, _tf_task_on_handle_close); while (task->trace_timer.data || task->gc_timer.data || task->idle.data || task->prepare.data) { uv_run(&task->_loop, UV_RUN_ONCE); } if (uv_loop_close(&task->_loop) != 0) { uv_print_all_handles(&task->_loop, stdout); } if (task->_trace) { tf_trace_destroy(task->_trace); } --_count; tf_free((void*)task->_path); tf_free(task); } JSValue tf_task_add_import(tf_task_t* task, taskid_t stub_id, exportid_t export_id) { import_record_t** it = _tf_task_find_import(task, stub_id, export_id); if (it) { (*it)->_useCount++; return JS_DupValue(task->_context, (*it)->_function); } JSValue function = JS_NewObjectClass(task->_context, _import_class_id); import_record_t* import = tf_malloc(sizeof(import_record_t)); JS_SetOpaque(function, import); *import = (import_record_t) { ._function = function, ._export = export_id, ._owner = task, ._task = stub_id, ._useCount = 1, }; int index = tf_util_insert_index(import, task->_imports, task->_import_count, sizeof(import_record_t*), _import_compare); task->_imports = tf_resize_vec(task->_imports, sizeof(import_record_t*) * (task->_import_count + 1)); if (task->_import_count - index) { memmove(task->_imports + index + 1, task->_imports + index, sizeof(import_record_t*) * (task->_import_count - index)); } task->_imports[index] = import; task->_import_count++; return JS_DupValue(task->_context, function); } void tf_task_print(tf_task_t* task, int argc, JSValueConst* argv) { if (task->_parent) { JSValue array = JS_NewArray(task->_context); for (int i = 0; i < argc; i++) { JS_SetPropertyUint32(task->_context, array, i, JS_DupValue(task->_context, argv[i])); } void* buffer; size_t size; tf_serialize_store(task, task->_parent, &buffer, &size, array); tf_packetstream_send(tf_taskstub_get_stream(task->_parent), kPrint, buffer, size); tf_free(buffer); JS_FreeValue(task->_context, array); } } tf_task_t* tf_task_get(JSContext* context) { return JS_GetContextOpaque(context); } void tf_task_set_ssb_port(tf_task_t* task, int port) { task->_ssb_port = port; } void tf_task_set_http_port(tf_task_t* task, int port) { task->_http_port = port; } void tf_task_set_https_port(tf_task_t* task, int port) { task->_https_port = port; } void tf_task_set_db_path(tf_task_t* task, const char* db_path) { snprintf(task->_db_path, sizeof(task->_db_path), "%s", db_path); } void tf_task_set_secrets_path(tf_task_t* task, const char* secrets_path) { snprintf(task->_secrets_path, sizeof(task->_secrets_path), "%s", secrets_path); } void tf_task_set_args(tf_task_t* task, const char* args) { task->_args = args; } const char* tf_task_get_name(tf_task_t* task) { return task->_scriptName; }