"use strict";
var g_wants_requests = {};
var g_database = new Database('core');
const k_use_create_history_stream = false;
const k_blobs_concurrent_target = 8;

function following(db, id) {
	var o = db.get(id + ":following");
	const k_version = 5;
	var f = o ? JSON.parse(o) : o;
	if (!f || f.version != k_version) {
		f = {users: [], sequence: 0, version: k_version};
	}
	f.users = new Set(f.users);
	ssb.sqlStream(
		"SELECT "+
		"  sequence, "+
		"  json_extract(content, '$.contact') AS contact, "+
		"  json_extract(content, '$.following') AS following "+
		"FROM messages "+
		"WHERE "+
		"  author = ?1 AND "+
		"  sequence > ?2 AND "+
		"  json_extract(content, '$.type') = 'contact' "+
		"UNION SELECT MAX(sequence) AS sequence, NULL, NULL FROM messages WHERE author = ?1 "+
		"ORDER BY sequence",
		[id, f.sequence],
		function(row) {
			if (row.following) {
				f.users.add(row.contact);
			} else {
				f.users.delete(row.contact);
			}
			f.sequence = row.sequence;
		});
	f.users = Array.from(f.users);
	var j = JSON.stringify(f);
	if (o != j) {
		db.set(id + ":following", j);
	}
	return f.users;
}

function followingDeep(db, seed_ids, depth) {
	if (depth <= 0) {
		return seed_ids;
	}
	var f = seed_ids.map(x => following(db, x));
	var ids = [].concat(...f);
	var x = followingDeep(db, [...new Set(ids)].sort(), depth - 1);
	x = [...new Set([].concat(...x, ...seed_ids))].sort();
	return x;
}

function get_latest_sequence_for_author(author) {
	var sequence = 0;
	ssb.sqlStream(
		'SELECT MAX(sequence) AS sequence FROM messages WHERE author = ?1',
		[author],
		function(row) {
			if (row.sequence) {
				sequence = row.sequence;
			}
		});
	return sequence;
}

function storeMessage(message) {
	var payload = message.message.value ? message.message.value : message.message;
	if (typeof(payload) == 'object') {
		ssb.storeMessage(payload);
	}
}

ssb.addEventListener('connections', function(change, connection) {
	if (change == 'add') {
		var sequence = get_latest_sequence_for_author(connection.id);
		if (k_use_create_history_stream) {
			connection.send_json({'name': ['createHistoryStream'], 'type': 'source', 'args': [{'id': connection.id, 'seq': sequence, 'live': true, 'keys': false}]}, storeMessage);
			var me = ssb.whoami();
			followingDeep(g_database, [me], 2).then(function(ids) {
				for (let id of ids) {
					if (id == me) {
						continue;
					}
					var sequence = get_latest_sequence_for_author(id);
					connection.send_json({'name': ['createHistoryStream'], 'type': 'source', 'args': [{'id': id, 'seq': sequence, 'live': true, 'keys': false}]}, storeMessage);
				}
			});
		} else {
			if (connection.is_client) {
				connection.send_json({"name": ["ebt", "replicate"], "args": [{"version": 3, "format": "classic"}], "type": "duplex"}, ebtReplicateClient);
			}
		}

		connection.active_blob_wants = {};
		connection.send_json({'name': ['blobs', 'createWants'], 'type': 'source', 'args': []}, function(message) {
			Object.keys(message.message).forEach(function(id) {
				if (message.message[id] < 0) {
					delete connection.active_blob_wants[id];
					var blob = ssb.blobGet(id);
					if (blob) {
						var out_message = {};
						out_message[id] = blob.byteLength;
						g_wants_requests[connection.id].send_json(out_message);
					}
				} else {
					var received_bytes = 0;
					var expected_bytes = message.message[id];
					var buffer = new Uint8Array(expected_bytes);
					connection.send_json({'name': ['blobs', 'get'], 'type': 'source', 'args': [id]}, function(message) {
						if (message.flags & 0x4 /* end */) {
							delete connection.active_blob_wants[id];
						} else {
							buffer.set(new Uint8Array(message.message, 0, message.message.byteLength), received_bytes);
							received_bytes += message.message.byteLength;
							if (received_bytes == expected_bytes) {
								ssb.blobStore(buffer);
							}
						}
					});
					if (Object.keys(connection.active_blob_wants).length < k_blobs_concurrent_target) {
						requestMoreBlobs(g_wants_requests[connection.id]);
					}
				}
			});
		});
	} else if (change == 'remove') {
		print('REMOVE', connection.id);
		delete g_wants_requests[connection.id];
	} else {
		print('CHANGE', change);
	}
});

function blob_want_discovered(request, id) {
	if (Object.keys(request.connection.active_blob_wants).length > k_blobs_concurrent_target) {
		return;
	}
	var message = {};
	message[id] = -1;
	request.send_json(message);
	request.connection.active_blob_wants[id] = true;
}

function requestMoreBlobs(request) {
	ssb.sqlStream(
		'SELECT id FROM blob_wants LIMIT ' + k_blobs_concurrent_target,
		[],
		row => blob_want_discovered(request, row.id));
}

ssb.addRpc(['blobs', 'createWants'], function(request) {
	g_wants_requests[request.connection.id] = request;
	ssb.addEventListener('blob_want_added', id => blob_want_discovered(request, id));
	requestMoreBlobs(request);
});

