#include "ssb.rpc.h"

#include "log.h"
#include "mem.h"
#include "ssb.db.h"
#include "ssb.ebt.h"
#include "ssb.h"
#include "util.js.h"

#include "sqlite3.h"
#include "uv.h"

#include <inttypes.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>

static void _tf_ssb_connection_send_history_stream(
	tf_ssb_connection_t* connection, int32_t request_number, const char* author, int64_t sequence, bool keys, bool live, bool end_request);
static void _tf_ssb_rpc_send_peers_exchange(tf_ssb_connection_t* connection);
static void _tf_ssb_rpc_start_delete_blobs(tf_ssb_t* ssb, int delay_ms);
static void _tf_ssb_rpc_start_delete_feeds(tf_ssb_t* ssb, int delay_ms);
static void _tf_ssb_rpc_ebt_replicate_resend_clock(tf_ssb_connection_t* connection, bool skip, void* user_data);
static void _tf_ssb_rpc_connection_blobs_createWants_callback(
	tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data);

static void _tf_ssb_rpc_gossip_ping_callback(
	tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data)
{
	char buffer[256];
	snprintf(buffer, sizeof(buffer), "%" PRId64, (int64_t)time(NULL) * 1000);
	tf_ssb_connection_rpc_send(connection, flags, -request_number, NULL, (const uint8_t*)buffer, strlen(buffer), NULL, NULL, NULL);
	if (flags & k_ssb_rpc_flag_end_error)
	{
		tf_ssb_connection_remove_request(connection, request_number);
	}
}

static void _tf_ssb_rpc_gossip_ping(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data)
{
	tf_ssb_connection_add_request(connection, -request_number, "gossip.ping", _tf_ssb_rpc_gossip_ping_callback, NULL, NULL, NULL);
}

static void _tf_ssb_rpc_blobs_get_callback(
	tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data)
{
}

typedef struct _blobs_get_work_t
{
	int64_t request_number;
	char id[k_id_base64_len];
	bool found;
	uint8_t* blob;
	size_t size;
} blobs_get_work_t;

static void _tf_ssb_rpc_blobs_get_work(tf_ssb_connection_t* connection, void* user_data)
{
	blobs_get_work_t* work = user_data;
	tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection);
	work->found = tf_ssb_db_blob_get(ssb, work->id, &work->blob, &work->size);
}

static void _tf_ssb_rpc_blobs_get_after_work(tf_ssb_connection_t* connection, int status, void* user_data)
{
	blobs_get_work_t* work = user_data;
	if (work->found)
	{
		const size_t k_send_max = 8192;
		for (size_t offset = 0; offset < work->size; offset += k_send_max)
		{
			if (!tf_ssb_connection_rpc_send(connection, k_ssb_rpc_flag_binary | k_ssb_rpc_flag_stream, -work->request_number, NULL, work->blob + offset,
					offset + k_send_max <= work->size ? k_send_max : (work->size - offset), NULL, NULL, NULL))
			{
				break;
			}
		}
		tf_free(work->blob);
	}
	tf_ssb_connection_rpc_send(connection, k_ssb_rpc_flag_json | k_ssb_rpc_flag_end_error | k_ssb_rpc_flag_stream, -work->request_number, NULL,
		(const uint8_t*)(work->found ? "true" : "false"), strlen(work->found ? "true" : "false"), NULL, NULL, NULL);
	tf_free(work);
}

static void _tf_ssb_rpc_blobs_get(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data)
{
	if (flags & k_ssb_rpc_flag_end_error)
	{
		tf_ssb_connection_remove_request(connection, -request_number);
		return;
	}
	tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection);
	if (!tf_ssb_is_replicator(ssb))
	{
		tf_ssb_connection_rpc_send_error_method_not_allowed(connection, flags, -request_number, "blobs.get");
		return;
	}

	tf_ssb_connection_add_request(connection, -request_number, "blobs.get", _tf_ssb_rpc_blobs_get_callback, NULL, NULL, NULL);
	JSContext* context = tf_ssb_connection_get_context(connection);
	JSValue ids = JS_GetPropertyStr(context, args, "args");
	int length = tf_util_get_length(context, ids);

	for (int i = 0; i < length; i++)
	{
		JSValue arg = JS_GetPropertyUint32(context, ids, i);
		const char* id = NULL;
		if (JS_IsString(arg))
		{
			id = JS_ToCString(context, arg);
		}
		else
		{
			JSValue key = JS_GetPropertyStr(context, arg, "key");
			id = JS_ToCString(context, key);
			JS_FreeValue(context, key);
		}

		blobs_get_work_t* work = tf_malloc(sizeof(blobs_get_work_t));
		*work = (blobs_get_work_t) {
			.request_number = request_number,
		};
		snprintf(work->id, sizeof(work->id), "%s", id);
		tf_ssb_connection_run_work(connection, _tf_ssb_rpc_blobs_get_work, _tf_ssb_rpc_blobs_get_after_work, work);

		JS_FreeCString(context, id);
		JS_FreeValue(context, arg);
	}
	JS_FreeValue(context, ids);
}

typedef struct _blobs_has_work_t
{
	int64_t request_number;
	char id[k_id_base64_len];
	bool found;
} blobs_has_work_t;

static void _tf_ssb_rpc_blobs_has_work(tf_ssb_connection_t* connection, void* user_data)
{
	blobs_has_work_t* work = user_data;
	tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection);
	sqlite3* db = tf_ssb_acquire_db_reader(ssb);
	work->found = tf_ssb_db_blob_has(db, work->id);
	tf_ssb_release_db_reader(ssb, db);
}

static void _tf_ssb_rpc_blobs_has_after_work(tf_ssb_connection_t* connection, int status, void* user_data)
{
	blobs_has_work_t* work = user_data;
	tf_ssb_connection_rpc_send(connection, k_ssb_rpc_flag_json | k_ssb_rpc_flag_end_error | k_ssb_rpc_flag_stream, -work->request_number, NULL,
		(const uint8_t*)(work->found ? "true" : "false"), strlen(work->found ? "true" : "false"), NULL, NULL, NULL);
	tf_free(work);
}

static void _tf_ssb_rpc_blobs_has(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data)
{
	tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection);
	if (!tf_ssb_is_replicator(ssb))
	{
		tf_ssb_connection_rpc_send_error_method_not_allowed(connection, flags, -request_number, "blobs.has");
		return;
	}
	JSContext* context = tf_ssb_connection_get_context(connection);
	JSValue ids = JS_GetPropertyStr(context, args, "args");
	JSValue id = JS_GetPropertyUint32(context, ids, 0);
	const char* id_str = JS_ToCString(context, id);

	blobs_has_work_t* work = tf_malloc(sizeof(blobs_has_work_t));
	*work = (blobs_has_work_t) {
		.request_number = request_number,
	};
	snprintf(work->id, sizeof(work->id), "%s", id_str);
	tf_ssb_connection_run_work(connection, _tf_ssb_rpc_blobs_has_work, _tf_ssb_rpc_blobs_has_after_work, work);

	JS_FreeCString(context, id_str);
	JS_FreeValue(context, id);
	JS_FreeValue(context, ids);
}

static void _tf_ssb_rpc_blob_wants_added_callback(tf_ssb_t* ssb, const char* id, void* user_data)
{
	tf_ssb_connection_t* connection = user_data;
	tf_ssb_blob_wants_t* blob_wants = tf_ssb_connection_get_blob_wants_state(connection);
	JSContext* context = tf_ssb_get_context(ssb);
	JSValue message = JS_NewObject(context);
	JS_SetPropertyStr(context, message, id, JS_NewInt64(context, -1));
	tf_ssb_connection_rpc_send_json(connection, k_ssb_rpc_flag_stream, -blob_wants->request_number, NULL, message, NULL, NULL, NULL);
	JS_FreeValue(context, message);
}

typedef struct _blob_wants_work_t
{
	char out_id[32][k_blob_id_len];
	int out_id_count;
} blob_wants_work_t;

static void _tf_ssb_request_blob_wants_work(tf_ssb_connection_t* connection, void* user_data)
{
	blob_wants_work_t* work = user_data;
	tf_ssb_blob_wants_t* blob_wants = tf_ssb_connection_get_blob_wants_state(connection);
	tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection);
	int64_t age = -1;
	sqlite3* db = tf_ssb_acquire_db_reader(ssb);
	tf_ssb_db_get_global_setting_int64(db, "blob_fetch_age_seconds", &age);
	tf_ssb_release_db_reader(ssb, db);
	int64_t timestamp = -1;
	if (age == 0)
	{
		/* Don't fetch any blobs. */
		return;
	}
	else if (age > 0)
	{
		int64_t now = (int64_t)time(NULL) * 1000ULL;
		timestamp = now - age * 1000ULL;
	}

	db = tf_ssb_acquire_db_reader(ssb);
	sqlite3_stmt* statement;
	if (sqlite3_prepare(db, "SELECT id FROM blob_wants_view WHERE id > ? AND timestamp > ? ORDER BY id LIMIT ?", -1, &statement, NULL) == SQLITE_OK)
	{
		if (sqlite3_bind_text(statement, 1, blob_wants->last_id, -1, NULL) == SQLITE_OK && sqlite3_bind_int64(statement, 2, timestamp) == SQLITE_OK &&
			sqlite3_bind_int(statement, 3, tf_countof(work->out_id)) == SQLITE_OK)
		{
			while (sqlite3_step(statement) == SQLITE_ROW)
			{
				snprintf(work->out_id[work->out_id_count], sizeof(work->out_id[work->out_id_count]), "%s", (const char*)sqlite3_column_text(statement, 0));
				work->out_id_count++;
			}
		}
		sqlite3_finalize(statement);
	}
	else
	{
		tf_printf("prepare failed: %s\n", sqlite3_errmsg(db));
	}
	tf_ssb_release_db_reader(ssb, db);
}

static void _tf_ssb_request_blob_wants_after_work(tf_ssb_connection_t* connection, int result, void* user_data)
{
	blob_wants_work_t* work = user_data;
	if (!tf_ssb_is_shutting_down(tf_ssb_connection_get_ssb(connection)))
	{
		JSContext* context = tf_ssb_connection_get_context(connection);
		tf_ssb_blob_wants_t* blob_wants = tf_ssb_connection_get_blob_wants_state(connection);
		bool send_failed = false;
		for (int i = 0; i < work->out_id_count && !send_failed; i++)
		{
			JSValue message = JS_NewObject(context);
			JS_SetPropertyStr(context, message, work->out_id[i], JS_NewInt32(context, -1));
			send_failed = !tf_ssb_connection_rpc_send_json(connection, k_ssb_rpc_flag_stream, -blob_wants->request_number, NULL, message, NULL, NULL, NULL);
			JS_FreeValue(context, message);
			blob_wants->wants_sent++;
		}
		if (work->out_id_count)
		{
			snprintf(blob_wants->last_id, sizeof(blob_wants->last_id), "%s", work->out_id[work->out_id_count - 1]);
		}
	}
	tf_free(work);
}

