#include "ssb.rpc.h"

#include "log.h"
#include "mem.h"
#include "ssb.h"
#include "ssb.db.h"
#include "util.js.h"

#include "sqlite3.h"
#include "uv.h"

#include <inttypes.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>

#if !defined(_countof)
#define _countof(a) ((int)(sizeof((a)) / sizeof(*(a))))
#endif

static void _tf_ssb_connection_send_history_stream(tf_ssb_connection_t* connection, int32_t request_number, const char* author, int64_t sequence, bool keys, bool live);
static void _tf_ssb_rpc_start_delete_blobs(tf_ssb_t* ssb, int delay_ms);

static int64_t _get_global_setting_int64(tf_ssb_t* ssb, const char* name, int64_t default_value)
{
	int64_t result = default_value;
	sqlite3* db = tf_ssb_acquire_db_reader(ssb);
	sqlite3_stmt* statement;
	if (sqlite3_prepare(db, "SELECT json_extract(value, '$.' || ?) FROM properties WHERE id = 'core' AND key = 'settings'", -1, &statement, NULL) == SQLITE_OK)
	{
		if (sqlite3_bind_text(statement, 1, name, -1, NULL) == SQLITE_OK)
		{
			if (sqlite3_step(statement) == SQLITE_ROW && sqlite3_column_type(statement, 0) != SQLITE_NULL)
			{
				result = sqlite3_column_int64(statement, 0);
			}
		}
		sqlite3_finalize(statement);
	}
	else
	{
		tf_printf("prepare failed: %s\n", sqlite3_errmsg(db));
	}
	tf_ssb_release_db_reader(ssb, db);
	return result;
}

static void _tf_ssb_rpc_gossip_ping_callback(
	tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data)
{
	char buffer[256];
	snprintf(buffer, sizeof(buffer), "%" PRId64, (int64_t)time(NULL) * 1000);
	tf_ssb_connection_rpc_send(connection, flags, -request_number, NULL, (const uint8_t*)buffer, strlen(buffer), NULL, NULL, NULL);
	if (flags & k_ssb_rpc_flag_end_error)
	{
		tf_ssb_connection_remove_request(connection, request_number);
	}
}

static void _tf_ssb_rpc_gossip_ping(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data)
{
	tf_ssb_connection_add_request(connection, -request_number, "gossip.ping", _tf_ssb_rpc_gossip_ping_callback, NULL, NULL, NULL);
}

static void _tf_ssb_rpc_blobs_get_callback(
	tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data)
{
}

typedef struct _blobs_get_work_t
{
	int64_t request_number;
	char id[k_id_base64_len];
	bool found;
	uint8_t* blob;
	size_t size;
} blobs_get_work_t;

static void _tf_ssb_rpc_blobs_get_work(tf_ssb_connection_t* connection, void* user_data)
{
	blobs_get_work_t* work = user_data;
	tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection);
	work->found = tf_ssb_db_blob_get(ssb, work->id, &work->blob, &work->size);
}

static void _tf_ssb_rpc_blobs_get_after_work(tf_ssb_connection_t* connection, int status, void* user_data)
{
	blobs_get_work_t* work = user_data;
	if (work->found)
	{
		const size_t k_send_max = 8192;
		for (size_t offset = 0; offset < work->size; offset += k_send_max)
		{
			tf_ssb_connection_rpc_send(connection, k_ssb_rpc_flag_binary | k_ssb_rpc_flag_stream, -work->request_number, NULL, work->blob + offset,
				offset + k_send_max <= work->size ? k_send_max : (work->size - offset), NULL, NULL, NULL);
		}
		tf_free(work->blob);
	}
	tf_ssb_connection_rpc_send(connection, k_ssb_rpc_flag_json | k_ssb_rpc_flag_end_error | k_ssb_rpc_flag_stream, -work->request_number, NULL,
		(const uint8_t*)(work->found ? "true" : "false"), strlen(work->found ? "true" : "false"), NULL, NULL, NULL);
	tf_free(work);
}

static void _tf_ssb_rpc_blobs_get(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data)
{
	if (flags & k_ssb_rpc_flag_end_error)
	{
		return;
	}
	tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection);
	if (!tf_ssb_is_replicator(ssb))
	{
		tf_ssb_connection_rpc_send_error_method_not_allowed(connection, flags, -request_number, "blobs.get");
		return;
	}

	tf_ssb_connection_add_request(connection, -request_number, "blobs.get", _tf_ssb_rpc_blobs_get_callback, NULL, NULL, NULL);
	JSContext* context = tf_ssb_connection_get_context(connection);
	JSValue ids = JS_GetPropertyStr(context, args, "args");
	int length = tf_util_get_length(context, ids);

	for (int i = 0; i < length; i++)
	{
		JSValue arg = JS_GetPropertyUint32(context, ids, i);
		const char* id = NULL;
		if (JS_IsString(arg))
		{
			id = JS_ToCString(context, arg);
		}
		else
		{
			JSValue key = JS_GetPropertyStr(context, arg, "key");
			id = JS_ToCString(context, key);
			JS_FreeValue(context, key);
		}

		blobs_get_work_t* work = tf_malloc(sizeof(blobs_get_work_t));
		*work = (blobs_get_work_t) {
			.request_number = request_number,
		};
		snprintf(work->id, sizeof(work->id), "%s", id);
		tf_ssb_connection_run_work(connection, _tf_ssb_rpc_blobs_get_work, _tf_ssb_rpc_blobs_get_after_work, work);

		JS_FreeCString(context, id);
		JS_FreeValue(context, arg);
	}
	JS_FreeValue(context, ids);
}

static void _tf_ssb_rpc_blobs_has(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data)
{
	tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection);
	if (!tf_ssb_is_replicator(ssb))
	{
		tf_ssb_connection_rpc_send_error_method_not_allowed(connection, flags, -request_number, "blobs.has");
		return;
	}
	JSContext* context = tf_ssb_connection_get_context(connection);
	JSValue ids = JS_GetPropertyStr(context, args, "args");
	JSValue id = JS_GetPropertyUint32(context, ids, 0);
	const char* id_str = JS_ToCString(context, id);
	bool has = tf_ssb_db_blob_has(ssb, id_str);
	JS_FreeCString(context, id_str);
	JS_FreeValue(context, id);
	JS_FreeValue(context, ids);
	tf_ssb_connection_rpc_send(connection, k_ssb_rpc_flag_json, -request_number, NULL, (const uint8_t*)(has ? "true" : "false"), strlen(has ? "true" : "false"), NULL, NULL, NULL);
}

static void _tf_ssb_rpc_blob_wants_added_callback(tf_ssb_t* ssb, const char* id, void* user_data)
{
	tf_ssb_connection_t* connection = user_data;
	tf_ssb_blob_wants_t* blob_wants = tf_ssb_connection_get_blob_wants_state(connection);
	JSContext* context = tf_ssb_get_context(ssb);
	JSValue message = JS_NewObject(context);
	JS_SetPropertyStr(context, message, id, JS_NewInt64(context, -1));
	tf_ssb_connection_rpc_send_json(connection, k_ssb_rpc_flag_stream, -blob_wants->request_number, NULL, message, NULL, NULL, NULL);
	JS_FreeValue(context, message);
}

typedef struct _blob_wants_work_t
{
	char out_id[32][k_blob_id_len];
	int out_id_count;
} blob_wants_work_t;

