import * as tfrpc from '/tfrpc.js'; const k_posts_max = 40; const k_votes_max = 20; var g_ready = false; var g_selected = null; let g_whoami = null; let g_hash = null; var g_blocking_cache = {}; var g_following_cache = {}; var g_following_deep_cache = {}; var g_sequence = {}; async function following(db, id) { if (g_following_cache[id]) { return g_following_cache[id]; } var o = await db.get(id + ":following"); const k_version = 5; var f = o ? JSON.parse(o) : o; if (!f || f.version != k_version) { f = {users: [], sequence: 0, version: k_version}; } f.users = new Set(f.users); await ssb.sqlStream( "SELECT "+ " sequence, "+ " json_extract(content, '$.contact') AS contact, "+ " json_extract(content, '$.following') AS following "+ "FROM messages "+ "WHERE "+ " author = ?1 AND "+ " sequence > ?2 AND "+ " json_extract(content, '$.type') = 'contact' "+ "UNION SELECT MAX(sequence) AS sequence, NULL, NULL FROM messages WHERE author = ?1 "+ "ORDER BY sequence", [id, f.sequence], function(row) { if (row.following) { f.users.add(row.contact); } else { f.users.delete(row.contact); } if (row.sequence) { f.sequence = row.sequence; } }); g_sequence[id] = f.sequence; var as_set = f.users; f.users = Array.from(f.users).sort(); var j = JSON.stringify(f); if (o != j) { await db.set(id + ":following", j); } f.users = as_set; g_following_cache[id] = f.users; return f.users; } async function followingDeep(db, seed_ids, depth, blocked) { if (depth <= 0) { return seed_ids; } var key = JSON.stringify([seed_ids, depth, blocked]); if (g_following_deep_cache[key]) { return g_following_deep_cache[key]; } var f = await Promise.all(seed_ids.map(x => following(db, x).then(x => [...x]))); var ids = [].concat(...f); if (blocked) { ids = ids.filter(x => !blocked.has(x)); } var x = await followingDeep(db, [...new Set(ids)].sort(), depth - 1, blocked); x = [...new Set([].concat(...x, ...seed_ids))].sort(); g_following_deep_cache[key] = x; return x; } async function blocking(db, id) { if (g_blocking_cache[id]) { return g_blocking_cache[id]; } var o = await db.get(id + ":blocking"); const k_version = 5; var f = o ? JSON.parse(o) : o; if (!f || f.version != k_version) { f = {users: [], sequence: 0, version: k_version}; } f.users = new Set(f.users); if (!g_sequence[id] || g_sequence[id] > f.sequence) { await ssb.sqlStream( "SELECT "+ " sequence, "+ " json_extract(content, '$.contact') AS contact, "+ " json_extract(content, '$.blocking') AS blocking "+ "FROM messages "+ "WHERE "+ " author = ?1 AND "+ " sequence > ?2 AND "+ " json_extract(content, '$.type') = 'contact' "+ "UNION SELECT MAX(sequence) AS sequence, NULL, NULL FROM messages WHERE author = ?1 "+ "ORDER BY sequence", [id, f.sequence], function(row) { if (row.blocking) { f.users.add(row.contact); } else { f.users.delete(row.contact); } if (row.sequence) { f.sequence = row.sequence; } }); g_sequence[id] = f.sequence; } var as_set = f.users; f.users = Array.from(f.users).sort(); var j = JSON.stringify(f); if (o != j) { await db.set(id + ":blocking", j); } f.users = as_set; g_blocking_cache[id] = f.users; return f.users; } async function getAbout(db, id) { var o = await db.get(id + ":about"); const k_version = 5; var f = o ? JSON.parse(o) : o; if (!f || f.version != k_version) { f = {about: {}, sequence: 0, version: k_version}; } if (g_sequence[id] === undefined || g_sequence[id] > f.sequence) { await ssb.sqlStream( "SELECT "+ " sequence, "+ " content "+ "FROM messages "+ "WHERE "+ " author = ?1 AND "+ " sequence > ?2 AND "+ " json_extract(content, '$.type') = 'about' AND "+ " json_extract(content, '$.about') = ?1 "+ "UNION SELECT MAX(sequence) as sequence, NULL FROM messages WHERE author = ?1 "+ "ORDER BY sequence", [id, f.sequence], function(row) { if (row.content) { var about = {}; try { about = JSON.parse(row.content); } catch { } delete about.about; delete about.type; f.about = Object.assign(f.about, about); } if (row.sequence) { f.sequence = Math.max(f.sequence, row.sequence); } }); g_sequence[id] = f.sequence; var j = JSON.stringify(f); if (o != j) { await db.set(id + ":about", j); } } return f.about; } function fnv32a(value) { var result = 0x811c9dc5; for (var i = 0; i < value.length; i++) { result ^= value.charCodeAt(i); result += (result << 1) + (result << 4) + (result << 7) + (result << 8) + (result << 24); } return result >>> 0; } async function getRecentPostsSingleId(db, id, limit) { var recent = []; await ssb.sqlStream( "SELECT "+ " rowid, "+ " id, "+ " timestamp "+ "FROM messages "+ "WHERE "+ " author = ? AND "+ " json_extract(content, '$.type') = 'post' "+ "ORDER BY sequence DESC LIMIT ?", [id, limit], function(row) { if (row.id) { recent.push({id: row.id, timestamp: row.timestamp}); } }); recent.sort((x, y) => y.timestamp - x.timestamp); return recent.map(x => x.id); } async function getRecentPostIds(db, id, ids, limit) { if (ids.length == 1) { return await getRecentPostsSingleId(db, ids[0], limit); } const k_version = 11; const k_batch_max = 32; var o = await db.get(id + ':recent_posts'); var recent = []; var f = o ? JSON.parse(o) : o; var ids_hash = fnv32a(JSON.stringify(ids)); if (!f || f.version != k_version || f.ids_hash != ids_hash) { f = {recent: [], rowid: 0, version: k_version, ids_hash: ids_hash}; } var row_id_max = 0; await ssb.sqlStream( "SELECT MAX(rowid) as rowid FROM messages", [], function(row) { row_id_max = row.rowid; }); for (var i = 0; i < ids.length; i += k_batch_max) { var ids_batch = ids.slice(i, Math.min(i + k_batch_max, ids.length)); await ssb.sqlStream( "SELECT "+ " rowid, "+ " id, "+ " timestamp "+ "FROM messages "+ "WHERE "+ " rowid > ? AND "+ " rowid <= ? AND "+ " author IN (" + ids_batch.map(x => '?').join(", ") + ") "+ "ORDER BY timestamp DESC LIMIT ?", [].concat([f.rowid, row_id_max], ids_batch, [limit]), function(row) { if (row.id) { recent.push({id: row.id, timestamp: row.timestamp}); } }); } f.rowid = row_id_max; f.recent = [].concat(recent, f.recent); var have = {}; f.recent = f.recent.filter(function(x) { if (!have[x.id]) { have[x.id] = true; return true; } }); f.recent.sort((x, y) => y.timestamp - x.timestamp); f.recent = f.recent.slice(0, limit); var j = JSON.stringify(f); if (o != j) { await db.set(id + ":recent_posts", j); } return f.recent.map(x => x.id); } async function getRecentPostIds2(db, id, ids, start_time) { if (ids.length == 1) { return getRecentPostsSingleId(db, ids[0], 20); } const k_batch_max = 32; var row_id_max = 0; await ssb.sqlStream( "SELECT MAX(rowid) as rowid FROM messages", [], function(row) { row_id_max = row.rowid; }); var posts_by_author = {}; for (var i = 0; i < ids.length; i += k_batch_max) { var ids_batch = ids.slice(i, Math.min(i + k_batch_max, ids.length)); await ssb.sqlStream( "SELECT "+ " author, "+ " id "+ "FROM messages "+ "WHERE "+ " author IN (" + ids_batch.map(x => '?').join(", ") + ") AND "+ " timestamp > ? AND "+ " rowid <= ? AND "+ " json_extract(content, '$.type') = 'post' "+ "ORDER BY timestamp DESC", [].concat(ids_batch, [start_time, row_id_max]), function(row) { if (row.id) { if (!posts_by_author[row.author]) { posts_by_author[row.author] = []; } posts_by_author[row.author].push(row.id); } }); } return Object.values(posts_by_author).map(x => x[0]); } async function getRelatedPostIds(db, message, ids, limit) { const k_batch_max = 16; var recent = []; var row_id_max = 0; await ssb.sqlStream( "SELECT MAX(rowid) as rowid FROM messages", [], function(row) { row_id_max = row.rowid; }); var id = message.id; try { id = JSON.parse(message.content).root || id; } catch { } for (var i = 0; i < ids.length; i += k_batch_max) { var ids_batch = ids.slice(i, Math.min(i + k_batch_max, ids.length)); await ssb.sqlStream( "SELECT "+ " rowid, "+ " id, "+ " timestamp "+ "FROM messages "+ "WHERE "+ " timestamp >= ? AND "+ " rowid <= ? AND "+ " author IN (" + ids_batch.map(x => '?').join(", ") + ") AND "+ " json_extract(content, '$.type') = 'post' AND "+ " (id = ? OR json_extract(content, '$.root') = ?) "+ "ORDER BY timestamp DESC LIMIT ?", [].concat([message.timestamp || 0, row_id_max], ids_batch, [message.id, id, limit]), function(row) { if (row.id) { recent.push({id: row.id, timestamp: row.timestamp}); } }); } recent.sort((x, y) => y.timestamp - x.timestamp); recent = recent.slice(0, limit); return recent.map(x => x.id); } async function getVotes(db, id) { var o = await db.get(id + ":votes"); const k_version = 7; var f = o ? JSON.parse(o) : o; if (!f || f.version != k_version) { f = {votes: [], sequence: 0, version: k_version}; } if (g_sequence[id] === undefined || g_sequence[id] > f.sequence) { var votes = []; await ssb.sqlStream( "SELECT "+ " author, "+ " id, "+ " sequence, "+ " timestamp, "+ " content "+ "FROM messages "+ "WHERE "+ " author = ? AND "+ " sequence > ? AND "+ " json_extract(content, '$.type') = 'vote' "+ "UNION SELECT NULL, NULL, MAX(sequence) AS sequence, NULL, NULL FROM messages WHERE author = ? "+ "ORDER BY sequence DESC LIMIT ?", [id, f.sequence, id, k_votes_max], function(row) { if (row.id) { votes.push(row); } if (row.sequence) { f.sequence = Math.max(f.sequence, row.sequence); } }); g_sequence[id] = f.sequence; f.votes = [].concat(votes, f.votes).slice(0, k_votes_max); var j = JSON.stringify(f); if (o != j) { await db.set(id + ":votes", j); } } return f.votes; } async function getPosts(db, ids) { var posts = []; if (ids.length) { await ssb.sqlStream( "SELECT rowid, * FROM messages WHERE id IN (" + ids.map(x => "?").join(", ") + ") ORDER BY timestamp DESC", ids, row => posts.push(row)); } return posts; } tfrpc.register(async function ready() { let identities = await ssb.getIdentities(); await tfrpc.rpc.set_identities(identities); g_ready = true; }); tfrpc.register(async function store_blob(blob) { if (Array.isArray(blob)) { blob = Uint8Array.from(blob); } return await ssb.blobStore(blob); }); ssb.addEventListener('broadcasts', async function() { await tfrpc.rpc.set('broadcasts', await ssb.getBroadcasts()); }); core.register('onConnectionsChanged', async function() { var connections = await ssb.connections(); await tfrpc.rpc.set('connections', connections); }); async function updateSequences(db) { var k_batch_max = 100; var changes = {}; var keys = Object.keys(g_sequence); for (var i = 0; i < keys.length; i += k_batch_max) { var ids_batch = keys.slice(i, Math.min(i + k_batch_max, keys.length)); await ssb.sqlStream( "SELECT "+ " author, "+ " MAX(sequence) AS sequence "+ "FROM messages "+ "WHERE "+ " author IN (" + ids_batch.map(x => '?').join(", ") + ")", ids_batch, function(row) { if (g_sequence[row.author] != row.sequence) { g_sequence[row.author] = row.sequence; changes[row.author] = row.sequence; } }); } return changes; } let g_refresh_limit = 5; async function refresh_internal(whoami, selected) { if (typeof(whoami) !== 'string') { return; } if (whoami !== g_whoami || selected !== g_selected) { g_whoami = whoami; g_selected = selected; } else { return; } var timing = []; timing.push({name: 'start', time: new Date()}); g_following_cache = {}; g_following_deep_cache = {}; await tfrpc.rpc.clear(); await tfrpc.rpc.set_identities(await ssb.getIdentities()); var db = await database("ssb"); g_sequence = {}; try { g_sequence = JSON.parse(await db.get('sequence')); } catch (e) { } await updateSequences(db); timing.push({name: 'init', time: new Date()}); var blocked = await blocking(db, whoami); timing.push({name: 'blocked', time: new Date()}); var all_followed = await followingDeep(db, [whoami], 2, blocked); timing.push({name: 'all_followed', time: new Date()}); let actual_selected = selected ? selected : all_followed; let hash = selected && selected.length == 1 ? selected[0] : null; if (hash !== g_hash) { g_hash = hash; print(g_hash, '=>', hash); //await tfrpc.rpc.set_hash(hash); } await Promise.all([ tfrpc.rpc.set('whoami', whoami), tfrpc.rpc.set('broadcasts', await ssb.getBroadcasts()), tfrpc.rpc.set('connections', await ssb.connections()), tfrpc.rpc.set('apps', await core.apps()), ]); timing.push({name: 'core', time: new Date()}); var ids; if (selected && selected.length == 1 && selected[0].startsWith('%')) { var m = await getPosts(db, selected); m = m.length ? m[0] : {id: selected[0]}; ids = await getRelatedPostIds(db, m, all_followed, k_posts_max); } else { ids = await getRecentPostIds2(db, whoami, actual_selected, (new Date()).valueOf() - (24 * 60 * 60 * 1000)); } timing.push({name: 'get_post_ids', time: new Date()}); var posts = await getPosts(db, ids); timing.push({name: 'get_posts', time: new Date()}); var roots = posts.map(function(x) { try { return JSON.parse(x.content).root; } catch { return null; } }); var have = new Set(posts.map(x => x.id)); roots = [...new Set(roots)].filter(x => x && !have.has(x)); var all_posts = [].concat(posts, await getPosts(db, roots)); timing.push({name: 'get_root_posts', time: new Date()}); await tfrpc.rpc.push_posts(all_posts); timing.push({name: 'send_posts', time: new Date()}); let all_users = {}; await Promise.all(all_followed.map(id => getAbout(db, id).then(function(results) { if (Object.keys(results).length) { all_users[id] = results; } }))); await tfrpc.rpc.push_users(all_users); timing.push({name: 'about', time: new Date()}); var all_votes = []; for (let id of all_followed) { var results = await getVotes(db, id); if (results.length) { all_votes.push(results); } } all_votes = all_votes.flat(); await tfrpc.rpc.push_votes(all_votes); timing.push({name: 'votes', time: new Date()}); if (selected && selected.length == 1 && selected[0].startsWith('@')) { let size = 0; await ssb.sqlStream( 'SELECT SUM(LENGTH(content)) AS length FROM messages WHERE author = ?1', selected, function(row) { size = row.length; }); let users = {}; users[selected[0]] = {size: size}; await tfrpc.rpc.push_users(users); } await tfrpc.rpc.push_following(Object.fromEntries(all_followed.map(id => [id, [...(g_following_cache[id] || [])]]))); timing.push({name: 'following', time: new Date()}); await tfrpc.rpc.push_blocking(whoami, [...(g_blocking_cache[whoami] || [])]); timing.push({name: 'send_blocking', time: new Date()}); await db.set('sequence', JSON.stringify(g_sequence)); var times = {}; var previous = null; for (let t of timing) { times[t.name] = (t.time - (previous || t).time) / 1000.0 + ' s'; previous = t; } await tfrpc.rpc.ready(times); } ssb.addEventListener('message', async function(id) { var db = await database("ssb"); var posts = await getPosts(db, [id]); for (let post of posts) { if (post.author == g_whoami || JSON.parse(post.content).type != 'post') { await tfrpc.rpc.push_posts([post]); } else { await tfrpc.rpc.add_unread(1); } } }); tfrpc.register(async function refresh(whoami, selected) { return refresh_internal(whoami, selected); }); async function addAppSources(message) { if (message.mentions) { for (let mention of message.mentions) { if (mention.type == 'application/tildefriends') { var blob = await ssb.blobGet(mention.link); var json = JSON.parse(utf8Decode(blob)); for (let file of Object.keys(json.files)) { message.mentions.push({ name: file, link: json.files[file], }); } } } } } core.register('message', async function(m) { if (m.message) { if (m.message.connect) { await ssb.connect(m.message.connect); } else if (m.message.appendMessage) { await addAppSources(m.message.appendMessage); await ssb.appendMessageWithIdentity(g_whoami, m.message.appendMessage); } } else if (m.event == 'hashChange') { let selected = m.hash.length > 1 ? [m.hash.substring(1)] : null; await refresh_internal(g_whoami, selected); } else if (m.event == 'focus' || m.event == 'blur') { /* Shh. */ } else { print(JSON.stringify(m)); } }); async function main() { if (core.user && core.user.credentials && core.user.credentials.permissions && core.user.credentials.permissions.administration) { await app.setDocument(utf8Decode(await getFile("index.html"))); } else { await app.setDocument('
Only the administrator can use this app at this time. Login at the top right.
'); } } main();