static void _tf_ssb_rpc_request_more_blobs(tf_ssb_connection_t* connection)
{
	blob_wants_work_t* work = tf_malloc(sizeof(blob_wants_work_t));
	memset(work, 0, sizeof(*work));
	tf_ssb_connection_run_work(connection, _tf_ssb_request_blob_wants_work, _tf_ssb_request_blob_wants_after_work, work);
}

static void _tf_ssb_rpc_blobs_createWants(
	tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data)
{
	tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection);
	if (!tf_ssb_is_replicator(ssb))
	{
		tf_ssb_connection_rpc_send_error_method_not_allowed(connection, flags, -request_number, "blobs.createWants");
		return;
	}
	tf_ssb_connection_add_request(connection, -request_number, "blobs.createWants", _tf_ssb_rpc_connection_blobs_createWants_callback, NULL, NULL, NULL);
	tf_ssb_blob_wants_t* blob_wants = tf_ssb_connection_get_blob_wants_state(connection);
	tf_ssb_add_blob_want_added_callback(ssb, _tf_ssb_rpc_blob_wants_added_callback, NULL, connection);
	blob_wants->request_number = request_number;
	_tf_ssb_rpc_request_more_blobs(connection);
}

typedef struct tunnel_t
{
	tf_ssb_connection_t* connection;
	int32_t request_number;
} tunnel_t;

static void _tf_ssb_rpc_tunnel_callback(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data)
{
	tunnel_t* tun = user_data;
	if (flags & k_ssb_rpc_flag_end_error)
	{
		tf_ssb_connection_remove_request(connection, request_number);

		JSContext* context = tf_ssb_connection_get_context(connection);
		JSValue message_val = JS_GetPropertyStr(context, args, "message");
		JSValue stack_val = JS_GetPropertyStr(context, args, "stack");

		char buffer[1024];
		if (!JS_IsUndefined(message_val))
		{
			const char* message_string = JS_ToCString(context, message_val);
			const char* stack_string = JS_ToCString(context, stack_val);
			snprintf(buffer, sizeof(buffer), "Error from tunnel: %s\n%s", message_string, stack_string);
			JS_FreeCString(context, message_string);
			JS_FreeCString(context, stack_string);
		}
		else
		{
			snprintf(buffer, sizeof(buffer), "Error from tunnel: %.*s", (int)size, message);
		}

		JS_FreeValue(context, stack_val);
		JS_FreeValue(context, message_val);

		tf_ssb_connection_close(tun->connection, buffer);
	}
	else
	{
		tf_ssb_connection_rpc_send(tun->connection, flags, tun->request_number, NULL, message, size, NULL, NULL, NULL);
	}
}

static void _tf_ssb_rpc_tunnel_cleanup(tf_ssb_t* ssb, void* user_data)
{
	tf_free(user_data);
}

static void _tf_ssb_rpc_tunnel_connect(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data)
{
	tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection);
	if (!tf_ssb_is_room(ssb))
	{
		tf_ssb_connection_rpc_send_error_method_not_allowed(connection, flags, -request_number, "tunnel.connect");
		return;
	}

	JSContext* context = tf_ssb_connection_get_context(connection);
	JSValue arg_array = JS_GetPropertyStr(context, args, "args");
	JSValue arg = JS_GetPropertyUint32(context, arg_array, 0);
	JSValue origin = JS_GetPropertyStr(context, arg, "origin");
	JSValue portal = JS_GetPropertyStr(context, arg, "portal");
	JSValue target = JS_GetPropertyStr(context, arg, "target");

	if (JS_IsUndefined(origin) && !JS_IsUndefined(portal) && !JS_IsUndefined(target))
	{
		const char* target_str = JS_ToCString(context, target);

		tf_ssb_connection_t* target_connection = tf_ssb_connection_get(ssb, target_str);
		if (target_connection)
		{
			int32_t tunnel_request_number = tf_ssb_connection_next_request_number(target_connection);
			const char* portal_str = JS_ToCString(context, portal);

			JSValue message = JS_NewObject(context);
			JSValue name = JS_NewArray(context);
			JS_SetPropertyUint32(context, name, 0, JS_NewString(context, "tunnel"));
			JS_SetPropertyUint32(context, name, 1, JS_NewString(context, "connect"));
			JS_SetPropertyStr(context, message, "name", name);
			JSValue arg_obj = JS_NewObject(context);
			char origin_str[k_id_base64_len] = "";
			tf_ssb_connection_get_id(connection, origin_str, sizeof(origin_str));
			JS_SetPropertyStr(context, arg_obj, "origin", JS_NewString(context, origin_str));
			JS_SetPropertyStr(context, arg_obj, "portal", JS_NewString(context, portal_str));
			JS_SetPropertyStr(context, arg_obj, "target", JS_NewString(context, target_str));
			JSValue arg_array = JS_NewArray(context);
			JS_SetPropertyUint32(context, arg_array, 0, arg_obj);
			JS_SetPropertyStr(context, message, "args", arg_array);
			JS_SetPropertyStr(context, message, "type", JS_NewString(context, "duplex"));

			tf_ssb_connection_rpc_send_json(
				target_connection, k_ssb_rpc_flag_stream | k_ssb_rpc_flag_new_request, tunnel_request_number, "tunnel.connect", message, NULL, NULL, NULL);

			tunnel_t* data0 = tf_malloc(sizeof(tunnel_t));
			*data0 = (tunnel_t) {
				.connection = target_connection,
				.request_number = tunnel_request_number,
			};
			tunnel_t* data1 = tf_malloc(sizeof(tunnel_t));
			*data1 = (tunnel_t) {
				.connection = connection,
				.request_number = -request_number,
			};
			tf_ssb_connection_add_request(connection, -request_number, "tunnel.connect", _tf_ssb_rpc_tunnel_callback, _tf_ssb_rpc_tunnel_cleanup, data0, target_connection);
			tf_ssb_connection_add_request(target_connection, tunnel_request_number, "tunnel.connect", _tf_ssb_rpc_tunnel_callback, _tf_ssb_rpc_tunnel_cleanup, data1, connection);

			JS_FreeValue(context, message);
			JS_FreeCString(context, portal_str);
		}
		else
		{
			tf_ssb_connection_rpc_send_error(connection, flags, -request_number, "Connection not found.");
		}
		JS_FreeCString(context, target_str);
	}
	else if (!JS_IsUndefined(origin) && !JS_IsUndefined(portal) && !JS_IsUndefined(target))
	{
		const char* origin_str = JS_ToCString(context, origin);
		const char* portal_str = JS_ToCString(context, portal);
		const char* target_str = JS_ToCString(context, target);
		tf_ssb_connection_tunnel_create(ssb, portal_str, -request_number, origin_str, 0);
		JS_FreeCString(context, origin_str);
		JS_FreeCString(context, portal_str);
		JS_FreeCString(context, target_str);
	}

	JS_FreeValue(context, origin);
	JS_FreeValue(context, portal);
	JS_FreeValue(context, target);
	JS_FreeValue(context, arg);
	JS_FreeValue(context, arg_array);
}

static void _tf_ssb_rpc_room_meta(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data)
{
	tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection);
	JSContext* context = tf_ssb_get_context(ssb);
	JSValue response = JS_FALSE;
	if (tf_ssb_is_room(ssb))
	{
		response = JS_NewObject(context);
		JS_SetPropertyStr(context, response, "name", JS_NewString(context, tf_ssb_get_room_name(ssb)));
		JS_SetPropertyStr(context, response, "membership", JS_FALSE);
		JSValue features = JS_NewArray(context);
		JS_SetPropertyUint32(context, features, 0, JS_NewString(context, "tunnel"));
		JS_SetPropertyUint32(context, features, 1, JS_NewString(context, "room1"));
		JS_SetPropertyUint32(context, features, 2, JS_NewString(context, "room2"));
		JS_SetPropertyStr(context, response, "features", features);
	}
	tf_ssb_connection_rpc_send_json(connection, flags, -request_number, NULL, response, NULL, NULL, NULL);
	JS_FreeValue(context, response);
}

static void _tf_ssb_rpc_send_endpoints(tf_ssb_t* ssb)
{
	JSContext* context = tf_ssb_get_context(ssb);
	JSValue endpoints = JS_NewArray(context);

	tf_ssb_connection_t* connections[1024];
	int count = tf_ssb_get_connections(ssb, connections, tf_countof(connections));
	int id_count = 0;
	for (int i = 0; i < count; i++)
	{
		char id[k_id_base64_len] = { 0 };
		if ((tf_ssb_connection_is_attendant(connections[i]) || tf_ssb_connection_is_endpoint(connections[i])) && tf_ssb_connection_is_connected(connections[i]) &&
			tf_ssb_connection_get_id(connections[i], id, sizeof(id)))
		{
			JS_SetPropertyUint32(context, endpoints, id_count++, JS_NewString(context, id));
		}
	}

	for (int i = 0; i < count; i++)
	{
		if (tf_ssb_connection_is_endpoint(connections[i]) && tf_ssb_connection_is_connected(connections[i]))
		{
			int32_t request_number = tf_ssb_connection_get_endpoint_request_number(connections[i]);
			tf_ssb_connection_rpc_send_json(connections[i], k_ssb_rpc_flag_json | k_ssb_rpc_flag_stream, -request_number, NULL, endpoints, NULL, NULL, NULL);
		}
	}
	JS_FreeValue(context, endpoints);
}