static void _tf_ssb_request_blob_wants_work(tf_ssb_connection_t* connection, void* user_data)
{
	blob_wants_work_t* work = user_data;
	tf_ssb_blob_wants_t* blob_wants = tf_ssb_connection_get_blob_wants_state(connection);
	tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection);
	int64_t age = _get_global_setting_int64(ssb, "blob_fetch_age_seconds", -1);
	int64_t timestamp = -1;
	if (age == 0)
	{
		/* Don't fetch any blobs. */
		return;
	}
	else if (age > 0)
	{
		int64_t now = (int64_t)time(NULL) * 1000ULL;
		timestamp = now - age * 1000ULL;
	}

	sqlite3* db = tf_ssb_acquire_db_reader(ssb);
	sqlite3_stmt* statement;
	if (sqlite3_prepare(db, "SELECT id FROM blob_wants_view WHERE id > ? AND timestamp > ? ORDER BY id LIMIT ?", -1, &statement, NULL) == SQLITE_OK)
	{
		if (sqlite3_bind_text(statement, 1, blob_wants->last_id, -1, NULL) == SQLITE_OK && sqlite3_bind_int64(statement, 2, timestamp) == SQLITE_OK &&
			sqlite3_bind_int(statement, 3, _countof(work->out_id)) == SQLITE_OK)
		{
			while (sqlite3_step(statement) == SQLITE_ROW)
			{
				snprintf(work->out_id[work->out_id_count], sizeof(work->out_id[work->out_id_count]), "%s", (const char*)sqlite3_column_text(statement, 0));
				work->out_id_count++;
			}
		}
		sqlite3_finalize(statement);
	}
	else
	{
		tf_printf("prepare failed: %s\n", sqlite3_errmsg(db));
	}
	tf_ssb_release_db_reader(ssb, db);
}

static void _tf_ssb_request_blob_wants_after_work(tf_ssb_connection_t* connection, int result, void* user_data)
{
	blob_wants_work_t* work = user_data;
	JSContext* context = tf_ssb_connection_get_context(connection);
	tf_ssb_blob_wants_t* blob_wants = tf_ssb_connection_get_blob_wants_state(connection);
	for (int i = 0; i < work->out_id_count; i++)
	{
		JSValue message = JS_NewObject(context);
		JS_SetPropertyStr(context, message, work->out_id[i], JS_NewInt32(context, -1));
		tf_ssb_connection_rpc_send_json(connection, k_ssb_rpc_flag_stream, -blob_wants->request_number, NULL, message, NULL, NULL, NULL);
		JS_FreeValue(context, message);
		blob_wants->wants_sent++;
	}
	if (work->out_id_count)
	{
		snprintf(blob_wants->last_id, sizeof(blob_wants->last_id), "%s", work->out_id[work->out_id_count - 1]);
	}
	tf_free(work);
}

static void _tf_ssb_rpc_request_more_blobs(tf_ssb_connection_t* connection)
{
	blob_wants_work_t* work = tf_malloc(sizeof(blob_wants_work_t));
	memset(work, 0, sizeof(*work));
	tf_ssb_connection_run_work(connection, _tf_ssb_request_blob_wants_work, _tf_ssb_request_blob_wants_after_work, work);
}

static void _tf_ssb_rpc_blobs_createWants(
	tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data)
{
	tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection);
	if (!tf_ssb_is_replicator(ssb))
	{
		tf_ssb_connection_rpc_send_error_method_not_allowed(connection, flags, -request_number, "blobs.createWants");
		return;
	}
	tf_ssb_blob_wants_t* blob_wants = tf_ssb_connection_get_blob_wants_state(connection);
	tf_ssb_add_blob_want_added_callback(ssb, _tf_ssb_rpc_blob_wants_added_callback, NULL, connection);
	blob_wants->request_number = request_number;
	_tf_ssb_rpc_request_more_blobs(connection);
}

typedef struct tunnel_t
{
	tf_ssb_connection_t* connection;
	int32_t request_number;
} tunnel_t;

static void _tf_ssb_rpc_tunnel_callback(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data)
{
	tunnel_t* tun = user_data;
	if (flags & k_ssb_rpc_flag_end_error)
	{
		tf_ssb_connection_remove_request(connection, request_number);
		tf_ssb_connection_close(tun->connection);
	}
	else
	{
		tf_ssb_connection_rpc_send(tun->connection, flags, tun->request_number, NULL, message, size, NULL, NULL, NULL);
	}
}

static void _tf_ssb_rpc_tunnel_cleanup(tf_ssb_t* ssb, void* user_data)
{
	tf_free(user_data);
}

static void _tf_ssb_rpc_tunnel_connect(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data)
{
	tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection);
	if (!tf_ssb_is_room(ssb))
	{
		tf_ssb_connection_rpc_send_error_method_not_allowed(connection, flags, -request_number, "tunnel.connect");
		return;
	}

	JSContext* context = tf_ssb_connection_get_context(connection);
	JSValue arg_array = JS_GetPropertyStr(context, args, "args");
	JSValue arg = JS_GetPropertyUint32(context, arg_array, 0);
	JSValue origin = JS_GetPropertyStr(context, arg, "origin");
	JSValue portal = JS_GetPropertyStr(context, arg, "portal");
	JSValue target = JS_GetPropertyStr(context, arg, "target");

	if (JS_IsUndefined(origin) && !JS_IsUndefined(portal) && !JS_IsUndefined(target))
	{
		const char* target_str = JS_ToCString(context, target);

		tf_ssb_connection_t* target_connection = tf_ssb_connection_get(ssb, target_str);
		if (target_connection)
		{
			int32_t tunnel_request_number = tf_ssb_connection_next_request_number(target_connection);
			const char* portal_str = JS_ToCString(context, portal);

			JSValue message = JS_NewObject(context);
			JSValue name = JS_NewArray(context);
			JS_SetPropertyUint32(context, name, 0, JS_NewString(context, "tunnel"));
			JS_SetPropertyUint32(context, name, 1, JS_NewString(context, "connect"));
			JS_SetPropertyStr(context, message, "name", name);
			JSValue arg_obj = JS_NewObject(context);
			char origin_str[k_id_base64_len] = "";
			tf_ssb_connection_get_id(connection, origin_str, sizeof(origin_str));
			JS_SetPropertyStr(context, arg_obj, "origin", JS_NewString(context, origin_str));
			JS_SetPropertyStr(context, arg_obj, "portal", JS_NewString(context, portal_str));
			JS_SetPropertyStr(context, arg_obj, "target", JS_NewString(context, target_str));
			JSValue arg_array = JS_NewArray(context);
			JS_SetPropertyUint32(context, arg_array, 0, arg_obj);
			JS_SetPropertyStr(context, message, "args", arg_array);
			JS_SetPropertyStr(context, message, "type", JS_NewString(context, "duplex"));

			tf_ssb_connection_rpc_send_json(
				target_connection, k_ssb_rpc_flag_stream | k_ssb_rpc_flag_new_request, tunnel_request_number, "tunnel.connect", message, NULL, NULL, NULL);

			tunnel_t* data0 = tf_malloc(sizeof(tunnel_t));
			*data0 = (tunnel_t) {
				.connection = target_connection,
				.request_number = tunnel_request_number,
			};
			tunnel_t* data1 = tf_malloc(sizeof(tunnel_t));
			*data1 = (tunnel_t) {
				.connection = connection,
				.request_number = -request_number,
			};
			tf_ssb_connection_add_request(connection, -request_number, "tunnel.connect", _tf_ssb_rpc_tunnel_callback, _tf_ssb_rpc_tunnel_cleanup, data0, target_connection);
			tf_ssb_connection_add_request(target_connection, tunnel_request_number, "tunnel.connect", _tf_ssb_rpc_tunnel_callback, _tf_ssb_rpc_tunnel_cleanup, data1, connection);

			JS_FreeValue(context, message);
			JS_FreeCString(context, portal_str);
		}
		else
		{
			tf_ssb_connection_rpc_send_error(connection, flags, -request_number, "Connection not found.");
		}
		JS_FreeCString(context, target_str);
	}
	else if (!JS_IsUndefined(origin) && !JS_IsUndefined(portal) && !JS_IsUndefined(target))
	{
		const char* origin_str = JS_ToCString(context, origin);
		const char* portal_str = JS_ToCString(context, portal);
		const char* target_str = JS_ToCString(context, target);
		tf_ssb_connection_tunnel_create(ssb, portal_str, -request_number, origin_str);
		JS_FreeCString(context, origin_str);
		JS_FreeCString(context, portal_str);
		JS_FreeCString(context, target_str);
	}

	JS_FreeValue(context, origin);
	JS_FreeValue(context, portal);
	JS_FreeValue(context, target);
	JS_FreeValue(context, arg);
	JS_FreeValue(context, arg_array);
}

