diff --git a/GNUmakefile b/GNUmakefile index 3b6583d9..7c613c57 100644 --- a/GNUmakefile +++ b/GNUmakefile @@ -616,7 +616,7 @@ $(IOS_TARGETS) $(IOSSIM_TARGETS): LDFLAGS += \ unix: debug release win: windebug winrelease -all: $(BUILD_TYPES) +all: $(BUILD_TYPES) default.nix .PHONY: all win unix ALL_APP_OBJS := \ @@ -673,6 +673,10 @@ src/android/AndroidManifest.xml : $(firstword $(MAKEFILE_LIST)) -e 's/android:targetSdkVersion="[[:digit:]]*"/android:targetSdkVersion="$(ANDROID_TARGET_SDK_VERSION)"/' \ $@ +default.nix : $(firstword $(MAKEFILE_LIST)) + @echo "[version] $@" + @sed -i -e 's/version = ".*";/version = "$(VERSION_NUMBER)";/' $@ + # Android support. out/res/layout_activity_main.xml.flat: src/android/res/layout/activity_main.xml @mkdir -p $(dir $@) @@ -858,7 +862,7 @@ clean: rm -rf $(BUILD_DIR) .PHONY: clean -dist: release-apk iosrelease-ipa $(if $(HAVE_WIN), out/winrelease/tildefriends.standalone.exe) +dist: release-apk iosrelease-ipa $(if $(HAVE_WIN), out/winrelease/tildefriends.standalone.exe) default.nix @echo [archive] dist/tildefriends-$(VERSION_NUMBER).tar.xz @rm -rf out/tildefriends-$(VERSION_NUMBER) @mkdir -p dist/ out/tildefriends-$(VERSION_NUMBER) diff --git a/apps/issues/app.js b/apps/issues/app.js index 77fafa11..01b37cf8 100644 --- a/apps/issues/app.js +++ b/apps/issues/app.js @@ -67,9 +67,6 @@ tfrpc.register(function getHash(id, message) { tfrpc.register(function setHash(hash) { return app.setHash(hash); }); -ssb.addEventListener('message', async function (id) { - await tfrpc.rpc.notifyNewMessage(id); -}); tfrpc.register(async function store_blob(blob) { if (Array.isArray(blob)) { blob = Uint8Array.from(blob); @@ -91,10 +88,12 @@ tfrpc.register(function getActiveIdentity() { tfrpc.register(async function try_decrypt(id, content) { return await ssb.privateMessageDecrypt(id, content); }); -ssb.addEventListener('broadcasts', async function () { +core.register('onMessage', async function (id) { + await tfrpc.rpc.notifyNewMessage(id); +}); +core.register('onBroadcastsChanged', async function () { await tfrpc.rpc.set('broadcasts', await ssb.getBroadcasts()); }); - core.register('onConnectionsChanged', async function () { await tfrpc.rpc.set('connections', await ssb.connections()); }); diff --git a/apps/journal/app.js b/apps/journal/app.js index 44a9f354..4437c18e 100644 --- a/apps/journal/app.js +++ b/apps/journal/app.js @@ -55,7 +55,7 @@ function new_message() { return g_new_message_promise; } -ssb.addEventListener('message', function (id) { +core.register('onMessage', function (id) { let resolve = g_new_message_resolve; g_new_message_promise = new Promise(function (resolve, reject) { g_new_message_resolve = resolve; diff --git a/apps/ssb.json b/apps/ssb.json index f03eb167..f50e3fdd 100644 --- a/apps/ssb.json +++ b/apps/ssb.json @@ -1,5 +1,5 @@ { "type": "tildefriends-app", "emoji": "🐌", - "previous": "&wA6sLaDxtYeFdVCCu8jyhPsGYtGZEjbWQHeGOn0Yifg=.sha256" + "previous": "&h0sTvkhc3zEJw/sH612fy5i554Gr1AKzCBbLkm0KH28=.sha256" } diff --git a/apps/ssb/app.js b/apps/ssb/app.js index c0f7a8cc..3868b534 100644 --- a/apps/ssb/app.js +++ b/apps/ssb/app.js @@ -76,7 +76,7 @@ tfrpc.register(function getHash(id, message) { tfrpc.register(function setHash(hash) { return app.setHash(hash); }); -ssb.addEventListener('message', async function (id) { +core.register('onMessage', async function (id) { await tfrpc.rpc.notifyNewMessage(id); }); tfrpc.register(async function store_blob(blob) { @@ -103,7 +103,7 @@ tfrpc.register(async function encrypt(id, recipients, content) { tfrpc.register(async function getActiveIdentity() { return await ssb.getActiveIdentity(); }); -ssb.addEventListener('broadcasts', async function () { +core.register('onBroadcastsChanged', async function () { await tfrpc.rpc.set('broadcasts', await ssb.getBroadcasts()); }); diff --git a/apps/ssb/tf-tab-news.js b/apps/ssb/tf-tab-news.js index b2d50b59..ee3b8789 100644 --- a/apps/ssb/tf-tab-news.js +++ b/apps/ssb/tf-tab-news.js @@ -136,7 +136,7 @@ class TfTabNewsElement extends LitElement { ${this.new_messages_text()}