static void _tf_ssb_rpc_room_attendants(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data)
{
	tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection);
	if (!tf_ssb_is_room(ssb))
	{
		tf_ssb_connection_rpc_send_error_method_not_allowed(connection, flags, -request_number, "room.attendants");
		return;
	}

	JSContext* context = tf_ssb_get_context(ssb);
	JSValue joined = JS_NewObject(context);
	JS_SetPropertyStr(context, joined, "type", JS_NewString(context, "joined"));
	char my_id[k_id_base64_len] = "";
	if (tf_ssb_connection_get_id(connection, my_id, sizeof(my_id)))
	{
		JS_SetPropertyStr(context, joined, "id", JS_NewString(context, my_id));
	}

	JSValue state = JS_NewObject(context);
	JS_SetPropertyStr(context, state, "type", JS_NewString(context, "state"));
	JSValue ids = JS_NewArray(context);
	int id_count = 0;
	tf_ssb_connection_t* connections[1024];
	int count = tf_ssb_get_connections(ssb, connections, tf_countof(connections));

	bool have_endpoints = false;
	for (int i = 0; i < count; i++)
	{
		char id[k_id_base64_len] = { 0 };
		if (tf_ssb_connection_is_attendant(connections[i]) && tf_ssb_connection_get_id(connections[i], id, sizeof(id)))
		{
			JS_SetPropertyUint32(context, ids, id_count++, JS_NewString(context, id));

			tf_ssb_connection_rpc_send_json(connections[i], flags, -tf_ssb_connection_get_attendant_request_number(connections[i]), NULL, joined, NULL, NULL, NULL);
		}
		if (tf_ssb_connection_is_endpoint(connections[i]))
		{
			have_endpoints = true;
		}
	}
	JS_SetPropertyStr(context, state, "ids", ids);
	tf_ssb_connection_rpc_send_json(connection, flags, -request_number, NULL, state, NULL, NULL, NULL);
	JS_FreeValue(context, joined);
	JS_FreeValue(context, state);

	if (have_endpoints)
	{
		_tf_ssb_rpc_send_endpoints(ssb);
	}

	tf_ssb_connection_set_attendant(connection, true, request_number);
}

static void _tf_ssb_rpc_tunnel_endpoints(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data)
{
	tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection);
	if (!tf_ssb_is_room(ssb))
	{
		tf_ssb_connection_rpc_send_error_method_not_allowed(connection, flags, -request_number, "tunnel.endpoints");
		return;
	}

	tf_ssb_connection_add_request(connection, -request_number, "tunnel.endpoints", NULL, NULL, NULL, NULL);
	tf_ssb_connection_set_endpoint(connection, true, request_number);
	_tf_ssb_rpc_send_endpoints(ssb);
}

typedef struct _blobs_get_t
{
	char id[k_blob_id_len];
	size_t received;
	size_t expected_size;
	bool done;
	bool storing;
	tf_ssb_t* ssb;
	tf_ssb_connection_t* connection;
	uint8_t buffer[];
} blobs_get_t;

static void _tf_ssb_rpc_blob_store_callback(const char* id, bool is_new, void* user_data)
{
	blobs_get_t* get = user_data;
	get->storing = false;
	tf_ssb_connection_adjust_read_backpressure(get->connection, -1);
	if (get->done)
	{
		tf_free(get);
	}
}

static void _tf_ssb_rpc_connection_blobs_get_callback(
	tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data)
{
	tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection);
	JSContext* context = tf_ssb_get_context(ssb);
	blobs_get_t* get = user_data;
	if ((flags & k_ssb_rpc_mask_type) == k_ssb_rpc_flag_binary && size > 0 && get->received + size <= get->expected_size)
	{
		memcpy(get->buffer + get->received, message, size);
		get->received += size;
	}
	else if ((flags & k_ssb_rpc_mask_type) == k_ssb_rpc_flag_json)
	{
		if (JS_ToBool(context, args))
		{
			get->storing = true;
			tf_ssb_connection_adjust_read_backpressure(connection, 1);
			tf_ssb_db_blob_store_async(ssb, get->buffer, get->received, _tf_ssb_rpc_blob_store_callback, get);
		}
		/* TODO: Should we send the response in the callback? */
		bool stored = true;
		tf_ssb_connection_rpc_send(connection, k_ssb_rpc_flag_json | k_ssb_rpc_flag_stream | k_ssb_rpc_flag_end_error, -request_number, NULL,
			(const uint8_t*)(stored ? "true" : "false"), strlen(stored ? "true" : "false"), NULL, NULL, NULL);
	}
}

static void _tf_ssb_rpc_connection_blobs_get_cleanup(tf_ssb_t* ssb, void* user_data)
{
	blobs_get_t* get = user_data;
	get->done = true;
	if (!get->storing)
	{
		tf_free(get);
	}
}

static void _tf_ssb_rpc_connection_blobs_get(tf_ssb_connection_t* connection, const char* blob_id, size_t size)
{
	blobs_get_t* get = tf_malloc(sizeof(blobs_get_t) + size);
	*get = (blobs_get_t) { .ssb = tf_ssb_connection_get_ssb(connection), .connection = connection, .expected_size = size };
	snprintf(get->id, sizeof(get->id), "%s", blob_id);
	memset(get->buffer, 0, size);

	tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection);
	JSContext* context = tf_ssb_get_context(ssb);
	JSValue message = JS_NewObject(context);
	JSValue name = JS_NewArray(context);
	JS_SetPropertyUint32(context, name, 0, JS_NewString(context, "blobs"));
	JS_SetPropertyUint32(context, name, 1, JS_NewString(context, "get"));
	JS_SetPropertyStr(context, message, "name", name);
	JS_SetPropertyStr(context, message, "type", JS_NewString(context, "source"));
	JSValue args = JS_NewArray(context);
	JS_SetPropertyUint32(context, args, 0, JS_NewString(context, blob_id));
	JS_SetPropertyStr(context, message, "args", args);

	tf_ssb_connection_rpc_send_json(connection, k_ssb_rpc_flag_stream | k_ssb_rpc_flag_new_request, tf_ssb_connection_next_request_number(connection), "blobs.get", message,
		_tf_ssb_rpc_connection_blobs_get_callback, _tf_ssb_rpc_connection_blobs_get_cleanup, get);

	JS_FreeValue(context, message);
}

typedef struct _blob_create_wants_work_t
{
	tf_ssb_connection_t* connection;
	char blob_id[k_blob_id_len];
	bool out_result;
	int64_t size;
	size_t out_size;
} blob_create_wants_work_t;

static void _tf_ssb_rpc_connection_blobs_create_wants_work(tf_ssb_connection_t* connection, void* user_data)
{
	blob_create_wants_work_t* work = user_data;
	tf_ssb_t* ssb = tf_ssb_connection_get_ssb(work->connection);
	work->out_result = tf_ssb_db_blob_get(ssb, work->blob_id, NULL, &work->out_size);
}

static void _tf_ssb_rpc_connection_blobs_create_wants_after_work(tf_ssb_connection_t* connection, int result, void* user_data)
{
	blob_create_wants_work_t* work = user_data;
	tf_ssb_t* ssb = tf_ssb_connection_get_ssb(work->connection);
	tf_ssb_blob_wants_t* blob_wants = tf_ssb_connection_get_blob_wants_state(connection);
	JSContext* context = tf_ssb_get_context(ssb);
	if (work->out_result)
	{
		JSValue message = JS_NewObject(context);
		JS_SetPropertyStr(context, message, work->blob_id, JS_NewInt64(context, work->out_size));
		tf_ssb_connection_rpc_send_json(work->connection, k_ssb_rpc_flag_stream, -blob_wants->request_number, NULL, message, NULL, NULL, NULL);
		JS_FreeValue(context, message);
	}
	else if (work->size == -1LL)
	{
		JSValue message = JS_NewObject(context);
		JS_SetPropertyStr(context, message, work->blob_id, JS_NewInt64(context, -2));
		tf_ssb_connection_rpc_send_json(work->connection, k_ssb_rpc_flag_stream, -blob_wants->request_number, NULL, message, NULL, NULL, NULL);
		JS_FreeValue(context, message);
	}
	tf_free(work);
}

static void _tf_ssb_rpc_connection_blobs_createWants_callback(
	tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data)
{
	tf_ssb_blob_wants_t* blob_wants = tf_ssb_connection_get_blob_wants_state(connection);
	if (!blob_wants)
	{
		return;
	}

	JSContext* context = tf_ssb_connection_get_context(connection);

	JSValue name = JS_GetPropertyStr(context, args, "name");
	if (!JS_IsUndefined(name))
	{
		/* { name: "Error" } */
		tf_ssb_connection_remove_request(connection, -request_number);
		JS_FreeValue(context, name);
		return;
	}

	JSPropertyEnum* ptab = NULL;
	uint32_t plen = 0;
	if (JS_GetOwnPropertyNames(context, &ptab, &plen, args, JS_GPN_STRING_MASK) == 0)
	{
		for (uint32_t i = 0; i < plen; ++i)
		{
			JSValue key = JS_AtomToString(context, ptab[i].atom);
			JSPropertyDescriptor desc;
			JSValue key_value = JS_NULL;
			if (JS_GetOwnProperty(context, &desc, args, ptab[i].atom) == 1)
			{
				key_value = desc.value;
				JS_FreeValue(context, desc.setter);
				JS_FreeValue(context, desc.getter);
			}
			const char* blob_id = JS_ToCString(context, key);
			int64_t size = 0;
			JS_ToInt64(context, &size, key_value);
			if (--blob_wants->wants_sent == 0)
			{
				_tf_ssb_rpc_request_more_blobs(connection);
			}
			if (size < 0)
			{
				blob_create_wants_work_t* work = tf_malloc(sizeof(blob_create_wants_work_t));
				*work = (blob_create_wants_work_t) {
					.connection = connection,
					.size = size,
				};
				snprintf(work->blob_id, sizeof(work->blob_id), "%s", blob_id);
				tf_ssb_connection_run_work(connection, _tf_ssb_rpc_connection_blobs_create_wants_work, _tf_ssb_rpc_connection_blobs_create_wants_after_work, work);
			}
			else
			{
				_tf_ssb_rpc_connection_blobs_get(connection, blob_id, size);
			}
			JS_FreeCString(context, blob_id);
			JS_FreeValue(context, key);
			JS_FreeValue(context, key_value);
		}
		for (uint32_t i = 0; i < plen; ++i)
		{
			JS_FreeAtom(context, ptab[i].atom);
		}
		js_free(context, ptab);
	}
}