static void _tf_ssb_rpc_room_meta(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data)
{
	tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection);
	JSContext* context = tf_ssb_get_context(ssb);
	JSValue response = JS_FALSE;
	if (tf_ssb_is_room(ssb))
	{
		response = JS_NewObject(context);
		JS_SetPropertyStr(context, response, "name", JS_NewString(context, tf_ssb_get_room_name(ssb)));
		JS_SetPropertyStr(context, response, "membership", JS_FALSE);
		JSValue features = JS_NewArray(context);
		JS_SetPropertyUint32(context, features, 0, JS_NewString(context, "tunnel"));
		JS_SetPropertyUint32(context, features, 1, JS_NewString(context, "room1"));
		JS_SetPropertyUint32(context, features, 2, JS_NewString(context, "room2"));
		JS_SetPropertyStr(context, response, "features", features);
	}
	tf_ssb_connection_rpc_send_json(connection, flags, -request_number, NULL, response, NULL, NULL, NULL);
	JS_FreeValue(context, response);
}

static void _tf_ssb_rpc_room_attendants(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data)
{
	tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection);
	if (!tf_ssb_is_room(ssb))
	{
		tf_ssb_connection_rpc_send_error_method_not_allowed(connection, flags, -request_number, "room.attendants");
		return;
	}

	JSContext* context = tf_ssb_get_context(ssb);
	JSValue joined = JS_NewObject(context);
	JS_SetPropertyStr(context, joined, "type", JS_NewString(context, "joined"));
	char my_id[k_id_base64_len] = "";
	if (tf_ssb_connection_get_id(connection, my_id, sizeof(my_id)))
	{
		JS_SetPropertyStr(context, joined, "id", JS_NewString(context, my_id));
	}

	JSValue state = JS_NewObject(context);
	JS_SetPropertyStr(context, state, "type", JS_NewString(context, "state"));
	JSValue ids = JS_NewArray(context);
	int id_count = 0;
	tf_ssb_connection_t* connections[1024];
	int count = tf_ssb_get_connections(ssb, connections, _countof(connections));

	for (int i = 0; i < count; i++)
	{
		char id[k_id_base64_len] = { 0 };
		if (tf_ssb_connection_is_attendant(connections[i]) && tf_ssb_connection_get_id(connections[i], id, sizeof(id)))
		{
			JS_SetPropertyUint32(context, ids, id_count++, JS_NewString(context, id));

			tf_ssb_connection_rpc_send_json(connections[i], flags, -tf_ssb_connection_get_attendant_request_number(connections[i]), NULL, joined, NULL, NULL, NULL);
		}
	}
	JS_SetPropertyStr(context, state, "ids", ids);
	tf_ssb_connection_rpc_send_json(connection, flags, -request_number, NULL, state, NULL, NULL, NULL);
	JS_FreeValue(context, joined);
	JS_FreeValue(context, state);

	tf_ssb_connection_set_attendant(connection, true, request_number);
}

typedef struct _blobs_get_t
{
	char id[k_blob_id_len];
	size_t received;
	size_t expected_size;
	bool done;
	bool storing;
	tf_ssb_t* ssb;
	tf_ssb_connection_t* connection;
	uint8_t buffer[];
} blobs_get_t;

static void _tf_ssb_rpc_blob_store_callback(const char* id, bool is_new, void* user_data)
{
	blobs_get_t* get = user_data;
	get->storing = false;
	tf_ssb_connection_adjust_read_backpressure(get->connection, -1);
	if (get->done)
	{
		tf_free(get);
	}
}

static void _tf_ssb_rpc_connection_blobs_get_callback(
	tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data)
{
	tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection);
	JSContext* context = tf_ssb_get_context(ssb);
	blobs_get_t* get = user_data;
	if ((flags & k_ssb_rpc_mask_type) == k_ssb_rpc_flag_binary && size > 0 && get->received + size <= get->expected_size)
	{
		memcpy(get->buffer + get->received, message, size);
		get->received += size;
	}
	else if ((flags & k_ssb_rpc_mask_type) == k_ssb_rpc_flag_json)
	{
		if (JS_ToBool(context, args))
		{
			get->storing = true;
			tf_ssb_connection_adjust_read_backpressure(connection, 1);
			tf_ssb_db_blob_store_async(ssb, get->buffer, get->received, _tf_ssb_rpc_blob_store_callback, get);
		}
		/* TODO: Should we send the response in the callback? */
		bool stored = true;
		tf_ssb_connection_rpc_send(connection, k_ssb_rpc_flag_json | k_ssb_rpc_flag_stream | k_ssb_rpc_flag_end_error, -request_number, NULL,
			(const uint8_t*)(stored ? "true" : "false"), strlen(stored ? "true" : "false"), NULL, NULL, NULL);
	}
}

static void _tf_ssb_rpc_connection_blobs_get_cleanup(tf_ssb_t* ssb, void* user_data)
{
	blobs_get_t* get = user_data;
	get->done = true;
	if (!get->storing)
	{
		tf_free(get);
	}
}

static void _tf_ssb_rpc_connection_blobs_get(tf_ssb_connection_t* connection, const char* blob_id, size_t size)
{
	blobs_get_t* get = tf_malloc(sizeof(blobs_get_t) + size);
	*get = (blobs_get_t) { .ssb = tf_ssb_connection_get_ssb(connection), .connection = connection, .expected_size = size };
	snprintf(get->id, sizeof(get->id), "%s", blob_id);
	memset(get->buffer, 0, size);

	tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection);
	JSContext* context = tf_ssb_get_context(ssb);
	JSValue message = JS_NewObject(context);
	JSValue name = JS_NewArray(context);
	JS_SetPropertyUint32(context, name, 0, JS_NewString(context, "blobs"));
	JS_SetPropertyUint32(context, name, 1, JS_NewString(context, "get"));
	JS_SetPropertyStr(context, message, "name", name);
	JS_SetPropertyStr(context, message, "type", JS_NewString(context, "source"));
	JSValue args = JS_NewArray(context);
	JS_SetPropertyUint32(context, args, 0, JS_NewString(context, blob_id));
	JS_SetPropertyStr(context, message, "args", args);

	tf_ssb_connection_rpc_send_json(connection, k_ssb_rpc_flag_stream | k_ssb_rpc_flag_new_request, tf_ssb_connection_next_request_number(connection), "blobs.get", message,
		_tf_ssb_rpc_connection_blobs_get_callback, _tf_ssb_rpc_connection_blobs_get_cleanup, get);

	JS_FreeValue(context, message);
}

