Needs more work, but several experiments that make things more responsive under load.

git-svn-id: https://www.unprompted.com/svn/projects/tildefriends/trunk@3783 ed5197a5-7fde-0310-b194-c3ffbd925b24
This commit is contained in:
Cory McWilliams 2022-01-22 20:13:14 +00:00
parent 0f03701043
commit b2a552b3e0
9 changed files with 128 additions and 100 deletions

View File

@ -2,6 +2,7 @@ PROJECT = tildefriends
BUILD_DIR ?= out
BUILD_DIR_DBG := $(BUILD_DIR)/debug
BUILD_DIR_REL := $(BUILD_DIR)/release
UNAME_M := $(shell uname -s)
CFLAGS += \
-Wall \
@ -13,10 +14,14 @@ CFLAGS += \
-fdata-sections
LDFLAGS += -Wl,-gc-sections
debug: CFLAGS += -Og -g -fsanitize=address -fsanitize=undefined
debug: LDFLAGS += -fsanitize=address -fsanitize=undefined
debug: CFLAGS += -Og -g
release: CFLAGS += -DNDEBUG -O3
ifeq ($(UNAME_M),x64_64)
debug: CFLAGS += -fsanitize=address -fsanitize=undefined
debug: LDFLAGS += -fsanitize=address -fsanitize=undefined
endif
APP_SOURCES = $(wildcard src/*.c)
APP_OBJS_DBG = $(patsubst %.c,$(BUILD_DIR_DBG)/%.o,$(APP_SOURCES))
APP_OBJS_REL = $(patsubst %.c,$(BUILD_DIR_REL)/%.o,$(APP_SOURCES))

View File

@ -202,9 +202,12 @@ ssb.addRpc(['tunnel', 'isRoom'], function(request) {
function ebtReplicateSendClock(request, have) {
var me = ssb.whoami();
var message = {};
var last_sent = request.connection.sent_clock || {};
var ids = followingDeep(g_database, [me], 2).concat([request.connection.id]);
for (let id of ids) {
message[id] = get_latest_sequence_for_author(id);
if (!last_sent) {
for (let id of ids) {
message[id] = get_latest_sequence_for_author(id);
}
}
for (let id of Object.keys(have)) {
if (message[id] === undefined) {
@ -213,7 +216,6 @@ function ebtReplicateSendClock(request, have) {
}
}
var last_sent = request.connection.sent_clock || {};
var to_send = {}
for (let id of ids) {
if (last_sent[id] === undefined || message[id] > last_sent[id]) {

View File

@ -204,7 +204,6 @@ void _socket_reportError(socket_t* socket, const char* error)
printf("Socket error.\n");
js_std_dump_error(tf_task_get_context(socket->_task));
}
tf_task_run_jobs(socket->_task);
JS_FreeValue(tf_task_get_context(socket->_task), exception);
JS_FreeValue(tf_task_get_context(socket->_task), result);
}
@ -500,7 +499,6 @@ void _socket_onNewConnection(uv_stream_t* server, int status)
printf("Socket error on connection.\n");
js_std_dump_error(tf_task_get_context(socket->_task));
}
tf_task_run_jobs(socket->_task);
}
}
@ -640,7 +638,6 @@ void _socket_onRead(uv_stream_t* stream, ssize_t readSize, const uv_buf_t* buffe
js_std_dump_error(tf_task_get_context(socket->_task));
}
JS_FreeValue(tf_task_get_context(socket->_task), result);
tf_task_run_jobs(socket->_task);
}
_socket_close_internal(socket);
}
@ -701,7 +698,6 @@ void _socket_onRead(uv_stream_t* stream, ssize_t readSize, const uv_buf_t* buffe
js_std_dump_error(tf_task_get_context(socket->_task));
}
JS_FreeValue(tf_task_get_context(socket->_task), result);
tf_task_run_jobs(socket->_task);
}
break;
}
@ -751,7 +747,6 @@ void _socket_notifyDataRead(socket_t* socket, const char* data, size_t length)
}
JS_FreeValue(tf_task_get_context(socket->_task), typedArray);
JS_FreeValue(tf_task_get_context(socket->_task), result);
tf_task_run_jobs(socket->_task);
}
}
}

View File

@ -174,6 +174,7 @@ typedef struct _tf_ssb_connection_t {
tf_ssb_t* ssb;
uv_tcp_t tcp;
uv_connect_t connect;
uv_async_t async;
JSValue object;
@ -1200,14 +1201,6 @@ void tf_ssb_append_message(tf_ssb_t* ssb, JSValue message)
void tf_ssb_connection_destroy(tf_ssb_connection_t* connection)
{
free(connection);
}
static void _tf_ssb_connection_on_close(uv_handle_t* handle)
{
tf_ssb_connection_t* connection = handle->data;
handle->data = NULL;
tf_ssb_t* ssb = connection->ssb;
for (tf_ssb_connection_t** it = &connection->ssb->connections; *it; it = &(*it)->next)
{
@ -1216,6 +1209,7 @@ static void _tf_ssb_connection_on_close(uv_handle_t* handle)
*it = connection->next;
connection->next = NULL;
ssb->connections_count--;
_tf_ssb_notify_connections_changed(ssb, k_tf_ssb_change_remove, connection);
break;
}
}
@ -1223,8 +1217,38 @@ static void _tf_ssb_connection_on_close(uv_handle_t* handle)
{
_tf_ssb_connection_remove_request(connection, connection->requests->request_number);
}
_tf_ssb_notify_connections_changed(ssb, k_tf_ssb_change_remove, connection);
JS_FreeValue(ssb->context, connection->object);
if (!JS_IsUndefined(connection->object))
{
JS_FreeValue(ssb->context, connection->object);
connection->object = JS_UNDEFINED;
}
if (connection->async.data && !uv_is_closing((uv_handle_t*)&connection->async))
{
uv_close((uv_handle_t*)&connection->async, _tf_ssb_connection_on_close);
}
if (connection->tcp.data && !uv_is_closing((uv_handle_t*)&connection->tcp))
{
uv_close((uv_handle_t*)&connection->tcp, _tf_ssb_connection_on_close);
}
if (connection->connect.data && !uv_is_closing((uv_handle_t*)&connection->connect))
{
uv_close((uv_handle_t*)&connection->connect, _tf_ssb_connection_on_close);
}
if (JS_IsUndefined(connection->object) &&
!connection->async.data &&
!connection->tcp.data &&
!connection->connect.data)
{
free(connection);
}
}
static void _tf_ssb_connection_on_close(uv_handle_t* handle)
{
tf_ssb_connection_t* connection = handle->data;
handle->data = NULL;
tf_ssb_connection_destroy(connection);
}
static void _tf_ssb_connection_on_tcp_recv(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf)
@ -1268,9 +1292,7 @@ static void _tf_ssb_connection_on_tcp_recv(uv_stream_t* stream, ssize_t nread, c
}
break;
case k_tf_ssb_state_verified:
while (_tf_ssb_connection_box_stream_recv(connection))
{
}
uv_async_send(&connection->async);
break;
case k_tf_ssb_state_server_wait_hello:
{
@ -1303,9 +1325,7 @@ static void _tf_ssb_connection_on_tcp_recv(uv_stream_t* stream, ssize_t nread, c
}
break;
case k_tf_ssb_state_server_verified:
while (_tf_ssb_connection_box_stream_recv(connection))
{
}
uv_async_send(&connection->async);
break;
case k_tf_ssb_state_closing:
break;
@ -1777,7 +1797,11 @@ void tf_ssb_run(tf_ssb_t* ssb)
static void _tf_ssb_connection_finalizer(JSRuntime* runtime, JSValue value)
{
tf_ssb_connection_t* connection = JS_GetOpaque(value, _connection_class_id);
tf_ssb_connection_destroy(connection);
if (connection)
{
connection->object = JS_UNDEFINED;
tf_ssb_connection_destroy(connection);
}
}
static void _tf_ssb_connection_send_json_response(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data)
@ -1818,6 +1842,15 @@ static JSValue _tf_ssb_connection_send_json(JSContext* context, JSValueConst thi
return JS_UNDEFINED;
}
static void _tf_ssb_connection_process_message_async(uv_async_t* async)
{
tf_ssb_connection_t* connection = async->data;
if (_tf_ssb_connection_box_stream_recv(connection))
{
uv_async_send(&connection->async);
}
}
tf_ssb_connection_t* tf_ssb_connection_create(tf_ssb_t* ssb, const char* host, const struct sockaddr_in* addr, const uint8_t* public_key)
{
for (tf_ssb_connection_t* connection = ssb->connections; connection; connection = connection->next)
@ -1847,6 +1880,8 @@ tf_ssb_connection_t* tf_ssb_connection_create(tf_ssb_t* ssb, const char* host, c
connection->send_request_number = 1;
snprintf(connection->host, sizeof(connection->host), "%s", host);
connection->port = ntohs(addr->sin_port);
connection->async.data = connection;
uv_async_init(ssb->loop, &connection->async, _tf_ssb_connection_process_message_async);
connection->object = JS_NewObjectClass(ssb->context, _connection_class_id);
JS_SetPropertyStr(context, connection->object, "send_json", JS_NewCFunction(context, _tf_ssb_connection_send_json, "send_json", 2));
@ -1866,9 +1901,7 @@ tf_ssb_connection_t* tf_ssb_connection_create(tf_ssb_t* ssb, const char* host, c
if (result)
{
printf("uv_tcp_connect(%s): %s\n", host, uv_strerror(result));
JS_SetOpaque(connection->object, NULL);
JS_FreeValue(ssb->context, connection->object);
free(connection);
}
else
{
@ -1938,6 +1971,8 @@ static void _tf_ssb_on_connection(uv_stream_t* stream, int status)
connection->ssb = ssb;
connection->tcp.data = connection;
connection->send_request_number = 1;
connection->async.data = connection;
uv_async_init(ssb->loop, &connection->async, _tf_ssb_connection_process_message_async);
connection->object = JS_NewObjectClass(ssb->context, _connection_class_id);
JS_SetPropertyStr(ssb->context, connection->object, "send_json", JS_NewCFunction(ssb->context, _tf_ssb_connection_send_json, "send_json", 2));
@ -1946,7 +1981,7 @@ static void _tf_ssb_on_connection(uv_stream_t* stream, int status)
if (uv_tcp_init(ssb->loop, &connection->tcp) != 0)
{
printf("uv_tcp_init failed\n");
free(connection);
JS_FreeValue(ssb->context, connection->object);
return;
}

View File

@ -173,32 +173,6 @@ static void _tf_ssb_sqlStream_callback(JSValue row, void* user_data)
JSValue response = JS_Call(info->context, info->callback, JS_UNDEFINED, 1, &row);
tf_util_report_error(info->context, response);
JS_FreeValue(info->context, response);
if (tf_task_get(info->context))
{
tf_task_run_jobs(tf_task_get(info->context));
}
else
{
JSRuntime* runtime = JS_GetRuntime(info->context);
while (JS_IsJobPending(runtime))
{
JSContext* context = NULL;
int r = JS_ExecutePendingJob(runtime, &context);
if (context)
{
JSValue result = JS_GetException(context);
tf_util_report_error(context, result);
}
if (r < 0)
{
js_std_dump_error(context);
}
else if (r == 0)
{
break;
}
}
}
}
static JSValue _tf_ssb_sqlStream(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv)
@ -257,10 +231,6 @@ static JSValue _tf_ssb_storeMessage(JSContext* context, JSValueConst this_val, i
{
tf_ssb_notify_message_added(ssb, id);
}
else
{
printf("failed to store message\n");
}
}
else
{

View File

@ -99,6 +99,9 @@ typedef struct _tf_task_t
uint64_t last_hrtime;
uint64_t last_idle_time;
uv_idle_t idle;
uv_prepare_t prepare;
export_record_t** _exports;
int _export_count;
exportid_t _nextExport;
@ -349,7 +352,6 @@ int tf_task_execute(tf_task_t* task, const char* fileName)
executed = true;
}
JS_FreeValue(task->_context, result);
tf_task_run_jobs(task);
free((void*)source);
}
else
@ -469,7 +471,6 @@ JSValue _task_invokeExport_internal(tf_taskstub_t* from, tf_task_t* to, exportid
result = JS_Call(to->_context, function, this_val, length - 1, argument_array);
tf_trace_end(to->_trace);
tf_util_report_error(to->_context, result);
tf_task_run_jobs(to);
JS_FreeValue(to->_context, this_val);
for (int i = 0; i < length - 1; i++)
@ -535,8 +536,6 @@ static void _forward_promise(tf_task_t* from, tf_taskstub_t* to, promiseid_t pro
JS_FreeValue(from->_context, promise_catch);
JS_FreeValue(from->_context, then_handler);
JS_FreeValue(from->_context, catch_handler);
tf_task_run_jobs(from);
}
static void _tf_task_sendPromiseResolve(tf_task_t* from, tf_taskstub_t* to, promiseid_t promise, JSValue result)
@ -1000,7 +999,6 @@ JSValue _tf_task_require(JSContext* context, JSValueConst this_val, int argc, JS
JS_SetPropertyStr(task->_context, global, "exports", JS_DupValue(task->_context, exports));
JSValue eval = JS_Eval(task->_context, source, strlen(source), path, 0);
tf_util_report_error(task->_context, eval);
tf_task_run_jobs(task);
if (JS_IsError(task->_context, eval) ||
JS_IsException(eval))
{
@ -1040,10 +1038,6 @@ static JSValue _tf_task_executeSource(tf_task_t* task, const char* source, const
{
snprintf(task->_scriptName, sizeof(task->_scriptName), "%s", name);
}
if (!JS_IsException(result))
{
tf_task_run_jobs(task);
}
tf_trace_end(task->_trace);
return result;
}
@ -1079,7 +1073,6 @@ JSValue _tf_task_sandbox_require(JSContext* context, JSValueConst this_val, int
tf_util_report_error(context, result);
JS_SetPropertyStr(context, global, "exports", oldExports);
JS_FreeValue(context, global);
tf_task_run_jobs(task);
free(source);
return exports;
}
@ -1094,7 +1087,6 @@ JSValue _tf_task_sandbox_require(JSContext* context, JSValueConst this_val, int
tf_util_report_error(context, result);
JS_SetPropertyStr(context, global, "exports", oldExports);
JS_FreeValue(context, global);
tf_task_run_jobs(task);
return exports;
}
else
@ -1181,7 +1173,6 @@ void tf_task_resolve_promise(tf_task_t* task, promiseid_t promise, JSValue value
JS_FreeValue(task->_context, it->values[2]);
JS_FreeValue(task->_context, result);
_tf_task_free_promise(task, promise);
tf_task_run_jobs(task);
}
else
{
@ -1212,7 +1203,6 @@ void tf_task_reject_promise(tf_task_t* task, promiseid_t promise, JSValue value)
JS_FreeValue(task->_context, it->values[2]);
JS_FreeValue(task->_context, result);
_tf_task_free_promise(task, promise);
tf_task_run_jobs(task);
}
}
@ -1314,6 +1304,54 @@ static void _tf_task_trace_timer(uv_timer_t* timer)
tf_trace_counter(task->_trace, "task", sizeof(k_names) / sizeof(*k_names), k_names, values);
}
static void _tf_task_run_jobs_idle(uv_idle_t* idle);
static void _tf_task_run_jobs_prepare(uv_prepare_t* prepare);
static bool _tf_task_run_jobs(tf_task_t* task)
{
if (JS_IsJobPending(task->_runtime))
{
JSContext* context = NULL;
int r = JS_ExecutePendingJob(task->_runtime, &context);
JSValue result = JS_GetException(context);
if (context)
{
tf_util_report_error(context, result);
}
if (r < 0)
{
js_std_dump_error(context);
}
else
{
return r != 0;
}
}
return 0;
}
static void _tf_task_run_jobs_idle(uv_idle_t* idle)
{
tf_task_t* task = idle->data;
if (!_tf_task_run_jobs(task))
{
/* No more jobs. Don't try again as actively. */
uv_idle_stop(&task->idle);
uv_prepare_start(&task->prepare, _tf_task_run_jobs_prepare);
}
}
static void _tf_task_run_jobs_prepare(uv_prepare_t* prepare)
{
tf_task_t* task = prepare->data;
if (_tf_task_run_jobs(task))
{
/* More jobs. We can run again immediately. */
uv_idle_start(&task->idle, _tf_task_run_jobs_idle);
uv_prepare_stop(&task->prepare);
}
}
tf_task_t* tf_task_create()
{
tf_task_t* task = malloc(sizeof(tf_task_t));
@ -1342,6 +1380,11 @@ tf_task_t* tf_task_create()
uv_timer_init(&task->_loop, &task->trace_timer);
uv_timer_start(&task->trace_timer, _tf_task_trace_timer, 100, 100);
uv_unref((uv_handle_t*)&task->trace_timer);
task->idle.data = task;
uv_idle_init(&task->_loop, &task->idle);
task->prepare.data = task;
uv_prepare_init(&task->_loop, &task->prepare);
uv_idle_start(&task->idle, _tf_task_run_jobs_idle);
return task;
}
@ -1531,8 +1574,12 @@ void tf_task_destroy(tf_task_t* task)
{
uv_close((uv_handle_t*)&task->trace_timer, _tf_task_on_handle_close);
}
uv_close((uv_handle_t*)&task->idle, _tf_task_on_handle_close);
uv_close((uv_handle_t*)&task->prepare, _tf_task_on_handle_close);
while (task->trace_timer.data)
while (task->trace_timer.data ||
task->idle.data ||
task->prepare.data)
{
uv_run(&task->_loop, UV_RUN_ONCE);
}
@ -1587,28 +1634,6 @@ tf_task_t* tf_task_get(JSContext* context)
return JS_GetContextOpaque(context);
}
void tf_task_run_jobs(tf_task_t* task)
{
while (JS_IsJobPending(task->_runtime))
{
JSContext* context = NULL;
int r = JS_ExecutePendingJob(task->_runtime, &context);
JSValue result = JS_GetException(context);
if (context)
{
tf_util_report_error(context, result);
}
if (r < 0)
{
js_std_dump_error(context);
}
else if (r == 0)
{
break;
}
}
}
void tf_task_send_promise_message(tf_task_t* from, tf_taskstub_t* to, tf_task_message_t type, promiseid_t promise, JSValue payload)
{
_tf_task_sendPromiseMessage(from, to, type, promise, payload);

View File

@ -55,7 +55,6 @@ uv_loop_t* tf_task_get_loop(tf_task_t* task);
tf_task_t* tf_task_get(JSContext* context);
tf_trace_t* tf_task_get_trace(tf_task_t* task);
tf_ssb_t* tf_task_get_ssb(tf_task_t* task);
void tf_task_run_jobs(tf_task_t* task);
const char* tf_task_get_name(tf_task_t* task);
JSValue tf_task_allocate_promise(tf_task_t* task, promiseid_t* out_promise);

View File

@ -272,7 +272,6 @@ static void _taskstub_on_process_exit(uv_process_t* process, int64_t status, int
JSValue result = JS_Call(context, stub->_on_exit, JS_NULL, 2, argv);
tf_util_report_error(context, result);
JS_FreeValue(context, result);
tf_task_run_jobs(stub->_owner);
JS_FreeValue(context, argv[0]);
JS_FreeValue(context, argv[1]);
}
@ -398,6 +397,5 @@ void tf_taskstub_on_error(tf_taskstub_t* stub, JSValue error)
JSValue result = JS_Call(context, stub->_on_error, JS_NULL, 1, &error);
tf_util_report_error(context, result);
JS_FreeValue(context, result);
tf_task_run_jobs(stub->_owner);
}
}

View File

@ -153,7 +153,6 @@ static void _util_timeoutCallback(uv_timer_t* handle)
NULL);
tf_util_report_error(context, result);
JS_FreeValue(context, result);
tf_task_run_jobs(timeout->_task);
tf_trace_end(tf_task_get_trace(timeout->_task));
free(timeout);
uv_close((uv_handle_t*)handle, _handle_closed);