static void _tf_ssb_rpc_connection_room_attendants_callback(
	tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data)
{
	tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection);
	JSContext* context = tf_ssb_get_context(ssb);
	JSValue type = JS_GetPropertyStr(context, args, "type");
	const char* type_string = JS_ToCString(context, type);
	if (!type_string)
	{
		tf_ssb_connection_rpc_send_error(connection, flags, -request_number, "Missing type.");
	}
	else if (strcmp(type_string, "state") == 0)
	{
		tf_ssb_connection_clear_room_attendants(connection);
		JSValue ids = JS_GetPropertyStr(context, args, "ids");
		int length = tf_util_get_length(context, ids);
		for (int i = 0; i < length; i++)
		{
			JSValue id = JS_GetPropertyUint32(context, ids, i);
			const char* id_string = JS_ToCString(context, id);
			if (id_string)
			{
				tf_ssb_connection_add_room_attendant(connection, id_string);
			}
			JS_FreeCString(context, id_string);
			JS_FreeValue(context, id);
		}
		JS_FreeValue(context, ids);
	}
	else if (strcmp(type_string, "joined") == 0)
	{
		JSValue id = JS_GetPropertyStr(context, args, "id");
		const char* id_string = JS_ToCString(context, id);
		if (id_string)
		{
			tf_ssb_connection_add_room_attendant(connection, id_string);
		}
		JS_FreeCString(context, id_string);
		JS_FreeValue(context, id);
	}
	else if (strcmp(type_string, "left") == 0)
	{
		JSValue id = JS_GetPropertyStr(context, args, "id");
		const char* id_string = JS_ToCString(context, id);
		if (id_string)
		{
			tf_ssb_connection_remove_room_attendant(connection, id_string);
		}
		JS_FreeCString(context, id_string);
		JS_FreeValue(context, id);
	}
	else
	{
		char buffer[256];
		snprintf(buffer, sizeof(buffer), "Unexpected room.attendants response type: '%s'.", type_string);
		tf_ssb_connection_rpc_send_error(connection, flags, -request_number, buffer);
	}
	JS_FreeCString(context, type_string);
	JS_FreeValue(context, type);
}

static bool _is_error(JSContext* context, JSValue message)
{
	JSValue name = JS_GetPropertyStr(context, message, "name");
	const char* name_string = JS_ToCString(context, name);
	bool is_error = false;
	if (name_string && strcmp(name_string, "Error") == 0)
	{
		is_error = true;
	}
	JS_FreeCString(context, name_string);
	JS_FreeValue(context, name);
	return is_error;
}

static void _tf_ssb_rpc_connection_tunnel_isRoom_callback(
	tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data)
{
	tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection);
	JSContext* context = tf_ssb_get_context(ssb);
	if (_is_error(context, args))
	{
		return;
	}

	if (JS_IsObject(args))
	{
		JSValue message = JS_NewObject(context);
		JSValue name = JS_NewArray(context);
		JS_SetPropertyUint32(context, name, 0, JS_NewString(context, "room"));
		JS_SetPropertyUint32(context, name, 1, JS_NewString(context, "attendants"));
		JS_SetPropertyStr(context, message, "name", name);
		JS_SetPropertyStr(context, message, "type", JS_NewString(context, "source"));
		JS_SetPropertyStr(context, message, "args", JS_NewArray(context));
		tf_ssb_connection_rpc_send_json(connection, k_ssb_rpc_flag_stream | k_ssb_rpc_flag_new_request, tf_ssb_connection_next_request_number(connection), "room.attendants",
			message, _tf_ssb_rpc_connection_room_attendants_callback, NULL, NULL);
		JS_FreeValue(context, message);
	}
}

typedef struct _tf_ssb_connection_send_history_stream_t
{
	int32_t request_number;
	char author[k_id_base64_len];
	int64_t sequence;
	bool keys;
	bool live;
	bool end_request;

	bool out_finished;
	int64_t out_max_sequence_seen;
	char** out_messages;
	int out_messages_count;
} tf_ssb_connection_send_history_stream_t;

static void _tf_ssb_connection_send_history_stream_work(tf_ssb_connection_t* connection, void* user_data)
{
	tf_ssb_connection_send_history_stream_t* request = user_data;
	if (!tf_ssb_connection_is_connected(connection))
	{
		return;
	}
	tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection);
	sqlite3* db = tf_ssb_acquire_db_reader(ssb);
	sqlite3_stmt* statement;
	const int k_max = 32;
	if (sqlite3_prepare(db,
			"SELECT previous, author, id, sequence, timestamp, hash, json(content), signature, flags FROM messages WHERE author = ?1 AND sequence > ?2 AND "
			"sequence < ?3 ORDER BY sequence",
			-1, &statement, NULL) == SQLITE_OK)
	{
		if (sqlite3_bind_text(statement, 1, request->author, -1, NULL) == SQLITE_OK && sqlite3_bind_int64(statement, 2, request->sequence) == SQLITE_OK &&
			sqlite3_bind_int64(statement, 3, request->sequence + k_max) == SQLITE_OK)
		{
			JSMallocFunctions funcs = { 0 };
			tf_get_js_malloc_functions(&funcs);
			JSRuntime* runtime = JS_NewRuntime2(&funcs, NULL);
			JSContext* context = JS_NewContext(runtime);

			int r = SQLITE_OK;
			while ((r = sqlite3_step(statement)) == SQLITE_ROW)
			{
				JSValue message = JS_UNDEFINED;
				request->out_max_sequence_seen = sqlite3_column_int64(statement, 3);

				JSValue formatted = tf_ssb_format_message(context, (const char*)sqlite3_column_text(statement, 0), (const char*)sqlite3_column_text(statement, 1),
					sqlite3_column_int64(statement, 3), sqlite3_column_double(statement, 4), (const char*)sqlite3_column_text(statement, 5),
					(const char*)sqlite3_column_text(statement, 6), (const char*)sqlite3_column_text(statement, 7), sqlite3_column_int(statement, 8));
				if (request->keys)
				{
					message = JS_NewObject(context);
					JS_SetPropertyStr(context, message, "key", JS_NewString(context, (const char*)sqlite3_column_text(statement, 2)));
					JS_SetPropertyStr(context, message, "value", formatted);
					JS_SetPropertyStr(context, message, "timestamp", JS_NewString(context, (const char*)sqlite3_column_text(statement, 4)));
				}
				else
				{
					message = formatted;
				}
				JSValue json = JS_JSONStringify(context, message, JS_NULL, JS_NULL);
				size_t size = 0;
				const char* string = JS_ToCStringLen(context, &size, json);

				request->out_messages = tf_resize_vec(request->out_messages, sizeof(char*) * (request->out_messages_count + 1));
				char* copy = tf_malloc(size + 1);
				memcpy(copy, string, size + 1);
				JS_FreeCString(context, string);
				request->out_messages[request->out_messages_count++] = copy;

				JS_FreeValue(context, json);
				JS_FreeValue(context, message);
			}

			JS_FreeContext(context);
			JS_FreeRuntime(runtime);
		}
		sqlite3_finalize(statement);
	}
	else
	{
		tf_printf("prepare failed: %s\n", sqlite3_errmsg(db));
	}
	tf_ssb_release_db_reader(ssb, db);
	request->out_finished = request->out_max_sequence_seen != request->sequence + k_max - 1;
}

static void _tf_ssb_connection_send_history_stream_destroy(tf_ssb_connection_send_history_stream_t* request)
{
	for (int i = 0; i < request->out_messages_count; i++)
	{
		tf_free(request->out_messages[i]);
	}
	tf_free(request->out_messages);
	tf_free(request);
}

static void _tf_ssb_connection_send_history_stream_after_work(tf_ssb_connection_t* connection, int result, void* user_data)
{
	tf_ssb_connection_send_history_stream_t* request = user_data;
	tf_ssb_connection_adjust_write_count(connection, -1);
	if (tf_ssb_connection_is_connected(connection) && !tf_ssb_is_shutting_down(tf_ssb_connection_get_ssb(connection)))
	{
		for (int i = 0; i < request->out_messages_count; i++)
		{
			if (!tf_ssb_connection_rpc_send(connection, k_ssb_rpc_flag_stream | k_ssb_rpc_flag_json, request->request_number, NULL, (const uint8_t*)request->out_messages[i],
					strlen(request->out_messages[i]), NULL, NULL, NULL))
			{
				break;
			}
		}
		tf_ssb_ebt_set_messages_sent(tf_ssb_connection_get_ebt(connection), request->author, request->out_max_sequence_seen);
		if (!request->out_finished)
		{
			_tf_ssb_connection_send_history_stream(
				connection, request->request_number, request->author, request->out_max_sequence_seen, request->keys, request->live, request->end_request);
		}
		else if (!request->live && request->end_request)
		{
			tf_ssb_connection_rpc_send(connection, k_ssb_rpc_flag_json, request->request_number, NULL, (const uint8_t*)"false", strlen("false"), NULL, NULL, NULL);
		}
	}
	_tf_ssb_connection_send_history_stream_destroy(request);
}

static void _tf_ssb_connection_send_history_stream_callback(tf_ssb_connection_t* connection, bool skip, void* user_data)
{
	tf_ssb_connection_adjust_write_count(connection, 1);
	if (!skip && tf_ssb_connection_is_connected(connection) && !tf_ssb_is_shutting_down(tf_ssb_connection_get_ssb(connection)))
	{
		tf_ssb_connection_run_work(connection, _tf_ssb_connection_send_history_stream_work, _tf_ssb_connection_send_history_stream_after_work, user_data);
	}
	else
	{
		_tf_ssb_connection_send_history_stream_destroy(user_data);
	}
}

static void _tf_ssb_connection_send_history_stream(
	tf_ssb_connection_t* connection, int32_t request_number, const char* author, int64_t sequence, bool keys, bool live, bool end_request)
{
	if (tf_ssb_connection_is_connected(connection) && !tf_ssb_is_shutting_down(tf_ssb_connection_get_ssb(connection)) && !tf_ssb_connection_is_closing(connection))
	{
		tf_ssb_connection_send_history_stream_t* async = tf_malloc(sizeof(tf_ssb_connection_send_history_stream_t));
		*async = (tf_ssb_connection_send_history_stream_t) {
			.request_number = request_number,
			.sequence = sequence,
			.keys = keys,
			.live = live,
			.end_request = end_request,
		};
		snprintf(async->author, sizeof(async->author), "%s", author);
		char key[128];
		snprintf(key, sizeof(key), "%s:%" PRId64, author, sequence);
		tf_ssb_connection_schedule_idle(connection, key, _tf_ssb_connection_send_history_stream_callback, async);
	}
}