typedef struct _blob_create_wants_work_t
{
	tf_ssb_connection_t* connection;
	char blob_id[k_blob_id_len];
	bool out_result;
	int64_t size;
	size_t out_size;
} blob_create_wants_work_t;

static void _tf_ssb_rpc_connection_blobs_create_wants_work(tf_ssb_connection_t* connection, void* user_data)
{
	blob_create_wants_work_t* work = user_data;
	tf_ssb_t* ssb = tf_ssb_connection_get_ssb(work->connection);
	work->out_result = tf_ssb_db_blob_get(ssb, work->blob_id, NULL, &work->out_size);
}

static void _tf_ssb_rpc_connection_blobs_create_wants_after_work(tf_ssb_connection_t* connection, int result, void* user_data)
{
	blob_create_wants_work_t* work = user_data;
	tf_ssb_t* ssb = tf_ssb_connection_get_ssb(work->connection);
	tf_ssb_blob_wants_t* blob_wants = tf_ssb_connection_get_blob_wants_state(connection);
	JSContext* context = tf_ssb_get_context(ssb);
	if (work->out_result)
	{
		JSValue message = JS_NewObject(context);
		JS_SetPropertyStr(context, message, work->blob_id, JS_NewInt64(context, work->out_size));
		tf_ssb_connection_rpc_send_json(work->connection, k_ssb_rpc_flag_stream, -blob_wants->request_number, NULL, message, NULL, NULL, NULL);
		JS_FreeValue(context, message);
	}
	else if (work->size == -1LL)
	{
		JSValue message = JS_NewObject(context);
		JS_SetPropertyStr(context, message, work->blob_id, JS_NewInt64(context, -2));
		tf_ssb_connection_rpc_send_json(work->connection, k_ssb_rpc_flag_stream, -blob_wants->request_number, NULL, message, NULL, NULL, NULL);
		JS_FreeValue(context, message);
	}
	tf_free(work);
}

static void _tf_ssb_rpc_connection_blobs_createWants_callback(
	tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data)
{
	tf_ssb_blob_wants_t* blob_wants = tf_ssb_connection_get_blob_wants_state(connection);
	if (!blob_wants)
	{
		return;
	}

	JSContext* context = tf_ssb_connection_get_context(connection);

	JSValue name = JS_GetPropertyStr(context, args, "name");
	if (!JS_IsUndefined(name))
	{
		/* { name: "Error" } */
		JS_FreeValue(context, name);
		return;
	}

	JSPropertyEnum* ptab = NULL;
	uint32_t plen = 0;
	if (JS_GetOwnPropertyNames(context, &ptab, &plen, args, JS_GPN_STRING_MASK) == 0)
	{
		for (uint32_t i = 0; i < plen; ++i)
		{
			JSValue key = JS_AtomToString(context, ptab[i].atom);
			JSPropertyDescriptor desc;
			JSValue key_value = JS_NULL;
			if (JS_GetOwnProperty(context, &desc, args, ptab[i].atom) == 1)
			{
				key_value = desc.value;
				JS_FreeValue(context, desc.setter);
				JS_FreeValue(context, desc.getter);
			}
			const char* blob_id = JS_ToCString(context, key);
			int64_t size = 0;
			JS_ToInt64(context, &size, key_value);
			if (--blob_wants->wants_sent == 0)
			{
				_tf_ssb_rpc_request_more_blobs(connection);
			}
			if (size < 0)
			{
				blob_create_wants_work_t* work = tf_malloc(sizeof(blob_create_wants_work_t));
				*work = (blob_create_wants_work_t) {
					.connection = connection,
					.size = size,
				};
				snprintf(work->blob_id, sizeof(work->blob_id), "%s", blob_id);
				tf_ssb_connection_run_work(connection, _tf_ssb_rpc_connection_blobs_create_wants_work, _tf_ssb_rpc_connection_blobs_create_wants_after_work, work);
			}
			else
			{
				_tf_ssb_rpc_connection_blobs_get(connection, blob_id, size);
			}
			JS_FreeCString(context, blob_id);
			JS_FreeValue(context, key);
			JS_FreeValue(context, key_value);
		}
		for (uint32_t i = 0; i < plen; ++i)
		{
			JS_FreeAtom(context, ptab[i].atom);
		}
		js_free(context, ptab);
	}
}

static void _tf_ssb_rpc_connection_room_attendants_callback(
	tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data)
{
	tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection);
	JSContext* context = tf_ssb_get_context(ssb);
	JSValue type = JS_GetPropertyStr(context, args, "type");
	const char* type_string = JS_ToCString(context, type);
	if (!type_string)
	{
		tf_ssb_connection_rpc_send_error(connection, flags, -request_number, "Missing type.");
	}
	else if (strcmp(type_string, "state") == 0)
	{
		tf_ssb_connection_clear_room_attendants(connection);
		JSValue ids = JS_GetPropertyStr(context, args, "ids");
		int length = tf_util_get_length(context, ids);
		for (int i = 0; i < length; i++)
		{
			JSValue id = JS_GetPropertyUint32(context, ids, i);
			const char* id_string = JS_ToCString(context, id);
			if (id_string)
			{
				tf_ssb_connection_add_room_attendant(connection, id_string);
			}
			JS_FreeCString(context, id_string);
			JS_FreeValue(context, id);
		}
		JS_FreeValue(context, ids);
	}
	else if (strcmp(type_string, "joined") == 0)
	{
		JSValue id = JS_GetPropertyStr(context, args, "id");
		const char* id_string = JS_ToCString(context, id);
		if (id_string)
		{
			tf_ssb_connection_add_room_attendant(connection, id_string);
		}
		JS_FreeCString(context, id_string);
		JS_FreeValue(context, id);
	}
	else if (strcmp(type_string, "left") == 0)
	{
		JSValue id = JS_GetPropertyStr(context, args, "id");
		const char* id_string = JS_ToCString(context, id);
		if (id_string)
		{
			tf_ssb_connection_remove_room_attendant(connection, id_string);
		}
		JS_FreeCString(context, id_string);
		JS_FreeValue(context, id);
	}
	else
	{
		char buffer[256];
		snprintf(buffer, sizeof(buffer), "Unexpected room.attendants response type: '%s'.", type_string);
		tf_ssb_connection_rpc_send_error(connection, flags, -request_number, buffer);
	}
	JS_FreeCString(context, type_string);
	JS_FreeValue(context, type);
}

static bool _is_error(JSContext* context, JSValue message)
{
	JSValue name = JS_GetPropertyStr(context, message, "name");
	const char* name_string = JS_ToCString(context, name);
	bool is_error = false;
	if (name_string && strcmp(name_string, "Error") == 0)
	{
		is_error = true;
	}
	JS_FreeCString(context, name_string);
	JS_FreeValue(context, name);
	return is_error;
}

