Compare commits

...

15 Commits

Author SHA1 Message Date
f31ec0338b
Merge commit '580688381e08d2b6df67f146118f9c4e38b37f78' into tasiaiso-documentation 2024-05-23 13:13:43 +02:00
580688381e prettier 2024-05-22 20:52:10 -04:00
e63d69a440 Missing generated semicolon. Sigh. 2024-05-22 20:44:28 -04:00
be64fe04fb Auto-update all the versions. 2024-05-22 20:35:48 -04:00
801ab20723 Merge pull request 'Add Nix support' (#62) from tasiaiso/tildefriends:tasiaiso-nix into main
Reviewed-on: cory/tildefriends#62
2024-05-22 20:31:11 -04:00
d974a5e044 An experiment in controlling memory usage when syncing. uv_read_stop when we have too active message/blob writes to the database and uv_read_start when we're back under control. #64 2024-05-22 19:53:33 -04:00
1be94ae0be Removed ssb.addEventListener and ssb.removeEventListener from the public API. Can do the same thing with core.register. 2024-05-22 18:51:21 -04:00
b883e6a485 Fix username/id extending off the screen in the welcome line. 2024-05-22 12:33:18 -04:00
a0210379ae Avoid confusing log output when responding with a method not found error. 2024-05-20 12:39:21 -04:00
3b36496dac
chore: a bit more doc 2024-05-12 21:17:38 +02:00
4ebd6c24a9
chore: missing period in description 2024-05-12 21:15:30 +02:00
05451d98b3
Merge branch 'tasiaiso-nix' of https://dev.tildefriends.net/tasiaiso/tildefriends into tasiaiso-nix 2024-05-12 21:13:43 +02:00
22a4bce3c8
docs(nix): add documentation in default.nix 2024-05-12 21:13:22 +02:00
76d499f00b Merge branch 'main' into tasiaiso-nix 2024-05-12 14:56:12 -04:00
f0772f9b99
build(nix): add Nix support 2024-05-12 20:34:03 +02:00
15 changed files with 262 additions and 24 deletions

View File

@ -616,7 +616,7 @@ $(IOS_TARGETS) $(IOSSIM_TARGETS): LDFLAGS += \
unix: debug release unix: debug release
win: windebug winrelease win: windebug winrelease
all: $(BUILD_TYPES) all: $(BUILD_TYPES) default.nix
.PHONY: all win unix .PHONY: all win unix
ALL_APP_OBJS := \ ALL_APP_OBJS := \
@ -673,6 +673,10 @@ src/android/AndroidManifest.xml : $(firstword $(MAKEFILE_LIST))
-e 's/android:targetSdkVersion="[[:digit:]]*"/android:targetSdkVersion="$(ANDROID_TARGET_SDK_VERSION)"/' \ -e 's/android:targetSdkVersion="[[:digit:]]*"/android:targetSdkVersion="$(ANDROID_TARGET_SDK_VERSION)"/' \
$@ $@
default.nix : $(firstword $(MAKEFILE_LIST))
@echo "[version] $@"
@sed -i -e 's/version = ".*";/version = "$(VERSION_NUMBER)";/' $@
# Android support. # Android support.
out/res/layout_activity_main.xml.flat: src/android/res/layout/activity_main.xml out/res/layout_activity_main.xml.flat: src/android/res/layout/activity_main.xml
@mkdir -p $(dir $@) @mkdir -p $(dir $@)
@ -858,7 +862,7 @@ clean:
rm -rf $(BUILD_DIR) rm -rf $(BUILD_DIR)
.PHONY: clean .PHONY: clean
dist: release-apk iosrelease-ipa $(if $(HAVE_WIN), out/winrelease/tildefriends.standalone.exe) dist: release-apk iosrelease-ipa $(if $(HAVE_WIN), out/winrelease/tildefriends.standalone.exe) default.nix
@echo [archive] dist/tildefriends-$(VERSION_NUMBER).tar.xz @echo [archive] dist/tildefriends-$(VERSION_NUMBER).tar.xz
@rm -rf out/tildefriends-$(VERSION_NUMBER) @rm -rf out/tildefriends-$(VERSION_NUMBER)
@mkdir -p dist/ out/tildefriends-$(VERSION_NUMBER) @mkdir -p dist/ out/tildefriends-$(VERSION_NUMBER)

View File

@ -67,9 +67,6 @@ tfrpc.register(function getHash(id, message) {
tfrpc.register(function setHash(hash) { tfrpc.register(function setHash(hash) {
return app.setHash(hash); return app.setHash(hash);
}); });
ssb.addEventListener('message', async function (id) {
await tfrpc.rpc.notifyNewMessage(id);
});
tfrpc.register(async function store_blob(blob) { tfrpc.register(async function store_blob(blob) {
if (Array.isArray(blob)) { if (Array.isArray(blob)) {
blob = Uint8Array.from(blob); blob = Uint8Array.from(blob);
@ -91,10 +88,12 @@ tfrpc.register(function getActiveIdentity() {
tfrpc.register(async function try_decrypt(id, content) { tfrpc.register(async function try_decrypt(id, content) {
return await ssb.privateMessageDecrypt(id, content); return await ssb.privateMessageDecrypt(id, content);
}); });
ssb.addEventListener('broadcasts', async function () { core.register('onMessage', async function (id) {
await tfrpc.rpc.notifyNewMessage(id);
});
core.register('onBroadcastsChanged', async function () {
await tfrpc.rpc.set('broadcasts', await ssb.getBroadcasts()); await tfrpc.rpc.set('broadcasts', await ssb.getBroadcasts());
}); });
core.register('onConnectionsChanged', async function () { core.register('onConnectionsChanged', async function () {
await tfrpc.rpc.set('connections', await ssb.connections()); await tfrpc.rpc.set('connections', await ssb.connections());
}); });

View File

@ -55,7 +55,7 @@ function new_message() {
return g_new_message_promise; return g_new_message_promise;
} }
ssb.addEventListener('message', function (id) { core.register('onMessage', function (id) {
let resolve = g_new_message_resolve; let resolve = g_new_message_resolve;
g_new_message_promise = new Promise(function (resolve, reject) { g_new_message_promise = new Promise(function (resolve, reject) {
g_new_message_resolve = resolve; g_new_message_resolve = resolve;

View File

@ -1,5 +1,5 @@
{ {
"type": "tildefriends-app", "type": "tildefriends-app",
"emoji": "🐌", "emoji": "🐌",
"previous": "&wA6sLaDxtYeFdVCCu8jyhPsGYtGZEjbWQHeGOn0Yifg=.sha256" "previous": "&h0sTvkhc3zEJw/sH612fy5i554Gr1AKzCBbLkm0KH28=.sha256"
} }

View File

@ -76,7 +76,7 @@ tfrpc.register(function getHash(id, message) {
tfrpc.register(function setHash(hash) { tfrpc.register(function setHash(hash) {
return app.setHash(hash); return app.setHash(hash);
}); });
ssb.addEventListener('message', async function (id) { core.register('onMessage', async function (id) {
await tfrpc.rpc.notifyNewMessage(id); await tfrpc.rpc.notifyNewMessage(id);
}); });
tfrpc.register(async function store_blob(blob) { tfrpc.register(async function store_blob(blob) {
@ -103,7 +103,7 @@ tfrpc.register(async function encrypt(id, recipients, content) {
tfrpc.register(async function getActiveIdentity() { tfrpc.register(async function getActiveIdentity() {
return await ssb.getActiveIdentity(); return await ssb.getActiveIdentity();
}); });
ssb.addEventListener('broadcasts', async function () { core.register('onBroadcastsChanged', async function () {
await tfrpc.rpc.set('broadcasts', await ssb.getBroadcasts()); await tfrpc.rpc.set('broadcasts', await ssb.getBroadcasts());
}); });

View File

@ -136,7 +136,7 @@ class TfTabNewsElement extends LitElement {
${this.new_messages_text()} ${this.new_messages_text()}
</button> </button>
</p> </p>
<div> <div class="w3-bar">
Welcome, <tf-user id=${this.whoami} .users=${this.users}></tf-user>! Welcome, <tf-user id=${this.whoami} .users=${this.users}></tf-user>!
${edit_profile} ${edit_profile}
</div> </div>

View File

@ -50,7 +50,7 @@ function new_message() {
return g_new_message_promise; return g_new_message_promise;
} }
ssb.addEventListener('message', function (id) { core.register('onMessage', function (id) {
let resolve = g_new_message_resolve; let resolve = g_new_message_resolve;
g_new_message_promise = new Promise(function (resolve, reject) { g_new_message_promise = new Promise(function (resolve, reject) {
g_new_message_resolve = resolve; g_new_message_resolve = resolve;

View File

@ -678,6 +678,8 @@ async function getProcessBlob(blobId, key, options) {
); );
} }
}; };
imports.ssb.addEventListener = undefined;
imports.ssb.removeEventListener = undefined;
imports.ssb.getIdentityInfo = undefined; imports.ssb.getIdentityInfo = undefined;
imports.fetch = function (url, options) { imports.fetch = function (url, options) {
return http.fetch(url, options, gGlobalSettings.fetch_hosts); return http.fetch(url, options, gGlobalSettings.fetch_hosts);
@ -1228,6 +1230,10 @@ async function blobHandler(request, response, blobId, uri) {
} }
} }
ssb.addEventListener('message', function () {
broadcastEvent('onMessage', [...arguments]);
});
ssb.addEventListener('broadcasts', function () { ssb.addEventListener('broadcasts', function () {
broadcastEvent('onBroadcastsChanged', []); broadcastEvent('onBroadcastsChanged', []);
}); });

68
default.nix Normal file
View File

@ -0,0 +1,68 @@
# How to upgrade to a newer version
# - Comment `src.hash`
# - Change `version`
# - Run `$ nix build`
# This will fetch the source code
# Since `hash` is not provided, nix will stop building and throw an error:
#
# error: hash mismatch in fixed-output derivation '/nix/store/fghi3ljs6fhz8pwm3dh73j5fwjpq5wbz-source.drv':
# specified: sha256-AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA=
# got: sha256-+uthA1w8CmZfW+WOK9wYGl2fUl/k10ufOc8W+Pwa9iQ=
# error: 1 dependencies of derivation '/nix/store/imcwsw5r74vkd8r0qa2k7cys2xfgraaz-tildefriends-0.0.18.drv' failed to build
#
# - Change `src.hash` to the new one, ie `sha256-+uthA1w8CmZfW+WOK9wYGl2fUl/k10ufOc8W+Pwa9iQ=`
# - Uncomment `src.hash`
# - Build again, this time it should work.
# - Check the release notes, if there's a new dependency or a change to `GNUMakefile`, this file might need to be changed too.
# For more details, contact tasiaiso @ https://tilde.club/~tasiaiso/
#
# WARNING: currently it is pinned to `47838d5e482cb4aac40190fa0414f08b8cf94d40`. I couldn't get v0.0.18 to work for some reason.
# I'll change this in the next release - tasiaiso
{
pkgs ? import <nixpkgs> {},
lib ? import <nixpkgs/lib>,
}:
pkgs.stdenv.mkDerivation rec {
pname = "tildefriends";
version = "0.0.19-wip";
src = pkgs.fetchFromGitea {
domain = "dev.tildefriends.net";
owner = "cory";
repo = "tildefriends";
# rev = "v${version}";
rev = "47838d5e482cb4aac40190fa0414f08b8cf94d40";
hash = "sha256-mb5KYvWPIqgV64FOaXKHm2ownBJiiSRtdH8+YWiXwvE="; # 47838d5e482cb4aac40190fa0414f08b8cf94d40
fetchSubmodules = true;
};
nativeBuildInputs = with pkgs; [
gnumake
openssl
which
];
buildInputs = with pkgs; [
openssl
which
];
buildPhase = ''
make -j $NIX_BUILD_CORES release
'';
installPhase = ''
mkdir -p $out/bin
cp -r out/release/tildefriends $out/bin
'';
doCheck = false;
meta = with pkgs; {
homepage = "https://tildefriends.net";
description = "Make apps and friends from the comfort of your web browser.";
mainProgram = "tildefriends";
license = with lib.licenses; [mit];
platforms = lib.platforms.all;
};
}

61
flake.lock Normal file
View File

@ -0,0 +1,61 @@
{
"nodes": {
"flake-utils": {
"inputs": {
"systems": "systems"
},
"locked": {
"lastModified": 1710146030,
"narHash": "sha256-SZ5L6eA7HJ/nmkzGG7/ISclqe6oZdOZTNoesiInkXPQ=",
"owner": "numtide",
"repo": "flake-utils",
"rev": "b1d9ab70662946ef0850d488da1c9019f3a9752a",
"type": "github"
},
"original": {
"owner": "numtide",
"repo": "flake-utils",
"type": "github"
}
},
"nixpkgs": {
"locked": {
"lastModified": 1715395895,
"narHash": "sha256-DreMqi6+qa21ffLQqhMQL2XRUkAGt3N7iVB5FhJKie4=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "71bae31b7dbc335528ca7e96f479ec93462323ff",
"type": "github"
},
"original": {
"owner": "NixOS",
"ref": "nixos-23.11",
"repo": "nixpkgs",
"type": "github"
}
},
"root": {
"inputs": {
"flake-utils": "flake-utils",
"nixpkgs": "nixpkgs"
}
},
"systems": {
"locked": {
"lastModified": 1681028828,
"narHash": "sha256-Vy1rq5AaRuLzOxct8nz4T6wlgyUR7zLU309k9mBC768=",
"owner": "nix-systems",
"repo": "default",
"rev": "da67096a3b9bf56a91d16901293e51ba5b49a27e",
"type": "github"
},
"original": {
"owner": "nix-systems",
"repo": "default",
"type": "github"
}
}
},
"root": "root",
"version": 7
}

37
flake.nix Normal file
View File

@ -0,0 +1,37 @@
{
description = "Tilde Friends is a platform for making, running, and sharing web applications.";
inputs = {
nixpkgs.url = "github:NixOS/nixpkgs/nixos-23.11";
flake-utils.url = "github:numtide/flake-utils";
};
outputs = {
self,
nixpkgs,
flake-utils,
}:
flake-utils.lib.eachDefaultSystem (system: let
pkgs = import nixpkgs {
inherit system;
};
in rec
{
# Nix formatter, run using `$ nix fmt`
formatter = pkgs.alejandra;
# Exports the tildefriends package
# Build with `$ nix build`
packages.default = pkgs.callPackage ./default.nix {};
# Creates a shell with the necessary dependencies
# Enter using `$ nix develop`
devShell = pkgs.mkShell {
buildInputs = with pkgs; [
openssl
llvmPackages_17.clang-unwrapped
unzip
];
};
});
}

View File

@ -342,6 +342,8 @@ typedef struct _tf_ssb_connection_t
tf_ssb_debug_message_t* debug_messages[k_debug_close_message_count]; tf_ssb_debug_message_t* debug_messages[k_debug_close_message_count];
int ref_count; int ref_count;
int read_back_pressure;
} tf_ssb_connection_t; } tf_ssb_connection_t;
static JSClassID _connection_class_id; static JSClassID _connection_class_id;
@ -1618,6 +1620,7 @@ static void _tf_ssb_connection_rpc_recv(tf_ssb_connection_t* connection, uint8_t
} }
if (!found && !_tf_ssb_name_equals(context, val, (const char*[]) { "Error", NULL })) if (!found && !_tf_ssb_name_equals(context, val, (const char*[]) { "Error", NULL }))
{ {
tf_ssb_connection_add_request(connection, -request_number, namebuf, NULL, NULL, NULL, NULL);
char buffer[256]; char buffer[256];
_tf_ssb_name_to_string(context, val, buffer, sizeof(buffer)); _tf_ssb_name_to_string(context, val, buffer, sizeof(buffer));
tf_ssb_connection_rpc_send_error_method_not_allowed(connection, flags, -request_number, buffer); tf_ssb_connection_rpc_send_error_method_not_allowed(connection, flags, -request_number, buffer);
@ -2061,6 +2064,30 @@ static void _tf_ssb_connection_client_send_hello(tf_ssb_connection_t* connection
connection->state = k_tf_ssb_state_sent_hello; connection->state = k_tf_ssb_state_sent_hello;
} }
static bool _tf_ssb_connection_read_start(tf_ssb_connection_t* connection)
{
int result = uv_read_start((uv_stream_t*)&connection->tcp, _tf_ssb_connection_on_tcp_alloc, _tf_ssb_connection_on_tcp_recv);
if (result && result != UV_EALREADY)
{
tf_printf("uv_read_start => %s\n", uv_strerror(result));
_tf_ssb_connection_close(connection, "uv_read_start failed");
return false;
}
return true;
}
static bool _tf_ssb_connection_read_stop(tf_ssb_connection_t* connection)
{
int result = uv_read_stop((uv_stream_t*)&connection->tcp);
if (result && result != UV_EALREADY)
{
tf_printf("uv_read_stop => %s\n", uv_strerror(result));
_tf_ssb_connection_close(connection, "uv_read_stop failed");
return false;
}
return true;
}
static void _tf_ssb_connection_on_connect(uv_connect_t* connect, int status) static void _tf_ssb_connection_on_connect(uv_connect_t* connect, int status)
{ {
tf_ssb_connection_t* connection = connect->data; tf_ssb_connection_t* connection = connect->data;
@ -2068,13 +2095,7 @@ static void _tf_ssb_connection_on_connect(uv_connect_t* connect, int status)
if (status == 0) if (status == 0)
{ {
connection->state = k_tf_ssb_state_connected; connection->state = k_tf_ssb_state_connected;
int result = uv_read_start(connect->handle, _tf_ssb_connection_on_tcp_alloc, _tf_ssb_connection_on_tcp_recv); if (_tf_ssb_connection_read_start(connection))
if (result)
{
tf_printf("uv_read_start => %s\n", uv_strerror(status));
_tf_ssb_connection_close(connection, "uv_read_start failed");
}
else
{ {
_tf_ssb_connection_client_send_hello(connection); _tf_ssb_connection_client_send_hello(connection);
} }
@ -2825,7 +2846,7 @@ static void _tf_ssb_on_connection(uv_stream_t* stream, int status)
_tf_ssb_notify_connections_changed(ssb, k_tf_ssb_change_create, connection); _tf_ssb_notify_connections_changed(ssb, k_tf_ssb_change_create, connection);
connection->state = k_tf_ssb_state_server_wait_hello; connection->state = k_tf_ssb_state_server_wait_hello;
uv_read_start((uv_stream_t*)&connection->tcp, _tf_ssb_connection_on_tcp_alloc, _tf_ssb_connection_on_tcp_recv); _tf_ssb_connection_read_start(connection);
} }
static void _tf_ssb_send_broadcast(tf_ssb_t* ssb, struct sockaddr_in* address, struct sockaddr_in* netmask) static void _tf_ssb_send_broadcast(tf_ssb_t* ssb, struct sockaddr_in* address, struct sockaddr_in* netmask)
@ -4064,3 +4085,26 @@ JSValue tf_ssb_connection_requests_to_object(tf_ssb_connection_t* connection)
} }
return object; return object;
} }
void tf_ssb_connection_adjust_read_backpressure(tf_ssb_connection_t* connection, int delta)
{
const int k_threshold = 256;
int old_pressure = connection->read_back_pressure;
connection->read_back_pressure += delta;
if (!connection->closing)
{
if (old_pressure < k_threshold && connection->read_back_pressure >= k_threshold)
{
_tf_ssb_connection_read_stop(connection);
}
else if (old_pressure >= k_threshold && connection->read_back_pressure < k_threshold)
{
_tf_ssb_connection_read_start(connection);
}
}
connection->ref_count += delta;
if (connection->ref_count == 0 && connection->closing)
{
_tf_ssb_connection_destroy(connection, "backpressure released");
}
}

View File

@ -996,4 +996,13 @@ void tf_ssb_schedule_work(tf_ssb_t* ssb, int delay_ms, void (*callback)(tf_ssb_t
*/ */
bool tf_ssb_hmacsha256_verify(const char* public_key, const void* payload, size_t payload_length, const char* signature, bool signature_is_urlb64); bool tf_ssb_hmacsha256_verify(const char* public_key, const void* payload, size_t payload_length, const char* signature, bool signature_is_urlb64);
/**
** Adjust read backpressure. If it gets too high, TCP receive will be paused
** until it lowers.
** @param connection The connection on which to affect backpressure.
** @param delta The change in backpressure. Higher will eventually pause
** receive. Lower will resume it.
*/
void tf_ssb_connection_adjust_read_backpressure(tf_ssb_connection_t* connection, int delta);
/** @} */ /** @} */

View File

@ -1889,7 +1889,7 @@ void tf_ssb_register(JSContext* context, tf_ssb_t* ssb)
JS_SetPropertyStr(context, object, "storeMessage", JS_NewCFunction(context, _tf_ssb_storeMessage, "storeMessage", 1)); JS_SetPropertyStr(context, object, "storeMessage", JS_NewCFunction(context, _tf_ssb_storeMessage, "storeMessage", 1));
JS_SetPropertyStr(context, object, "blobStore", JS_NewCFunction(context, _tf_ssb_blobStore, "blobStore", 1)); JS_SetPropertyStr(context, object, "blobStore", JS_NewCFunction(context, _tf_ssb_blobStore, "blobStore", 1));
/* Should be trusted only. */ /* Trusted only. */
JS_SetPropertyStr(context, object, "addEventListener", JS_NewCFunction(context, _tf_ssb_add_event_listener, "addEventListener", 2)); JS_SetPropertyStr(context, object, "addEventListener", JS_NewCFunction(context, _tf_ssb_add_event_listener, "addEventListener", 2));
JS_SetPropertyStr(context, object, "removeEventListener", JS_NewCFunction(context, _tf_ssb_remove_event_listener, "removeEventListener", 2)); JS_SetPropertyStr(context, object, "removeEventListener", JS_NewCFunction(context, _tf_ssb_remove_event_listener, "removeEventListener", 2));

View File

@ -404,6 +404,7 @@ typedef struct _blobs_get_t
bool done; bool done;
bool storing; bool storing;
tf_ssb_t* ssb; tf_ssb_t* ssb;
tf_ssb_connection_t* connection;
uint8_t buffer[]; uint8_t buffer[];
} blobs_get_t; } blobs_get_t;
@ -411,6 +412,7 @@ static void _tf_ssb_rpc_blob_store_callback(const char* id, bool is_new, void* u
{ {
blobs_get_t* get = user_data; blobs_get_t* get = user_data;
get->storing = false; get->storing = false;
tf_ssb_connection_adjust_read_backpressure(get->connection, -1);
if (get->done) if (get->done)
{ {
tf_free(get); tf_free(get);
@ -433,6 +435,7 @@ static void _tf_ssb_rpc_connection_blobs_get_callback(
if (JS_ToBool(context, args)) if (JS_ToBool(context, args))
{ {
get->storing = true; get->storing = true;
tf_ssb_connection_adjust_read_backpressure(connection, 1);
tf_ssb_db_blob_store_async(ssb, get->buffer, get->received, _tf_ssb_rpc_blob_store_callback, get); tf_ssb_db_blob_store_async(ssb, get->buffer, get->received, _tf_ssb_rpc_blob_store_callback, get);
} }
/* TODO: Should we send the response in the callback? */ /* TODO: Should we send the response in the callback? */
@ -455,7 +458,7 @@ static void _tf_ssb_rpc_connection_blobs_get_cleanup(tf_ssb_t* ssb, void* user_d
static void _tf_ssb_rpc_connection_blobs_get(tf_ssb_connection_t* connection, const char* blob_id, size_t size) static void _tf_ssb_rpc_connection_blobs_get(tf_ssb_connection_t* connection, const char* blob_id, size_t size)
{ {
blobs_get_t* get = tf_malloc(sizeof(blobs_get_t) + size); blobs_get_t* get = tf_malloc(sizeof(blobs_get_t) + size);
*get = (blobs_get_t) { .ssb = tf_ssb_connection_get_ssb(connection), .expected_size = size }; *get = (blobs_get_t) { .ssb = tf_ssb_connection_get_ssb(connection), .connection = connection, .expected_size = size };
snprintf(get->id, sizeof(get->id), "%s", blob_id); snprintf(get->id, sizeof(get->id), "%s", blob_id);
memset(get->buffer, 0, size); memset(get->buffer, 0, size);
@ -1000,6 +1003,12 @@ static void _tf_ssb_rpc_ebt_replicate_send_messages(tf_ssb_connection_t* connect
} }
} }
static void _tf_ssb_rpc_ebt_replicate_store_callback(const char* id, bool verified, bool is_new, void* user_data)
{
tf_ssb_connection_t* connection = user_data;
tf_ssb_connection_adjust_read_backpressure(connection, -1);
}
static void _tf_ssb_rpc_ebt_replicate(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data) static void _tf_ssb_rpc_ebt_replicate(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data)
{ {
tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection); tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection);
@ -1022,7 +1031,8 @@ static void _tf_ssb_rpc_ebt_replicate(tf_ssb_connection_t* connection, uint8_t f
if (!JS_IsUndefined(author)) if (!JS_IsUndefined(author))
{ {
/* Looks like a message. */ /* Looks like a message. */
tf_ssb_verify_strip_and_store_message(ssb, args, NULL, NULL); tf_ssb_connection_adjust_read_backpressure(connection, 1);
tf_ssb_verify_strip_and_store_message(ssb, args, _tf_ssb_rpc_ebt_replicate_store_callback, connection);
} }
else else
{ {