static void _tf_ssb_rpc_createHistoryStream(
	tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data)
{
	tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection);
	if (!tf_ssb_is_replicator(ssb))
	{
		tf_ssb_connection_rpc_send_error_method_not_allowed(connection, flags, -request_number, "createHistoryStream");
		return;
	}
	JSContext* context = tf_ssb_get_context(ssb);
	JSValue arg_array = JS_GetPropertyStr(context, args, "args");
	JSValue arg = JS_GetPropertyUint32(context, arg_array, 0);
	if (JS_IsUndefined(arg))
	{
		tf_ssb_connection_rpc_send_error(connection, flags, -request_number, "Missing request.args in createHistoryStream.");
	}
	JSValue id = JS_GetPropertyStr(context, arg, "id");
	JSValue seq = JS_GetPropertyStr(context, arg, "seq");
	JSValue keys = JS_GetPropertyStr(context, arg, "keys");
	JSValue live = JS_GetPropertyStr(context, arg, "live");
	bool is_keys = JS_IsUndefined(keys) || JS_ToBool(context, keys) > 0;
	bool is_live = JS_ToBool(context, live) > 0 && (tf_ssb_connection_get_flags(connection) & k_tf_ssb_connect_flag_one_shot) == 0;
	int64_t sequence = 0;
	JS_ToInt64(context, &sequence, seq);
	const char* author = JS_ToCString(context, id);

	_tf_ssb_connection_send_history_stream(connection, -request_number, author, sequence, is_keys, is_live, true);

	if (is_live)
	{
		tf_ssb_connection_add_new_message_request(connection, author, -request_number, is_keys);
	}

	JS_FreeCString(context, author);
	JS_FreeValue(context, id);
	JS_FreeValue(context, seq);
	JS_FreeValue(context, keys);
	JS_FreeValue(context, live);
	JS_FreeValue(context, arg);
	JS_FreeValue(context, arg_array);
}

static void _tf_ssb_rpc_ebt_replicate_send_messages(tf_ssb_connection_t* connection)
{
	tf_ssb_ebt_t* ebt = tf_ssb_connection_get_ebt(connection);
	tf_ssb_ebt_clock_t* clock = tf_ssb_ebt_get_messages_to_send(ebt);
	if (clock)
	{
		for (int i = 0; i < clock->count; i++)
		{
			tf_ssb_ebt_clock_entry_t* entry = &clock->entries[i];
			int32_t request_number = tf_ssb_connection_get_ebt_request_number(connection);
			bool live = (tf_ssb_connection_get_flags(connection) & k_tf_ssb_connect_flag_one_shot) == 0;
			_tf_ssb_connection_send_history_stream(connection, request_number, entry->id, entry->value, false, live, false);
		}
		tf_free(clock);
	}
}

static void _tf_ssb_rpc_ebt_replicate_store_callback(const char* id, bool verified, bool is_new, void* user_data)
{
	tf_ssb_connection_t* connection = user_data;
	tf_ssb_connection_adjust_read_backpressure(connection, -1);
}

typedef struct _resend_clock_t
{
	tf_ssb_connection_t* connection;
	int32_t request_number;
	int pending;
} resend_clock_t;

static void _tf_ssb_rpc_ebt_send_clock_callback(const tf_ssb_ebt_clock_t* clock, int32_t request_number, void* user_data)
{
	resend_clock_t* resend = user_data;
	tf_ssb_connection_t* connection = resend->connection;

	if (clock && clock->count && tf_ssb_connection_is_connected(connection) && !tf_ssb_connection_is_closing(connection))
	{
		JSContext* context = tf_ssb_connection_get_context(connection);
		JSValue message = JS_NewObject(context);
		for (int i = 0; i < clock->count; i++)
		{
			JS_SetPropertyStr(context, message, clock->entries[i].id, JS_NewInt64(context, clock->entries[i].value));
		}
		tf_ssb_connection_rpc_send_json(connection, k_ssb_rpc_flag_stream | k_ssb_rpc_flag_json, -request_number, NULL, message, NULL, NULL, NULL);
		JS_FreeValue(context, message);
	}

	tf_ssb_ebt_t* ebt = tf_ssb_connection_get_ebt(connection);
	if (resend->pending != tf_ssb_ebt_get_send_clock_pending(ebt) && tf_ssb_connection_is_connected(connection) &&
		!tf_ssb_is_shutting_down(tf_ssb_connection_get_ssb(connection)) && !tf_ssb_connection_is_closing(connection))
	{
		resend->pending = tf_ssb_ebt_get_send_clock_pending(ebt);
		tf_ssb_connection_schedule_idle(connection, "ebt.clock", _tf_ssb_rpc_ebt_replicate_resend_clock, resend);
	}
	else
	{
		tf_ssb_ebt_set_send_clock_pending(ebt, 0);
		tf_free(resend);
	}

	_tf_ssb_rpc_ebt_replicate_send_messages(connection);
}

static void _tf_ssb_rpc_ebt_replicate_resend_clock(tf_ssb_connection_t* connection, bool skip, void* user_data)
{
	resend_clock_t* resend = user_data;
	if (!skip)
	{
		tf_ssb_ebt_t* ebt = tf_ssb_connection_get_ebt(connection);
		tf_ssb_ebt_get_send_clock(ebt, resend->request_number, _tf_ssb_rpc_ebt_send_clock_callback, resend);
	}
	else
	{
		tf_free(resend);
	}
}

static void _tf_ssb_rpc_ebt_schedule_send_clock(tf_ssb_connection_t* connection)
{
	tf_ssb_ebt_t* ebt = tf_ssb_connection_get_ebt(connection);
	int pending = tf_ssb_ebt_get_send_clock_pending(ebt) + 1;
	tf_ssb_ebt_set_send_clock_pending(ebt, pending);
	if (pending == 1)
	{
		resend_clock_t* resend = tf_malloc(sizeof(resend_clock_t));
		*resend = (resend_clock_t) {
			.connection = connection,
			.request_number = -tf_ssb_connection_get_ebt_request_number(connection),
			.pending = pending,
		};
		tf_ssb_connection_schedule_idle(connection, "ebt.clock", _tf_ssb_rpc_ebt_replicate_resend_clock, resend);
	}
}

static void _tf_ssb_rpc_ebt_replicate(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data)
{
	tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection);
	JSContext* context = tf_ssb_get_context(ssb);
	if (_is_error(context, args))
	{
		/* TODO: Send createHistoryStream. */
		tf_ssb_connection_remove_request(connection, -request_number);
		return;
	}

	if (!tf_ssb_connection_get_ebt_request_number(connection))
	{
		tf_ssb_connection_set_ebt_request_number(connection, -request_number);
	}

	JSValue author = JS_GetPropertyStr(context, args, "author");
	JSValue name = JS_GetPropertyStr(context, args, "name");
	JSValue in_clock = JS_IsUndefined(name) ? args : JS_UNDEFINED;

	bool resend_clock = false;

	tf_ssb_ebt_t* ebt = tf_ssb_connection_get_ebt(connection);
	if (!JS_IsUndefined(author))
	{
		/* Looks like a message. */
		tf_ssb_connection_adjust_read_backpressure(connection, 1);
		tf_ssb_verify_strip_and_store_message(ssb, args, _tf_ssb_rpc_ebt_replicate_store_callback, connection);

		resend_clock = !tf_ssb_is_shutting_down(ssb) && !tf_ssb_connection_is_closing(connection);
	}
	else
	{
		tf_ssb_ebt_receive_clock(ebt, context, in_clock);
		resend_clock = true;
	}

	if (resend_clock && tf_ssb_connection_is_connected(connection) && !tf_ssb_is_shutting_down(tf_ssb_connection_get_ssb(connection)) && !tf_ssb_connection_is_closing(connection))
	{
		_tf_ssb_rpc_ebt_schedule_send_clock(connection);
	}
	JS_FreeValue(context, name);
	JS_FreeValue(context, author);
}

static void _tf_ssb_rpc_ebt_replicate_client(
	tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data)
{
	_tf_ssb_rpc_ebt_replicate(connection, flags, request_number, args, message, size, user_data);
}

static void _tf_ssb_rpc_send_ebt_replicate(tf_ssb_connection_t* connection)
{
	tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection);
	JSContext* context = tf_ssb_get_context(ssb);
	JSValue message = JS_NewObject(context);
	JSValue name = JS_NewArray(context);
	JS_SetPropertyUint32(context, name, 0, JS_NewString(context, "ebt"));
	JS_SetPropertyUint32(context, name, 1, JS_NewString(context, "replicate"));
	JS_SetPropertyStr(context, message, "name", name);
	JSValue arg = JS_NewObject(context);
	JS_SetPropertyStr(context, arg, "version", JS_NewInt32(context, 3));
	JS_SetPropertyStr(context, arg, "format", JS_NewString(context, "classic"));
	JSValue args = JS_NewArray(context);
	JS_SetPropertyUint32(context, args, 0, arg);
	JS_SetPropertyStr(context, message, "args", args);
	JS_SetPropertyStr(context, message, "type", JS_NewString(context, "duplex"));
	int32_t request_number = tf_ssb_connection_next_request_number(connection);
	tf_ssb_connection_rpc_send_json(
		connection, k_ssb_rpc_flag_stream | k_ssb_rpc_flag_new_request, request_number, "ebt.replicate", message, _tf_ssb_rpc_ebt_replicate_client, NULL, NULL);
	if (!tf_ssb_connection_get_ebt_request_number(connection))
	{
		tf_ssb_connection_set_ebt_request_number(connection, request_number);
	}
	JS_FreeValue(context, message);
}

static void _tf_ssb_rpc_ebt_replicate_server(
	tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data)
{
	if (flags & k_ssb_rpc_flag_end_error)
	{
		return;
	}
	tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection);
	if (!tf_ssb_is_replicator(ssb))
	{
		tf_ssb_connection_rpc_send_error_method_not_allowed(connection, flags, -request_number, "ebt.replicate");
		return;
	}
	_tf_ssb_rpc_ebt_replicate(connection, flags, request_number, args, message, size, user_data);
	tf_ssb_connection_add_request(connection, -request_number, "ebt.replicate", _tf_ssb_rpc_ebt_replicate, NULL, NULL, NULL);
}

typedef struct _invite_use_t
{
	tf_ssb_t* ssb;
	char author[k_id_base64_len];
	;
	uint8_t private_key[512];
	char previous_id[64];
	int64_t previous_sequence;

	char host[256];
	int port;
	char pub[k_id_base64_len];
} invite_use_t;

static void _tf_ssb_rpc_get_previous_work(tf_ssb_connection_t* connection, void* user_data)
{
	invite_use_t* work = user_data;
	tf_ssb_t* ssb = work->ssb;
	tf_ssb_db_get_latest_message_by_author(ssb, work->author, &work->previous_sequence, work->previous_id, sizeof(work->previous_id));
}

static void _tf_ssb_invite_use_pub_message_store_callback(const char* id, bool verified, bool is_new, void* user_data)
{
	invite_use_t* work = user_data;
	tf_free(work);
}