static void _tf_ssb_rpc_connection_tunnel_isRoom_callback(
	tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data)
{
	tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection);
	JSContext* context = tf_ssb_get_context(ssb);
	if (_is_error(context, args))
	{
		return;
	}

	if (JS_IsObject(args))
	{
		JSValue message = JS_NewObject(context);
		JSValue name = JS_NewArray(context);
		JS_SetPropertyUint32(context, name, 0, JS_NewString(context, "room"));
		JS_SetPropertyUint32(context, name, 1, JS_NewString(context, "attendants"));
		JS_SetPropertyStr(context, message, "name", name);
		JS_SetPropertyStr(context, message, "type", JS_NewString(context, "source"));
		JS_SetPropertyStr(context, message, "args", JS_NewArray(context));
		tf_ssb_connection_rpc_send_json(connection, k_ssb_rpc_flag_stream | k_ssb_rpc_flag_new_request, tf_ssb_connection_next_request_number(connection), "room.attendants",
			message, _tf_ssb_rpc_connection_room_attendants_callback, NULL, NULL);
		JS_FreeValue(context, message);
	}
}

typedef struct _tf_ssb_connection_send_history_stream_t
{
	int32_t request_number;
	char author[k_id_base64_len];
	int64_t sequence;
	bool keys;
	bool live;

	bool out_finished;
	int64_t out_max_sequence_seen;
	char** out_messages;
	int out_messages_count;
} tf_ssb_connection_send_history_stream_t;

static void _tf_ssb_connection_send_history_stream_work(tf_ssb_connection_t* connection, void* user_data)
{
	tf_ssb_connection_send_history_stream_t* request = user_data;
	if (!tf_ssb_connection_is_connected(connection))
	{
		return;
	}
	tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection);
	sqlite3* db = tf_ssb_acquire_db_reader(ssb);
	sqlite3_stmt* statement;
	const int k_max = 32;
	if (sqlite3_prepare(db,
			"SELECT previous, author, id, sequence, timestamp, hash, json(content), signature, flags FROM messages WHERE author = ?1 AND sequence > ?2 AND "
			"sequence "
			"< ?3 ORDER BY sequence",
			-1, &statement, NULL) == SQLITE_OK)
	{
		if (sqlite3_bind_text(statement, 1, request->author, -1, NULL) == SQLITE_OK && sqlite3_bind_int64(statement, 2, request->sequence) == SQLITE_OK &&
			sqlite3_bind_int64(statement, 3, request->sequence + k_max) == SQLITE_OK)
		{
			JSMallocFunctions funcs = { 0 };
			tf_get_js_malloc_functions(&funcs);
			JSRuntime* runtime = JS_NewRuntime2(&funcs, NULL);
			JSContext* context = JS_NewContext(runtime);

			while (sqlite3_step(statement) == SQLITE_ROW)
			{
				JSValue message = JS_UNDEFINED;
				request->out_max_sequence_seen = sqlite3_column_int64(statement, 3);

				JSValue formatted = tf_ssb_format_message(context, (const char*)sqlite3_column_text(statement, 0), (const char*)sqlite3_column_text(statement, 1),
					sqlite3_column_int64(statement, 3), sqlite3_column_double(statement, 4), (const char*)sqlite3_column_text(statement, 5),
					(const char*)sqlite3_column_text(statement, 6), (const char*)sqlite3_column_text(statement, 7), sqlite3_column_int(statement, 8));
				if (request->keys)
				{
					message = JS_NewObject(context);
					JS_SetPropertyStr(context, message, "key", JS_NewString(context, (const char*)sqlite3_column_text(statement, 2)));
					JS_SetPropertyStr(context, message, "value", formatted);
					JS_SetPropertyStr(context, message, "timestamp", JS_NewString(context, (const char*)sqlite3_column_text(statement, 4)));
				}
				else
				{
					message = formatted;
				}
				JSValue json = JS_JSONStringify(context, message, JS_NULL, JS_NULL);
				size_t size = 0;
				const char* string = JS_ToCStringLen(context, &size, json);

				request->out_messages = tf_resize_vec(request->out_messages, sizeof(char*) * (request->out_messages_count + 1));
				char* copy = tf_malloc(size + 1);
				memcpy(copy, string, size + 1);
				JS_FreeCString(context, string);
				request->out_messages[request->out_messages_count++] = copy;

				JS_FreeValue(context, json);
				JS_FreeValue(context, message);
			}

			JS_FreeContext(context);
			JS_FreeRuntime(runtime);
		}
		sqlite3_finalize(statement);
	}
	else
	{
		tf_printf("prepare failed: %s\n", sqlite3_errmsg(db));
	}
	tf_ssb_release_db_reader(ssb, db);
	request->out_finished = request->out_max_sequence_seen != request->sequence + k_max - 1;
}

static void _tf_ssb_connection_send_history_stream_after_work(tf_ssb_connection_t* connection, int result, void* user_data)
{
	tf_ssb_connection_send_history_stream_t* request = user_data;
	tf_ssb_connection_adjust_write_count(connection, -1);
	if (tf_ssb_connection_is_connected(connection))
	{
		for (int i = 0; i < request->out_messages_count; i++)
		{
			tf_ssb_connection_rpc_send(connection, k_ssb_rpc_flag_stream | k_ssb_rpc_flag_json, request->request_number, NULL, (const uint8_t*)request->out_messages[i],
				strlen(request->out_messages[i]), NULL, NULL, NULL);
		}
		if (!request->out_finished)
		{
			_tf_ssb_connection_send_history_stream(connection, request->request_number, request->author, request->out_max_sequence_seen, request->keys, request->live);
		}
		else if (!request->live)
		{
			tf_ssb_connection_rpc_send(connection, k_ssb_rpc_flag_json, request->request_number, NULL, (const uint8_t*)"false", strlen("false"), NULL, NULL, NULL);
		}
	}
	for (int i = 0; i < request->out_messages_count; i++)
	{
		tf_free(request->out_messages[i]);
	}
	tf_free(request->out_messages);
	tf_free(request);
}

static void _tf_ssb_connection_send_history_stream_callback(tf_ssb_connection_t* connection, void* user_data)
{
	tf_ssb_connection_adjust_write_count(connection, 1);
	if (tf_ssb_connection_is_connected(connection))
	{
		tf_ssb_connection_run_work(connection, _tf_ssb_connection_send_history_stream_work, _tf_ssb_connection_send_history_stream_after_work, user_data);
	}
	else
	{
		_tf_ssb_connection_send_history_stream_after_work(connection, -1, user_data);
	}
}

static void _tf_ssb_connection_send_history_stream(tf_ssb_connection_t* connection, int32_t request_number, const char* author, int64_t sequence, bool keys, bool live)
{
	tf_ssb_connection_send_history_stream_t* async = tf_malloc(sizeof(tf_ssb_connection_send_history_stream_t));
	*async = (tf_ssb_connection_send_history_stream_t) {
		.request_number = request_number,
		.sequence = sequence,
		.keys = keys,
		.live = live,
	};
	snprintf(async->author, sizeof(async->author), "%s", author);
	tf_ssb_connection_schedule_idle(connection, _tf_ssb_connection_send_history_stream_callback, async);
}

