2021-12-28 10:25:04 -05:00
"use strict" ;
2021-09-06 13:50:38 -04:00
var g _wants _requests = { } ;
2021-11-03 19:23:20 -04:00
var g _database = new Database ( 'core' ) ;
2022-11-02 19:34:44 -04:00
let g _attendants = { } ;
2021-12-27 14:52:42 -05:00
const k _use _create _history _stream = false ;
2021-12-28 10:25:04 -05:00
const k _blobs _concurrent _target = 8 ;
2021-09-06 13:50:38 -04:00
2021-12-27 14:52:42 -05:00
function following ( db , id ) {
var o = db . get ( id + ":following" ) ;
2021-09-08 20:15:57 -04:00
const k _version = 5 ;
var f = o ? JSON . parse ( o ) : o ;
if ( ! f || f . version != k _version ) {
f = { users : [ ] , sequence : 0 , version : k _version } ;
}
f . users = new Set ( f . users ) ;
2021-12-27 14:52:42 -05:00
ssb . sqlStream (
2021-09-08 20:15:57 -04:00
"SELECT " +
" sequence, " +
" json_extract(content, '$.contact') AS contact, " +
" json_extract(content, '$.following') AS following " +
"FROM messages " +
"WHERE " +
" author = ?1 AND " +
" sequence > ?2 AND " +
" json_extract(content, '$.type') = 'contact' " +
"UNION SELECT MAX(sequence) AS sequence, NULL, NULL FROM messages WHERE author = ?1 " +
"ORDER BY sequence" ,
[ id , f . sequence ] ,
function ( row ) {
if ( row . following ) {
f . users . add ( row . contact ) ;
} else {
f . users . delete ( row . contact ) ;
}
f . sequence = row . sequence ;
} ) ;
2022-02-05 22:28:29 -05:00
f . users = Array . from ( f . users ) . sort ( ) ;
2021-09-08 20:15:57 -04:00
var j = JSON . stringify ( f ) ;
if ( o != j ) {
2021-12-27 14:52:42 -05:00
db . set ( id + ":following" , j ) ;
2021-09-08 20:15:57 -04:00
}
return f . users ;
}
2021-12-27 14:52:42 -05:00
function followingDeep ( db , seed _ids , depth ) {
2021-09-08 20:15:57 -04:00
if ( depth <= 0 ) {
return seed _ids ;
}
2021-12-27 14:52:42 -05:00
var f = seed _ids . map ( x => following ( db , x ) ) ;
2021-09-08 20:15:57 -04:00
var ids = [ ] . concat ( ... f ) ;
2021-12-27 14:52:42 -05:00
var x = followingDeep ( db , [ ... new Set ( ids ) ] . sort ( ) , depth - 1 ) ;
2021-11-05 22:10:13 -04:00
x = [ ... new Set ( [ ] . concat ( ... x , ... seed _ids ) ) ] . sort ( ) ;
2021-09-08 20:15:57 -04:00
return x ;
}
2021-10-31 15:39:16 -04:00
function get _latest _sequence _for _author ( author ) {
2021-10-30 17:07:01 -04:00
var sequence = 0 ;
2021-10-31 15:39:16 -04:00
ssb . sqlStream (
2022-02-05 22:28:29 -05:00
'SELECT MAX(sequence) AS sequence FROM messages WHERE author = ?1' ,
2021-10-30 17:07:01 -04:00
[ author ] ,
function ( row ) {
2021-10-31 15:39:16 -04:00
if ( row . sequence ) {
2021-12-27 14:52:42 -05:00
sequence = row . sequence ;
2021-10-31 15:39:16 -04:00
}
2021-10-30 17:07:01 -04:00
} ) ;
return sequence ;
}
2021-12-22 14:57:34 -05:00
function storeMessage ( message ) {
var payload = message . message . value ? message . message . value : message . message ;
if ( typeof ( payload ) == 'object' ) {
ssb . storeMessage ( payload ) ;
}
}
2022-11-02 19:34:44 -04:00
function tunnel _attendants ( request ) {
if ( request . message . type !== 'state' ) {
throw Error ( 'Unexpected type: ' + request . message . type ) ;
}
let state = new Set ( request . message . ids ) ;
for ( let id of state ) {
request . add _room _attendant ( id ) ;
}
request . more ( function attendants ( message ) {
if ( message . message . type === 'joined' ) {
request . add _room _attendant ( message . message . id ) ;
state . add ( message . message . id ) ;
} else if ( message . message . type === 'left' ) {
request . remove _room _attendant ( message . message . id ) ;
state . delete ( message . message . id ) ;
} else {
throw Error ( 'Unexpected type: ' + message . type ) ;
}
} ) ;
}
2022-11-11 21:00:49 -05:00
function send _blobs _create _wants ( connection ) {
connection . send _json ( { 'name' : [ 'blobs' , 'createWants' ] , 'type' : 'source' , 'args' : [ ] } , function on _blob _create _wants ( message ) {
2022-11-13 17:02:54 -05:00
if ( message . message ? . name === 'Error' ) {
return ;
}
2022-11-11 21:00:49 -05:00
Object . keys ( message . message ) . forEach ( function ( id ) {
if ( message . message [ id ] < 0 ) {
if ( g _wants _requests [ connection . id ] ) {
delete connection . active _blob _wants [ id ] ;
var blob = ssb . blobGet ( id ) ;
if ( blob ) {
var out _message = { } ;
out _message [ id ] = blob . byteLength ;
g _wants _requests [ connection . id ] . send _json ( out _message ) ;
}
}
} else {
var received _bytes = 0 ;
var expected _bytes = message . message [ id ] ;
var buffer = new Uint8Array ( expected _bytes ) ;
connection . send _json ( { 'name' : [ 'blobs' , 'get' ] , 'type' : 'source' , 'args' : [ id ] } , function ( message ) {
if ( message . flags & 0x4 /* end */ ) {
delete connection . active _blob _wants [ id ] ;
} else {
buffer . set ( new Uint8Array ( message . message , 0 , message . message . byteLength ) , received _bytes ) ;
received _bytes += message . message . byteLength ;
if ( received _bytes == expected _bytes ) {
ssb . blobStore ( buffer ) ;
}
}
} ) ;
if ( g _wants _requests [ connection . id ] && Object . keys ( connection . active _blob _wants ) . length < k _blobs _concurrent _target ) {
requestMoreBlobs ( g _wants _requests [ connection . id ] ) ;
}
}
} ) ;
} ) ;
}
2022-11-02 19:34:44 -04:00
ssb . addEventListener ( 'connections' , function on _connections _changed ( change , connection ) {
2021-09-06 13:50:38 -04:00
if ( change == 'add' ) {
2022-11-11 21:00:49 -05:00
connection . active _blob _wants = { } ;
2021-10-31 15:39:16 -04:00
var sequence = get _latest _sequence _for _author ( connection . id ) ;
2021-12-27 14:52:42 -05:00
if ( k _use _create _history _stream ) {
connection . send _json ( { 'name' : [ 'createHistoryStream' ] , 'type' : 'source' , 'args' : [ { 'id' : connection . id , 'seq' : sequence , 'live' : true , 'keys' : false } ] } , storeMessage ) ;
2022-07-31 15:01:08 -04:00
var identities = ssb . getAllIdentities ( ) ;
followingDeep ( g _database , identities , 2 ) . then ( function ( ids ) {
2021-12-27 14:52:42 -05:00
for ( let id of ids ) {
2022-07-31 15:01:08 -04:00
if ( identities . indexOf ( id ) != - 1 ) {
2021-12-27 14:52:42 -05:00
continue ;
}
var sequence = get _latest _sequence _for _author ( id ) ;
connection . send _json ( { 'name' : [ 'createHistoryStream' ] , 'type' : 'source' , 'args' : [ { 'id' : id , 'seq' : sequence , 'live' : true , 'keys' : false } ] } , storeMessage ) ;
}
} ) ;
} else {
if ( connection . is _client ) {
2022-11-11 21:00:49 -05:00
connection . send _json ( { "name" : [ "ebt" , "replicate" ] , "args" : [ { "version" : 3 , "format" : "classic" } ] , "type" : "duplex" } , ebtReplicateClient ) ;
connection . send _json _async ( { 'name' : [ 'tunnel' , 'isRoom' ] , 'args' : [ ] } , function tunnel _is _room ( request ) {
2022-11-02 19:34:44 -04:00
if ( request . message ) {
2022-11-11 21:00:49 -05:00
request . connection . send _json ( { 'name' : [ 'room' , 'attendants' ] , 'args' : [ ] , 'type' : 'source' } , tunnel _attendants ) ;
2022-11-02 19:34:44 -04:00
}
} ) ;
2021-12-27 14:52:42 -05:00
}
2022-11-11 21:00:49 -05:00
send _blobs _create _wants ( connection ) ;
2021-12-27 14:52:42 -05:00
}
2021-09-06 13:50:38 -04:00
} else if ( change == 'remove' ) {
2021-11-03 18:15:46 -04:00
print ( 'REMOVE' , connection . id ) ;
2022-11-02 19:34:44 -04:00
notify _attendant _changed ( connection . id , 'left' ) ;
delete g _attendants [ connection . id ] ;
2021-09-06 13:50:38 -04:00
delete g _wants _requests [ connection . id ] ;
} else {
2021-11-03 18:15:46 -04:00
print ( 'CHANGE' , change ) ;
2021-09-06 13:50:38 -04:00
}
} ) ;
2021-12-28 10:25:04 -05:00
function blob _want _discovered ( request , id ) {
2022-11-09 18:25:22 -05:00
if ( ! request || ! request . connection || Object . keys ( request . connection . active _blob _wants ) . length > k _blobs _concurrent _target ) {
2021-12-28 10:25:04 -05:00
return ;
2021-09-06 13:50:38 -04:00
}
2021-12-28 10:25:04 -05:00
var message = { } ;
message [ id ] = - 1 ;
request . send _json ( message ) ;
request . connection . active _blob _wants [ id ] = true ;
}
function requestMoreBlobs ( request ) {
2021-09-06 13:50:38 -04:00
ssb . sqlStream (
2021-12-28 10:25:04 -05:00
'SELECT id FROM blob_wants LIMIT ' + k _blobs _concurrent _target ,
2021-09-06 13:50:38 -04:00
[ ] ,
2021-12-28 10:25:04 -05:00
row => blob _want _discovered ( request , row . id ) ) ;
}
ssb . addRpc ( [ 'blobs' , 'createWants' ] , function ( request ) {
g _wants _requests [ request . connection . id ] = request ;
ssb . addEventListener ( 'blob_want_added' , id => blob _want _discovered ( request , id ) ) ;
requestMoreBlobs ( request ) ;
2021-09-06 13:50:38 -04:00
} ) ;
2022-11-02 19:34:44 -04:00
function notify _attendant _changed ( id , type ) {
2022-11-13 17:02:54 -05:00
if ( ! id ) {
print ( ` notify_attendant_changed called with id= ${ id } ` ) ;
2022-11-15 19:11:03 -05:00
return ;
2022-11-13 17:02:54 -05:00
}
2022-11-02 19:34:44 -04:00
for ( let r of Object . values ( g _attendants ) ) {
try {
r . send _json ( {
type : type ,
id : id ,
} ) ;
} catch ( e ) {
2022-11-13 17:02:54 -05:00
print ( ` Removing ${ id } from g_attendants in ${ type } . ` , e ) ;
delete g _attendants [ id ] ;
2022-11-02 19:34:44 -04:00
}
}
}
ssb . addRpc ( [ 'room' , 'attendants' ] , function ( request ) {
let ids = Object . keys ( g _attendants ) . sort ( ) ;
request . send _json ( {
type : 'state' ,
ids : ids ,
} ) ;
notify _attendant _changed ( request . connection . id , 'joined' ) ;
g _attendants [ request . connection . id ] = request ;
} ) ;
2021-12-27 14:52:42 -05:00
function ebtReplicateSendClock ( request , have ) {
2022-07-31 15:01:08 -04:00
var identities = ssb . getAllIdentities ( ) ;
2021-12-27 14:52:42 -05:00
var message = { } ;
2022-01-22 15:13:14 -05:00
var last _sent = request . connection . sent _clock || { } ;
2022-07-31 15:01:08 -04:00
var ids = followingDeep ( g _database , identities , 2 ) . concat ( [ request . connection . id ] ) ;
2022-01-22 15:47:10 -05:00
if ( ! Object . keys ( last _sent ) . length ) {
2022-01-22 15:13:14 -05:00
for ( let id of ids ) {
2022-02-05 22:49:47 -05:00
message [ id ] = get _latest _sequence _for _author ( id ) ;
2022-01-22 15:13:14 -05:00
}
2021-12-27 14:52:42 -05:00
}
2021-12-28 10:25:04 -05:00
for ( let id of Object . keys ( have ) ) {
if ( message [ id ] === undefined ) {
var sequence = get _latest _sequence _for _author ( id ) ;
message [ id ] = sequence ? sequence : - 1 ;
}
}
2021-12-27 14:52:42 -05:00
var to _send = { }
2022-02-10 21:44:27 -05:00
var offset = Math . floor ( Math . random ( ) * ids . length ) ;
for ( var i = 0 ; i < ids . length ; i ++ ) {
var id = ids [ ( i + offset ) % ids . length ] ;
2021-12-27 14:52:42 -05:00
if ( last _sent [ id ] === undefined || message [ id ] > last _sent [ id ] ) {
last _sent [ id ] = to _send [ id ] = message [ id ] === - 1 ? - 1 : message [ id ] << 1 ;
}
2021-12-28 10:25:04 -05:00
if ( Object . keys ( to _send ) . length >= 32 ) {
request . send _json ( to _send ) ;
to _send = { } ;
}
2021-12-27 14:52:42 -05:00
}
request . connection . sent _clock = last _sent ;
if ( Object . keys ( to _send ) . length ) {
request . send _json ( to _send ) ;
}
}
function formatMessage ( row ) {
2022-02-02 21:00:05 -05:00
if ( row . sequence _before _author ) {
return {
previous : row . previous ,
sequence : row . sequence ,
author : row . author ,
timestamp : row . timestamp ,
hash : row . hash ,
content : JSON . parse ( row . content ) ,
signature : row . signature ,
} ;
} else {
return {
previous : row . previous ,
author : row . author ,
sequence : row . sequence ,
timestamp : row . timestamp ,
hash : row . hash ,
content : JSON . parse ( row . content ) ,
signature : row . signature ,
} ;
}
2021-12-27 14:52:42 -05:00
}
function ebtReplicateRegisterMessageCallback ( request ) {
ssb . addEventListener ( 'message' , function ( message _id ) {
ssb . sqlStream (
2022-07-31 15:01:08 -04:00
'SELECT previous, author, id, sequence, timestamp, hash, content, signature FROM messages WHERE id = ?1' ,
[ message _id ] ,
2021-12-27 14:52:42 -05:00
function ( row ) {
request . send _json ( formatMessage ( row ) ) ;
} ) ;
} ) ;
}
function ebtReplicateCommon ( request ) {
if ( request . message . author ) {
storeMessage ( request ) ;
} else {
ebtReplicateSendClock ( request , request . message ) ;
for ( let id of Object . keys ( request . message ) ) {
if ( request . message [ id ] >= 0 && ( request . message [ id ] & 1 ) == 0 ) {
ssb . sqlStream (
'SELECT previous, author, id, sequence, timestamp, hash, content, signature FROM messages WHERE author = ?1 AND sequence >= ?2 ORDER BY sequence' ,
[ id , request . message [ id ] >> 1 ] ,
function ( row ) {
request . send _json ( formatMessage ( row ) ) ;
} ) ;
}
}
}
}
function ebtReplicateClient ( request ) {
2022-11-11 21:00:49 -05:00
if ( request . message ? . name !== 'Error' ) {
if ( ! request . connection . message _registered ) {
ebtReplicateRegisterMessageCallback ( request ) ;
request . connection . message _registered = true ;
}
ebtReplicateCommon ( request ) ;
2021-12-27 14:52:42 -05:00
}
}
function ebtReplicateServer ( request ) {
ebtReplicateRegisterMessageCallback ( request ) ;
ebtReplicateSendClock ( request , { } ) ;
request . more ( ebtReplicateCommon ) ;
}
ssb . addRpc ( [ 'ebt' , 'replicate' ] , ebtReplicateServer ) ;
2021-11-07 17:28:58 -05:00
ssb . addRpc ( [ 'createHistoryStream' ] , function ( request ) {
2021-09-06 13:50:38 -04:00
var id = request . args [ 0 ] . id ;
var seq = request . args [ 0 ] . seq ;
2021-11-05 22:10:13 -04:00
var keys = request . args [ 0 ] . keys || request . args [ 0 ] . keys === undefined ;
2021-11-10 19:05:07 -05:00
function sendMessage ( row ) {
if ( keys ) {
var message = {
key : row . id ,
2022-02-02 21:00:05 -05:00
value : formatMessage ( row ) ,
2021-11-10 19:05:07 -05:00
timestamp : row . timestamp ,
} ;
} else {
2022-02-02 21:00:05 -05:00
var message = formatMessage ( row ) ;
2021-11-10 19:05:07 -05:00
}
request . send _json ( message ) ;
}
ssb . sqlStream (
2022-02-02 21:00:05 -05:00
'SELECT previous, author, id, sequence, timestamp, hash, content, signature, sequence_before_author FROM messages WHERE author = ?1 AND sequence >= ?2 ORDER BY sequence' ,
2021-11-10 19:05:07 -05:00
[ id , seq ? ? 0 ] ,
sendMessage ) ;
2021-12-20 12:00:25 -05:00
ssb . addEventListener ( 'message' , function ( message _id ) {
2021-11-10 19:05:07 -05:00
ssb . sqlStream (
2022-02-02 21:00:05 -05:00
'SELECT previous, author, id, sequence, timestamp, hash, content, signature, sequence_before_author FROM messages WHERE id = ?1 AND author = ?2' ,
2021-12-27 14:52:42 -05:00
[ message _id , id ] ,
2021-12-20 12:00:25 -05:00
function ( row ) {
2021-12-27 14:52:42 -05:00
sendMessage ( row ) ;
2021-12-20 12:00:25 -05:00
} ) ;
2021-11-10 19:05:07 -05:00
} ) ;
2021-09-06 13:50:38 -04:00
} ) ;