static void _tf_ssb_invite_use_contact_message_store_callback(const char* id, bool verified, bool is_new, void* user_data)
{
	/*
	** 2. Post a pub message:
	** { "type": "pub", "address": { "host": "one.butt.nz", "port": 8008, "key": "@VJM7w1W19ZsKmG2KnfaoKIM66BRoreEkzaVm/J//wl8=.ed25519" } }
	*/
	invite_use_t* work = user_data;
	tf_ssb_t* ssb = work->ssb;
	JSContext* context = tf_ssb_get_context(ssb);

	JSValue content = JS_NewObject(context);
	JS_SetPropertyStr(context, content, "type", JS_NewString(context, "pub"));
	JSValue address = JS_NewObject(context);
	JS_SetPropertyStr(context, address, "host", JS_NewString(context, work->host));
	JS_SetPropertyStr(context, address, "port", JS_NewInt32(context, work->port));
	JS_SetPropertyStr(context, address, "key", JS_NewString(context, work->pub));
	JS_SetPropertyStr(context, content, "address", address);

	JSValue message = tf_ssb_sign_message(ssb, work->author, work->private_key, content, id, work->previous_sequence + 1);
	JSValue json = JS_JSONStringify(context, message, JS_NULL, JS_NULL);
	JS_FreeValue(context, json);
	tf_ssb_verify_strip_and_store_message(ssb, message, _tf_ssb_invite_use_pub_message_store_callback, work);
	JS_FreeValue(context, message);

	JS_FreeValue(context, content);
}

static void _tf_ssb_rpc_get_previous_after_work(tf_ssb_connection_t* connection, int status, void* user_data)
{
	invite_use_t* work = user_data;
	tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection);

	JSContext* context = tf_ssb_connection_get_context(connection);
	JSValue content = JS_NewObject(context);
	JS_SetPropertyStr(context, content, "type", JS_NewString(context, "contact"));
	JS_SetPropertyStr(context, content, "contact", JS_NewString(context, work->pub));
	JS_SetPropertyStr(context, content, "following", JS_TRUE);

	JSValue message = tf_ssb_sign_message(ssb, work->author, work->private_key, content, work->previous_id, work->previous_sequence);
	JSValue json = JS_JSONStringify(context, message, JS_NULL, JS_NULL);
	JS_FreeValue(context, json);
	tf_ssb_verify_strip_and_store_message(ssb, message, _tf_ssb_invite_use_contact_message_store_callback, work);
	JS_FreeValue(context, message);

	JS_FreeValue(context, content);
}

static void _tf_ssb_rpc_invite_use_callback(
	tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data)
{
	/*
	** 1. Follow the pub back:
	** { "type": "contact", "contact": "@VJM7w1W19ZsKmG2KnfaoKIM66BRoreEkzaVm/J//wl8=.ed25519", "following": true }
	*/
	tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection);
	invite_use_t* work = tf_malloc(sizeof(invite_use_t));
	*work = (invite_use_t) {
		.ssb = ssb,
		.port = tf_ssb_connection_get_port(connection),
	};
	snprintf(work->host, sizeof(work->host), "%s", tf_ssb_connection_get_host(connection));
	tf_ssb_whoami(ssb, work->author, sizeof(work->author));
	tf_ssb_get_private_key(ssb, work->private_key, sizeof(work->private_key));
	tf_ssb_connection_get_id(connection, work->pub, sizeof(work->pub));
	tf_ssb_connection_run_work(connection, _tf_ssb_rpc_get_previous_work, _tf_ssb_rpc_get_previous_after_work, work);
}

static void _tf_ssb_rpc_send_invite_use(tf_ssb_connection_t* connection)
{
	tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection);
	JSContext* context = tf_ssb_get_context(ssb);
	JSValue message = JS_NewObject(context);
	JSValue name = JS_NewArray(context);
	JS_SetPropertyUint32(context, name, 0, JS_NewString(context, "invite"));
	JS_SetPropertyUint32(context, name, 1, JS_NewString(context, "use"));
	JS_SetPropertyStr(context, message, "name", name);
	JS_SetPropertyStr(context, message, "type", JS_NewString(context, "async"));
	JSValue args = JS_NewArray(context);
	JSValue object = JS_NewObject(context);
	char id[k_id_base64_len] = { 0 };
	tf_ssb_whoami(ssb, id, sizeof(id));
	JS_SetPropertyStr(context, object, "feed", JS_NewString(context, id));
	JS_SetPropertyUint32(context, args, 0, object);
	JS_SetPropertyStr(context, message, "args", args);
	tf_ssb_connection_rpc_send_json(
		connection, k_ssb_rpc_flag_new_request, tf_ssb_connection_next_request_number(connection), "invite.use", message, _tf_ssb_rpc_invite_use_callback, NULL, NULL);
	JS_FreeValue(context, message);
}

static void _tf_ssb_rpc_connections_changed_callback(tf_ssb_t* ssb, tf_ssb_change_t change, tf_ssb_connection_t* connection, void* user_data)
{
	JSContext* context = tf_ssb_get_context(ssb);
	if (change == k_tf_ssb_change_connect)
	{
		if (tf_ssb_is_replicator(ssb))
		{
			JSValue message = JS_NewObject(context);
			JSValue name = JS_NewArray(context);
			JS_SetPropertyUint32(context, name, 0, JS_NewString(context, "blobs"));
			JS_SetPropertyUint32(context, name, 1, JS_NewString(context, "createWants"));
			JS_SetPropertyStr(context, message, "name", name);
			JS_SetPropertyStr(context, message, "type", JS_NewString(context, "source"));
			JS_SetPropertyStr(context, message, "args", JS_NewArray(context));
			tf_ssb_connection_rpc_send_json(connection, k_ssb_rpc_flag_stream | k_ssb_rpc_flag_new_request, tf_ssb_connection_next_request_number(connection), "blobs.createWants",
				message, _tf_ssb_rpc_connection_blobs_createWants_callback, NULL, NULL);
			JS_FreeValue(context, message);
		}

		if (tf_ssb_connection_is_client(connection))
		{
			if (tf_ssb_connection_get_flags(connection) & k_tf_ssb_connect_flag_use_invite)
			{
				_tf_ssb_rpc_send_invite_use(connection);
			}

			JSValue message = JS_NewObject(context);
			JSValue name = JS_NewArray(context);
			JS_SetPropertyUint32(context, name, 0, JS_NewString(context, "tunnel"));
			JS_SetPropertyUint32(context, name, 1, JS_NewString(context, "isRoom"));
			JS_SetPropertyStr(context, message, "name", name);
			JS_SetPropertyStr(context, message, "args", JS_NewArray(context));
			tf_ssb_connection_rpc_send_json(connection, k_ssb_rpc_flag_new_request, tf_ssb_connection_next_request_number(connection), "tunnel.isRoom", message,
				_tf_ssb_rpc_connection_tunnel_isRoom_callback, NULL, NULL);
			JS_FreeValue(context, message);

			if (tf_ssb_is_peer_exchange(ssb))
			{
				_tf_ssb_rpc_send_peers_exchange(connection);
			}

			if (tf_ssb_is_replicator(ssb))
			{
				_tf_ssb_rpc_send_ebt_replicate(connection);
			}
		}
	}
	else if (change == k_tf_ssb_change_remove)
	{
		tf_ssb_remove_blob_want_added_callback(ssb, _tf_ssb_rpc_blob_wants_added_callback, connection);

		char id[k_id_base64_len] = "";
		if (tf_ssb_connection_get_id(connection, id, sizeof(id)))
		{
			JSValue left = JS_NewObject(context);
			JS_SetPropertyStr(context, left, "type", JS_NewString(context, "left"));
			JS_SetPropertyStr(context, left, "id", JS_NewString(context, id));
			tf_ssb_connection_t* connections[1024];
			int count = tf_ssb_get_connections(ssb, connections, tf_countof(connections));
			for (int i = 0; i < count; i++)
			{
				if (tf_ssb_connection_is_attendant(connections[i]))
				{
					tf_ssb_connection_rpc_send_json(
						connections[i], k_ssb_rpc_flag_stream, -tf_ssb_connection_get_attendant_request_number(connections[i]), NULL, left, NULL, NULL, NULL);
				}
			}
			JS_FreeValue(context, left);

			if (tf_ssb_connection_is_endpoint(connection) || tf_ssb_connection_is_attendant(connection))
			{
				_tf_ssb_rpc_send_endpoints(ssb);
			}
		}
	}
}

static void _tf_ssb_rpc_broadcasts_changed_visit(
	const char* host, const struct sockaddr_in* addr, tf_ssb_broadcast_origin_t origin, tf_ssb_connection_t* tunnel, const uint8_t* pub, void* user_data)
{
	tf_ssb_t* ssb = user_data;
	if (tunnel && (tf_ssb_connection_get_flags(tunnel) & k_tf_ssb_connect_flag_one_shot) != 0 && !tf_ssb_connection_get_tunnel(tunnel))
	{
		char target_id[k_id_base64_len] = { 0 };
		char portal_id[k_id_base64_len] = { 0 };
		if (tf_ssb_id_bin_to_str(target_id, sizeof(target_id), pub) && tf_ssb_connection_get_id(tunnel, portal_id, sizeof(portal_id)))
		{
			tf_ssb_tunnel_create(ssb, portal_id, target_id, k_tf_ssb_connect_flag_one_shot);
		}
	}
}

static void _tf_ssb_rpc_broadcasts_changed_callback(tf_ssb_t* ssb, void* user_data)
{
	tf_ssb_visit_broadcasts(ssb, _tf_ssb_rpc_broadcasts_changed_visit, ssb);
}

static void _tf_ssb_rpc_checkpoint(tf_ssb_t* ssb)
{
	int64_t checkpoint_start_ms = uv_hrtime();
	sqlite3* db = tf_ssb_acquire_db_writer(ssb);
	int log = 0;
	int checkpointed = 0;
	if (sqlite3_wal_checkpoint_v2(db, NULL, SQLITE_CHECKPOINT_TRUNCATE, &log, &checkpointed) == SQLITE_OK)
	{
		tf_printf("Checkpointed %d frames in %d ms.  Log is now %d frames.\n", (int)((uv_hrtime() - checkpoint_start_ms) / 1000000LL), checkpointed, log);
	}
	else
	{
		tf_printf("Checkpoint: %s.\n", sqlite3_errmsg(db));
	}
	tf_ssb_release_db_writer(ssb, db);
}

typedef struct _delete_t
{
	int deleted;
	int64_t duration_ms;
} delete_t;