static void _tf_ssb_rpc_createHistoryStream(
	tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data)
{
	tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection);
	if (!tf_ssb_is_replicator(ssb))
	{
		tf_ssb_connection_rpc_send_error_method_not_allowed(connection, flags, -request_number, "createHistoryStream");
		return;
	}
	JSContext* context = tf_ssb_get_context(ssb);
	JSValue arg_array = JS_GetPropertyStr(context, args, "args");
	JSValue arg = JS_GetPropertyUint32(context, arg_array, 0);
	if (JS_IsUndefined(arg))
	{
		tf_ssb_connection_rpc_send_error(connection, flags, -request_number, "Missing request.args in createHistoryStream.");
	}
	JSValue id = JS_GetPropertyStr(context, arg, "id");
	JSValue seq = JS_GetPropertyStr(context, arg, "seq");
	JSValue keys = JS_GetPropertyStr(context, arg, "keys");
	JSValue live = JS_GetPropertyStr(context, arg, "live");
	bool is_keys = JS_IsUndefined(keys) || JS_ToBool(context, keys) > 0;
	bool is_live = JS_ToBool(context, live) > 0;
	int64_t sequence = 0;
	JS_ToInt64(context, &sequence, seq);
	const char* author = JS_ToCString(context, id);

	_tf_ssb_connection_send_history_stream(connection, -request_number, author, sequence, is_keys, is_live);

	if (is_live)
	{
		tf_ssb_connection_add_new_message_request(connection, author, -request_number, is_keys);
	}

	JS_FreeCString(context, author);
	JS_FreeValue(context, id);
	JS_FreeValue(context, seq);
	JS_FreeValue(context, keys);
	JS_FreeValue(context, live);
	JS_FreeValue(context, arg);
	JS_FreeValue(context, arg_array);
}

typedef struct _ebt_clock_row_t
{
	char id[k_id_base64_len];
	int64_t value;
} ebt_clock_row_t;

typedef struct _ebt_replicate_send_clock_t
{
	int64_t request_number;
	ebt_clock_row_t* clock;
	int clock_count;

	char* out_clock;
} ebt_replicate_send_clock_t;

static void _tf_ssb_rpc_ebt_replicate_send_clock_work(tf_ssb_connection_t* connection, void* user_data)
{
	ebt_replicate_send_clock_t* work = user_data;

	JSMallocFunctions funcs = { 0 };
	tf_get_js_malloc_functions(&funcs);
	JSRuntime* runtime = JS_NewRuntime2(&funcs, NULL);
	JSContext* context = JS_NewContext(runtime);

	tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection);
	JSValue full_clock = JS_NewObject(context);

	/* Ask for every identity we know is being followed from local accounts. */
	const char** visible = tf_ssb_db_get_all_visible_identities(ssb, 2);
	for (int i = 0; visible[i]; i++)
	{
		int64_t sequence = 0;
		tf_ssb_db_get_latest_message_by_author(ssb, visible[i], &sequence, NULL, 0);
		JS_SetPropertyStr(context, full_clock, visible[i], JS_NewInt64(context, sequence == -1 ? -1 : (sequence << 1)));
	}
	tf_free(visible);

	/* Ask about the incoming connection, too. */
	char id[k_id_base64_len] = "";
	if (tf_ssb_connection_get_id(connection, id, sizeof(id)))
	{
		JSValue in_clock = JS_GetPropertyStr(context, full_clock, id);
		if (JS_IsUndefined(in_clock))
		{
			int64_t sequence = 0;
			tf_ssb_db_get_latest_message_by_author(ssb, id, &sequence, NULL, 0);
			JS_SetPropertyStr(context, full_clock, id, JS_NewInt64(context, sequence == -1 ? -1 : (sequence << 1)));
		}
		JS_FreeValue(context, in_clock);
	}

	/* Also respond with what we know about all requested identities. */
	for (int i = 0; i < work->clock_count; i++)
	{
		JSValue in_clock = JS_GetPropertyStr(context, full_clock, work->clock[i].id);
		if (JS_IsUndefined(in_clock))
		{
			int64_t sequence = -1;
			tf_ssb_db_get_latest_message_by_author(ssb, work->clock[i].id, &sequence, NULL, 0);
			JS_SetPropertyStr(context, full_clock, work->clock[i].id, JS_NewInt64(context, sequence == -1 ? -1 : (sequence << 1)));
		}
		JS_FreeValue(context, in_clock);
	}

	JSValue json = JS_JSONStringify(context, full_clock, JS_NULL, JS_NULL);
	size_t size = 0;
	const char* string = JS_ToCStringLen(context, &size, json);
	char* copy = tf_malloc(size + 1);
	memcpy(copy, string, size + 1);
	work->out_clock = copy;
	JS_FreeCString(context, string);
	JS_FreeValue(context, json);
	JS_FreeValue(context, full_clock);

	JS_FreeContext(context);
	JS_FreeRuntime(runtime);
}

static void _tf_ssb_rpc_ebt_replicate_send_clock_after_work(tf_ssb_connection_t* connection, int result, void* user_data)
{
	ebt_replicate_send_clock_t* work = user_data;
	tf_free(work->clock);
	if (work->out_clock)
	{
		tf_ssb_connection_rpc_send(
			connection, k_ssb_rpc_flag_stream | k_ssb_rpc_flag_json, -work->request_number, NULL, (const uint8_t*)work->out_clock, strlen(work->out_clock), NULL, NULL, NULL);
		tf_free(work->out_clock);
	}
	tf_free(work);
}

static void _tf_ssb_rpc_ebt_replicate_send_clock(tf_ssb_connection_t* connection, int32_t request_number, JSValue message)
{
	ebt_replicate_send_clock_t* work = tf_malloc(sizeof(ebt_replicate_send_clock_t));
	*work = (ebt_replicate_send_clock_t) {
		.request_number = request_number,
	};
	JSContext* context = tf_ssb_connection_get_context(connection);

	if (!JS_IsUndefined(message))
	{
		JSPropertyEnum* ptab = NULL;
		uint32_t plen = 0;
		if (JS_GetOwnPropertyNames(context, &ptab, &plen, message, JS_GPN_STRING_MASK) == 0)
		{
			work->clock_count = (int)plen;
			work->clock = tf_malloc(sizeof(ebt_clock_row_t) * plen);
			memset(work->clock, 0, sizeof(ebt_clock_row_t) * plen);
			for (uint32_t i = 0; i < plen; ++i)
			{
				const char* id = JS_AtomToCString(context, ptab[i].atom);
				snprintf(work->clock[i].id, sizeof(work->clock[i].id), "%s", id);
				JS_FreeCString(context, id);

				JSPropertyDescriptor desc = { 0 };
				JSValue key_value = JS_UNDEFINED;
				if (JS_GetOwnProperty(context, &desc, message, ptab[i].atom) == 1)
				{
					key_value = desc.value;
					JS_FreeValue(context, desc.setter);
					JS_FreeValue(context, desc.getter);
				}
				JS_ToInt64(context, &work->clock[i].value, key_value);
				JS_FreeValue(context, key_value);
				JS_FreeAtom(context, ptab[i].atom);
			}
			js_free(context, ptab);
		}
	}

	tf_ssb_connection_run_work(connection, _tf_ssb_rpc_ebt_replicate_send_clock_work, _tf_ssb_rpc_ebt_replicate_send_clock_after_work, work);
}

