tildefriends/apps/cory/ssb/app.js

623 lines
17 KiB
JavaScript
Raw Normal View History

import * as tfrpc from '/tfrpc.js';
const k_posts_max = 40;
const k_votes_max = 20;
var g_ready = false;
var g_selected = null;
let g_whoami = null;
var g_blocking_cache = {};
var g_following_cache = {};
var g_following_deep_cache = {};
var g_sequence = {};
async function following(db, id) {
if (g_following_cache[id]) {
return g_following_cache[id];
}
var o = await db.get(id + ":following");
const k_version = 5;
var f = o ? JSON.parse(o) : o;
if (!f || f.version != k_version) {
f = {users: [], sequence: 0, version: k_version};
}
f.users = new Set(f.users);
await ssb.sqlStream(
"SELECT "+
" sequence, "+
" json_extract(content, '$.contact') AS contact, "+
" json_extract(content, '$.following') AS following "+
"FROM messages "+
"WHERE "+
" author = ?1 AND "+
" sequence > ?2 AND "+
" json_extract(content, '$.type') = 'contact' "+
"UNION SELECT MAX(sequence) AS sequence, NULL, NULL FROM messages WHERE author = ?1 "+
"ORDER BY sequence",
[id, f.sequence],
function(row) {
if (row.following) {
f.users.add(row.contact);
} else {
f.users.delete(row.contact);
}
if (row.sequence) {
f.sequence = row.sequence;
}
});
g_sequence[id] = f.sequence;
var as_set = f.users;
f.users = Array.from(f.users).sort();
var j = JSON.stringify(f);
if (o != j) {
await db.set(id + ":following", j);
}
f.users = as_set;
g_following_cache[id] = f.users;
return f.users;
}
async function followingDeep(db, seed_ids, depth, blocked) {
if (depth <= 0) {
return seed_ids;
}
var key = JSON.stringify([seed_ids, depth, blocked]);
if (g_following_deep_cache[key]) {
return g_following_deep_cache[key];
}
var f = await Promise.all(seed_ids.map(x => following(db, x).then(x => [...x])));
var ids = [].concat(...f);
if (blocked) {
ids = ids.filter(x => !blocked.has(x));
}
var x = await followingDeep(db, [...new Set(ids)].sort(), depth - 1, blocked);
x = [...new Set([].concat(...x, ...seed_ids))].sort();
g_following_deep_cache[key] = x;
return x;
}
async function blocking(db, id) {
if (g_blocking_cache[id]) {
return g_blocking_cache[id];
}
var o = await db.get(id + ":blocking");
const k_version = 5;
var f = o ? JSON.parse(o) : o;
if (!f || f.version != k_version) {
f = {users: [], sequence: 0, version: k_version};
}
f.users = new Set(f.users);
if (!g_sequence[id] || g_sequence[id] > f.sequence) {
await ssb.sqlStream(
"SELECT "+
" sequence, "+
" json_extract(content, '$.contact') AS contact, "+
" json_extract(content, '$.blocking') AS blocking "+
"FROM messages "+
"WHERE "+
" author = ?1 AND "+
" sequence > ?2 AND "+
" json_extract(content, '$.type') = 'contact' "+
"UNION SELECT MAX(sequence) AS sequence, NULL, NULL FROM messages WHERE author = ?1 "+
"ORDER BY sequence",
[id, f.sequence],
function(row) {
if (row.blocking) {
f.users.add(row.contact);
} else {
f.users.delete(row.contact);
}
if (row.sequence) {
f.sequence = row.sequence;
}
});
g_sequence[id] = f.sequence;
}
var as_set = f.users;
f.users = Array.from(f.users).sort();
var j = JSON.stringify(f);
if (o != j) {
await db.set(id + ":blocking", j);
}
f.users = as_set;
g_blocking_cache[id] = f.users;
return f.users;
}
async function getAbout(db, id) {
var o = await db.get(id + ":about");
const k_version = 5;
var f = o ? JSON.parse(o) : o;
if (!f || f.version != k_version) {
f = {about: {}, sequence: 0, version: k_version};
}
if (g_sequence[id] === undefined || g_sequence[id] > f.sequence) {
await ssb.sqlStream(
"SELECT "+
" sequence, "+
" content "+
"FROM messages "+
"WHERE "+
" author = ?1 AND "+
" sequence > ?2 AND "+
" json_extract(content, '$.type') = 'about' AND "+
" json_extract(content, '$.about') = ?1 "+
"UNION SELECT MAX(sequence) as sequence, NULL FROM messages WHERE author = ?1 "+
"ORDER BY sequence",
[id, f.sequence],
function(row) {
if (row.content) {
var about = {};
try {
about = JSON.parse(row.content);
} catch {
}
delete about.about;
delete about.type;
f.about = Object.assign(f.about, about);
}
if (row.sequence) {
f.sequence = Math.max(f.sequence, row.sequence);
}
});
g_sequence[id] = f.sequence;
var j = JSON.stringify(f);
if (o != j) {
await db.set(id + ":about", j);
}
}
return f.about;
}
function fnv32a(value)
{
var result = 0x811c9dc5;
for (var i = 0; i < value.length; i++) {
result ^= value.charCodeAt(i);
result += (result << 1) + (result << 4) + (result << 7) + (result << 8) + (result << 24);
}
return result >>> 0;
}
async function getRecentPostsSingleId(db, id, limit) {
var recent = [];
await ssb.sqlStream(
"SELECT "+
" rowid, "+
" id, "+
" timestamp "+
"FROM messages "+
"WHERE "+
" author = ? AND "+
" json_extract(content, '$.type') = 'post' "+
"ORDER BY sequence DESC LIMIT ?",
[id, limit],
function(row) {
if (row.id) {
recent.push({id: row.id, timestamp: row.timestamp});
}
});
recent.sort((x, y) => y.timestamp - x.timestamp);
return recent.map(x => x.id);
}
async function getRecentPostIds(db, id, ids, limit) {
if (ids.length == 1) {
return await getRecentPostsSingleId(db, ids[0], limit);
}
const k_version = 11;
const k_batch_max = 32;
var o = await db.get(id + ':recent_posts');
var recent = [];
var f = o ? JSON.parse(o) : o;
var ids_hash = fnv32a(JSON.stringify(ids));
if (!f || f.version != k_version || f.ids_hash != ids_hash) {
f = {recent: [], rowid: 0, version: k_version, ids_hash: ids_hash};
}
var row_id_max = 0;
await ssb.sqlStream(
"SELECT MAX(rowid) as rowid FROM messages",
[],
function(row) {
row_id_max = row.rowid;
});
for (var i = 0; i < ids.length; i += k_batch_max) {
var ids_batch = ids.slice(i, Math.min(i + k_batch_max, ids.length));
await ssb.sqlStream(
"SELECT "+
" rowid, "+
" id, "+
" timestamp "+
"FROM messages "+
"WHERE "+
" rowid > ? AND "+
" rowid <= ? AND "+
" author IN (" + ids_batch.map(x => '?').join(", ") + ") "+
"ORDER BY timestamp DESC LIMIT ?",
[].concat([f.rowid, row_id_max], ids_batch, [limit]),
function(row) {
if (row.id) {
recent.push({id: row.id, timestamp: row.timestamp});
}
});
}
f.rowid = row_id_max;
f.recent = [].concat(recent, f.recent);
var have = {};
f.recent = f.recent.filter(function(x) {
if (!have[x.id]) {
have[x.id] = true;
return true;
}
});
f.recent.sort((x, y) => y.timestamp - x.timestamp);
f.recent = f.recent.slice(0, limit);
var j = JSON.stringify(f);
if (o != j) {
await db.set(id + ":recent_posts", j);
}
return f.recent.map(x => x.id);
}
async function getRecentPostIds2(db, id, ids, start_time) {
if (ids.length == 1) {
return getRecentPostsSingleId(db, ids[0], 20);
}
const k_batch_max = 32;
var row_id_max = 0;
await ssb.sqlStream(
"SELECT MAX(rowid) as rowid FROM messages",
[],
function(row) {
row_id_max = row.rowid;
});
var posts_by_author = {};
for (var i = 0; i < ids.length; i += k_batch_max) {
var ids_batch = ids.slice(i, Math.min(i + k_batch_max, ids.length));
await ssb.sqlStream(
"SELECT "+
" author, "+
" id "+
"FROM messages "+
"WHERE "+
" author IN (" + ids_batch.map(x => '?').join(", ") + ") AND "+
" timestamp > ? AND "+
" rowid <= ? AND "+
" json_extract(content, '$.type') = 'post' "+
"ORDER BY timestamp DESC",
[].concat(ids_batch, [start_time, row_id_max]),
function(row) {
if (row.id) {
if (!posts_by_author[row.author]) {
posts_by_author[row.author] = [];
}
posts_by_author[row.author].push(row.id);
}
});
}
return Object.values(posts_by_author).map(x => x[0]);
}
async function getRelatedPostIds(db, message, ids, limit) {
const k_batch_max = 16;
var recent = [];
var row_id_max = 0;
await ssb.sqlStream(
"SELECT MAX(rowid) as rowid FROM messages",
[],
function(row) {
row_id_max = row.rowid;
});
var id = message.id;
try {
id = JSON.parse(message.content).root || id;
} catch {
}
for (var i = 0; i < ids.length; i += k_batch_max) {
var ids_batch = ids.slice(i, Math.min(i + k_batch_max, ids.length));
await ssb.sqlStream(
"SELECT "+
" rowid, "+
" id, "+
" timestamp "+
"FROM messages "+
"WHERE "+
" timestamp >= ? AND "+
" rowid <= ? AND "+
" author IN (" + ids_batch.map(x => '?').join(", ") + ") AND "+
" json_extract(content, '$.type') = 'post' AND "+
" (id = ? OR json_extract(content, '$.root') = ?) "+
"ORDER BY timestamp DESC LIMIT ?",
[].concat([message.timestamp || 0, row_id_max], ids_batch, [message.id, id, limit]),
function(row) {
if (row.id) {
recent.push({id: row.id, timestamp: row.timestamp});
}
});
}
recent.sort((x, y) => y.timestamp - x.timestamp);
recent = recent.slice(0, limit);
return recent.map(x => x.id);
}
async function getVotes(db, id) {
var o = await db.get(id + ":votes");
const k_version = 7;
var f = o ? JSON.parse(o) : o;
if (!f || f.version != k_version) {
f = {votes: [], sequence: 0, version: k_version};
}
if (g_sequence[id] === undefined || g_sequence[id] > f.sequence) {
var votes = [];
await ssb.sqlStream(
"SELECT "+
" author, "+
" id, "+
" sequence, "+
" timestamp, "+
" content "+
"FROM messages "+
"WHERE "+
" author = ? AND "+
" sequence > ? AND "+
" json_extract(content, '$.type') = 'vote' "+
"UNION SELECT NULL, NULL, MAX(sequence) AS sequence, NULL, NULL FROM messages WHERE author = ? "+
"ORDER BY sequence DESC LIMIT ?",
[id, f.sequence, id, k_votes_max],
function(row) {
if (row.id) {
votes.push(row);
}
if (row.sequence) {
f.sequence = Math.max(f.sequence, row.sequence);
}
});
g_sequence[id] = f.sequence;
f.votes = [].concat(votes, f.votes).slice(0, k_votes_max);
var j = JSON.stringify(f);
if (o != j) {
await db.set(id + ":votes", j);
}
}
return f.votes;
}
async function getPosts(db, ids) {
var posts = [];
if (ids.length) {
await ssb.sqlStream(
"SELECT rowid, * FROM messages WHERE id IN (" + ids.map(x => "?").join(", ") + ") ORDER BY timestamp DESC",
ids,
row => posts.push(row));
}
return posts;
}
tfrpc.register(async function ready() {
let identities = await ssb.getIdentities();
await tfrpc.rpc.set_identities(identities);
g_ready = true;
refresh_internal(g_whoami, g_selected, true);
});
tfrpc.register(async function store_blob(blob) {
if (Array.isArray(blob)) {
blob = Uint8Array.from(blob);
}
return await ssb.blobStore(blob);
});
ssb.addEventListener('broadcasts', async function() {
await tfrpc.rpc.set('broadcasts', await ssb.getBroadcasts());
});
core.register('onConnectionsChanged', async function() {
var connections = await ssb.connections();
await tfrpc.rpc.set('connections', connections);
});
async function updateSequences(db) {
var k_batch_max = 100;
var changes = {};
var keys = Object.keys(g_sequence);
for (var i = 0; i < keys.length; i += k_batch_max) {
var ids_batch = keys.slice(i, Math.min(i + k_batch_max, keys.length));
await ssb.sqlStream(
"SELECT "+
" author, "+
" MAX(sequence) AS sequence "+
"FROM messages "+
"WHERE "+
" author IN (" + ids_batch.map(x => '?').join(", ") + ")",
ids_batch,
function(row) {
if (g_sequence[row.author] != row.sequence) {
g_sequence[row.author] = row.sequence;
changes[row.author] = row.sequence;
}
});
}
return changes;
}
async function refresh_internal(whoami, selected, force) {
if (!g_ready) {
return;
}
let hash = selected || undefined;
if (whoami !== g_whoami || selected !== g_selected || force) {
g_whoami = whoami;
g_selected = selected;
} else {
return;
}
if (typeof(whoami) !== 'string') {
return;
}
var timing = [];
timing.push({name: 'start', time: new Date()});
g_following_cache = {};
g_following_deep_cache = {};
await tfrpc.rpc.clear();
await tfrpc.rpc.set_identities(await ssb.getIdentities());
var db = await database("ssb");
g_sequence = {};
try {
g_sequence = JSON.parse(await db.get('sequence'));
} catch (e) {
}
await updateSequences(db);
timing.push({name: 'init', time: new Date()});
var blocked = await blocking(db, whoami);
timing.push({name: 'blocked', time: new Date()});
var all_followed = await followingDeep(db, [whoami], 2, blocked);
timing.push({name: 'all_followed', time: new Date()});
let actual_selected = (selected ? [selected] : all_followed) ?? [];
await Promise.all([
tfrpc.rpc.set('whoami', whoami),
tfrpc.rpc.set('broadcasts', await ssb.getBroadcasts()),
tfrpc.rpc.set('connections', await ssb.connections()),
tfrpc.rpc.set('apps', await core.apps()),
]);
timing.push({name: 'core', time: new Date()});
var ids;
if (selected && selected.startsWith('%')) {
var m = await getPosts(db, [selected]);
m = m.length ? m[0] : {id: selected};
ids = await getRelatedPostIds(db, m, all_followed, k_posts_max);
} else {
ids = await getRecentPostIds2(db, whoami, actual_selected, (new Date()).valueOf() - (24 * 60 * 60 * 1000));
}
timing.push({name: 'get_post_ids', time: new Date()});
var posts = await getPosts(db, ids);
timing.push({name: 'get_posts', time: new Date()});
var roots = posts.map(function(x) {
try {
return JSON.parse(x.content).root;
} catch {
return null;
}
});
var have = new Set(posts.map(x => x.id));
roots = [...new Set(roots)].filter(x => x && !have.has(x));
var all_posts = [].concat(posts, await getPosts(db, roots));
timing.push({name: 'get_root_posts', time: new Date()});
await tfrpc.rpc.push_posts(all_posts);
timing.push({name: 'send_posts', time: new Date()});
let all_users = {};
await Promise.all(all_followed.map(id => getAbout(db, id).then(function(results) {
if (Object.keys(results).length) {
all_users[id] = results;
}
})));
await tfrpc.rpc.push_users(all_users);
timing.push({name: 'about', time: new Date()});
var all_votes = [];
for (let id of all_followed) {
var results = await getVotes(db, id);
if (results.length) {
all_votes.push(results);
}
}
all_votes = all_votes.flat();
await tfrpc.rpc.push_votes(all_votes);
timing.push({name: 'votes', time: new Date()});
if (selected && selected.length == 1 && selected[0].startsWith('@')) {
let size = 0;
await ssb.sqlStream(
'SELECT SUM(LENGTH(content)) AS length FROM messages WHERE author = ?1',
selected,
function(row) {
size = row.length;
});
let users = {};
users[selected[0]] = {size: size};
await tfrpc.rpc.push_users(users);
}
await tfrpc.rpc.push_following(Object.fromEntries(all_followed.map(id => [id, [...(g_following_cache[id] || [])]])));
timing.push({name: 'following', time: new Date()});
await tfrpc.rpc.push_blocking(whoami, [...(g_blocking_cache[whoami] || [])]);
timing.push({name: 'send_blocking', time: new Date()});
await db.set('sequence', JSON.stringify(g_sequence));
var times = {};
var previous = null;
for (let t of timing) {
times[t.name] = (t.time - (previous || t).time) / 1000.0 + ' s';
previous = t;
}times.total = (new Date() - timing[0].time) / 1000.0 + ' s';
await tfrpc.rpc.ready(times);
}
ssb.addEventListener('message', async function(id) {
var db = await database("ssb");
var posts = await getPosts(db, [id]);
for (let post of posts) {
if (post.author == g_whoami ||
JSON.parse(post.content).type != 'post') {
await tfrpc.rpc.push_posts([post]);
} else {
await tfrpc.rpc.add_unread(1);
}
}
});
tfrpc.register(async function refresh(whoami, selected, force) {
return refresh_internal(whoami, selected, force);
});
tfrpc.register(async function createIdentity() {
return ssb.createIdentity();
});
async function addAppSources(message) {
if (message.mentions) {
for (let mention of message.mentions) {
if (mention.type == 'application/tildefriends') {
var blob = await ssb.blobGet(mention.link);
var json = JSON.parse(utf8Decode(blob));
for (let file of Object.keys(json.files)) {
message.mentions.push({
name: file,
link: json.files[file],
});
}
}
}
}
}
core.register('message', async function(m) {
if (m.message) {
if (m.message.connect) {
await ssb.connect(m.message.connect);
} else if (m.message.appendMessage) {
await addAppSources(m.message.appendMessage);
await ssb.appendMessageWithIdentity(g_whoami, m.message.appendMessage);
}
} else if (m.event == 'hashChange') {
let hash = m.hash.length > 1 ? m.hash.substring(1) : null;
if (hash) {
let parts = hash.split(':');
let changed = g_whoami !== parts[0] || g_selected !== parts[1];
g_whoami = parts[0];
g_selected = parts[1];
await refresh_internal(g_whoami, g_selected, changed);
}
} else if (m.event == 'focus' || m.event == 'blur') {
/* Shh. */
} else {
print(JSON.stringify(m));
}
});
async function main() {
if (core.user?.credentials?.permissions?.authenticated) {
await app.setDocument(utf8Decode(await getFile("index.html")));
} else {
await app.setDocument('<div style="color: #f00">You must be signed in to use this app at this time. Login at the top right.</div>');
}
}
main();