ssb.addRpc(['blobs', 'has'], function(request) {
	var found = false;
	ssb.sqlStream(
		'SELECT 1 FROM blobs where id = ?1',
		[request.args[0]],
		function(row) {
			found = true;
		});
	request.send_json(found);
});

ssb.addRpc(['blobs', 'get'], function(request) {
	for (let arg of request.args) {
		var blob;
		if (arg.key) {
			blob = ssb.blobGet(arg.key);
		} else {
			blob = ssb.blobGet(arg);
		}
		const k_send_max = 8192;
		if (blob.byteLength > k_send_max) {
			for (var i = 0; i < blob.byteLength; i += k_send_max) {
				var buffer = new Uint8Array(blob, i, Math.min(blob.byteLength - i, k_send_max));
				request.send_binary(buffer);
			}
		} else {
			request.send_binary(blob);
		}
		request.send_json_end(true);
	}
	request.more(function(request) {});
});

ssb.addRpc(['gossip', 'ping'], function(request) {
	request.more(function(message) {
		message.send_json(Date.now());
	});
});

ssb.addRpc(['tunnel', 'isRoom'], function(request) {
	request.send_json(false);
});

function ebtReplicateSendClock(request, have) {
	var me = ssb.whoami();
	var message = {};
	var ids = followingDeep(g_database, [me], 2).concat([request.connection.id]);
	for (let id of ids) {
		message[id] = get_latest_sequence_for_author(id);
	}
	for (let id of Object.keys(have)) {
		if (message[id] === undefined) {
			var sequence = get_latest_sequence_for_author(id);
			message[id] = sequence ? sequence : -1;
		}
	}

	var last_sent = request.connection.sent_clock || {};
	var to_send = {}
	for (let id of ids) {
		if (last_sent[id] === undefined || message[id] > last_sent[id]) {
			last_sent[id] = to_send[id] = message[id] === -1 ? -1 : message[id] << 1;
		}
		if (Object.keys(to_send).length >= 32) {
			request.send_json(to_send);
			to_send = {};
		}
	}
	request.connection.sent_clock = last_sent;

	if (Object.keys(to_send).length) {
		request.send_json(to_send);
	}
}

function formatMessage(row) {
	var message = {
		previous: row.previous,
		author: row.author,
		sequence: row.sequence,
		timestamp: row.timestamp,
		hash: row.hash,
		content: JSON.parse(row.content),
		signature: row.signature,
	};
	return message;
}

function ebtReplicateRegisterMessageCallback(request) {
	var me = ssb.whoami();
	ssb.addEventListener('message', function(message_id) {
		ssb.sqlStream(
			'SELECT previous, author, id, sequence, timestamp, hash, content, signature FROM messages WHERE id = ?1 AND author = ?2',
			[message_id, me],
			function (row) {
				request.send_json(formatMessage(row));
			});
	});
}

function ebtReplicateCommon(request) {
	var me = ssb.whoami();
	if (request.message.author) {
		storeMessage(request);
	} else {
		ebtReplicateSendClock(request, request.message);

		for (let id of Object.keys(request.message)) {
			if (request.message[id] >= 0 && (request.message[id] & 1) == 0) {
				ssb.sqlStream(
					'SELECT previous, author, id, sequence, timestamp, hash, content, signature FROM messages WHERE author = ?1 AND sequence >= ?2 ORDER BY sequence',
					[id, request.message[id] >> 1],
					function (row) {
						request.send_json(formatMessage(row));
					});
			}
		}
	}
}

function ebtReplicateClient(request) {
	if (!request.connection.message_registered) {
		ebtReplicateRegisterMessageCallback(request);
		request.connection.message_registered = true;
	}
	ebtReplicateCommon(request);
}

function ebtReplicateServer(request) {
	ebtReplicateRegisterMessageCallback(request);
	ebtReplicateSendClock(request, {});
	request.more(ebtReplicateCommon);
}

ssb.addRpc(['ebt', 'replicate'], ebtReplicateServer);

ssb.addRpc(['createHistoryStream'], function(request) {
	var id = request.args[0].id;
	var seq = request.args[0].seq;
	var keys = request.args[0].keys || request.args[0].keys === undefined;
	function sendMessage(row) {
		if (keys) {
			var message = {
				key: row.id,
				value: {
					previous: row.previous,
					author: row.author,
					sequence: row.sequence,
					timestamp: row.timestamp,
					hash: row.hash,
					content: JSON.parse(row.content),
					signature: row.signature,
				},
				timestamp: row.timestamp,
			};
		} else {
			var message = {
				previous: row.previous,
				author: row.author,
				sequence: row.sequence,
				timestamp: row.timestamp,
				hash: row.hash,
				content: JSON.parse(row.content),
				signature: row.signature,
			};
		}
		request.send_json(message);
	}
	ssb.sqlStream(
		'SELECT previous, author, id, sequence, timestamp, hash, content, signature FROM messages WHERE author = ?1 AND sequence >= ?2 ORDER BY sequence',
		[id, seq ?? 0],
		sendMessage);
	ssb.addEventListener('message', function(message_id) {
		ssb.sqlStream(
			'SELECT previous, author, id, sequence, timestamp, hash, content, signature FROM messages WHERE id = ?1 AND author = ?2',
			[message_id, id],
			function (row) {
				sendMessage(row);
			});
	});
});