static void _tf_ssb_rpc_ebt_replicate_send_messages(tf_ssb_connection_t* connection, JSValue message)
{
	if (JS_IsUndefined(message))
	{
		return;
	}

	tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection);
	JSContext* context = tf_ssb_get_context(ssb);
	JSPropertyEnum* ptab = NULL;
	uint32_t plen = 0;
	if (JS_GetOwnPropertyNames(context, &ptab, &plen, message, JS_GPN_STRING_MASK) == 0)
	{
		for (uint32_t i = 0; i < plen; ++i)
		{
			JSValue in_clock = JS_UNDEFINED;
			JSPropertyDescriptor desc = { 0 };
			if (JS_GetOwnProperty(context, &desc, message, ptab[i].atom) == 1)
			{
				in_clock = desc.value;
				JS_FreeValue(context, desc.setter);
				JS_FreeValue(context, desc.getter);
			}
			if (!JS_IsUndefined(in_clock))
			{
				JSValue key = JS_AtomToString(context, ptab[i].atom);
				int64_t sequence = -1;
				JS_ToInt64(context, &sequence, in_clock);
				const char* author = JS_ToCString(context, key);
				if (sequence >= 0 && (sequence & 1) == 0)
				{
					int32_t request_number = tf_ssb_connection_get_ebt_request_number(connection);
					_tf_ssb_connection_send_history_stream(connection, request_number, author, sequence >> 1, false, true);
					tf_ssb_connection_add_new_message_request(connection, author, request_number, false);
				}
				else
				{
					tf_ssb_connection_remove_new_message_request(connection, author);
				}
				JS_FreeCString(context, author);
				JS_FreeValue(context, key);
			}
			JS_FreeValue(context, in_clock);
		}
		for (uint32_t i = 0; i < plen; ++i)
		{
			JS_FreeAtom(context, ptab[i].atom);
		}
		js_free(context, ptab);
	}
}

static void _tf_ssb_rpc_ebt_replicate_store_callback(const char* id, bool verified, bool is_new, void* user_data)
{
	tf_ssb_connection_t* connection = user_data;
	tf_ssb_connection_adjust_read_backpressure(connection, -1);
}

static void _tf_ssb_rpc_ebt_replicate(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data)
{
	tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection);
	JSContext* context = tf_ssb_get_context(ssb);
	if (_is_error(context, args))
	{
		/* TODO: Send createHistoryStream. */
		return;
	}

	if (!tf_ssb_connection_get_ebt_request_number(connection))
	{
		tf_ssb_connection_set_ebt_request_number(connection, -request_number);
	}

	JSValue author = JS_GetPropertyStr(context, args, "author");
	JSValue name = JS_GetPropertyStr(context, args, "name");
	JSValue in_clock = JS_IsUndefined(name) ? args : JS_UNDEFINED;

	if (!JS_IsUndefined(author))
	{
		/* Looks like a message. */
		tf_ssb_connection_adjust_read_backpressure(connection, 1);
		tf_ssb_verify_strip_and_store_message(ssb, args, _tf_ssb_rpc_ebt_replicate_store_callback, connection);
	}
	else
	{
		/* EBT clock. */
		if (!tf_ssb_connection_get_sent_clock(connection))
		{
			_tf_ssb_rpc_ebt_replicate_send_clock(connection, request_number, in_clock);
			tf_ssb_connection_set_sent_clock(connection, true);
		}
		_tf_ssb_rpc_ebt_replicate_send_messages(connection, in_clock);
	}
	JS_FreeValue(context, name);
	JS_FreeValue(context, author);
}

static void _tf_ssb_rpc_ebt_replicate_client(
	tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data)
{
	_tf_ssb_rpc_ebt_replicate(connection, flags, request_number, args, message, size, user_data);
}

static void _tf_ssb_rpc_send_ebt_replicate(tf_ssb_connection_t* connection)
{
	tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection);
	JSContext* context = tf_ssb_get_context(ssb);
	JSValue message = JS_NewObject(context);
	JSValue name = JS_NewArray(context);
	JS_SetPropertyUint32(context, name, 0, JS_NewString(context, "ebt"));
	JS_SetPropertyUint32(context, name, 1, JS_NewString(context, "replicate"));
	JS_SetPropertyStr(context, message, "name", name);
	JSValue arg = JS_NewObject(context);
	JS_SetPropertyStr(context, arg, "version", JS_NewInt32(context, 3));
	JS_SetPropertyStr(context, arg, "format", JS_NewString(context, "classic"));
	JSValue args = JS_NewArray(context);
	JS_SetPropertyUint32(context, args, 0, arg);
	JS_SetPropertyStr(context, message, "args", args);
	JS_SetPropertyStr(context, message, "type", JS_NewString(context, "duplex"));
	int32_t request_number = tf_ssb_connection_next_request_number(connection);
	tf_ssb_connection_rpc_send_json(
		connection, k_ssb_rpc_flag_stream | k_ssb_rpc_flag_new_request, request_number, "ebt.replicate", message, _tf_ssb_rpc_ebt_replicate_client, NULL, NULL);
	if (!tf_ssb_connection_get_ebt_request_number(connection))
	{
		tf_ssb_connection_set_ebt_request_number(connection, request_number);
	}
	JS_FreeValue(context, message);
}

static void _tf_ssb_rpc_ebt_replicate_server(
	tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue args, const uint8_t* message, size_t size, void* user_data)
{
	if (flags & k_ssb_rpc_flag_end_error)
	{
		return;
	}
	tf_ssb_t* ssb = tf_ssb_connection_get_ssb(connection);
	if (!tf_ssb_is_replicator(ssb))
	{
		tf_ssb_connection_rpc_send_error_method_not_allowed(connection, flags, -request_number, "ebt.replicate");
		return;
	}
	_tf_ssb_rpc_ebt_replicate(connection, flags, request_number, args, message, size, user_data);
	tf_ssb_connection_add_request(connection, -request_number, "ebt.replicate", _tf_ssb_rpc_ebt_replicate, NULL, NULL, NULL);
}

static void _tf_ssb_rpc_connections_changed_callback(tf_ssb_t* ssb, tf_ssb_change_t change, tf_ssb_connection_t* connection, void* user_data)
{
	JSContext* context = tf_ssb_get_context(ssb);
	if (change == k_tf_ssb_change_connect)
	{
		JSValue message = JS_NewObject(context);
		JSValue name = JS_NewArray(context);
		JS_SetPropertyUint32(context, name, 0, JS_NewString(context, "blobs"));
		JS_SetPropertyUint32(context, name, 1, JS_NewString(context, "createWants"));
		JS_SetPropertyStr(context, message, "name", name);
		JS_SetPropertyStr(context, message, "type", JS_NewString(context, "source"));
		JS_SetPropertyStr(context, message, "args", JS_NewArray(context));
		tf_ssb_connection_rpc_send_json(connection, k_ssb_rpc_flag_stream | k_ssb_rpc_flag_new_request, tf_ssb_connection_next_request_number(connection), "blobs.createWants",
			message, _tf_ssb_rpc_connection_blobs_createWants_callback, NULL, NULL);
		JS_FreeValue(context, message);

		if (tf_ssb_connection_is_client(connection))
		{
			message = JS_NewObject(context);
			name = JS_NewArray(context);
			JS_SetPropertyUint32(context, name, 0, JS_NewString(context, "tunnel"));
			JS_SetPropertyUint32(context, name, 1, JS_NewString(context, "isRoom"));
			JS_SetPropertyStr(context, message, "name", name);
			JS_SetPropertyStr(context, message, "args", JS_NewArray(context));
			tf_ssb_connection_rpc_send_json(connection, k_ssb_rpc_flag_new_request, tf_ssb_connection_next_request_number(connection), "tunnel.isRoom", message,
				_tf_ssb_rpc_connection_tunnel_isRoom_callback, NULL, NULL);
			JS_FreeValue(context, message);

			if (tf_ssb_is_replicator(ssb))
			{
				_tf_ssb_rpc_send_ebt_replicate(connection);
			}
		}
	}
	else if (change == k_tf_ssb_change_remove)
	{
		tf_ssb_remove_blob_want_added_callback(ssb, _tf_ssb_rpc_blob_wants_added_callback, connection);

		char id[k_id_base64_len] = "";
		if (tf_ssb_connection_get_id(connection, id, sizeof(id)))
		{
			JSValue left = JS_NewObject(context);
			JS_SetPropertyStr(context, left, "type", JS_NewString(context, "left"));
			JS_SetPropertyStr(context, left, "id", JS_NewString(context, id));
			tf_ssb_connection_t* connections[1024];
			int count = tf_ssb_get_connections(ssb, connections, _countof(connections));
			for (int i = 0; i < count; i++)
			{
				if (tf_ssb_connection_is_attendant(connections[i]))
				{
					tf_ssb_connection_rpc_send_json(
						connections[i], k_ssb_rpc_flag_stream, -tf_ssb_connection_get_attendant_request_number(connections[i]), NULL, left, NULL, NULL, NULL);
				}
			}
			JS_FreeValue(context, left);
		}
	}
}