-
+
Welcome, ! ${edit_profile}
diff --git a/apps/wiki/utils.js b/apps/wiki/utils.js index fff4fdcc..f8718264 100644 --- a/apps/wiki/utils.js +++ b/apps/wiki/utils.js @@ -50,7 +50,7 @@ function new_message() { return g_new_message_promise; } -ssb.addEventListener('message', function (id) { +core.register('onMessage', function (id) { let resolve = g_new_message_resolve; g_new_message_promise = new Promise(function (resolve, reject) { g_new_message_resolve = resolve; diff --git a/core/core.js b/core/core.js index 641de855..b90f9cb0 100644 --- a/core/core.js +++ b/core/core.js @@ -678,6 +678,8 @@ async function getProcessBlob(blobId, key, options) { ); } }; + imports.ssb.addEventListener = undefined; + imports.ssb.removeEventListener = undefined; imports.ssb.getIdentityInfo = undefined; imports.fetch = function (url, options) { return http.fetch(url, options, gGlobalSettings.fetch_hosts); @@ -1228,6 +1230,10 @@ async function blobHandler(request, response, blobId, uri) { } } +ssb.addEventListener('message', function () { + broadcastEvent('onMessage', [...arguments]); +}); + ssb.addEventListener('broadcasts', function () { broadcastEvent('onBroadcastsChanged', []); }); diff --git a/default.nix b/default.nix new file mode 100644 index 00000000..7f794766 --- /dev/null +++ b/default.nix @@ -0,0 +1,68 @@ +# How to upgrade to a newer version +# - Comment `src.hash` +# - Change `version` +# - Run `$ nix build` +# This will fetch the source code +# Since `hash` is not provided, nix will stop building and throw an error: +# +# error: hash mismatch in fixed-output derivation '/nix/store/fghi3ljs6fhz8pwm3dh73j5fwjpq5wbz-source.drv': +# specified: sha256-AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA= +# got: sha256-+uthA1w8CmZfW+WOK9wYGl2fUl/k10ufOc8W+Pwa9iQ= +# error: 1 dependencies of derivation '/nix/store/imcwsw5r74vkd8r0qa2k7cys2xfgraaz-tildefriends-0.0.18.drv' failed to build +# +# - Change `src.hash` to the new one, ie `sha256-+uthA1w8CmZfW+WOK9wYGl2fUl/k10ufOc8W+Pwa9iQ=` +# - Uncomment `src.hash` +# - Build again, this time it should work. +# - Check the release notes, if there's a new dependency or a change to `GNUMakefile`, this file might need to be changed too. +# For more details, contact tasiaiso @ https://tilde.club/~tasiaiso/ +# +# WARNING: currently it is pinned to `47838d5e482cb4aac40190fa0414f08b8cf94d40`. I couldn't get v0.0.18 to work for some reason. +# I'll change this in the next release - tasiaiso +{ + pkgs ? import {}, + lib ? import , +}: +pkgs.stdenv.mkDerivation rec { + pname = "tildefriends"; + version = "0.0.19-wip"; + + src = pkgs.fetchFromGitea { + domain = "dev.tildefriends.net"; + owner = "cory"; + repo = "tildefriends"; + # rev = "v${version}"; + rev = "47838d5e482cb4aac40190fa0414f08b8cf94d40"; + hash = "sha256-mb5KYvWPIqgV64FOaXKHm2ownBJiiSRtdH8+YWiXwvE="; # 47838d5e482cb4aac40190fa0414f08b8cf94d40 + fetchSubmodules = true; + }; + + nativeBuildInputs = with pkgs; [ + gnumake + openssl + which + ]; + + buildInputs = with pkgs; [ + openssl + which + ]; + + buildPhase = '' + make -j $NIX_BUILD_CORES release + ''; + + installPhase = '' + mkdir -p $out/bin + cp -r out/release/tildefriends $out/bin + ''; + + doCheck = false; + + meta = with pkgs; { + homepage = "https://tildefriends.net"; + description = "Make apps and friends from the comfort of your web browser."; + mainProgram = "tildefriends"; + license = with lib.licenses; [mit]; + platforms = lib.platforms.all; + }; +} diff --git a/flake.lock b/flake.lock new file mode 100644 index 00000000..e532421d --- /dev/null +++ b/flake.lock @@ -0,0 +1,61 @@ +{ + "nodes": { + "flake-utils": { + "inputs": { + "systems": "systems" + }, + "locked": { + "lastModified": 1710146030, + "narHash": "sha256-SZ5L6eA7HJ/nmkzGG7/ISclqe6oZdOZTNoesiInkXPQ=", + "owner": "numtide", + "repo": "flake-utils", + "rev": "b1d9ab70662946ef0850d488da1c9019f3a9752a", + "type": "github" + }, + "original": { + "owner": "numtide", + "repo": "flake-utils", + "type": "github" + } + }, + "nixpkgs": { + "locked": { + "lastModified": 1715395895, + "narHash": "sha256-DreMqi6+qa21ffLQqhMQL2XRUkAGt3N7iVB5FhJKie4=", + "owner": "NixOS", + "repo": "nixpkgs", + "rev": "71bae31b7dbc335528ca7e96f479ec93462323ff", + "type": "github" + }, + "original": { + "owner": "NixOS", + "ref": "nixos-23.11", + "repo": "nixpkgs", + "type": "github" + } + }, + "root": { + "inputs": { + "flake-utils": "flake-utils", + "nixpkgs": "nixpkgs" + } + }, + "systems": { + "locked": { + "lastModified": 1681028828, + "narHash": "sha256-Vy1rq5AaRuLzOxct8nz4T6wlgyUR7zLU309k9mBC768=", + "owner": "nix-systems", + "repo": "default", + "rev": "da67096a3b9bf56a91d16901293e51ba5b49a27e", + "type": "github" + }, + "original": { + "owner": "nix-systems", + "repo": "default", + "type": "github" + } + } + }, + "root": "root", + "version": 7 +} diff --git a/flake.nix b/flake.nix new file mode 100644 index 00000000..2a5a072b --- /dev/null +++ b/flake.nix @@ -0,0 +1,37 @@ +{ + description = "Tilde Friends is a platform for making, running, and sharing web applications."; + + inputs = { + nixpkgs.url = "github:NixOS/nixpkgs/nixos-23.11"; + flake-utils.url = "github:numtide/flake-utils"; + }; + + outputs = { + self, + nixpkgs, + flake-utils, + }: + flake-utils.lib.eachDefaultSystem (system: let + pkgs = import nixpkgs { + inherit system; + }; + in rec + { + # Nix formatter, run using `$ nix fmt` + formatter = pkgs.alejandra; + + # Exports the tildefriends package + # Build with `$ nix build` + packages.default = pkgs.callPackage ./default.nix {}; + + # Creates a shell with the necessary dependencies + # Enter using `$ nix develop` + devShell = pkgs.mkShell { + buildInputs = with pkgs; [ + openssl + llvmPackages_17.clang-unwrapped + unzip + ]; + }; + }); +} diff --git a/src/ssb.c b/src/ssb.c index 38f2e22f..ddd4ec50 100644 --- a/src/ssb.c +++ b/src/ssb.c @@ -342,6 +342,8 @@ typedef struct _tf_ssb_connection_t tf_ssb_debug_message_t* debug_messages[k_debug_close_message_count]; int ref_count; + + int read_back_pressure; } tf_ssb_connection_t; static JSClassID _connection_class_id; @@ -1618,6 +1620,7 @@ static void _tf_ssb_connection_rpc_recv(tf_ssb_connection_t* connection, uint8_t } if (!found && !_tf_ssb_name_equals(context, val, (const char*[]) { "Error", NULL })) { + tf_ssb_connection_add_request(connection, -request_number, namebuf, NULL, NULL, NULL, NULL); char buffer[256]; _tf_ssb_name_to_string(context, val, buffer, sizeof(buffer)); tf_ssb_connection_rpc_send_error_method_not_allowed(connection, flags, -request_number, buffer); @@ -2061,6 +2064,30 @@ static void _tf_ssb_connection_client_send_hello(tf_ssb_connection_t* connection connection->state = k_tf_ssb_state_sent_hello; } +static bool _tf_ssb_connection_read_start(tf_ssb_connection_t* connection) +{ + int result = uv_read_start((uv_stream_t*)&connection->tcp, _tf_ssb_connection_on_tcp_alloc, _tf_ssb_connection_on_tcp_recv); + if (result && result != UV_EALREADY) + { + tf_printf("uv_read_start => %s\n", uv_strerror(result)); + _tf_ssb_connection_close(connection, "uv_read_start failed"); + return false; + } + return true; +} + +static bool _tf_ssb_connection_read_stop(tf_ssb_connection_t* connection) +{ + int result = uv_read_stop((uv_stream_t*)&connection->tcp); + if (result && result != UV_EALREADY) + { + tf_printf("uv_read_stop => %s\n", uv_strerror(result)); + _tf_ssb_connection_close(connection, "uv_read_stop failed"); + return false; + } + return true; +} + static void _tf_ssb_connection_on_connect(uv_connect_t* connect, int status) { tf_ssb_connection_t* connection = connect->data; @@ -2068,13 +2095,7 @@ static void _tf_ssb_connection_on_connect(uv_connect_t* connect, int status) if (status == 0) { connection->state = k_tf_ssb_state_connected; - int result = uv_read_start(connect->handle, _tf_ssb_connection_on_tcp_alloc, _tf_ssb_connection_on_tcp_recv); - if (result) - { - tf_printf("uv_read_start => %s\n", uv_strerror(status)); - _tf_ssb_connection_close(connection, "uv_read_start failed"); - } - else + if (_tf_ssb_connection_read_start(connection)) { _tf_ssb_connection_client_send_hello(connection); } @@ -2825,7 +2846,7 @@ static void _tf_ssb_on_connection(uv_stream_t* stream, int status) _tf_ssb_notify_connections_changed(ssb, k_tf_ssb_change_create, connection); connection->state = k_tf_ssb_state_server_wait_hello; - uv_read_start((uv_stream_t*)&connection->tcp, _tf_ssb_connection_on_tcp_alloc, _tf_ssb_connection_on_tcp_recv); + _tf_ssb_connection_read_start(connection); } static void _tf_ssb_send_broadcast(tf_ssb_t* ssb, struct sockaddr_in* address, struct sockaddr_in* netmask) @@ -4064,3 +4085,26 @@ JSValue tf_ssb_connection_requests_to_object(tf_ssb_connection_t* connection) } return object; } + +void tf_ssb_connection_adjust_read_backpressure(tf_ssb_connection_t* connection, int delta) +{ + const int k_threshold = 256; + int old_pressure = connection->read_back_pressure; + connection->read_back_pressure += delta; + if (!connection->closing) + { + if (old_pressure < k_threshold && connection->read_back_pressure >= k_threshold) + { + _tf_ssb_connection_read_stop(connection); + } + else if (old_pressure >= k_threshold && connection->read_back_pressure < k_threshold) + { + _tf_ssb_connection_read_start(connection); + } + } + connection->ref_count += delta; + if (connection->ref_count == 0 && connection->closing) + { + _tf_ssb_connection_destroy(connection, "backpressure released"); + } +} diff --git a/src/ssb.h b/src/ssb.h index 4e24980b..e0f6c7bd 100644 --- a/src/ssb.h +++ b/src/ssb.h @@ -996,4 +996,13 @@ void tf_ssb_schedule_work(tf_ssb_t* ssb, int delay_ms, void (*callback)(tf_ssb_t */ bool tf_ssb_hmacsha256_verify(const char* public_key, const void* payload, size_t payload_length, const char* signature, bool signature_is_urlb64); +/** +** Adjust read backpressure. If it gets too high, TCP receive will be paused +** until it lowers. +** @param connection The connection on which to affect backpressure. +** @param delta The change in backpressure. Higher will eventually pause +** receive. Lower will resume it. +*/ +void tf_ssb_connection_adjust_read_backpressure(tf_ssb_connection_t* connection, int delta); + /** @} */ diff --git a/src/ssb.js.c b/src/ssb.js.c index d7808c4f..22953f2c 100644 --- a/src/ssb.js.c +++ b/src/ssb.js.c @@ -1889,7 +1889,7 @@ void tf_ssb_register(JSContext* context, tf_ssb_t* ssb) JS_SetPropertyStr(context, object, "storeMessage", JS_NewCFunction(context, _tf_ssb_storeMessage, "storeMessage", 1)); JS_SetPropertyStr(context, object, "blobStore", JS_NewCFunction(context, _tf_ssb_blobStore, "blobStore", 1)); - /* Should be trusted only. */ + /* Trusted only. */ JS_SetPropertyStr(context, object, "addEventListener", JS_NewCFunction(context, _tf_ssb_add_event_listener, "addEventListener", 2)); JS_SetPropertyStr(context, object, "removeEventListener", JS_NewCFunction(context, _tf_ssb_remove_event_listener, "removeEventListener", 2)); diff --git a/src/ssb.rpc.c b/src/ssb.rpc.c index 9fa6cf8b..1f36568f 100644 --- a/src/ssb.rpc.c +++ b/src/ssb.rpc.c @@ -404,6 +404,7 @@ typedef struct _blobs_get_t bool done; bool storing; tf_ssb_t* ssb; + tf_ssb_connection_t* connection; uint8_t buffer[]; } blobs_get_t; @@ -411,6 +412,7 @@ static void _tf_ssb_rpc_blob_store_callback(const char* id, bool is_new, void* u { blobs_get_t* get = user_data; get->storing = false; + tf_ssb_connection_adjust_read_backpressure(get->connection, -1); if (get->done) { tf_free(get); @@ -433,6 +435,7 @@ static void _tf_ssb_rpc_connection_blobs_get_callback( if (JS_ToBool(context, args)) { get->storing = true; + tf_ssb_connection_adjust_read_backpressure(connection, 1); tf_ssb_db_blob_store_async(ssb, get->buffer, get->received, _tf_ssb_rpc_blob_store_callback, get); } /* TODO: Should we send the response in the callback? */ @@ -455,7 +458,7 @@ static void _tf_ssb_rpc_connection_blobs_get_cleanup(tf_ssb_t* ssb, void* user_d static void _tf_ssb_rpc_connection_blobs_get(tf_ssb_connection_t* connection, const char* blob_id, size_t size) { blobs_get_t* get = tf_malloc(sizeof(blobs_get_t) + size); - *get = (blobs_get_t) { .ssb = tf_ssb_connection_get_ssb(connection), .expected_size = size }; + *get = (blobs_get_t) { .ssb = tf_ssb_connection_get_ssb(connection), .connection = connection, .expected_size = size }; snprintf(get->id, sizeof(get->id), "%s", blob_id); memset(get->buffer, 0, size); @@ -1000,6 +1003,12 @@ static void _tf_ssb_rpc_ebt_replicate_send_messages(tf_ssb_connection_t* connect } } +static void _tf_ssb_rpc_ebt_replicate_store_callback(const char* id, bool verified, bool is_new, void* user_data) +{ + tf_ssb_connection_t* connection = user_data; + tf_ssb_connection_adjust_read_backpressure(connection, -1); +} + static void _tf_ssb_rpc_ebt_replicate(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) { tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection); @@ -1022,7 +1031,8 @@ static void _tf_ssb_rpc_ebt_replicate(tf_ssb_connection_t* connection, uint8_t f if (!JS_IsUndefined(author)) { /* Looks like a message. */ - tf_ssb_verify_strip_and_store_message(ssb, args, NULL, NULL); + tf_ssb_connection_adjust_read_backpressure(connection, 1); + tf_ssb_verify_strip_and_store_message(ssb, args, _tf_ssb_rpc_ebt_replicate_store_callback, connection); } else {