#include "socket.js.h" #include "log.h" #include "mem.h" #include "task.h" #include "tls.h" #include "tlscontext.js.h" #include "util.js.h" #include "uv.h" #include typedef int promiseid_t; typedef struct _socket_t socket_t; static JSClassID _classId; static int _count; static int _open_count; static tf_tls_context_t* _defaultTlsContext; static socket_t** _sockets; static int _sockets_count; typedef enum _socket_direction_t { kUndetermined, kAccept, kConnect, } socket_direction_t; typedef struct _socket_t { tf_task_t* _task; uv_tcp_t _socket; uv_timer_t _timer; tf_tls_session_t* _tls; promiseid_t _startTlsPromise; promiseid_t _closePromise; bool _connected; bool _noDelay; bool _reading; bool _listening; int _active; char _peerName[256]; socket_direction_t _direction; JSValue _object; JSValue _onConnect; JSValue _onRead; JSValue _onError; uint64_t created_ms; uint64_t timeout_ms; } socket_t; static JSValue _socket_create(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv); static void _socket_finalizer(JSRuntime* runtime, JSValue value); static JSValue _socket_startTls(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv); static JSValue _socket_stopTls(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv); static JSValue _socket_bind(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv); static JSValue _socket_connect(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv); static JSValue _socket_listen(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv); static JSValue _socket_accept(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv); static JSValue _socket_close(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv); static JSValue _socket_shutdown(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv); static JSValue _socket_read(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv); static JSValue _socket_onError(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv); static JSValue _socket_write(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv); static JSValue _socket_isConnected(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv); static JSValue _socket_getPeerName(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv); static JSValue _socket_getPeerCertificate(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv); static JSValue _socket_getNoDelay(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv); static JSValue _socket_setNoDelay(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv); static JSValue _sockets_get(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv); static JSValue _socket_setActivityTimeout(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv); static void _socket_onClose(uv_handle_t* handle); static void _socket_onShutdown(uv_shutdown_t* request, int status); static void _socket_onResolvedForBind(uv_getaddrinfo_t* resolver, int status, struct addrinfo* result); static void _socket_onResolvedForConnect(uv_getaddrinfo_t* resolver, int status, struct addrinfo* result); static void _socket_onConnect(uv_connect_t* request, int status); static void _socket_onNewConnection(uv_stream_t* server, int status); static void _socket_allocateBuffer(uv_handle_t* handle, size_t suggestedSize, uv_buf_t* buffer); static void _socket_onRead(uv_stream_t* stream, ssize_t readSize, const uv_buf_t* buffer); static void _socket_onWrite(uv_write_t* request, int status); static void _socket_onTlsShutdown(uv_write_t* request, int status); static void _socket_resetTimeout(socket_t* socket); static void _socket_pauseTimeout(socket_t* socket); static void _socket_resumeTimeout(socket_t* socket); static void _socket_notifyDataRead(socket_t* socket, const char* data, size_t length); static int _socket_writeBytes(socket_t* socket, promiseid_t promise, int (*callback)(socket_t* socket, promiseid_t promise, const char*, size_t), JSValue value, int* outLength); static int _socket_writeInternal(socket_t* socket, promiseid_t promise, const char* data, size_t length); static void _socket_processTlsShutdown(socket_t* socket, promiseid_t promise); static void _socket_shutdownInternal(socket_t* socket, promiseid_t promise); static bool _socket_processSomeOutgoingTls(socket_t* socket, promiseid_t promise, uv_write_cb callback); static void _socket_processOutgoingTls(socket_t* socket); static void _socket_reportTlsErrors(socket_t* socket); static void _socket_reportError(socket_t* socket, const char* error); static void _socket_set_handler(socket_t* socket, JSValue* handler, JSValue new_value) { JSContext* context = tf_task_get_context(socket->_task); JSValue old_handler = *handler; if (JS_IsUndefined(old_handler) && !JS_IsUndefined(new_value)) { JS_DupValue(context, socket->_object); } *handler = JS_DupValue(context, new_value); JS_FreeValue(context, old_handler); if (!JS_IsUndefined(old_handler) && JS_IsUndefined(new_value)) { JS_FreeValue(context, socket->_object); } } static void _socket_gc_mark(JSRuntime* runtime, JSValueConst value, JS_MarkFunc mark_func) { socket_t* socket = JS_GetOpaque(value, _classId); if (socket) { JS_MarkValue(runtime, socket->_onConnect, mark_func); JS_MarkValue(runtime, socket->_onRead, mark_func); JS_MarkValue(runtime, socket->_onError, mark_func); } } JSValue tf_socket_register(JSContext* context) { JS_NewClassID(&_classId); JSClassDef def = { .class_name = "Socket", .finalizer = &_socket_finalizer, .gc_mark = _socket_gc_mark, }; if (JS_NewClass(JS_GetRuntime(context), _classId, &def) != 0) { fprintf(stderr, "Failed to register Socket.\n"); } JSValue global = JS_GetGlobalObject(context); JS_SetPropertyStr(context, global, "getSockets", JS_NewCFunction(context, _sockets_get, "getSockets", 0)); JS_FreeValue(context, global); return JS_NewCFunction2(context, _socket_create, "Socket", 0, JS_CFUNC_constructor, 0); } int tf_socket_get_count() { return _count; } int tf_socket_get_open_count() { return _open_count; } typedef struct _socket_resolve_data_t { uv_getaddrinfo_t resolver; socket_t* socket; promiseid_t promise; } socket_resolve_data_t; static socket_t* _socket_create_internal(JSContext* context) { socket_t* socket = tf_malloc(sizeof(socket_t)); memset(socket, 0, sizeof(*socket)); _sockets = tf_resize_vec(_sockets, sizeof(socket_t*) * (_sockets_count + 1)); _sockets[_sockets_count++] = socket; socket->_closePromise = -1; socket->_startTlsPromise = -1; ++_count; JSValue object = JS_NewObjectClass(context, _classId); socket->_task = tf_task_get(context); socket->_object = object; JS_SetOpaque(object, socket); socket->created_ms = uv_now(tf_task_get_loop(socket->_task)); socket->_onRead = JS_UNDEFINED; socket->_onError = JS_UNDEFINED; socket->_onConnect = JS_UNDEFINED; JS_SetPropertyStr(context, object, "bind", JS_NewCFunction(context, _socket_bind, "bind", 2)); JS_SetPropertyStr(context, object, "connect", JS_NewCFunction(context, _socket_connect, "connect", 2)); JS_SetPropertyStr(context, object, "listen", JS_NewCFunction(context, _socket_listen, "listen", 2)); JS_SetPropertyStr(context, object, "accept", JS_NewCFunction(context, _socket_accept, "accept", 0)); JS_SetPropertyStr(context, object, "startTls", JS_NewCFunction(context, _socket_startTls, "startTls", 0)); JS_SetPropertyStr(context, object, "stopTls", JS_NewCFunction(context, _socket_stopTls, "stopTls", 0)); JS_SetPropertyStr(context, object, "shutdown", JS_NewCFunction(context, _socket_shutdown, "shutdown", 0)); JS_SetPropertyStr(context, object, "close", JS_NewCFunction(context, _socket_close, "close", 0)); JS_SetPropertyStr(context, object, "read", JS_NewCFunction(context, _socket_read, "read", 0)); JS_SetPropertyStr(context, object, "onError", JS_NewCFunction(context, _socket_onError, "onError", 1)); JS_SetPropertyStr(context, object, "write", JS_NewCFunction(context, _socket_write, "write", 1)); JS_SetPropertyStr(context, object, "setActivityTimeout", JS_NewCFunction(context, _socket_setActivityTimeout, "setActivityTimeout", 1)); JSAtom atom = JS_NewAtom(context, "isConnected"); JS_DefinePropertyGetSet(context, object, atom, JS_NewCFunction(context, _socket_isConnected, "isConnected", 0), JS_NULL, 0); JS_FreeAtom(context, atom); atom = JS_NewAtom(context, "peerName"); JS_DefinePropertyGetSet(context, object, atom, JS_NewCFunction(context, _socket_getPeerName, "peerName", 0), JS_NULL, 0); JS_FreeAtom(context, atom); atom = JS_NewAtom(context, "peerCertificate"); JS_DefinePropertyGetSet(context, object, atom, JS_NewCFunction(context, _socket_getPeerCertificate, "peerCertificate", 0), JS_NULL, 0); JS_FreeAtom(context, atom); atom = JS_NewAtom(context, "noDelay"); JSValue get_no_delay = JS_NewCFunction(context, _socket_getNoDelay, "getNoDelay", 0); JSValue set_no_delay = JS_NewCFunction(context, _socket_setNoDelay, "setNoDelay", 1); JS_DefinePropertyGetSet(context, object, atom, get_no_delay, set_no_delay, 0); JS_FreeAtom(context, atom); ++_open_count; uv_tcp_init(tf_task_get_loop(socket->_task), &socket->_socket); socket->_socket.data = socket; uv_timer_init(tf_task_get_loop(socket->_task), &socket->_timer); socket->_timer.data = socket; return socket; } static JSValue _socket_create(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv) { return _socket_create_internal(context)->_object; } static void _socket_close_internal(socket_t* socket) { _socket_set_handler(socket, &socket->_onRead, JS_UNDEFINED); _socket_set_handler(socket, &socket->_onError, JS_UNDEFINED); _socket_set_handler(socket, &socket->_onConnect, JS_UNDEFINED); if (socket->_tls) { tf_tls_session_destroy(socket->_tls); socket->_tls = NULL; } if (socket->_socket.data && !uv_is_closing((uv_handle_t*)&socket->_socket)) { uv_close((uv_handle_t*)&socket->_socket, _socket_onClose); } if (socket->_timer.data && !uv_is_closing((uv_handle_t*)&socket->_timer)) { uv_close((uv_handle_t*)&socket->_timer, _socket_onClose); } if (!socket->_socket.data && !socket->_timer.data && JS_IsUndefined(socket->_object)) { --_count; for (int i = 0; i < _sockets_count; i++) { if (_sockets[i] == socket) { _sockets[i] = _sockets[_sockets_count - 1]; --_sockets_count; _sockets = tf_resize_vec(_sockets, sizeof(socket_t*) * _sockets_count); break; } } tf_free(socket); } } static void _socket_finalizer(JSRuntime* runtime, JSValue value) { socket_t* socket = JS_GetOpaque(value, _classId); socket->_object = JS_UNDEFINED; _socket_close_internal(socket); } static void _socket_reportError(socket_t* socket, const char* error) { JSContext* context = tf_task_get_context(socket->_task); JSValue ref = JS_DupValue(context, socket->_object); if (JS_IsFunction(context, socket->_onError)) { JSValue exception = JS_ThrowInternalError(context, "%s", error); JSValue cb_ref = JS_DupValue(context, socket->_onError); JSValue result = JS_Call(context, socket->_onError, socket->_object, 1, &exception); JS_FreeValue(context, cb_ref); tf_util_report_error(context, result); JS_FreeValue(context, exception); JS_FreeValue(context, result); } else { fprintf(stderr, "Socket::reportError: %s\n", error); } JS_FreeValue(context, ref); } static void _socket_reportTlsErrors(socket_t* socket) { char buffer[4096]; while (socket->_tls && tf_tls_session_get_error(socket->_tls, buffer, sizeof(buffer))) { _socket_reportError(socket, buffer); } } static JSValue _socket_startTls(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv) { socket_t* socket = JS_GetOpaque(this_val, _classId); if (!socket->_tls) { tf_tls_context_t* context = 0; if (argc > 0 && JS_IsObject(argv[0])) { context = tf_tls_context_get(argv[0]); } else { if (!_defaultTlsContext) { _defaultTlsContext = tf_tls_context_create(); } context = _defaultTlsContext; } if (context) { socket->_tls = tf_tls_context_create_session(context); } if (socket->_tls) { tf_tls_session_set_hostname(socket->_tls, socket->_peerName); if (socket->_direction == kAccept) { tf_tls_session_start_accept(socket->_tls); } else if (socket->_direction == kConnect) { tf_tls_session_start_connect(socket->_tls); } JSValue result = tf_task_allocate_promise(socket->_task, &socket->_startTlsPromise); _socket_processOutgoingTls(socket); return result; } else { return JS_ThrowInternalError(tf_task_get_context(socket->_task), "Failed to get TLS context"); } } else { return JS_ThrowInternalError(tf_task_get_context(socket->_task), "startTls with TLS already started"); } } static JSValue _socket_stopTls(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv) { socket_t* socket = JS_GetOpaque(this_val, _classId); if (socket->_tls) { _socket_processOutgoingTls(socket); tf_tls_session_destroy(socket->_tls); socket->_tls = NULL; } else { JS_ThrowInternalError(tf_task_get_context(socket->_task), "stopTls with TLS already stopped"); } return JS_NULL; } static bool _socket_processSomeOutgoingTls(socket_t* socket, promiseid_t promise, uv_write_cb callback) { if (!socket->_socket.data) { return false; } char buffer[65536]; int result = tf_tls_session_read_encrypted(socket->_tls, buffer, sizeof(buffer)); if (result > 0) { char* request_buffer = tf_malloc(sizeof(uv_write_t) + result); uv_write_t* request = (uv_write_t*)request_buffer; memset(request, 0, sizeof(*request)); request->data = (void*)(intptr_t)(promise); char* rawBuffer = request_buffer + sizeof(uv_write_t); memcpy(rawBuffer, buffer, result); uv_buf_t writeBuffer = { .base = rawBuffer, .len = result, }; _socket_pauseTimeout(socket); int writeResult = uv_write(request, (uv_stream_t*)&socket->_socket, &writeBuffer, 1, callback); if (writeResult != 0) { tf_free(request_buffer); char error[256]; snprintf(error, sizeof(error), "uv_write: %s", uv_strerror(writeResult)); _socket_reportError(socket, error); } } else { _socket_reportTlsErrors(socket); } return result > 0; } static void _socket_processOutgoingTls(socket_t* socket) { while (_socket_processSomeOutgoingTls(socket, -1, _socket_onWrite)) { } } static JSValue _socket_bind(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv) { socket_t* socket = JS_GetOpaque(this_val, _classId); const char* node = JS_ToCString(tf_task_get_context(socket->_task), argv[0]); const char* port = JS_ToCString(tf_task_get_context(socket->_task), argv[1]); socket_resolve_data_t* data = tf_malloc(sizeof(socket_resolve_data_t)); memset(data, 0, sizeof(*data)); struct addrinfo hints = { .ai_family = AF_UNSPEC, .ai_socktype = SOCK_STREAM, .ai_protocol = IPPROTO_TCP, .ai_flags = 0, }; data->resolver.data = data; data->socket = socket; JSValue promise = tf_task_allocate_promise(socket->_task, &data->promise); int result = uv_getaddrinfo(tf_task_get_loop(socket->_task), &data->resolver, _socket_onResolvedForBind, node, port, &hints); if (result != 0) { tf_task_reject_promise(socket->_task, data->promise, JS_ThrowInternalError(tf_task_get_context(socket->_task), "uv_getaddrinfo: %s", uv_strerror(result))); tf_free(data); } return promise; } static void _socket_onResolvedForBind(uv_getaddrinfo_t* resolver, int status, struct addrinfo* result) { socket_resolve_data_t* data = (socket_resolve_data_t*)resolver->data; if (status != 0) { tf_task_reject_promise( data->socket->_task, data->promise, JS_ThrowInternalError(tf_task_get_context(data->socket->_task), "uv_getaddrinfo: %s", uv_strerror(status))); } else { int bindResult = uv_tcp_bind(&data->socket->_socket, result->ai_addr, 0); if (bindResult != 0) { tf_task_reject_promise( data->socket->_task, data->promise, JS_ThrowInternalError(tf_task_get_context(data->socket->_task), "uv_tcp_bind: %s", uv_strerror(bindResult))); } else { struct sockaddr_storage addr = { 0 }; int port = 0; int size = (int)sizeof(addr); if (uv_tcp_getsockname(&data->socket->_socket, (struct sockaddr*)&addr, &size) == 0) { if (addr.ss_family == AF_INET) { port = ntohs(((struct sockaddr_in*)&addr)->sin_port); } else if (addr.ss_family == AF_INET6) { port = ntohs(((struct sockaddr_in6*)&addr)->sin6_port); } } tf_task_resolve_promise(data->socket->_task, data->promise, JS_NewInt32(tf_task_get_context(data->socket->_task), port)); } } tf_free(data); } static JSValue _socket_connect(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv) { socket_t* socket = JS_GetOpaque(this_val, _classId); socket->_direction = kConnect; const char* node = JS_ToCString(context, argv[0]); const char* port = JS_ToCString(context, argv[1]); snprintf(socket->_peerName, sizeof(socket->_peerName), "%s", node); socket_resolve_data_t* data = tf_malloc(sizeof(socket_resolve_data_t)); memset(data, 0, sizeof(*data)); struct addrinfo hints = { .ai_family = PF_INET, .ai_socktype = SOCK_STREAM, .ai_protocol = IPPROTO_TCP, }; data->resolver.data = data; data->socket = socket; JSValue promise = tf_task_allocate_promise(socket->_task, &data->promise); int result = uv_getaddrinfo(tf_task_get_loop(socket->_task), &data->resolver, _socket_onResolvedForConnect, node, port, &hints); if (result != 0) { char error[256]; snprintf(error, sizeof(error), "uv_getaddrinfo: %s", uv_strerror(result)); tf_task_reject_promise(socket->_task, data->promise, JS_ThrowInternalError(context, "%s", error)); _socket_close_internal(socket); tf_free(data); } JS_FreeCString(context, node); JS_FreeCString(context, port); return promise; } static void _socket_onResolvedForConnect(uv_getaddrinfo_t* resolver, int status, struct addrinfo* result) { socket_resolve_data_t* data = resolver->data; if (status != 0) { char error[256]; snprintf(error, sizeof(error), "uv_getaddrinfo: %s", uv_strerror(status)); tf_task_reject_promise(data->socket->_task, data->promise, JS_ThrowInternalError(tf_task_get_context(data->socket->_task), "%s", error)); _socket_close_internal(data->socket); } else { uv_connect_t* request = tf_malloc(sizeof(uv_connect_t)); memset(request, 0, sizeof(*request)); request->data = (void*)(intptr_t)data->promise; int connectResult = uv_tcp_connect(request, &data->socket->_socket, result->ai_addr, _socket_onConnect); if (connectResult != 0) { char error[256]; snprintf(error, sizeof(error), "uv_tcp_connect: %s", uv_strerror(connectResult)); tf_task_reject_promise(data->socket->_task, data->promise, JS_ThrowInternalError(tf_task_get_context(data->socket->_task), "%s", error)); _socket_close_internal(data->socket); tf_free(request); } } uv_freeaddrinfo(result); tf_free(data); } static void _socket_onConnect(uv_connect_t* request, int status) { promiseid_t promise = (intptr_t)request->data; if (promise != -1) { socket_t* socket = request->handle->data; if (status == 0) { socket->_connected = true; tf_task_resolve_promise(socket->_task, promise, JS_NewInt32(tf_task_get_context(socket->_task), status)); } else { char error[256]; snprintf(error, sizeof(error), "uv_tcp_connect: %s", uv_strerror(status)); tf_task_reject_promise(socket->_task, promise, JS_ThrowInternalError(tf_task_get_context(socket->_task), "%s", error)); _socket_close_internal(socket); } } tf_free(request); } static JSValue _socket_listen(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv) { socket_t* socket = JS_GetOpaque(this_val, _classId); socket->_listening = true; int backlog = 16; JS_ToInt32(context, &backlog, argv[0]); if (JS_IsUndefined(socket->_onConnect)) { _socket_set_handler(socket, &socket->_onConnect, argv[1]); int result = uv_listen((uv_stream_t*)&socket->_socket, backlog, _socket_onNewConnection); if (result != 0) { return JS_ThrowInternalError(context, "uv_listen: %s", uv_strerror(result)); } return JS_NewInt32(context, result); } else { return JS_ThrowInternalError(context, "listen: Already listening."); } } static void _socket_onNewConnection(uv_stream_t* server, int status) { socket_t* socket = server->data; JSContext* context = tf_task_get_context(socket->_task); JSValue ref = JS_DupValue(context, socket->_object); if (!JS_IsUndefined(socket->_onConnect)) { JSValue cb_ref = JS_DupValue(context, socket->_onConnect); JSValue result = JS_Call(context, socket->_onConnect, socket->_object, 0, NULL); JS_FreeValue(context, cb_ref); tf_util_report_error(context, result); JS_FreeValue(context, result); } JS_FreeValue(context, ref); } static JSValue _socket_accept(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv) { socket_t* socket = JS_GetOpaque(this_val, _classId); socket_t* client = _socket_create_internal(context); client->_direction = kAccept; promiseid_t promise; JSValue ref = JS_DupValue(context, client->_object); JSValue result = tf_task_allocate_promise(socket->_task, &promise); int status = uv_accept((uv_stream_t*)&socket->_socket, (uv_stream_t*)&client->_socket); if (status == 0) { struct sockaddr_storage name = { 0 }; int namelen = (int)sizeof(name); if (uv_tcp_getpeername(&client->_socket, (struct sockaddr*)&name, &namelen) == 0) { uv_ip_name((const struct sockaddr*)&name, client->_peerName, sizeof(client->_peerName)); } client->_connected = true; tf_task_resolve_promise(socket->_task, promise, client->_object); JS_FreeValue(context, client->_object); } else { tf_task_reject_promise(socket->_task, promise, JS_ThrowInternalError(context, "uv_accept: %s", uv_strerror(status))); } JS_FreeValue(context, ref); return result; } static JSValue _socket_close(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv) { socket_t* socket = JS_GetOpaque(this_val, _classId); if (socket->_closePromise == -1 && socket->_socket.data && !uv_is_closing((uv_handle_t*)&socket->_socket)) { JSValue result = tf_task_allocate_promise(socket->_task, &socket->_closePromise); _socket_close_internal(socket); return result; } return JS_UNDEFINED; } static JSValue _socket_shutdown(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv) { socket_t* socket = JS_GetOpaque(this_val, _classId); promiseid_t promise = -1; JSValue result = tf_task_allocate_promise(socket->_task, &promise); if (socket->_tls) { _socket_processTlsShutdown(socket, promise); } else { _socket_shutdownInternal(socket, promise); } return result; } static void _socket_shutdownInternal(socket_t* socket, promiseid_t promise) { uv_shutdown_t* request = tf_malloc(sizeof(uv_shutdown_t)); memset(request, 0, sizeof(*request)); request->data = (void*)(intptr_t)promise; int result = uv_shutdown(request, (uv_stream_t*)&socket->_socket, _socket_onShutdown); if (result != 0) { char error[256]; snprintf(error, sizeof(error), "uv_shutdown: %s", uv_strerror(result)); tf_task_reject_promise(socket->_task, promise, JS_ThrowInternalError(tf_task_get_context(socket->_task), "%s", error)); tf_free(request); } } static void _socket_processTlsShutdown(socket_t* socket, promiseid_t promise) { if (!socket->_tls) { _socket_shutdownInternal(socket, promise); } else { tf_tls_session_shutdown(socket->_tls); if (!_socket_processSomeOutgoingTls(socket, promise, _socket_onTlsShutdown)) { _socket_shutdownInternal(socket, promise); } } } static void _socket_onTlsShutdown(uv_write_t* request, int status) { socket_t* socket = request->handle->data; promiseid_t promise = (intptr_t)request->data; _socket_processTlsShutdown(socket, promise); tf_free(request); } static JSValue _socket_onError(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv) { socket_t* socket = JS_GetOpaque(this_val, _classId); _socket_set_handler(socket, &socket->_onError, argv[0]); return JS_NULL; } static JSValue _socket_read(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv) { socket_t* socket = JS_GetOpaque(this_val, _classId); JSValue ref = JS_DupValue(context, socket->_object); _socket_set_handler(socket, &socket->_onRead, argv[0]); promiseid_t promise = -1; JSValue read_result = tf_task_allocate_promise(socket->_task, &promise); if (!socket->_reading && socket->_socket.data) { _socket_resetTimeout(socket); int result = uv_read_start((uv_stream_t*)&socket->_socket, _socket_allocateBuffer, _socket_onRead); if (result != 0) { tf_task_reject_promise(socket->_task, promise, JS_ThrowInternalError(context, "uv_read_start: %s", uv_strerror(result))); } else { socket->_reading = true; tf_task_resolve_promise(socket->_task, promise, JS_UNDEFINED); } } else { tf_task_resolve_promise(socket->_task, promise, JS_UNDEFINED); } JS_FreeValue(context, ref); return read_result; } static void _socket_allocateBuffer(uv_handle_t* handle, size_t suggestedSize, uv_buf_t* buf) { *buf = uv_buf_init(tf_malloc(suggestedSize), suggestedSize); } static void _socket_onRead(uv_stream_t* stream, ssize_t readSize, const uv_buf_t* buffer) { socket_t* socket = stream->data; _socket_resetTimeout(socket); JSContext* context = tf_task_get_context(socket->_task); JSValue ref = JS_DupValue(context, socket->_object); if (readSize <= 0) { socket->_connected = false; if (!JS_IsUndefined(socket->_onRead)) { JSValue args[] = { JS_UNDEFINED }; JSValue cb_ref = JS_DupValue(context, socket->_onRead); JSValue result = JS_Call(context, socket->_onRead, socket->_object, 1, args); JS_FreeValue(context, cb_ref); tf_util_report_error(context, result); JS_FreeValue(context, result); } _socket_close_internal(socket); } else { if (socket->_tls) { _socket_reportTlsErrors(socket); tf_tls_session_write_encrypted(socket->_tls, buffer->base, readSize); if (socket->_startTlsPromise != -1) { tf_tls_handshake_t result = tf_tls_session_handshake(socket->_tls); if (result == k_tls_handshake_done) { promiseid_t promise = socket->_startTlsPromise; socket->_startTlsPromise = -1; tf_task_resolve_promise(socket->_task, promise, JS_UNDEFINED); } else if (result == k_tls_handshake_failed) { promiseid_t promise = socket->_startTlsPromise; socket->_startTlsPromise = -1; char buffer[8192]; if (tf_tls_session_get_error(socket->_tls, buffer, sizeof(buffer))) { tf_task_reject_promise(socket->_task, promise, JS_ThrowInternalError(context, "%s", buffer)); } else { tf_task_reject_promise(socket->_task, promise, JS_UNDEFINED); } } } while (socket->_tls) { char plain[8192]; int result = tf_tls_session_read_plain(socket->_tls, plain, sizeof(plain)); if (result > 0) { _socket_notifyDataRead(socket, plain, result); } else if (result == k_tls_read_failed) { _socket_reportTlsErrors(socket); _socket_close_internal(socket); break; } else if (result == k_tls_read_zero) { if (!JS_IsUndefined(socket->_onRead)) { JSValue args[] = { JS_UNDEFINED }; JSValue cb_ref = JS_DupValue(context, socket->_onRead); JSValue result = JS_Call(context, socket->_onRead, socket->_object, 1, args); JS_FreeValue(context, cb_ref); tf_util_report_error(context, result); JS_FreeValue(context, result); } _socket_close_internal(socket); break; } else { break; } } if (socket->_tls) { _socket_processOutgoingTls(socket); } } else { _socket_notifyDataRead(socket, buffer->base, readSize); } } tf_free(buffer->base); JS_FreeValue(context, ref); } static void _socket_notifyDataRead(socket_t* socket, const char* data, size_t length) { if (data && length > 0) { JSContext* context = tf_task_get_context(socket->_task); JSValue ref = JS_DupValue(context, socket->_object); JSValue typedArray = tf_util_new_uint8_array(context, (const uint8_t*)data, length); JSValue args[] = { typedArray }; if (!JS_IsUndefined(socket->_onRead)) { JSValue cb_ref = JS_DupValue(context, socket->_onRead); JSValue result = JS_Call(context, socket->_onRead, socket->_object, 1, args); JS_FreeValue(context, cb_ref); tf_util_report_error(context, result); JS_FreeValue(context, result); } JS_FreeValue(context, typedArray); JS_FreeValue(context, ref); } } static int _socket_writeBytes(socket_t* socket, promiseid_t promise, int (*callback)(socket_t* socket, promiseid_t promise, const char*, size_t), JSValue value, int* outLength) { int result = -1; size_t length; uint8_t* array = NULL; JSContext* context = tf_task_get_context(socket->_task); if (JS_IsString(value)) { const char* stringValue = JS_ToCStringLen(context, &length, value); result = callback(socket, promise, stringValue, length); JS_FreeCString(context, stringValue); } else if ((array = tf_util_try_get_array_buffer(context, &length, value)) != 0) { result = callback(socket, promise, (const char*)array, length); } else { size_t offset; size_t element_size; JSValue buffer = tf_util_try_get_typed_array_buffer(context, value, &offset, &length, &element_size); if (!JS_IsException(buffer)) { size_t size; if ((array = tf_util_try_get_array_buffer(context, &size, buffer)) != 0) { result = callback(socket, promise, (const char*)array, length); } } JS_FreeValue(context, buffer); } if (outLength) { *outLength = (int)length; } return result; } static int _socket_writeInternal(socket_t* socket, promiseid_t promise, const char* data, size_t length) { if (!socket->_socket.data) { return UV_ENOTCONN; } char* rawBuffer = tf_malloc(sizeof(uv_write_t) + length); uv_write_t* request = (uv_write_t*)rawBuffer; memcpy(rawBuffer + sizeof(uv_write_t), data, length); uv_buf_t buffer = { .base = rawBuffer + sizeof(uv_write_t), .len = length, }; request->data = (void*)(intptr_t)promise; _socket_pauseTimeout(socket); int result = uv_write(request, (uv_stream_t*)&socket->_socket, &buffer, 1, _socket_onWrite); if (result != 0) { tf_free(rawBuffer); } return result; } static int _socket_write_tls(socket_t* socket, promiseid_t promise, const char* data, size_t size) { return tf_tls_session_write_plain(socket->_tls, data, size); } static JSValue _socket_write(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv) { socket_t* socket = JS_GetOpaque(this_val, _classId); promiseid_t promise = -1; JSValue write_result = tf_task_allocate_promise(socket->_task, &promise); JSValue ref = JS_DupValue(context, socket->_object); if (!JS_IsUndefined(argv[0])) { if (socket->_tls) { _socket_reportTlsErrors(socket); int length = 0; int result = _socket_writeBytes(socket, -1, _socket_write_tls, argv[0], &length); char buffer[8192]; if (result <= 0 && tf_tls_session_get_error(socket->_tls, buffer, sizeof(buffer))) { tf_task_reject_promise(socket->_task, promise, JS_ThrowInternalError(context, "%s", buffer)); } else if (result < length) { tf_task_reject_promise(socket->_task, promise, JS_NewInt32(context, result)); } else { tf_task_resolve_promise(socket->_task, promise, JS_NewInt32(context, result)); } _socket_processOutgoingTls(socket); } else { int length; int result = _socket_writeBytes(socket, promise, _socket_writeInternal, argv[0], &length); if (result != 0) { tf_task_reject_promise(socket->_task, promise, JS_ThrowInternalError(context, "uv_write: %s", uv_strerror(result))); } } } else { tf_task_reject_promise(socket->_task, promise, JS_NewInt32(context, -2)); } JS_FreeValue(context, ref); return write_result; } static void _socket_onWrite(uv_write_t* request, int status) { socket_t* socket = request->handle->data; _socket_resumeTimeout(socket); promiseid_t promise = (intptr_t)request->data; if (promise != -1) { if (status == 0) { tf_task_resolve_promise(socket->_task, promise, JS_NewInt32(tf_task_get_context(socket->_task), status)); } else { tf_task_reject_promise(socket->_task, promise, JS_ThrowInternalError(tf_task_get_context(socket->_task), "uv_write: %s", uv_strerror(status))); } } tf_free(request); } static void _socket_timeout(uv_timer_t* timer) { socket_t* socket = timer->data; _socket_close_internal(socket); } static JSValue _socket_setActivityTimeout(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv) { socket_t* socket = JS_GetOpaque(this_val, _classId); int64_t timeout = 0; if (JS_ToInt64(context, &timeout, argv[0]) == 0 && socket->timeout_ms > 0) { socket->timeout_ms = timeout; uv_timer_start(&socket->_timer, _socket_timeout, socket->timeout_ms, 0); } else { uv_timer_stop(&socket->_timer); } return JS_UNDEFINED; } static JSValue _socket_isConnected(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv) { socket_t* socket = JS_GetOpaque(this_val, _classId); return socket->_connected ? JS_TRUE : JS_FALSE; } static void _socket_onClose(uv_handle_t* handle) { --_open_count; socket_t* socket = handle->data; handle->data = NULL; if (socket->_closePromise != -1) { promiseid_t promise = socket->_closePromise; socket->_closePromise = -1; socket->_connected = false; tf_task_resolve_promise(socket->_task, promise, JS_UNDEFINED); } if (socket->_startTlsPromise != -1) { promiseid_t promise = socket->_startTlsPromise; socket->_startTlsPromise = -1; socket->_connected = false; tf_task_resolve_promise(socket->_task, promise, JS_UNDEFINED); } _socket_close_internal(socket); } static void _socket_onShutdown(uv_shutdown_t* request, int status) { socket_t* socket = request->handle->data; _socket_resetTimeout(socket); promiseid_t promise = (intptr_t)request->data; if (status == 0) { tf_task_resolve_promise(socket->_task, promise, JS_UNDEFINED); } else { char error[256]; snprintf(error, sizeof(error), "uv_shutdown: %s", uv_strerror(status)); tf_task_reject_promise(socket->_task, promise, JS_ThrowInternalError(tf_task_get_context(socket->_task), "%s", error)); } tf_free(request); } static JSValue _socket_getPeerName(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv) { socket_t* socket = JS_GetOpaque(this_val, _classId); struct sockaddr_in6 addr; int nameLength = sizeof(addr); if (uv_tcp_getpeername(&socket->_socket, (struct sockaddr*)&addr, &nameLength) == 0) { char name[1024]; if ((size_t)nameLength > sizeof(struct sockaddr_in)) { if (uv_ip6_name(&addr, name, sizeof(name)) == 0) { return JS_NewString(context, name); } } else { if (uv_ip4_name((struct sockaddr_in*)&addr, name, sizeof(name)) == 0) { return JS_NewString(context, name); } } } return JS_UNDEFINED; } static JSValue _socket_getPeerCertificate(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv) { socket_t* socket = JS_GetOpaque(this_val, _classId); if (socket->_tls) { char buffer[128 * 1024]; int result = tf_tls_session_get_peer_certificate(socket->_tls, buffer, sizeof(buffer)); if (result > 0) { return tf_util_new_uint8_array(tf_task_get_context(socket->_task), (const uint8_t*)buffer, sizeof(buffer)); } } return JS_UNDEFINED; } static JSValue _socket_getNoDelay(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv) { socket_t* socket = JS_GetOpaque(this_val, _classId); return JS_NewBool(context, socket->_noDelay); } static JSValue _socket_setNoDelay(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv) { socket_t* socket = JS_GetOpaque(this_val, _classId); int result = JS_ToBool(context, argv[0]); socket->_noDelay = result > 0; uv_tcp_nodelay(&socket->_socket, result > 0 ? 1 : 0); return JS_UNDEFINED; } static JSValue _sockets_get(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv) { JSValue array = JS_NewArray(context); for (int i = 0; i < _sockets_count; i++) { socket_t* s = _sockets[i]; JSValue entry = JS_NewObject(context); JS_SetPropertyStr(context, entry, "peer", JS_NewString(context, s->_peerName)); JS_SetPropertyStr(context, entry, "listening", JS_NewBool(context, s->_listening)); JS_SetPropertyStr(context, entry, "connected", JS_NewBool(context, s->_connected)); JS_SetPropertyStr(context, entry, "tls", JS_NewBool(context, s->_tls != NULL)); JS_SetPropertyStr(context, entry, "age_seconds", JS_NewFloat64(context, (uv_now(tf_task_get_loop(s->_task)) - s->created_ms) / 1000.0)); JS_SetPropertyUint32(context, array, i, entry); } return array; } static void _socket_resetTimeout(socket_t* socket) { if (socket->timeout_ms && socket->_active == 0) { uv_timer_start(&socket->_timer, _socket_timeout, socket->timeout_ms, 0); } } static void _socket_pauseTimeout(socket_t* socket) { if (socket->_active++ == 1) { uv_timer_stop(&socket->_timer); } } static void _socket_resumeTimeout(socket_t* socket) { if (--socket->_active == 0 && socket->timeout_ms > 0) { uv_timer_start(&socket->_timer, _socket_timeout, socket->timeout_ms, 0); } }