static void _tf_ssb_rpc_checkpoint(tf_ssb_t* ssb)
{
	int64_t checkpoint_start_ms = uv_hrtime();
	sqlite3* db = tf_ssb_acquire_db_writer(ssb);
	int log = 0;
	int checkpointed = 0;
	if (sqlite3_wal_checkpoint_v2(db, NULL, SQLITE_CHECKPOINT_TRUNCATE, &log, &checkpointed) == SQLITE_OK)
	{
		tf_printf("Checkpointed %d frames in %d ms.  Log is now %d frames.\n", (int)((uv_hrtime() - checkpoint_start_ms) / 1000000LL), checkpointed, log);
	}
	else
	{
		tf_printf("Checkpoint: %s.\n", sqlite3_errmsg(db));
	}
	tf_ssb_release_db_writer(ssb, db);
}

static void _tf_ssb_rpc_delete_blobs_work(tf_ssb_t* ssb, void* user_data)
{
	int64_t age = _get_global_setting_int64(ssb, "blob_expire_age_seconds", -1);
	if (age <= 0)
	{
		_tf_ssb_rpc_checkpoint(ssb);
		return;
	}
	int64_t start_ns = uv_hrtime();
	sqlite3* db = tf_ssb_acquire_db_writer(ssb);
	sqlite3_stmt* statement;
	int64_t now = (int64_t)time(NULL) * 1000ULL;
	int64_t timestamp = now - age * 1000ULL;
	const int k_limit = 128;
	int deleted = 0;
	if (sqlite3_prepare(db,
			"DELETE FROM blobs WHERE blobs.id IN ("
			"  SELECT blobs.id FROM blobs "
			"  JOIN messages_refs ON blobs.id = messages_refs.ref "
			"  JOIN messages ON messages.id = messages_refs.message "
			"  WHERE blobs.created < ?1 / 1000 "
			"  GROUP BY blobs.id HAVING MAX(messages.timestamp) < ?1 LIMIT ?2)",
			-1, &statement, NULL) == SQLITE_OK)
	{
		if (sqlite3_bind_int64(statement, 1, timestamp) == SQLITE_OK && sqlite3_bind_int(statement, 2, k_limit) == SQLITE_OK)
		{
			int r = sqlite3_step(statement);
			if (r != SQLITE_DONE)
			{
				tf_printf("_tf_ssb_rpc_delete_blobs_work: %s\n", sqlite3_errmsg(db));
			}
			else
			{
				tf_printf("_tf_ssb_rpc_delete_blobs_work: %d rows\n", sqlite3_changes(db));
			}
			deleted = sqlite3_changes(db);
		}
	}
	else
	{
		tf_printf("prepare failed: %s\n", sqlite3_errmsg(db));
	}
	tf_ssb_release_db_writer(ssb, db);
	int64_t duration_ms = (uv_hrtime() - start_ns) / 1000000LL;
	tf_printf("Deleted %d blobs in %d ms.\n", deleted, (int)duration_ms);
	_tf_ssb_rpc_checkpoint(ssb);
	_tf_ssb_rpc_start_delete_blobs(ssb, deleted ? (int)duration_ms : (15 * 60 * 1000));
}

static void _tf_ssb_rpc_delete_blobs_after_work(tf_ssb_t* ssb, int status, void* user_data)
{
}

static void _tf_ssb_rpc_start_delete_callback(tf_ssb_t* ssb, void* user_data)
{
	tf_ssb_run_work(ssb, _tf_ssb_rpc_delete_blobs_work, _tf_ssb_rpc_delete_blobs_after_work, NULL);
}

static void _tf_ssb_rpc_start_delete_blobs(tf_ssb_t* ssb, int delay_ms)
{
	tf_printf("will delete more blobs in %d ms\n", delay_ms);
	tf_ssb_schedule_work(ssb, delay_ms, _tf_ssb_rpc_start_delete_callback, NULL);
}

void tf_ssb_rpc_start_periodic(tf_ssb_t* ssb)
{
	_tf_ssb_rpc_start_delete_blobs(ssb, 30 * 1000);
}

void tf_ssb_rpc_register(tf_ssb_t* ssb)
{
	tf_ssb_add_connections_changed_callback(ssb, _tf_ssb_rpc_connections_changed_callback, NULL, NULL);
	tf_ssb_add_rpc_callback(ssb, (const char*[]) { "gossip", "ping", NULL }, _tf_ssb_rpc_gossip_ping, NULL, NULL); /* DUPLEX */
	tf_ssb_add_rpc_callback(ssb, (const char*[]) { "blobs", "get", NULL }, _tf_ssb_rpc_blobs_get, NULL, NULL); /* SOURCE */
	tf_ssb_add_rpc_callback(ssb, (const char*[]) { "blobs", "has", NULL }, _tf_ssb_rpc_blobs_has, NULL, NULL); /* ASYNC */
	tf_ssb_add_rpc_callback(ssb, (const char*[]) { "blobs", "createWants", NULL }, _tf_ssb_rpc_blobs_createWants, NULL, NULL); /* SOURCE */
	tf_ssb_add_rpc_callback(ssb, (const char*[]) { "tunnel", "connect", NULL }, _tf_ssb_rpc_tunnel_connect, NULL, NULL); /* DUPLEX */
	tf_ssb_add_rpc_callback(ssb, (const char*[]) { "tunnel", "isRoom", NULL }, _tf_ssb_rpc_room_meta, NULL, NULL); /* FAKE-ASYNC */
	tf_ssb_add_rpc_callback(ssb, (const char*[]) { "room", "metadata", NULL }, _tf_ssb_rpc_room_meta, NULL, NULL); /* ASYNC */
	tf_ssb_add_rpc_callback(ssb, (const char*[]) { "room", "attendants", NULL }, _tf_ssb_rpc_room_attendants, NULL, NULL); /* SOURCE */
	tf_ssb_add_rpc_callback(ssb, (const char*[]) { "createHistoryStream", NULL }, _tf_ssb_rpc_createHistoryStream, NULL, NULL); /* SOURCE */
	tf_ssb_add_rpc_callback(ssb, (const char*[]) { "ebt", "replicate", NULL }, _tf_ssb_rpc_ebt_replicate_server, NULL, NULL); /* DUPLEX */
}