static void _tf_ssb_rpc_delete_blobs_work(tf_ssb_t* ssb, void* user_data)
{
	delete_t* delete = user_data;
	int64_t age = -1;
	sqlite3* db = tf_ssb_acquire_db_reader(ssb);
	tf_ssb_db_get_global_setting_int64(db, "blob_expire_age_seconds", &age);
	tf_ssb_release_db_reader(ssb, db);
	if (age <= 0)
	{
		_tf_ssb_rpc_checkpoint(ssb);
		return;
	}
	int64_t start_ns = uv_hrtime();
	db = tf_ssb_acquire_db_writer(ssb);
	sqlite3_stmt* statement;
	int64_t now = (int64_t)time(NULL) * 1000ULL;
	int64_t timestamp = now - age * 1000ULL;
	const int k_limit = 128;
	int deleted = 0;
	if (sqlite3_prepare(db,
			"DELETE FROM blobs WHERE blobs.id IN ("
			"  SELECT blobs.id FROM blobs "
			"  JOIN messages_refs ON blobs.id = messages_refs.ref "
			"  JOIN messages ON messages.id = messages_refs.message "
			"  WHERE blobs.created < ?1 / 1000 "
			"  GROUP BY blobs.id HAVING MAX(messages.timestamp) < ?1 LIMIT ?2)",
			-1, &statement, NULL) == SQLITE_OK)
	{
		if (sqlite3_bind_int64(statement, 1, timestamp) == SQLITE_OK && sqlite3_bind_int(statement, 2, k_limit) == SQLITE_OK)
		{
			int r = sqlite3_step(statement);
			if (r != SQLITE_DONE)
			{
				tf_printf("_tf_ssb_rpc_delete_blobs_work: %s\n", sqlite3_errmsg(db));
			}
			else
			{
				tf_printf("_tf_ssb_rpc_delete_blobs_work: %d rows\n", sqlite3_changes(db));
			}
			deleted = sqlite3_changes(db);
		}
	}
	else
	{
		tf_printf("prepare failed: %s\n", sqlite3_errmsg(db));
	}
	tf_ssb_release_db_writer(ssb, db);
	delete->duration_ms = (uv_hrtime() - start_ns) / 1000000LL;
	tf_printf("Deleted %d blobs in %d ms.\n", deleted, (int)delete->duration_ms);
	_tf_ssb_rpc_checkpoint(ssb);
}

static void _tf_ssb_rpc_delete_blobs_after_work(tf_ssb_t* ssb, int status, void* user_data)
{
	delete_t* delete = user_data;
	_tf_ssb_rpc_start_delete_blobs(ssb, delete->deleted ? (int)delete->duration_ms : (15 * 60 * 1000));
	tf_free(delete);
}

static void _tf_ssb_rpc_start_delete_blobs_callback(tf_ssb_t* ssb, void* user_data)
{
	delete_t* delete = tf_malloc(sizeof(delete_t));
	*delete = (delete_t) { 0 };
	tf_ssb_run_work(ssb, _tf_ssb_rpc_delete_blobs_work, _tf_ssb_rpc_delete_blobs_after_work, delete);
}

static void _tf_ssb_rpc_start_delete_blobs(tf_ssb_t* ssb, int delay_ms)
{
	tf_printf("will delete more blobs in %d ms\n", delay_ms);
	tf_ssb_schedule_work(ssb, delay_ms, _tf_ssb_rpc_start_delete_blobs_callback, NULL);
}

static void _tf_ssb_rpc_delete_feeds_work(tf_ssb_t* ssb, void* user_data)
{
	delete_t* delete = user_data;
	sqlite3* db = tf_ssb_acquire_db_reader(ssb);
	bool delete_stale_feeds = false;
	tf_ssb_db_get_global_setting_bool(db, "delete_stale_feeds", &delete_stale_feeds);
	if (!delete_stale_feeds)
	{
		tf_ssb_release_db_reader(ssb, db);
		return;
	}
	int64_t start_ns = uv_hrtime();
	int64_t replication_hops = 2;
	tf_ssb_db_get_global_setting_int64(db, "replication_hops", &replication_hops);
	tf_ssb_release_db_reader(ssb, db);
	const char** identities = tf_ssb_db_get_all_visible_identities(ssb, replication_hops);

	JSMallocFunctions funcs = { 0 };
	tf_get_js_malloc_functions(&funcs);
	JSRuntime* runtime = JS_NewRuntime2(&funcs, NULL);
	JSContext* context = JS_NewContext(runtime);
	JSValue array = JS_NewArray(context);
	for (int i = 0; identities[i]; i++)
	{
		JS_SetPropertyUint32(context, array, i, JS_NewString(context, identities[i]));
	}
	tf_free(identities);

	JSValue json = JS_JSONStringify(context, array, JS_NULL, JS_NULL);
	const char* arg = JS_ToCString(context, json);
	JS_FreeValue(context, json);
	JS_FreeValue(context, array);

	db = tf_ssb_acquire_db_writer(ssb);
	sqlite3_stmt* statement;
	if (sqlite3_prepare(db,
			"DELETE FROM messages WHERE author IN ("
			"  SELECT author FROM messages WHERE author NOT IN (SELECT value FROM json_each(?)) GROUP BY author LIMIT 1"
			") RETURNING author",
			-1, &statement, NULL) == SQLITE_OK)
	{
		int status = SQLITE_OK;
		bool printed = false;
		if (sqlite3_bind_text(statement, 1, arg, -1, NULL) == SQLITE_OK)
		{
			while ((status = sqlite3_step(statement)) == SQLITE_ROW)
			{
				if (!printed)
				{
					tf_printf("deleting %s\n", sqlite3_column_text(statement, 0));
					printed = true;
					delete->deleted++;
				}
			}
			if (status != SQLITE_DONE)
			{
				tf_printf("deleting feeds: %s\n", sqlite3_errmsg(db));
			}
		}
		sqlite3_finalize(statement);
	}
	tf_ssb_release_db_writer(ssb, db);

	JS_FreeCString(context, arg);

	JS_FreeContext(context);
	JS_FreeRuntime(runtime);

	delete->duration_ms = (uv_hrtime() - start_ns) / 1000000LL;
	tf_printf("Deleted %d feeds in %d ms.\n", delete->deleted, (int)delete->duration_ms);
	_tf_ssb_rpc_checkpoint(ssb);
}

static void _tf_ssb_rpc_delete_feeds_after_work(tf_ssb_t* ssb, int status, void* user_data)
{
	delete_t* delete = user_data;
	_tf_ssb_rpc_start_delete_feeds(ssb, delete->deleted ? (int)delete->duration_ms : (15 * 60 * 1000));
	tf_free(delete);
}

static void _tf_ssb_rpc_start_delete_feeds_callback(tf_ssb_t* ssb, void* user_data)
{
	delete_t* delete = tf_malloc(sizeof(delete_t));
	*delete = (delete_t) { 0 };
	tf_ssb_run_work(ssb, _tf_ssb_rpc_delete_feeds_work, _tf_ssb_rpc_delete_feeds_after_work, delete);
}

static void _tf_ssb_rpc_start_delete_feeds(tf_ssb_t* ssb, int delay_ms)
{
	tf_printf("will delete more feeds in %d ms\n", delay_ms);
	tf_ssb_schedule_work(ssb, delay_ms, _tf_ssb_rpc_start_delete_feeds_callback, NULL);
}

void tf_ssb_rpc_start_periodic(tf_ssb_t* ssb)
{
	_tf_ssb_rpc_start_delete_blobs(ssb, 30 * 1000);
	_tf_ssb_rpc_start_delete_feeds(ssb, 25 * 1000);
}

typedef struct _peers_exchange_t
{
	tf_ssb_t* ssb;
	JSValue peers;
} peers_exchange_t;

static void _tf_ssb_get_peers_exhange_callback(
	const char* host, const struct sockaddr_in* addr, tf_ssb_broadcast_origin_t origin, tf_ssb_connection_t* tunnel, const uint8_t* pub, void* user_data)
{
	peers_exchange_t* data = user_data;
	if (origin == k_tf_ssb_broadcast_origin_peer_exchange)
	{
		char fullid[256] = { 0 };
		tf_base64_encode(pub, k_id_bin_len, fullid, sizeof(fullid));
		char* dot = strchr(fullid, '.');
		if (dot)
		{
			*dot = '\0';
		}

		char connection[1024] = { 0 };
		snprintf(connection, sizeof(connection), "net:%s:%d~shs:%s", host, ntohs(addr->sin_port), fullid);

		JSContext* context = tf_ssb_get_context(data->ssb);
		JS_SetPropertyStr(context, data->peers, connection, JS_NewInt32(context, 0));
	}
}

static JSValue _tf_ssb_get_peers_exchange(tf_ssb_t* ssb)
{
	JSContext* context = tf_ssb_get_context(ssb);
	JSValue peers = JS_NewObject(context);
	tf_ssb_visit_broadcasts(ssb, _tf_ssb_get_peers_exhange_callback, &(peers_exchange_t) { .ssb = ssb, .peers = peers });
	return peers;
}

static void _tf_ssb_rpc_peers_exchange_internal(
	tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data)
{
	JSContext* context = tf_ssb_connection_get_context(connection);
	if (_is_error(context, args))
	{
		return;
	}

	/* The peer that participated in the exchange is now a peer exchange entry, too. */
	tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection);
	if (*tf_ssb_connection_get_host(connection))
	{
		char fullid[k_id_base64_len] = { 0 };
		tf_ssb_connection_get_id(connection, fullid, sizeof(fullid));
		char* dot = strchr(fullid, '.');
		if (dot)
		{
			*dot = '\0';
		}

		int port = tf_ssb_connection_get_port(connection);
		JSValue port_value = JS_GetPropertyStr(context, args, "port");
		JS_ToInt32(context, &port, port_value);
		JS_FreeValue(context, port_value);

		char connection_string[1024] = { 0 };
		snprintf(connection_string, sizeof(connection_string), "net:%s:%d~shs:%s", tf_ssb_connection_get_host(connection), port, fullid + 1);
		tf_ssb_add_broadcast(ssb, connection_string, k_tf_ssb_broadcast_origin_peer_exchange, k_ssb_peer_exchange_expires_seconds);
	}

	JSValue in_peers = JS_GetPropertyStr(context, args, "peers");

	JSPropertyEnum* ptab = NULL;
	uint32_t plen = 0;
	if (JS_GetOwnPropertyNames(context, &ptab, &plen, in_peers, JS_GPN_STRING_MASK) == 0)
	{
		for (uint32_t i = 0; i < plen; ++i)
		{
			JSValue key = JS_AtomToString(context, ptab[i].atom);
			JSPropertyDescriptor desc;
			JSValue key_value = JS_NULL;
			if (JS_GetOwnProperty(context, &desc, args, ptab[i].atom) == 1)
			{
				key_value = desc.value;
				JS_FreeValue(context, desc.setter);
				JS_FreeValue(context, desc.getter);
			}
			const char* connection = JS_ToCString(context, key);
			int64_t timestamp = 0;
			JS_ToInt64(context, &timestamp, key_value);
			/* ADD BROADCAST connection: timestamp */
			JS_FreeCString(context, connection);
			JS_FreeValue(context, key);
			JS_FreeValue(context, key_value);
		}
		for (uint32_t i = 0; i < plen; ++i)
		{
			JS_FreeAtom(context, ptab[i].atom);
		}
		js_free(context, ptab);
	}
	JS_FreeValue(context, in_peers);
}

