diff --git a/Makefile b/Makefile index f2f7b72e..1e9be818 100644 --- a/Makefile +++ b/Makefile @@ -2,6 +2,7 @@ PROJECT = tildefriends BUILD_DIR ?= out BUILD_DIR_DBG := $(BUILD_DIR)/debug BUILD_DIR_REL := $(BUILD_DIR)/release +UNAME_M := $(shell uname -s) CFLAGS += \ -Wall \ @@ -13,10 +14,14 @@ CFLAGS += \ -fdata-sections LDFLAGS += -Wl,-gc-sections -debug: CFLAGS += -Og -g -fsanitize=address -fsanitize=undefined -debug: LDFLAGS += -fsanitize=address -fsanitize=undefined +debug: CFLAGS += -Og -g release: CFLAGS += -DNDEBUG -O3 +ifeq ($(UNAME_M),x64_64) + debug: CFLAGS += -fsanitize=address -fsanitize=undefined + debug: LDFLAGS += -fsanitize=address -fsanitize=undefined +endif + APP_SOURCES = $(wildcard src/*.c) APP_OBJS_DBG = $(patsubst %.c,$(BUILD_DIR_DBG)/%.o,$(APP_SOURCES)) APP_OBJS_REL = $(patsubst %.c,$(BUILD_DIR_REL)/%.o,$(APP_SOURCES)) diff --git a/core/ssb.js b/core/ssb.js index 501cf1d7..b1789e58 100644 --- a/core/ssb.js +++ b/core/ssb.js @@ -202,9 +202,12 @@ ssb.addRpc(['tunnel', 'isRoom'], function(request) { function ebtReplicateSendClock(request, have) { var me = ssb.whoami(); var message = {}; + var last_sent = request.connection.sent_clock || {}; var ids = followingDeep(g_database, [me], 2).concat([request.connection.id]); - for (let id of ids) { - message[id] = get_latest_sequence_for_author(id); + if (!last_sent) { + for (let id of ids) { + message[id] = get_latest_sequence_for_author(id); + } } for (let id of Object.keys(have)) { if (message[id] === undefined) { @@ -213,7 +216,6 @@ function ebtReplicateSendClock(request, have) { } } - var last_sent = request.connection.sent_clock || {}; var to_send = {} for (let id of ids) { if (last_sent[id] === undefined || message[id] > last_sent[id]) { diff --git a/src/socket.js.c b/src/socket.js.c index af1bd87a..5c089483 100644 --- a/src/socket.js.c +++ b/src/socket.js.c @@ -204,7 +204,6 @@ void _socket_reportError(socket_t* socket, const char* error) printf("Socket error.\n"); js_std_dump_error(tf_task_get_context(socket->_task)); } - tf_task_run_jobs(socket->_task); JS_FreeValue(tf_task_get_context(socket->_task), exception); JS_FreeValue(tf_task_get_context(socket->_task), result); } @@ -500,7 +499,6 @@ void _socket_onNewConnection(uv_stream_t* server, int status) printf("Socket error on connection.\n"); js_std_dump_error(tf_task_get_context(socket->_task)); } - tf_task_run_jobs(socket->_task); } } @@ -640,7 +638,6 @@ void _socket_onRead(uv_stream_t* stream, ssize_t readSize, const uv_buf_t* buffe js_std_dump_error(tf_task_get_context(socket->_task)); } JS_FreeValue(tf_task_get_context(socket->_task), result); - tf_task_run_jobs(socket->_task); } _socket_close_internal(socket); } @@ -701,7 +698,6 @@ void _socket_onRead(uv_stream_t* stream, ssize_t readSize, const uv_buf_t* buffe js_std_dump_error(tf_task_get_context(socket->_task)); } JS_FreeValue(tf_task_get_context(socket->_task), result); - tf_task_run_jobs(socket->_task); } break; } @@ -751,7 +747,6 @@ void _socket_notifyDataRead(socket_t* socket, const char* data, size_t length) } JS_FreeValue(tf_task_get_context(socket->_task), typedArray); JS_FreeValue(tf_task_get_context(socket->_task), result); - tf_task_run_jobs(socket->_task); } } } diff --git a/src/ssb.c b/src/ssb.c index 0d56e2e5..995e5bee 100644 --- a/src/ssb.c +++ b/src/ssb.c @@ -174,6 +174,7 @@ typedef struct _tf_ssb_connection_t { tf_ssb_t* ssb; uv_tcp_t tcp; uv_connect_t connect; + uv_async_t async; JSValue object; @@ -1200,14 +1201,6 @@ void tf_ssb_append_message(tf_ssb_t* ssb, JSValue message) void tf_ssb_connection_destroy(tf_ssb_connection_t* connection) { - free(connection); -} - -static void _tf_ssb_connection_on_close(uv_handle_t* handle) -{ - tf_ssb_connection_t* connection = handle->data; - handle->data = NULL; - tf_ssb_t* ssb = connection->ssb; for (tf_ssb_connection_t** it = &connection->ssb->connections; *it; it = &(*it)->next) { @@ -1216,6 +1209,7 @@ static void _tf_ssb_connection_on_close(uv_handle_t* handle) *it = connection->next; connection->next = NULL; ssb->connections_count--; + _tf_ssb_notify_connections_changed(ssb, k_tf_ssb_change_remove, connection); break; } } @@ -1223,8 +1217,38 @@ static void _tf_ssb_connection_on_close(uv_handle_t* handle) { _tf_ssb_connection_remove_request(connection, connection->requests->request_number); } - _tf_ssb_notify_connections_changed(ssb, k_tf_ssb_change_remove, connection); - JS_FreeValue(ssb->context, connection->object); + if (!JS_IsUndefined(connection->object)) + { + JS_FreeValue(ssb->context, connection->object); + connection->object = JS_UNDEFINED; + } + if (connection->async.data && !uv_is_closing((uv_handle_t*)&connection->async)) + { + uv_close((uv_handle_t*)&connection->async, _tf_ssb_connection_on_close); + } + if (connection->tcp.data && !uv_is_closing((uv_handle_t*)&connection->tcp)) + { + uv_close((uv_handle_t*)&connection->tcp, _tf_ssb_connection_on_close); + } + if (connection->connect.data && !uv_is_closing((uv_handle_t*)&connection->connect)) + { + uv_close((uv_handle_t*)&connection->connect, _tf_ssb_connection_on_close); + } + + if (JS_IsUndefined(connection->object) && + !connection->async.data && + !connection->tcp.data && + !connection->connect.data) + { + free(connection); + } +} + +static void _tf_ssb_connection_on_close(uv_handle_t* handle) +{ + tf_ssb_connection_t* connection = handle->data; + handle->data = NULL; + tf_ssb_connection_destroy(connection); } static void _tf_ssb_connection_on_tcp_recv(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) @@ -1268,9 +1292,7 @@ static void _tf_ssb_connection_on_tcp_recv(uv_stream_t* stream, ssize_t nread, c } break; case k_tf_ssb_state_verified: - while (_tf_ssb_connection_box_stream_recv(connection)) - { - } + uv_async_send(&connection->async); break; case k_tf_ssb_state_server_wait_hello: { @@ -1303,9 +1325,7 @@ static void _tf_ssb_connection_on_tcp_recv(uv_stream_t* stream, ssize_t nread, c } break; case k_tf_ssb_state_server_verified: - while (_tf_ssb_connection_box_stream_recv(connection)) - { - } + uv_async_send(&connection->async); break; case k_tf_ssb_state_closing: break; @@ -1777,7 +1797,11 @@ void tf_ssb_run(tf_ssb_t* ssb) static void _tf_ssb_connection_finalizer(JSRuntime* runtime, JSValue value) { tf_ssb_connection_t* connection = JS_GetOpaque(value, _connection_class_id); - tf_ssb_connection_destroy(connection); + if (connection) + { + connection->object = JS_UNDEFINED; + tf_ssb_connection_destroy(connection); + } } static void _tf_ssb_connection_send_json_response(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) @@ -1818,6 +1842,15 @@ static JSValue _tf_ssb_connection_send_json(JSContext* context, JSValueConst thi return JS_UNDEFINED; } +static void _tf_ssb_connection_process_message_async(uv_async_t* async) +{ + tf_ssb_connection_t* connection = async->data; + if (_tf_ssb_connection_box_stream_recv(connection)) + { + uv_async_send(&connection->async); + } +} + tf_ssb_connection_t* tf_ssb_connection_create(tf_ssb_t* ssb, const char* host, const struct sockaddr_in* addr, const uint8_t* public_key) { for (tf_ssb_connection_t* connection = ssb->connections; connection; connection = connection->next) @@ -1847,6 +1880,8 @@ tf_ssb_connection_t* tf_ssb_connection_create(tf_ssb_t* ssb, const char* host, c connection->send_request_number = 1; snprintf(connection->host, sizeof(connection->host), "%s", host); connection->port = ntohs(addr->sin_port); + connection->async.data = connection; + uv_async_init(ssb->loop, &connection->async, _tf_ssb_connection_process_message_async); connection->object = JS_NewObjectClass(ssb->context, _connection_class_id); JS_SetPropertyStr(context, connection->object, "send_json", JS_NewCFunction(context, _tf_ssb_connection_send_json, "send_json", 2)); @@ -1866,9 +1901,7 @@ tf_ssb_connection_t* tf_ssb_connection_create(tf_ssb_t* ssb, const char* host, c if (result) { printf("uv_tcp_connect(%s): %s\n", host, uv_strerror(result)); - JS_SetOpaque(connection->object, NULL); JS_FreeValue(ssb->context, connection->object); - free(connection); } else { @@ -1938,6 +1971,8 @@ static void _tf_ssb_on_connection(uv_stream_t* stream, int status) connection->ssb = ssb; connection->tcp.data = connection; connection->send_request_number = 1; + connection->async.data = connection; + uv_async_init(ssb->loop, &connection->async, _tf_ssb_connection_process_message_async); connection->object = JS_NewObjectClass(ssb->context, _connection_class_id); JS_SetPropertyStr(ssb->context, connection->object, "send_json", JS_NewCFunction(ssb->context, _tf_ssb_connection_send_json, "send_json", 2)); @@ -1946,7 +1981,7 @@ static void _tf_ssb_on_connection(uv_stream_t* stream, int status) if (uv_tcp_init(ssb->loop, &connection->tcp) != 0) { printf("uv_tcp_init failed\n"); - free(connection); + JS_FreeValue(ssb->context, connection->object); return; } diff --git a/src/ssb.js.c b/src/ssb.js.c index 11983739..5d0250d8 100644 --- a/src/ssb.js.c +++ b/src/ssb.js.c @@ -173,32 +173,6 @@ static void _tf_ssb_sqlStream_callback(JSValue row, void* user_data) JSValue response = JS_Call(info->context, info->callback, JS_UNDEFINED, 1, &row); tf_util_report_error(info->context, response); JS_FreeValue(info->context, response); - if (tf_task_get(info->context)) - { - tf_task_run_jobs(tf_task_get(info->context)); - } - else - { - JSRuntime* runtime = JS_GetRuntime(info->context); - while (JS_IsJobPending(runtime)) - { - JSContext* context = NULL; - int r = JS_ExecutePendingJob(runtime, &context); - if (context) - { - JSValue result = JS_GetException(context); - tf_util_report_error(context, result); - } - if (r < 0) - { - js_std_dump_error(context); - } - else if (r == 0) - { - break; - } - } - } } static JSValue _tf_ssb_sqlStream(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv) @@ -257,10 +231,6 @@ static JSValue _tf_ssb_storeMessage(JSContext* context, JSValueConst this_val, i { tf_ssb_notify_message_added(ssb, id); } - else - { - printf("failed to store message\n"); - } } else { diff --git a/src/task.c b/src/task.c index 83ad0954..6b59efd5 100644 --- a/src/task.c +++ b/src/task.c @@ -99,6 +99,9 @@ typedef struct _tf_task_t uint64_t last_hrtime; uint64_t last_idle_time; + uv_idle_t idle; + uv_prepare_t prepare; + export_record_t** _exports; int _export_count; exportid_t _nextExport; @@ -349,7 +352,6 @@ int tf_task_execute(tf_task_t* task, const char* fileName) executed = true; } JS_FreeValue(task->_context, result); - tf_task_run_jobs(task); free((void*)source); } else @@ -469,7 +471,6 @@ JSValue _task_invokeExport_internal(tf_taskstub_t* from, tf_task_t* to, exportid result = JS_Call(to->_context, function, this_val, length - 1, argument_array); tf_trace_end(to->_trace); tf_util_report_error(to->_context, result); - tf_task_run_jobs(to); JS_FreeValue(to->_context, this_val); for (int i = 0; i < length - 1; i++) @@ -535,8 +536,6 @@ static void _forward_promise(tf_task_t* from, tf_taskstub_t* to, promiseid_t pro JS_FreeValue(from->_context, promise_catch); JS_FreeValue(from->_context, then_handler); JS_FreeValue(from->_context, catch_handler); - - tf_task_run_jobs(from); } static void _tf_task_sendPromiseResolve(tf_task_t* from, tf_taskstub_t* to, promiseid_t promise, JSValue result) @@ -1000,7 +999,6 @@ JSValue _tf_task_require(JSContext* context, JSValueConst this_val, int argc, JS JS_SetPropertyStr(task->_context, global, "exports", JS_DupValue(task->_context, exports)); JSValue eval = JS_Eval(task->_context, source, strlen(source), path, 0); tf_util_report_error(task->_context, eval); - tf_task_run_jobs(task); if (JS_IsError(task->_context, eval) || JS_IsException(eval)) { @@ -1040,10 +1038,6 @@ static JSValue _tf_task_executeSource(tf_task_t* task, const char* source, const { snprintf(task->_scriptName, sizeof(task->_scriptName), "%s", name); } - if (!JS_IsException(result)) - { - tf_task_run_jobs(task); - } tf_trace_end(task->_trace); return result; } @@ -1079,7 +1073,6 @@ JSValue _tf_task_sandbox_require(JSContext* context, JSValueConst this_val, int tf_util_report_error(context, result); JS_SetPropertyStr(context, global, "exports", oldExports); JS_FreeValue(context, global); - tf_task_run_jobs(task); free(source); return exports; } @@ -1094,7 +1087,6 @@ JSValue _tf_task_sandbox_require(JSContext* context, JSValueConst this_val, int tf_util_report_error(context, result); JS_SetPropertyStr(context, global, "exports", oldExports); JS_FreeValue(context, global); - tf_task_run_jobs(task); return exports; } else @@ -1181,7 +1173,6 @@ void tf_task_resolve_promise(tf_task_t* task, promiseid_t promise, JSValue value JS_FreeValue(task->_context, it->values[2]); JS_FreeValue(task->_context, result); _tf_task_free_promise(task, promise); - tf_task_run_jobs(task); } else { @@ -1212,7 +1203,6 @@ void tf_task_reject_promise(tf_task_t* task, promiseid_t promise, JSValue value) JS_FreeValue(task->_context, it->values[2]); JS_FreeValue(task->_context, result); _tf_task_free_promise(task, promise); - tf_task_run_jobs(task); } } @@ -1314,6 +1304,54 @@ static void _tf_task_trace_timer(uv_timer_t* timer) tf_trace_counter(task->_trace, "task", sizeof(k_names) / sizeof(*k_names), k_names, values); } +static void _tf_task_run_jobs_idle(uv_idle_t* idle); +static void _tf_task_run_jobs_prepare(uv_prepare_t* prepare); + +static bool _tf_task_run_jobs(tf_task_t* task) +{ + if (JS_IsJobPending(task->_runtime)) + { + JSContext* context = NULL; + int r = JS_ExecutePendingJob(task->_runtime, &context); + JSValue result = JS_GetException(context); + if (context) + { + tf_util_report_error(context, result); + } + if (r < 0) + { + js_std_dump_error(context); + } + else + { + return r != 0; + } + } + return 0; +} + +static void _tf_task_run_jobs_idle(uv_idle_t* idle) +{ + tf_task_t* task = idle->data; + if (!_tf_task_run_jobs(task)) + { + /* No more jobs. Don't try again as actively. */ + uv_idle_stop(&task->idle); + uv_prepare_start(&task->prepare, _tf_task_run_jobs_prepare); + } +} + +static void _tf_task_run_jobs_prepare(uv_prepare_t* prepare) +{ + tf_task_t* task = prepare->data; + if (_tf_task_run_jobs(task)) + { + /* More jobs. We can run again immediately. */ + uv_idle_start(&task->idle, _tf_task_run_jobs_idle); + uv_prepare_stop(&task->prepare); + } +} + tf_task_t* tf_task_create() { tf_task_t* task = malloc(sizeof(tf_task_t)); @@ -1342,6 +1380,11 @@ tf_task_t* tf_task_create() uv_timer_init(&task->_loop, &task->trace_timer); uv_timer_start(&task->trace_timer, _tf_task_trace_timer, 100, 100); uv_unref((uv_handle_t*)&task->trace_timer); + task->idle.data = task; + uv_idle_init(&task->_loop, &task->idle); + task->prepare.data = task; + uv_prepare_init(&task->_loop, &task->prepare); + uv_idle_start(&task->idle, _tf_task_run_jobs_idle); return task; } @@ -1531,8 +1574,12 @@ void tf_task_destroy(tf_task_t* task) { uv_close((uv_handle_t*)&task->trace_timer, _tf_task_on_handle_close); } + uv_close((uv_handle_t*)&task->idle, _tf_task_on_handle_close); + uv_close((uv_handle_t*)&task->prepare, _tf_task_on_handle_close); - while (task->trace_timer.data) + while (task->trace_timer.data || + task->idle.data || + task->prepare.data) { uv_run(&task->_loop, UV_RUN_ONCE); } @@ -1587,28 +1634,6 @@ tf_task_t* tf_task_get(JSContext* context) return JS_GetContextOpaque(context); } -void tf_task_run_jobs(tf_task_t* task) -{ - while (JS_IsJobPending(task->_runtime)) - { - JSContext* context = NULL; - int r = JS_ExecutePendingJob(task->_runtime, &context); - JSValue result = JS_GetException(context); - if (context) - { - tf_util_report_error(context, result); - } - if (r < 0) - { - js_std_dump_error(context); - } - else if (r == 0) - { - break; - } - } -} - void tf_task_send_promise_message(tf_task_t* from, tf_taskstub_t* to, tf_task_message_t type, promiseid_t promise, JSValue payload) { _tf_task_sendPromiseMessage(from, to, type, promise, payload); diff --git a/src/task.h b/src/task.h index 4c03cce5..5364a85b 100644 --- a/src/task.h +++ b/src/task.h @@ -55,7 +55,6 @@ uv_loop_t* tf_task_get_loop(tf_task_t* task); tf_task_t* tf_task_get(JSContext* context); tf_trace_t* tf_task_get_trace(tf_task_t* task); tf_ssb_t* tf_task_get_ssb(tf_task_t* task); -void tf_task_run_jobs(tf_task_t* task); const char* tf_task_get_name(tf_task_t* task); JSValue tf_task_allocate_promise(tf_task_t* task, promiseid_t* out_promise); diff --git a/src/taskstub.js.c b/src/taskstub.js.c index 212a5261..6cd5e57d 100644 --- a/src/taskstub.js.c +++ b/src/taskstub.js.c @@ -272,7 +272,6 @@ static void _taskstub_on_process_exit(uv_process_t* process, int64_t status, int JSValue result = JS_Call(context, stub->_on_exit, JS_NULL, 2, argv); tf_util_report_error(context, result); JS_FreeValue(context, result); - tf_task_run_jobs(stub->_owner); JS_FreeValue(context, argv[0]); JS_FreeValue(context, argv[1]); } @@ -398,6 +397,5 @@ void tf_taskstub_on_error(tf_taskstub_t* stub, JSValue error) JSValue result = JS_Call(context, stub->_on_error, JS_NULL, 1, &error); tf_util_report_error(context, result); JS_FreeValue(context, result); - tf_task_run_jobs(stub->_owner); } } diff --git a/src/util.js.c b/src/util.js.c index 006cee24..7e246e69 100644 --- a/src/util.js.c +++ b/src/util.js.c @@ -153,7 +153,6 @@ static void _util_timeoutCallback(uv_timer_t* handle) NULL); tf_util_report_error(context, result); JS_FreeValue(context, result); - tf_task_run_jobs(timeout->_task); tf_trace_end(tf_task_get_trace(timeout->_task)); free(timeout); uv_close((uv_handle_t*)handle, _handle_closed);