2021-12-28 10:25:04 -05:00
"use strict" ;
2021-09-06 13:50:38 -04:00
var g _wants _requests = { } ;
2021-11-03 19:23:20 -04:00
var g _database = new Database ( 'core' ) ;
2021-12-27 14:52:42 -05:00
const k _use _create _history _stream = false ;
2021-12-28 10:25:04 -05:00
const k _blobs _concurrent _target = 8 ;
2021-09-06 13:50:38 -04:00
2021-12-27 14:52:42 -05:00
function following ( db , id ) {
var o = db . get ( id + ":following" ) ;
2021-09-08 20:15:57 -04:00
const k _version = 5 ;
var f = o ? JSON . parse ( o ) : o ;
if ( ! f || f . version != k _version ) {
f = { users : [ ] , sequence : 0 , version : k _version } ;
}
f . users = new Set ( f . users ) ;
2021-12-27 14:52:42 -05:00
ssb . sqlStream (
2021-09-08 20:15:57 -04:00
"SELECT " +
" sequence, " +
" json_extract(content, '$.contact') AS contact, " +
" json_extract(content, '$.following') AS following " +
"FROM messages " +
"WHERE " +
" author = ?1 AND " +
" sequence > ?2 AND " +
" json_extract(content, '$.type') = 'contact' " +
"UNION SELECT MAX(sequence) AS sequence, NULL, NULL FROM messages WHERE author = ?1 " +
"ORDER BY sequence" ,
[ id , f . sequence ] ,
function ( row ) {
if ( row . following ) {
f . users . add ( row . contact ) ;
} else {
f . users . delete ( row . contact ) ;
}
f . sequence = row . sequence ;
} ) ;
f . users = Array . from ( f . users ) ;
var j = JSON . stringify ( f ) ;
if ( o != j ) {
2021-12-27 14:52:42 -05:00
db . set ( id + ":following" , j ) ;
2021-09-08 20:15:57 -04:00
}
return f . users ;
}
2021-12-27 14:52:42 -05:00
function followingDeep ( db , seed _ids , depth ) {
2021-09-08 20:15:57 -04:00
if ( depth <= 0 ) {
return seed _ids ;
}
2021-12-27 14:52:42 -05:00
var f = seed _ids . map ( x => following ( db , x ) ) ;
2021-09-08 20:15:57 -04:00
var ids = [ ] . concat ( ... f ) ;
2021-12-27 14:52:42 -05:00
var x = followingDeep ( db , [ ... new Set ( ids ) ] . sort ( ) , depth - 1 ) ;
2021-11-05 22:10:13 -04:00
x = [ ... new Set ( [ ] . concat ( ... x , ... seed _ids ) ) ] . sort ( ) ;
2021-09-08 20:15:57 -04:00
return x ;
}
2021-10-31 15:39:16 -04:00
function get _latest _sequence _for _author ( author ) {
2021-10-30 17:07:01 -04:00
var sequence = 0 ;
2021-10-31 15:39:16 -04:00
ssb . sqlStream (
2022-02-01 21:13:38 -05:00
'SELECT MIN(a.sequence) FROM messages a LEFT OUTER JOIN messages b ON a.author = b.author AND a.sequence + 1 = b.sequence WHERE a.author = ?1 AND b.content IS NULL' ,
// TODO: Should we never have allowed inserting if we couldn't
// validate previous? The query above helps us fill in gaps in
// feeds.
//'SELECT MAX(sequence) AS sequence FROM messages WHERE author = ?1',
2021-10-30 17:07:01 -04:00
[ author ] ,
function ( row ) {
2021-10-31 15:39:16 -04:00
if ( row . sequence ) {
2021-12-27 14:52:42 -05:00
sequence = row . sequence ;
2021-10-31 15:39:16 -04:00
}
2021-10-30 17:07:01 -04:00
} ) ;
return sequence ;
}
2021-12-22 14:57:34 -05:00
function storeMessage ( message ) {
var payload = message . message . value ? message . message . value : message . message ;
if ( typeof ( payload ) == 'object' ) {
ssb . storeMessage ( payload ) ;
}
}
2021-11-07 17:28:58 -05:00
ssb . addEventListener ( 'connections' , function ( change , connection ) {
2021-09-06 13:50:38 -04:00
if ( change == 'add' ) {
2021-10-31 15:39:16 -04:00
var sequence = get _latest _sequence _for _author ( connection . id ) ;
2021-12-27 14:52:42 -05:00
if ( k _use _create _history _stream ) {
connection . send _json ( { 'name' : [ 'createHistoryStream' ] , 'type' : 'source' , 'args' : [ { 'id' : connection . id , 'seq' : sequence , 'live' : true , 'keys' : false } ] } , storeMessage ) ;
var me = ssb . whoami ( ) ;
followingDeep ( g _database , [ me ] , 2 ) . then ( function ( ids ) {
for ( let id of ids ) {
if ( id == me ) {
continue ;
}
var sequence = get _latest _sequence _for _author ( id ) ;
connection . send _json ( { 'name' : [ 'createHistoryStream' ] , 'type' : 'source' , 'args' : [ { 'id' : id , 'seq' : sequence , 'live' : true , 'keys' : false } ] } , storeMessage ) ;
}
} ) ;
} else {
if ( connection . is _client ) {
connection . send _json ( { "name" : [ "ebt" , "replicate" ] , "args" : [ { "version" : 3 , "format" : "classic" } ] , "type" : "duplex" } , ebtReplicateClient ) ;
}
}
2021-12-28 10:25:04 -05:00
connection . active _blob _wants = { } ;
2021-09-06 13:50:38 -04:00
connection . send _json ( { 'name' : [ 'blobs' , 'createWants' ] , 'type' : 'source' , 'args' : [ ] } , function ( message ) {
Object . keys ( message . message ) . forEach ( function ( id ) {
if ( message . message [ id ] < 0 ) {
2022-02-01 21:13:38 -05:00
if ( g _wants _requests [ connection . id ] ) {
delete connection . active _blob _wants [ id ] ;
var blob = ssb . blobGet ( id ) ;
if ( blob ) {
var out _message = { } ;
out _message [ id ] = blob . byteLength ;
g _wants _requests [ connection . id ] . send _json ( out _message ) ;
}
2021-09-06 13:50:38 -04:00
}
} else {
2021-10-31 17:15:18 -04:00
var received _bytes = 0 ;
var expected _bytes = message . message [ id ] ;
var buffer = new Uint8Array ( expected _bytes ) ;
2021-10-30 17:07:01 -04:00
connection . send _json ( { 'name' : [ 'blobs' , 'get' ] , 'type' : 'source' , 'args' : [ id ] } , function ( message ) {
2021-12-28 10:25:04 -05:00
if ( message . flags & 0x4 /* end */ ) {
delete connection . active _blob _wants [ id ] ;
} else {
buffer . set ( new Uint8Array ( message . message , 0 , message . message . byteLength ) , received _bytes ) ;
received _bytes += message . message . byteLength ;
if ( received _bytes == expected _bytes ) {
ssb . blobStore ( buffer ) ;
}
2021-10-31 17:15:18 -04:00
}
2021-09-06 13:50:38 -04:00
} ) ;
2021-12-28 10:25:04 -05:00
if ( Object . keys ( connection . active _blob _wants ) . length < k _blobs _concurrent _target ) {
requestMoreBlobs ( g _wants _requests [ connection . id ] ) ;
}
2021-09-06 13:50:38 -04:00
}
} ) ;
} ) ;
} else if ( change == 'remove' ) {
2021-11-03 18:15:46 -04:00
print ( 'REMOVE' , connection . id ) ;
2021-09-06 13:50:38 -04:00
delete g _wants _requests [ connection . id ] ;
} else {
2021-11-03 18:15:46 -04:00
print ( 'CHANGE' , change ) ;
2021-09-06 13:50:38 -04:00
}
} ) ;
2021-12-28 10:25:04 -05:00
function blob _want _discovered ( request , id ) {
if ( Object . keys ( request . connection . active _blob _wants ) . length > k _blobs _concurrent _target ) {
return ;
2021-09-06 13:50:38 -04:00
}
2021-12-28 10:25:04 -05:00
var message = { } ;
message [ id ] = - 1 ;
request . send _json ( message ) ;
request . connection . active _blob _wants [ id ] = true ;
}
function requestMoreBlobs ( request ) {
2021-09-06 13:50:38 -04:00
ssb . sqlStream (
2021-12-28 10:25:04 -05:00
'SELECT id FROM blob_wants LIMIT ' + k _blobs _concurrent _target ,
2021-09-06 13:50:38 -04:00
[ ] ,
2021-12-28 10:25:04 -05:00
row => blob _want _discovered ( request , row . id ) ) ;
}
ssb . addRpc ( [ 'blobs' , 'createWants' ] , function ( request ) {
g _wants _requests [ request . connection . id ] = request ;
ssb . addEventListener ( 'blob_want_added' , id => blob _want _discovered ( request , id ) ) ;
requestMoreBlobs ( request ) ;
2021-09-06 13:50:38 -04:00
} ) ;
2021-11-07 17:28:58 -05:00
ssb . addRpc ( [ 'blobs' , 'has' ] , function ( request ) {
2021-09-06 13:50:38 -04:00
var found = false ;
ssb . sqlStream (
'SELECT 1 FROM blobs where id = ?1' ,
[ request . args [ 0 ] ] ,
function ( row ) {
found = true ;
} ) ;
request . send _json ( found ) ;
} ) ;
2021-11-07 17:28:58 -05:00
ssb . addRpc ( [ 'blobs' , 'get' ] , function ( request ) {
2021-12-28 16:48:03 -05:00
for ( let arg of request . args ) {
var blob ;
if ( arg . key ) {
blob = ssb . blobGet ( arg . key ) ;
} else {
blob = ssb . blobGet ( arg ) ;
}
const k _send _max = 8192 ;
if ( blob . byteLength > k _send _max ) {
for ( var i = 0 ; i < blob . byteLength ; i += k _send _max ) {
var buffer = new Uint8Array ( blob , i , Math . min ( blob . byteLength - i , k _send _max ) ) ;
request . send _binary ( buffer ) ;
}
} else {
request . send _binary ( blob ) ;
}
request . send _json _end ( true ) ;
2021-10-31 15:39:16 -04:00
}
2021-12-28 16:48:03 -05:00
request . more ( function ( request ) { } ) ;
2021-09-06 13:50:38 -04:00
} ) ;
2021-12-22 14:57:34 -05:00
ssb . addRpc ( [ 'gossip' , 'ping' ] , function ( request ) {
request . more ( function ( message ) {
2021-12-27 14:52:42 -05:00
message . send _json ( Date . now ( ) ) ;
2021-12-22 14:57:34 -05:00
} ) ;
} ) ;
2021-12-27 14:52:42 -05:00
ssb . addRpc ( [ 'tunnel' , 'isRoom' ] , function ( request ) {
request . send _json ( false ) ;
} ) ;
function ebtReplicateSendClock ( request , have ) {
var me = ssb . whoami ( ) ;
var message = { } ;
2022-01-22 15:13:14 -05:00
var last _sent = request . connection . sent _clock || { } ;
2021-12-28 10:25:04 -05:00
var ids = followingDeep ( g _database , [ me ] , 2 ) . concat ( [ request . connection . id ] ) ;
2022-01-22 15:47:10 -05:00
if ( ! Object . keys ( last _sent ) . length ) {
2022-01-22 15:13:14 -05:00
for ( let id of ids ) {
message [ id ] = get _latest _sequence _for _author ( id ) ;
}
2021-12-27 14:52:42 -05:00
}
2021-12-28 10:25:04 -05:00
for ( let id of Object . keys ( have ) ) {
if ( message [ id ] === undefined ) {
var sequence = get _latest _sequence _for _author ( id ) ;
message [ id ] = sequence ? sequence : - 1 ;
}
}
2021-12-27 14:52:42 -05:00
var to _send = { }
for ( let id of ids ) {
if ( last _sent [ id ] === undefined || message [ id ] > last _sent [ id ] ) {
last _sent [ id ] = to _send [ id ] = message [ id ] === - 1 ? - 1 : message [ id ] << 1 ;
}
2021-12-28 10:25:04 -05:00
if ( Object . keys ( to _send ) . length >= 32 ) {
request . send _json ( to _send ) ;
to _send = { } ;
}
2021-12-27 14:52:42 -05:00
}
request . connection . sent _clock = last _sent ;
if ( Object . keys ( to _send ) . length ) {
request . send _json ( to _send ) ;
}
}
function formatMessage ( row ) {
2022-02-02 21:00:05 -05:00
if ( row . sequence _before _author ) {
return {
previous : row . previous ,
sequence : row . sequence ,
author : row . author ,
timestamp : row . timestamp ,
hash : row . hash ,
content : JSON . parse ( row . content ) ,
signature : row . signature ,
} ;
} else {
return {
previous : row . previous ,
author : row . author ,
sequence : row . sequence ,
timestamp : row . timestamp ,
hash : row . hash ,
content : JSON . parse ( row . content ) ,
signature : row . signature ,
} ;
}
2021-12-27 14:52:42 -05:00
}
function ebtReplicateRegisterMessageCallback ( request ) {
var me = ssb . whoami ( ) ;
ssb . addEventListener ( 'message' , function ( message _id ) {
ssb . sqlStream (
'SELECT previous, author, id, sequence, timestamp, hash, content, signature FROM messages WHERE id = ?1 AND author = ?2' ,
[ message _id , me ] ,
function ( row ) {
request . send _json ( formatMessage ( row ) ) ;
} ) ;
} ) ;
}
function ebtReplicateCommon ( request ) {
var me = ssb . whoami ( ) ;
if ( request . message . author ) {
storeMessage ( request ) ;
} else {
ebtReplicateSendClock ( request , request . message ) ;
for ( let id of Object . keys ( request . message ) ) {
if ( request . message [ id ] >= 0 && ( request . message [ id ] & 1 ) == 0 ) {
ssb . sqlStream (
'SELECT previous, author, id, sequence, timestamp, hash, content, signature FROM messages WHERE author = ?1 AND sequence >= ?2 ORDER BY sequence' ,
[ id , request . message [ id ] >> 1 ] ,
function ( row ) {
request . send _json ( formatMessage ( row ) ) ;
} ) ;
}
}
}
}
function ebtReplicateClient ( request ) {
if ( ! request . connection . message _registered ) {
ebtReplicateRegisterMessageCallback ( request ) ;
request . connection . message _registered = true ;
}
ebtReplicateCommon ( request ) ;
}
function ebtReplicateServer ( request ) {
ebtReplicateRegisterMessageCallback ( request ) ;
ebtReplicateSendClock ( request , { } ) ;
request . more ( ebtReplicateCommon ) ;
}
ssb . addRpc ( [ 'ebt' , 'replicate' ] , ebtReplicateServer ) ;
2021-11-07 17:28:58 -05:00
ssb . addRpc ( [ 'createHistoryStream' ] , function ( request ) {
2021-09-06 13:50:38 -04:00
var id = request . args [ 0 ] . id ;
var seq = request . args [ 0 ] . seq ;
2021-11-05 22:10:13 -04:00
var keys = request . args [ 0 ] . keys || request . args [ 0 ] . keys === undefined ;
2021-11-10 19:05:07 -05:00
function sendMessage ( row ) {
if ( keys ) {
var message = {
key : row . id ,
2022-02-02 21:00:05 -05:00
value : formatMessage ( row ) ,
2021-11-10 19:05:07 -05:00
timestamp : row . timestamp ,
} ;
} else {
2022-02-02 21:00:05 -05:00
var message = formatMessage ( row ) ;
2021-11-10 19:05:07 -05:00
}
request . send _json ( message ) ;
}
ssb . sqlStream (
2022-02-02 21:00:05 -05:00
'SELECT previous, author, id, sequence, timestamp, hash, content, signature, sequence_before_author FROM messages WHERE author = ?1 AND sequence >= ?2 ORDER BY sequence' ,
2021-11-10 19:05:07 -05:00
[ id , seq ? ? 0 ] ,
sendMessage ) ;
2021-12-20 12:00:25 -05:00
ssb . addEventListener ( 'message' , function ( message _id ) {
2021-11-10 19:05:07 -05:00
ssb . sqlStream (
2022-02-02 21:00:05 -05:00
'SELECT previous, author, id, sequence, timestamp, hash, content, signature, sequence_before_author FROM messages WHERE id = ?1 AND author = ?2' ,
2021-12-27 14:52:42 -05:00
[ message _id , id ] ,
2021-12-20 12:00:25 -05:00
function ( row ) {
2021-12-27 14:52:42 -05:00
sendMessage ( row ) ;
2021-12-20 12:00:25 -05:00
} ) ;
2021-11-10 19:05:07 -05:00
} ) ;
2021-09-06 13:50:38 -04:00
} ) ;