static void _tf_ssb_rpc_send_peers_exchange(tf_ssb_connection_t* connection)
{
	int32_t request_number = tf_ssb_connection_next_request_number(connection);
	JSContext* context = tf_ssb_connection_get_context(connection);
	JSValue message = JS_NewObject(context);
	JSValue name = JS_NewArray(context);
	JS_SetPropertyUint32(context, name, 0, JS_NewString(context, "peers"));
	JS_SetPropertyUint32(context, name, 1, JS_NewString(context, "exchange"));
	JS_SetPropertyStr(context, message, "name", name);
	tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection);
	JS_SetPropertyStr(context, message, "port", JS_NewInt32(context, tf_ssb_server_get_port(ssb)));
	JS_SetPropertyStr(context, message, "peers", _tf_ssb_get_peers_exchange(ssb));
	tf_ssb_connection_rpc_send_json(connection, k_ssb_rpc_flag_new_request, request_number, "peers.exchange", message, _tf_ssb_rpc_peers_exchange_internal, NULL, NULL);
	JS_FreeValue(context, message);
}

static void _tf_ssb_rpc_peers_exchange(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data)
{
	tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection);

	_tf_ssb_rpc_peers_exchange_internal(connection, flags, request_number, args, message, size, user_data);

	JSContext* context = tf_ssb_connection_get_context(connection);
	JSValue out_message = JS_NewObject(context);
	JS_SetPropertyStr(context, out_message, "port", JS_NewInt32(context, tf_ssb_server_get_port(ssb)));
	JS_SetPropertyStr(context, out_message, "peers", _tf_ssb_get_peers_exchange(ssb));
	tf_ssb_connection_rpc_send_json(connection, flags, -request_number, NULL, out_message, NULL, NULL, NULL);
	JS_FreeValue(context, out_message);
}

typedef struct _invite_t
{
	tf_ssb_connection_t* connection;
	char pub[k_id_base64_len];
	char invite_public_key[k_id_base64_len];
	char id[k_id_base64_len];
	int32_t request_number;
	bool accepted;
	char previous_id[256];
	int64_t previous_sequence;
	char* message;
} invite_t;

static void _tf_ssb_rpc_invite_use_work(tf_ssb_connection_t* connection, void* user_data)
{
	invite_t* work = user_data;
	tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection);
	sqlite3* db = tf_ssb_acquire_db_writer(ssb);
	work->accepted = tf_ssb_db_use_invite(db, work->invite_public_key);
	tf_ssb_release_db_writer(ssb, db);

	if (work->accepted)
	{
		tf_ssb_db_get_latest_message_by_author(ssb, work->pub, &work->previous_sequence, work->previous_id, sizeof(work->previous_id));
	}
}

static void _tf_ssb_invite_use_message_store_callback(const char* id, bool verified, bool is_new, void* user_data)
{
	invite_t* work = user_data;
	tf_ssb_connection_t* connection = work->connection;
	if (verified && is_new && tf_ssb_connection_is_connected(connection) && !tf_ssb_connection_is_closing(connection))
	{
		tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection);
		JSContext* context = tf_ssb_get_context(ssb);
		JSValue payload = JS_NewObject(context);
		JSValue message = JS_ParseJSON(context, work->message, strlen(work->message), NULL);
		JS_SetPropertyStr(context, payload, "key", JS_NewString(context, id));
		JS_SetPropertyStr(context, payload, "value", message);
		tf_ssb_connection_rpc_send_json(connection, 0, -work->request_number, NULL, payload, NULL, NULL, NULL);
		JS_FreeValue(context, payload);
	}
	else
	{
		tf_ssb_connection_rpc_send(
			connection, k_ssb_rpc_flag_json | k_ssb_rpc_flag_end_error, -work->request_number, NULL, (const uint8_t*)"false", strlen("false"), NULL, NULL, NULL);
	}
	if (work->message)
	{
		tf_free(work->message);
	}
	tf_free(work);
}

static void _tf_ssb_rpc_invite_use_after_work(tf_ssb_connection_t* connection, int status, void* user_data)
{
	invite_t* work = user_data;
	if (work->accepted)
	{
		tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection);
		JSContext* context = tf_ssb_connection_get_context(connection);
		JSValue content = JS_NewObject(context);
		JS_SetPropertyStr(context, content, "type", JS_NewString(context, "contact"));
		JS_SetPropertyStr(context, content, "contact", JS_NewString(context, work->id));
		JS_SetPropertyStr(context, content, "following", JS_TRUE);
		JS_SetPropertyStr(context, content, "pub", JS_TRUE);

		uint8_t private_key[512] = { 0 };
		tf_ssb_get_private_key(ssb, private_key, sizeof(private_key));

		JSValue message = tf_ssb_sign_message(ssb, work->pub, private_key, content, work->previous_id, work->previous_sequence);
		JSValue json = JS_JSONStringify(context, message, JS_NULL, JS_NULL);
		const char* string = JS_ToCString(context, json);
		work->message = tf_strdup(string);
		JS_FreeCString(context, string);
		JS_FreeValue(context, json);
		tf_ssb_verify_strip_and_store_message(ssb, message, _tf_ssb_invite_use_message_store_callback, work);
		JS_FreeValue(context, message);
		JS_FreeValue(context, content);
	}
	else
	{
		tf_ssb_connection_rpc_send(
			connection, k_ssb_rpc_flag_json | k_ssb_rpc_flag_end_error, -work->request_number, NULL, (const uint8_t*)"false", strlen("false"), NULL, NULL, NULL);
		tf_ssb_connection_close(connection, "Invite not accepted.");
	}
}

static void _tf_ssb_rpc_invite_use(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data)
{
	invite_t* work = tf_malloc(sizeof(invite_t));
	*work = (invite_t) {
		.connection = connection,
		.request_number = request_number,
	};
	tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection);
	tf_ssb_whoami(ssb, work->pub, sizeof(work->pub));
	JSContext* context = tf_ssb_connection_get_context(connection);
	JSValue array = JS_GetPropertyStr(context, args, "args");
	JSValue object = JS_GetPropertyUint32(context, array, 0);
	JSValue feed = JS_GetPropertyStr(context, object, "feed");
	tf_ssb_connection_get_id(connection, work->invite_public_key, sizeof(work->invite_public_key));
	const char* id = JS_ToCString(context, feed);
	snprintf(work->id, sizeof(work->id), "%s", id);
	JS_FreeCString(context, id);
	JS_FreeValue(context, feed);
	JS_FreeValue(context, object);
	JS_FreeValue(context, array);
	tf_ssb_connection_run_work(connection, _tf_ssb_rpc_invite_use_work, _tf_ssb_rpc_invite_use_after_work, work);
}

static void _tf_ssb_rpc_message_added_callback(tf_ssb_t* ssb, const char* id, void* user_data)
{
	tf_ssb_connection_t* connections[256];
	int count = tf_ssb_get_connections(ssb, connections, tf_countof(connections));
	for (int i = 0; i < count; i++)
	{
		tf_ssb_connection_t* connection = connections[i];
		if (tf_ssb_connection_is_connected(connection) && !tf_ssb_is_shutting_down(tf_ssb_connection_get_ssb(connection)) && !tf_ssb_connection_is_closing(connection))
		{
			_tf_ssb_rpc_ebt_schedule_send_clock(connections[i]);
		}
	}
}

void tf_ssb_rpc_register(tf_ssb_t* ssb)
{
	tf_ssb_add_connections_changed_callback(ssb, _tf_ssb_rpc_connections_changed_callback, NULL, NULL);
	tf_ssb_add_broadcasts_changed_callback(ssb, _tf_ssb_rpc_broadcasts_changed_callback, NULL, NULL);
	tf_ssb_add_message_added_callback(ssb, _tf_ssb_rpc_message_added_callback, NULL, NULL);
	tf_ssb_add_rpc_callback(ssb, "gossip.ping", _tf_ssb_rpc_gossip_ping, NULL, NULL); /* DUPLEX */
	tf_ssb_add_rpc_callback(ssb, "blobs.get", _tf_ssb_rpc_blobs_get, NULL, NULL); /* SOURCE */
	tf_ssb_add_rpc_callback(ssb, "blobs.has", _tf_ssb_rpc_blobs_has, NULL, NULL); /* ASYNC */
	tf_ssb_add_rpc_callback(ssb, "blobs.createWants", _tf_ssb_rpc_blobs_createWants, NULL, NULL); /* SOURCE */
	tf_ssb_add_rpc_callback(ssb, "tunnel.connect", _tf_ssb_rpc_tunnel_connect, NULL, NULL); /* DUPLEX */
	tf_ssb_add_rpc_callback(ssb, "tunnel.isRoom", _tf_ssb_rpc_room_meta, NULL, NULL); /* FAKE-ASYNC */
	tf_ssb_add_rpc_callback(ssb, "tunnel.endpoints", _tf_ssb_rpc_tunnel_endpoints, NULL, NULL); /* SOURCE */
	tf_ssb_add_rpc_callback(ssb, "room.metadata", _tf_ssb_rpc_room_meta, NULL, NULL); /* ASYNC */
	tf_ssb_add_rpc_callback(ssb, "room.attendants", _tf_ssb_rpc_room_attendants, NULL, NULL); /* SOURCE */
	tf_ssb_add_rpc_callback(ssb, "createHistoryStream", _tf_ssb_rpc_createHistoryStream, NULL, NULL); /* SOURCE */
	tf_ssb_add_rpc_callback(ssb, "ebt.replicate", _tf_ssb_rpc_ebt_replicate_server, NULL, NULL); /* DUPLEX */
	tf_ssb_add_rpc_callback(ssb, "peers.exchange", _tf_ssb_rpc_peers_exchange, NULL, NULL); /* ASYNC */
	tf_ssb_add_rpc_callback(ssb, "invite.use", _tf_ssb_rpc_invite_use, NULL, NULL); /* ASYNC */
}