2022-11-16 20:36:24 -05:00
# include "ssb.rpc.h"
2023-06-17 10:05:23 -04:00
# include "log.h"
2022-11-16 20:36:24 -05:00
# include "mem.h"
# include "ssb.h"
# include "ssb.db.h"
# include "util.js.h"
2023-05-21 17:36:51 -04:00
# include "sqlite3.h"
2023-06-17 10:05:23 -04:00
# include "uv.h"
2022-12-29 20:23:44 -05:00
2022-11-16 20:36:24 -05:00
# include <inttypes.h>
2023-07-26 22:51:42 -04:00
# include <stdlib.h>
2022-11-16 20:36:24 -05:00
# include <string.h>
# include <time.h>
2023-01-07 19:25:38 -05:00
# if !defined(_countof)
# define _countof(a) ((int)(sizeof((a)) / sizeof(*(a))))
# endif
2023-01-18 18:43:49 -05:00
static void _tf_ssb_connection_send_history_stream ( tf_ssb_connection_t * connection , int32_t request_number , const char * author , int64_t sequence , bool keys , bool live ) ;
2023-04-12 19:22:33 -04:00
static void _tf_ssb_connection_send_history_stream_internal ( tf_ssb_connection_t * connection , int32_t request_number , const char * author , int64_t sequence , bool keys , bool live ) ;
2023-07-26 22:51:42 -04:00
static void _tf_ssb_rpc_start_delete_blobs ( tf_ssb_t * ssb , int delay_ms ) ;
static int64_t _get_global_setting_int64 ( tf_ssb_t * ssb , const char * name , int64_t default_value )
{
int64_t result = default_value ;
sqlite3 * db = tf_ssb_acquire_db_reader ( ssb ) ;
sqlite3_stmt * statement ;
if ( sqlite3_prepare ( db , " SELECT json_extract(value, '$.' || ?) FROM properties WHERE id = 'core' AND key = 'settings' " , - 1 , & statement , NULL ) = = SQLITE_OK )
{
if ( sqlite3_bind_text ( statement , 1 , name , - 1 , NULL ) = = SQLITE_OK )
{
if ( sqlite3_step ( statement ) = = SQLITE_ROW )
{
result = sqlite3_column_int64 ( statement , 0 ) ;
}
}
sqlite3_finalize ( statement ) ;
}
else
{
tf_printf ( " prepare failed: %s \n " , sqlite3_errmsg ( db ) ) ;
}
tf_ssb_release_db_reader ( ssb , db ) ;
return result ;
}
static bool _get_global_setting_bool ( tf_ssb_t * ssb , const char * name , bool default_value )
{
bool result = default_value ;
sqlite3 * db = tf_ssb_acquire_db_reader ( ssb ) ;
sqlite3_stmt * statement ;
if ( sqlite3_prepare ( db , " SELECT json_extract(value, '$.' || ?) FROM properties WHERE id = 'core' AND key = 'settings' " , - 1 , & statement , NULL ) = = SQLITE_OK )
{
if ( sqlite3_bind_text ( statement , 1 , name , - 1 , NULL ) = = SQLITE_OK )
{
if ( sqlite3_step ( statement ) = = SQLITE_ROW )
{
result = sqlite3_column_int ( statement , 0 ) ! = 0 ;
}
}
sqlite3_finalize ( statement ) ;
}
else
{
tf_printf ( " prepare failed: %s \n " , sqlite3_errmsg ( db ) ) ;
}
tf_ssb_release_db_reader ( ssb , db ) ;
return result ;
}
static bool _get_global_setting_string ( tf_ssb_t * ssb , const char * name , char * out_value , size_t size )
{
bool result = false ;
sqlite3 * db = tf_ssb_acquire_db_reader ( ssb ) ;
sqlite3_stmt * statement ;
if ( sqlite3_prepare ( db , " SELECT json_extract(value, '$.' || ?) FROM properties WHERE id = 'core' AND key = 'settings' " , - 1 , & statement , NULL ) = = SQLITE_OK )
{
if ( sqlite3_bind_text ( statement , 1 , name , - 1 , NULL ) = = SQLITE_OK )
{
if ( sqlite3_step ( statement ) = = SQLITE_ROW )
{
snprintf ( out_value , size , " %s " , sqlite3_column_text ( statement , 0 ) ) ;
result = true ;
}
}
sqlite3_finalize ( statement ) ;
}
else
{
tf_printf ( " prepare failed: %s \n " , sqlite3_errmsg ( db ) ) ;
}
tf_ssb_release_db_reader ( ssb , db ) ;
return result ;
}
2023-01-16 21:17:29 -05:00
2023-01-10 21:30:07 -05:00
static void _tf_ssb_rpc_gossip_ping_callback ( tf_ssb_connection_t * connection , uint8_t flags , int32_t request_number , JSValue args , const uint8_t * message , size_t size , void * user_data )
2022-11-16 20:36:24 -05:00
{
char buffer [ 256 ] ;
2023-01-10 21:30:07 -05:00
snprintf ( buffer , sizeof ( buffer ) , " % " PRId64 , ( int64_t ) time ( NULL ) * 1000 ) ;
2022-11-16 20:36:24 -05:00
tf_ssb_connection_rpc_send (
connection ,
flags ,
- request_number ,
( const uint8_t * ) buffer ,
strlen ( buffer ) ,
NULL ,
NULL ,
NULL ) ;
2023-01-18 18:43:49 -05:00
if ( flags & k_ssb_rpc_flag_end_error )
{
tf_ssb_connection_remove_request ( connection , request_number ) ;
}
2022-11-16 20:36:24 -05:00
}
2023-01-10 21:30:07 -05:00
static void _tf_ssb_rpc_gossip_ping ( tf_ssb_connection_t * connection , uint8_t flags , int32_t request_number , JSValue args , const uint8_t * message , size_t size , void * user_data )
{
tf_ssb_connection_add_request ( connection , - request_number , _tf_ssb_rpc_gossip_ping_callback , NULL , NULL , NULL ) ;
}
2022-11-16 20:36:24 -05:00
static void _tf_ssb_rpc_blobs_get ( tf_ssb_connection_t * connection , uint8_t flags , int32_t request_number , JSValue args , const uint8_t * message , size_t size , void * user_data )
{
2023-01-14 14:49:43 -05:00
if ( flags & k_ssb_rpc_flag_end_error )
{
return ;
}
2022-11-16 20:36:24 -05:00
tf_ssb_t * ssb = tf_ssb_connection_get_ssb ( connection ) ;
JSContext * context = tf_ssb_connection_get_context ( connection ) ;
JSValue ids = JS_GetPropertyStr ( context , args , " args " ) ;
int length = tf_util_get_length ( context , ids ) ;
bool success = false ;
for ( int i = 0 ; i < length ; i + + )
{
JSValue arg = JS_GetPropertyUint32 ( context , ids , i ) ;
2023-01-01 21:11:21 -05:00
const char * id = NULL ;
if ( JS_IsString ( arg ) )
{
id = JS_ToCString ( context , arg ) ;
}
else
2022-12-29 12:01:27 -05:00
{
2022-11-16 20:36:24 -05:00
JSValue key = JS_GetPropertyStr ( context , arg , " key " ) ;
2023-01-01 21:11:21 -05:00
id = JS_ToCString ( context , key ) ;
JS_FreeValue ( context , key ) ;
2022-11-16 20:36:24 -05:00
}
uint8_t * blob = NULL ;
size_t size = 0 ;
2023-03-04 21:54:04 -05:00
const size_t k_send_max = 8192 ;
2022-11-16 20:36:24 -05:00
if ( tf_ssb_db_blob_get ( ssb , id , & blob , & size ) )
{
for ( size_t offset = 0 ; offset < size ; offset + = k_send_max )
{
tf_ssb_connection_rpc_send (
connection ,
k_ssb_rpc_flag_binary | k_ssb_rpc_flag_stream ,
- request_number ,
blob + offset ,
offset + k_send_max < = size ? k_send_max : ( size - offset ) ,
NULL ,
NULL ,
NULL ) ;
}
success = true ;
tf_free ( blob ) ;
}
JS_FreeCString ( context , id ) ;
JS_FreeValue ( context , arg ) ;
}
JS_FreeValue ( context , ids ) ;
tf_ssb_connection_rpc_send (
connection ,
2023-01-01 21:11:21 -05:00
k_ssb_rpc_flag_json | k_ssb_rpc_flag_end_error | k_ssb_rpc_flag_stream ,
2022-11-16 20:36:24 -05:00
- request_number ,
( const uint8_t * ) ( success ? " true " : " false " ) ,
strlen ( success ? " true " : " false " ) ,
NULL ,
NULL ,
NULL ) ;
}
2022-11-16 20:49:34 -05:00
static void _tf_ssb_rpc_blobs_has ( tf_ssb_connection_t * connection , uint8_t flags , int32_t request_number , JSValue args , const uint8_t * message , size_t size , void * user_data )
{
tf_ssb_t * ssb = tf_ssb_connection_get_ssb ( connection ) ;
JSContext * context = tf_ssb_connection_get_context ( connection ) ;
JSValue ids = JS_GetPropertyStr ( context , args , " args " ) ;
JSValue id = JS_GetPropertyUint32 ( context , ids , 0 ) ;
const char * id_str = JS_ToCString ( context , id ) ;
bool has = tf_ssb_db_blob_has ( ssb , id_str ) ;
JS_FreeCString ( context , id_str ) ;
JS_FreeValue ( context , id ) ;
JS_FreeValue ( context , ids ) ;
tf_ssb_connection_rpc_send (
connection ,
2023-01-18 18:43:49 -05:00
k_ssb_rpc_flag_json ,
2022-11-16 20:49:34 -05:00
- request_number ,
( const uint8_t * ) ( has ? " true " : " false " ) ,
strlen ( has ? " true " : " false " ) ,
NULL ,
NULL ,
NULL ) ;
}
2023-01-04 19:52:23 -05:00
static void _tf_ssb_rpc_blob_wants_added_callback ( tf_ssb_t * ssb , const char * id , void * user_data )
{
tf_ssb_connection_t * connection = user_data ;
tf_ssb_blob_wants_t * blob_wants = tf_ssb_connection_get_blob_wants_state ( connection ) ;
JSContext * context = tf_ssb_get_context ( ssb ) ;
JSValue message = JS_NewObject ( context ) ;
JS_SetPropertyStr ( context , message , id , JS_NewInt64 ( context , - 1 ) ) ;
2023-01-14 14:32:36 -05:00
tf_ssb_connection_rpc_send_json ( connection , k_ssb_rpc_flag_stream , - blob_wants - > request_number , message , NULL , NULL , NULL ) ;
2023-01-04 19:52:23 -05:00
JS_FreeValue ( context , message ) ;
}
static void _tf_ssb_rpc_request_more_blobs ( tf_ssb_connection_t * connection )
{
JSContext * context = tf_ssb_connection_get_context ( connection ) ;
tf_ssb_blob_wants_t * blob_wants = tf_ssb_connection_get_blob_wants_state ( connection ) ;
tf_ssb_t * ssb = tf_ssb_connection_get_ssb ( connection ) ;
2023-07-26 22:51:42 -04:00
int64_t age = _get_global_setting_int64 ( ssb , " blob_fetch_age_seconds " , - 1 ) ;
int64_t timestamp = - 1 ;
if ( age = = 0 )
{
/* Don't fetch any blobs. */
return ;
}
else if ( age > 0 )
{
int64_t now = ( int64_t ) time ( NULL ) * 1000ULL ;
timestamp = now - age * 1000ULL ;
}
2023-06-14 20:27:49 -04:00
sqlite3 * db = tf_ssb_acquire_db_reader ( ssb ) ;
2023-01-04 19:52:23 -05:00
sqlite3_stmt * statement ;
2023-07-26 22:51:42 -04:00
if ( sqlite3_prepare ( db , " SELECT id FROM blob_wants_view WHERE id > ? AND timestamp > ? ORDER BY id LIMIT 32 " , - 1 , & statement , NULL ) = = SQLITE_OK )
2023-01-04 19:52:23 -05:00
{
2023-07-26 22:51:42 -04:00
if ( sqlite3_bind_text ( statement , 1 , blob_wants - > last_id , - 1 , NULL ) = = SQLITE_OK & &
sqlite3_bind_int64 ( statement , 2 , timestamp ) = = SQLITE_OK )
2023-01-04 19:52:23 -05:00
{
while ( sqlite3_step ( statement ) = = SQLITE_ROW )
{
const char * blob = ( const char * ) sqlite3_column_text ( statement , 0 ) ;
JSValue message = JS_NewObject ( context ) ;
JS_SetPropertyStr ( context , message , blob , JS_NewInt32 ( context , - 1 ) ) ;
2023-01-14 14:32:36 -05:00
tf_ssb_connection_rpc_send_json ( connection , k_ssb_rpc_flag_stream , - blob_wants - > request_number , message , NULL , NULL , NULL ) ;
2023-01-04 19:52:23 -05:00
JS_FreeValue ( context , message ) ;
snprintf ( blob_wants - > last_id , sizeof ( blob_wants - > last_id ) , " %s " , blob ) ;
blob_wants - > wants_sent + + ;
}
}
sqlite3_finalize ( statement ) ;
}
2023-07-20 01:06:15 -04:00
else
{
tf_printf ( " prepare failed: %s \n " , sqlite3_errmsg ( db ) ) ;
}
2023-06-14 20:27:49 -04:00
tf_ssb_release_db_reader ( ssb , db ) ;
2023-01-04 19:52:23 -05:00
}
static void _tf_ssb_rpc_blobs_createWants ( tf_ssb_connection_t * connection , uint8_t flags , int32_t request_number , JSValue args , const uint8_t * message , size_t size , void * user_data )
{
tf_ssb_blob_wants_t * blob_wants = tf_ssb_connection_get_blob_wants_state ( connection ) ;
tf_ssb_t * ssb = tf_ssb_connection_get_ssb ( connection ) ;
tf_ssb_add_blob_want_added_callback ( ssb , _tf_ssb_rpc_blob_wants_added_callback , NULL , connection ) ;
blob_wants - > request_number = request_number ;
_tf_ssb_rpc_request_more_blobs ( connection ) ;
}
2022-11-26 22:12:24 -05:00
typedef struct tunnel_t
{
tf_ssb_connection_t * connection ;
int32_t request_number ;
} tunnel_t ;
void _tf_ssb_rpc_tunnel_callback ( tf_ssb_connection_t * connection , uint8_t flags , int32_t request_number , JSValue args , const uint8_t * message , size_t size , void * user_data )
{
tunnel_t * tun = user_data ;
tf_ssb_connection_rpc_send ( tun - > connection , flags , tun - > request_number , message , size , NULL , NULL , NULL ) ;
}
void _tf_ssb_rpc_tunnel_cleanup ( tf_ssb_t * ssb , void * user_data )
{
tf_free ( user_data ) ;
}
static void _tf_ssb_rpc_tunnel_connect ( tf_ssb_connection_t * connection , uint8_t flags , int32_t request_number , JSValue args , const uint8_t * message , size_t size , void * user_data )
{
tf_ssb_t * ssb = tf_ssb_connection_get_ssb ( connection ) ;
2023-01-07 19:25:38 -05:00
if ( ! _get_global_setting_bool ( ssb , " room " , true ) )
{
2023-05-23 18:16:07 -04:00
tf_ssb_connection_rpc_send_error_method_not_allowed ( connection , flags , - request_number , " tunnel.connect " ) ;
2023-01-07 19:25:38 -05:00
return ;
}
2022-11-26 22:12:24 -05:00
JSContext * context = tf_ssb_connection_get_context ( connection ) ;
JSValue arg_array = JS_GetPropertyStr ( context , args , " args " ) ;
JSValue arg = JS_GetPropertyUint32 ( context , arg_array , 0 ) ;
JSValue origin = JS_GetPropertyStr ( context , arg , " origin " ) ;
JSValue portal = JS_GetPropertyStr ( context , arg , " portal " ) ;
JSValue target = JS_GetPropertyStr ( context , arg , " target " ) ;
if ( JS_IsUndefined ( origin ) & &
! JS_IsUndefined ( portal ) & &
! JS_IsUndefined ( target ) )
{
const char * target_str = JS_ToCString ( context , target ) ;
tf_ssb_connection_t * target_connection = tf_ssb_connection_get ( ssb , target_str ) ;
2023-01-10 20:43:35 -05:00
if ( target_connection )
{
int32_t tunnel_request_number = tf_ssb_connection_next_request_number ( target_connection ) ;
const char * portal_str = JS_ToCString ( context , portal ) ;
2022-11-26 22:12:24 -05:00
2023-01-10 20:43:35 -05:00
JSValue message = JS_NewObject ( context ) ;
JSValue name = JS_NewArray ( context ) ;
JS_SetPropertyUint32 ( context , name , 0 , JS_NewString ( context , " tunnel " ) ) ;
JS_SetPropertyUint32 ( context , name , 1 , JS_NewString ( context , " connect " ) ) ;
JS_SetPropertyStr ( context , message , " name " , name ) ;
JSValue arg_obj = JS_NewObject ( context ) ;
char origin_str [ k_id_base64_len ] = " " ;
tf_ssb_connection_get_id ( connection , origin_str , sizeof ( origin_str ) ) ;
JS_SetPropertyStr ( context , arg_obj , " origin " , JS_NewString ( context , origin_str ) ) ;
JS_SetPropertyStr ( context , arg_obj , " portal " , JS_NewString ( context , portal_str ) ) ;
JS_SetPropertyStr ( context , arg_obj , " target " , JS_NewString ( context , target_str ) ) ;
JSValue arg_array = JS_NewArray ( context ) ;
JS_SetPropertyUint32 ( context , arg_array , 0 , arg_obj ) ;
JS_SetPropertyStr ( context , message , " args " , arg_array ) ;
JS_SetPropertyStr ( context , message , " type " , JS_NewString ( context , " duplex " ) ) ;
2023-01-14 14:32:36 -05:00
tf_ssb_connection_rpc_send_json (
2023-01-10 20:43:35 -05:00
target_connection ,
2023-01-14 14:32:36 -05:00
k_ssb_rpc_flag_stream ,
2023-01-10 20:43:35 -05:00
tunnel_request_number ,
2023-01-14 14:32:36 -05:00
message ,
2023-01-10 20:43:35 -05:00
NULL ,
NULL ,
NULL ) ;
2022-11-26 22:12:24 -05:00
2023-01-10 20:43:35 -05:00
tunnel_t * data0 = tf_malloc ( sizeof ( tunnel_t ) ) ;
* data0 = ( tunnel_t )
{
. connection = target_connection ,
. request_number = tunnel_request_number ,
} ;
tunnel_t * data1 = tf_malloc ( sizeof ( tunnel_t ) ) ;
* data1 = ( tunnel_t )
{
. connection = connection ,
. request_number = - request_number ,
} ;
tf_ssb_connection_add_request ( connection , - request_number , _tf_ssb_rpc_tunnel_callback , _tf_ssb_rpc_tunnel_cleanup , data0 , target_connection ) ;
tf_ssb_connection_add_request ( target_connection , tunnel_request_number , _tf_ssb_rpc_tunnel_callback , _tf_ssb_rpc_tunnel_cleanup , data1 , connection ) ;
JS_FreeValue ( context , message ) ;
JS_FreeCString ( context , portal_str ) ;
}
2022-11-26 22:12:24 -05:00
JS_FreeCString ( context , target_str ) ;
}
else if ( ! JS_IsUndefined ( origin ) & &
! JS_IsUndefined ( portal ) & &
! JS_IsUndefined ( target ) )
{
const char * origin_str = JS_ToCString ( context , origin ) ;
const char * portal_str = JS_ToCString ( context , portal ) ;
const char * target_str = JS_ToCString ( context , target ) ;
tf_ssb_connection_tunnel_create ( ssb , portal_str , - request_number , origin_str ) ;
JS_FreeCString ( context , origin_str ) ;
JS_FreeCString ( context , portal_str ) ;
JS_FreeCString ( context , target_str ) ;
}
JS_FreeValue ( context , origin ) ;
JS_FreeValue ( context , portal ) ;
JS_FreeValue ( context , target ) ;
JS_FreeValue ( context , arg ) ;
JS_FreeValue ( context , arg_array ) ;
}
2023-01-10 20:43:35 -05:00
static void _tf_ssb_rpc_room_meta ( tf_ssb_connection_t * connection , uint8_t flags , int32_t request_number , JSValue args , const uint8_t * message , size_t size , void * user_data )
2022-12-29 20:23:44 -05:00
{
tf_ssb_t * ssb = tf_ssb_connection_get_ssb ( connection ) ;
JSContext * context = tf_ssb_get_context ( ssb ) ;
JSValue response = JS_FALSE ;
if ( _get_global_setting_bool ( ssb , " room " , true ) )
{
2023-01-07 19:25:38 -05:00
char room_name [ 1024 ] = " tilde friends tunnel " ;
_get_global_setting_string ( ssb , " room_name " , room_name , sizeof ( room_name ) ) ;
2022-12-29 20:23:44 -05:00
response = JS_NewObject ( context ) ;
2023-01-07 19:25:38 -05:00
JS_SetPropertyStr ( context , response , " name " , JS_NewString ( context , room_name ) ) ;
2022-12-29 20:23:44 -05:00
JS_SetPropertyStr ( context , response , " membership " , JS_FALSE ) ;
JSValue features = JS_NewArray ( context ) ;
JS_SetPropertyUint32 ( context , features , 0 , JS_NewString ( context , " tunnel " ) ) ;
JS_SetPropertyUint32 ( context , features , 1 , JS_NewString ( context , " room1 " ) ) ;
2023-01-10 20:43:35 -05:00
JS_SetPropertyUint32 ( context , features , 2 , JS_NewString ( context , " room2 " ) ) ;
2022-12-29 20:23:44 -05:00
JS_SetPropertyStr ( context , response , " features " , features ) ;
}
2023-01-09 17:37:34 -05:00
tf_ssb_connection_rpc_send_json (
2022-12-29 20:23:44 -05:00
connection ,
2023-01-10 20:43:35 -05:00
flags ,
2022-12-29 20:23:44 -05:00
- request_number ,
2023-01-09 17:37:34 -05:00
response ,
2022-12-29 20:23:44 -05:00
NULL ,
NULL ,
NULL ) ;
JS_FreeValue ( context , response ) ;
}
2023-01-07 19:25:38 -05:00
static void _tf_ssb_rpc_room_attendants ( tf_ssb_connection_t * connection , uint8_t flags , int32_t request_number , JSValue args , const uint8_t * message , size_t size , void * user_data )
{
tf_ssb_t * ssb = tf_ssb_connection_get_ssb ( connection ) ;
if ( ! _get_global_setting_bool ( ssb , " room " , true ) )
{
2023-05-23 18:16:07 -04:00
tf_ssb_connection_rpc_send_error_method_not_allowed ( connection , flags , - request_number , " room.attendants " ) ;
2023-01-07 19:25:38 -05:00
return ;
}
JSContext * context = tf_ssb_get_context ( ssb ) ;
JSValue joined = JS_NewObject ( context ) ;
JS_SetPropertyStr ( context , joined , " type " , JS_NewString ( context , " joined " ) ) ;
char my_id [ k_id_base64_len ] = " " ;
if ( tf_ssb_connection_get_id ( connection , my_id , sizeof ( my_id ) ) )
{
JS_SetPropertyStr ( context , joined , " id " , JS_NewString ( context , my_id ) ) ;
}
JSValue state = JS_NewObject ( context ) ;
JS_SetPropertyStr ( context , state , " type " , JS_NewString ( context , " state " ) ) ;
JSValue ids = JS_NewArray ( context ) ;
int id_count = 0 ;
tf_ssb_connection_t * connections [ 1024 ] ;
int count = tf_ssb_get_connections ( ssb , connections , _countof ( connections ) ) ;
2023-01-10 20:43:35 -05:00
2023-01-07 19:25:38 -05:00
for ( int i = 0 ; i < count ; i + + )
{
char id [ k_id_base64_len ] = { 0 } ;
if ( tf_ssb_connection_is_attendant ( connections [ i ] ) & &
tf_ssb_connection_get_id ( connections [ i ] , id , sizeof ( id ) ) )
{
JS_SetPropertyUint32 ( context , ids , id_count + + , JS_NewString ( context , id ) ) ;
tf_ssb_connection_rpc_send_json (
connections [ i ] ,
flags ,
- tf_ssb_connection_get_attendant_request_number ( connections [ i ] ) ,
joined ,
NULL ,
NULL ,
NULL ) ;
}
}
JS_SetPropertyStr ( context , state , " ids " , ids ) ;
tf_ssb_connection_rpc_send_json ( connection , flags , - request_number , state , NULL , NULL , NULL ) ;
JS_FreeValue ( context , joined ) ;
JS_FreeValue ( context , state ) ;
tf_ssb_connection_set_attendant ( connection , true , request_number ) ;
}
2023-01-04 19:52:23 -05:00
typedef struct _blobs_get_t
{
char id [ k_blob_id_len ] ;
size_t received ;
size_t expected_size ;
2023-06-17 10:05:23 -04:00
bool done ;
2023-07-18 19:46:15 -04:00
bool storing ;
2023-06-17 10:05:23 -04:00
tf_ssb_t * ssb ;
uint8_t buffer [ ] ;
2023-01-04 19:52:23 -05:00
} blobs_get_t ;
2023-07-18 19:46:15 -04:00
static void _tf_ssb_rpc_blob_store_callback ( const char * id , bool is_new , void * user_data )
2023-06-17 10:05:23 -04:00
{
2023-07-18 19:46:15 -04:00
blobs_get_t * get = user_data ;
get - > storing = false ;
2023-06-17 10:05:23 -04:00
if ( get - > done )
{
tf_free ( get ) ;
}
}
2023-01-04 19:52:23 -05:00
static void _tf_ssb_rpc_connection_blobs_get_callback ( tf_ssb_connection_t * connection , uint8_t flags , int32_t request_number , JSValue args , const uint8_t * message , size_t size , void * user_data )
{
tf_ssb_t * ssb = tf_ssb_connection_get_ssb ( connection ) ;
JSContext * context = tf_ssb_get_context ( ssb ) ;
blobs_get_t * get = user_data ;
if ( ( flags & k_ssb_rpc_mask_type ) = = k_ssb_rpc_flag_binary & & size > 0 & & get - > received + size < = get - > expected_size )
{
memcpy ( get - > buffer + get - > received , message , size ) ;
get - > received + = size ;
}
else if ( ( flags & k_ssb_rpc_mask_type ) = = k_ssb_rpc_flag_json )
{
if ( JS_ToBool ( context , args ) )
{
2023-07-18 19:46:15 -04:00
get - > storing = true ;
tf_ssb_db_blob_store_async ( ssb , get - > buffer , get - > received , _tf_ssb_rpc_blob_store_callback , get ) ;
2023-01-04 19:52:23 -05:00
}
2023-07-18 19:46:15 -04:00
/* TODO: Should we send the response in the callback? */
bool stored = true ;
2023-01-04 19:52:23 -05:00
tf_ssb_connection_rpc_send (
connection ,
k_ssb_rpc_flag_json | k_ssb_rpc_flag_stream | k_ssb_rpc_flag_end_error ,
- request_number ,
( const uint8_t * ) ( stored ? " true " : " false " ) ,
strlen ( stored ? " true " : " false " ) ,
NULL ,
NULL ,
NULL ) ;
}
}
static void _tf_ssb_rpc_connection_blobs_get_cleanup ( tf_ssb_t * ssb , void * user_data )
{
2023-06-17 10:05:23 -04:00
blobs_get_t * get = user_data ;
get - > done = true ;
2023-07-18 19:46:15 -04:00
if ( ! get - > storing )
2023-06-17 10:05:23 -04:00
{
tf_free ( get ) ;
}
2023-01-04 19:52:23 -05:00
}
static void _tf_ssb_rpc_connection_blobs_get ( tf_ssb_connection_t * connection , const char * blob_id , size_t size )
{
blobs_get_t * get = tf_malloc ( sizeof ( blobs_get_t ) + size ) ;
2023-06-17 10:05:23 -04:00
* get = ( blobs_get_t ) { . ssb = tf_ssb_connection_get_ssb ( connection ) , . expected_size = size } ;
2023-01-04 19:52:23 -05:00
snprintf ( get - > id , sizeof ( get - > id ) , " %s " , blob_id ) ;
memset ( get - > buffer , 0 , size ) ;
tf_ssb_t * ssb = tf_ssb_connection_get_ssb ( connection ) ;
JSContext * context = tf_ssb_get_context ( ssb ) ;
JSValue message = JS_NewObject ( context ) ;
JSValue name = JS_NewArray ( context ) ;
JS_SetPropertyUint32 ( context , name , 0 , JS_NewString ( context , " blobs " ) ) ;
JS_SetPropertyUint32 ( context , name , 1 , JS_NewString ( context , " get " ) ) ;
JS_SetPropertyStr ( context , message , " name " , name ) ;
JS_SetPropertyStr ( context , message , " type " , JS_NewString ( context , " source " ) ) ;
JSValue args = JS_NewArray ( context ) ;
JS_SetPropertyUint32 ( context , args , 0 , JS_NewString ( context , blob_id ) ) ;
JS_SetPropertyStr ( context , message , " args " , args ) ;
2023-01-14 14:32:36 -05:00
tf_ssb_connection_rpc_send_json (
2023-01-04 19:52:23 -05:00
connection ,
2023-01-14 14:32:36 -05:00
k_ssb_rpc_flag_stream ,
2023-01-04 19:52:23 -05:00
tf_ssb_connection_next_request_number ( connection ) ,
2023-01-14 14:32:36 -05:00
message ,
2023-01-04 19:52:23 -05:00
_tf_ssb_rpc_connection_blobs_get_callback ,
_tf_ssb_rpc_connection_blobs_get_cleanup ,
get ) ;
JS_FreeValue ( context , message ) ;
}
static void _tf_ssb_rpc_connection_blobs_createWants_callback ( tf_ssb_connection_t * connection , uint8_t flags , int32_t request_number , JSValue args , const uint8_t * message , size_t size , void * user_data )
{
tf_ssb_blob_wants_t * blob_wants = tf_ssb_connection_get_blob_wants_state ( connection ) ;
if ( ! blob_wants )
{
return ;
}
tf_ssb_t * ssb = tf_ssb_connection_get_ssb ( connection ) ;
JSContext * context = tf_ssb_connection_get_context ( connection ) ;
JSValue name = JS_GetPropertyStr ( context , args , " name " ) ;
if ( ! JS_IsUndefined ( name ) )
{
/* { name: "Error" } */
JS_FreeValue ( context , name ) ;
return ;
}
JSPropertyEnum * ptab ;
uint32_t plen ;
JS_GetOwnPropertyNames ( context , & ptab , & plen , args , JS_GPN_STRING_MASK ) ;
for ( uint32_t i = 0 ; i < plen ; + + i )
{
JSValue key = JS_AtomToString ( context , ptab [ i ] . atom ) ;
JSPropertyDescriptor desc ;
JSValue key_value = JS_NULL ;
if ( JS_GetOwnProperty ( context , & desc , args , ptab [ i ] . atom ) = = 1 )
{
key_value = desc . value ;
JS_FreeValue ( context , desc . setter ) ;
JS_FreeValue ( context , desc . getter ) ;
}
const char * blob_id = JS_ToCString ( context , key ) ;
int64_t size = 0 ;
JS_ToInt64 ( context , & size , key_value ) ;
if ( - - blob_wants - > wants_sent = = 0 )
{
_tf_ssb_rpc_request_more_blobs ( connection ) ;
}
if ( size < 0 )
{
size_t blob_size = 0 ;
if ( tf_ssb_db_blob_get ( ssb , blob_id , NULL , & blob_size ) )
{
JSValue message = JS_NewObject ( context ) ;
JS_SetPropertyStr ( context , message , blob_id , JS_NewInt64 ( context , blob_size ) ) ;
2023-01-14 14:32:36 -05:00
tf_ssb_connection_rpc_send_json (
2023-01-04 19:52:23 -05:00
connection ,
2023-01-14 14:32:36 -05:00
k_ssb_rpc_flag_stream ,
2023-01-04 19:52:23 -05:00
- blob_wants - > request_number ,
2023-01-14 14:32:36 -05:00
message ,
2023-01-04 19:52:23 -05:00
NULL ,
NULL ,
NULL ) ;
JS_FreeValue ( context , message ) ;
}
else if ( size = = - 1LL )
{
JSValue message = JS_NewObject ( context ) ;
JS_SetPropertyStr ( context , message , blob_id , JS_NewInt64 ( context , - 2 ) ) ;
2023-01-14 14:32:36 -05:00
tf_ssb_connection_rpc_send_json (
2023-01-04 19:52:23 -05:00
connection ,
2023-01-14 14:32:36 -05:00
k_ssb_rpc_flag_stream ,
2023-01-04 19:52:23 -05:00
- blob_wants - > request_number ,
2023-01-14 14:32:36 -05:00
message ,
2023-01-04 19:52:23 -05:00
NULL ,
NULL ,
NULL ) ;
JS_FreeValue ( context , message ) ;
}
}
else
{
_tf_ssb_rpc_connection_blobs_get ( connection , blob_id , size ) ;
}
JS_FreeCString ( context , blob_id ) ;
JS_FreeValue ( context , key ) ;
JS_FreeValue ( context , key_value ) ;
}
for ( uint32_t i = 0 ; i < plen ; + + i )
{
JS_FreeAtom ( context , ptab [ i ] . atom ) ;
}
js_free ( context , ptab ) ;
}
2023-01-07 19:25:38 -05:00
static void _tf_ssb_rpc_connection_room_attendants_callback ( tf_ssb_connection_t * connection , uint8_t flags , int32_t request_number , JSValue args , const uint8_t * message , size_t size , void * user_data )
{
tf_ssb_t * ssb = tf_ssb_connection_get_ssb ( connection ) ;
JSContext * context = tf_ssb_get_context ( ssb ) ;
JSValue type = JS_GetPropertyStr ( context , args , " type " ) ;
const char * type_string = JS_ToCString ( context , type ) ;
if ( ! type_string )
{
tf_ssb_connection_rpc_send_error ( connection , flags , - request_number , " Missing type. " ) ;
}
else if ( strcmp ( type_string , " state " ) = = 0 )
{
tf_ssb_connection_clear_room_attendants ( connection ) ;
JSValue ids = JS_GetPropertyStr ( context , args , " ids " ) ;
int length = tf_util_get_length ( context , ids ) ;
for ( int i = 0 ; i < length ; i + + )
{
JSValue id = JS_GetPropertyUint32 ( context , ids , i ) ;
const char * id_string = JS_ToCString ( context , id ) ;
if ( id_string )
{
tf_ssb_connection_add_room_attendant ( connection , id_string ) ;
}
JS_FreeCString ( context , id_string ) ;
JS_FreeValue ( context , id ) ;
}
JS_FreeValue ( context , ids ) ;
}
else if ( strcmp ( type_string , " joined " ) = = 0 )
{
JSValue id = JS_GetPropertyStr ( context , args , " id " ) ;
const char * id_string = JS_ToCString ( context , id ) ;
if ( id_string )
{
tf_ssb_connection_add_room_attendant ( connection , id_string ) ;
}
JS_FreeCString ( context , id_string ) ;
JS_FreeValue ( context , id ) ;
}
else if ( strcmp ( type_string , " left " ) = = 0 )
{
JSValue id = JS_GetPropertyStr ( context , args , " id " ) ;
const char * id_string = JS_ToCString ( context , id ) ;
if ( id_string )
{
tf_ssb_connection_remove_room_attendant ( connection , id_string ) ;
}
JS_FreeCString ( context , id_string ) ;
JS_FreeValue ( context , id ) ;
}
else
{
tf_ssb_connection_rpc_send_error ( connection , flags , - request_number , " Unexpected room.attendants response type. " ) ;
}
JS_FreeCString ( context , type_string ) ;
JS_FreeValue ( context , type ) ;
}
2023-01-09 17:37:34 -05:00
static bool _is_error ( JSContext * context , JSValue message )
{
JSValue name = JS_GetPropertyStr ( context , message , " name " ) ;
const char * name_string = JS_ToCString ( context , name ) ;
bool is_error = false ;
if ( name_string & & strcmp ( name_string , " Error " ) = = 0 )
{
is_error = true ;
}
JS_FreeCString ( context , name_string ) ;
JS_FreeValue ( context , name ) ;
return is_error ;
}
2023-01-07 19:25:38 -05:00
static void _tf_ssb_rpc_connection_tunnel_isRoom_callback ( tf_ssb_connection_t * connection , uint8_t flags , int32_t request_number , JSValue args , const uint8_t * message , size_t size , void * user_data )
{
tf_ssb_t * ssb = tf_ssb_connection_get_ssb ( connection ) ;
JSContext * context = tf_ssb_get_context ( ssb ) ;
2023-01-09 17:37:34 -05:00
if ( _is_error ( context , args ) )
{
return ;
}
2023-01-07 19:25:38 -05:00
if ( JS_IsObject ( args ) )
{
JSValue message = JS_NewObject ( context ) ;
JSValue name = JS_NewArray ( context ) ;
JS_SetPropertyUint32 ( context , name , 0 , JS_NewString ( context , " room " ) ) ;
JS_SetPropertyUint32 ( context , name , 1 , JS_NewString ( context , " attendants " ) ) ;
JS_SetPropertyStr ( context , message , " name " , name ) ;
JS_SetPropertyStr ( context , message , " type " , JS_NewString ( context , " source " ) ) ;
JS_SetPropertyStr ( context , message , " args " , JS_NewArray ( context ) ) ;
tf_ssb_connection_rpc_send_json (
connection ,
k_ssb_rpc_flag_stream ,
tf_ssb_connection_next_request_number ( connection ) ,
message ,
_tf_ssb_rpc_connection_room_attendants_callback ,
NULL ,
NULL ) ;
JS_FreeValue ( context , message ) ;
}
}
2023-01-16 21:17:29 -05:00
typedef struct _tf_ssb_connection_send_history_stream_t
{
int32_t request_number ;
char author [ k_id_base64_len ] ;
int64_t sequence ;
bool keys ;
2023-01-18 18:43:49 -05:00
bool live ;
2023-01-16 21:17:29 -05:00
} tf_ssb_connection_send_history_stream_t ;
static void _tf_ssb_connection_send_history_stream_callback ( tf_ssb_connection_t * connection , void * user_data )
{
tf_ssb_connection_send_history_stream_t * request = user_data ;
if ( tf_ssb_connection_is_connected ( connection ) )
{
2023-04-12 19:22:33 -04:00
_tf_ssb_connection_send_history_stream_internal ( connection , request - > request_number , request - > author , request - > sequence , request - > keys , request - > live ) ;
2023-01-16 21:17:29 -05:00
}
tf_free ( request ) ;
}
2023-04-12 19:22:33 -04:00
static void _tf_ssb_connection_send_history_stream_internal ( tf_ssb_connection_t * connection , int32_t request_number , const char * author , int64_t sequence , bool keys , bool live )
2023-01-08 12:45:15 -05:00
{
tf_ssb_t * ssb = tf_ssb_connection_get_ssb ( connection ) ;
JSContext * context = tf_ssb_get_context ( ssb ) ;
2023-06-14 20:27:49 -04:00
sqlite3 * db = tf_ssb_acquire_db_reader ( ssb ) ;
2023-01-08 12:45:15 -05:00
sqlite3_stmt * statement ;
2023-01-16 21:17:29 -05:00
const int k_max = 32 ;
int64_t max_sequence_seen = 0 ;
if ( sqlite3_prepare ( db , " SELECT previous, author, id, sequence, timestamp, hash, content, signature, sequence_before_author FROM messages WHERE author = ?1 AND sequence >= ?2 AND sequence < ?3 ORDER BY sequence " , - 1 , & statement , NULL ) = = SQLITE_OK )
2023-01-08 12:45:15 -05:00
{
if ( sqlite3_bind_text ( statement , 1 , author , - 1 , NULL ) = = SQLITE_OK & &
2023-01-16 21:17:29 -05:00
sqlite3_bind_int64 ( statement , 2 , sequence ) = = SQLITE_OK & &
sqlite3_bind_int64 ( statement , 3 , sequence + k_max ) = = SQLITE_OK )
2023-01-08 12:45:15 -05:00
{
while ( sqlite3_step ( statement ) = = SQLITE_ROW )
{
JSValue message = JS_UNDEFINED ;
2023-01-16 21:17:29 -05:00
max_sequence_seen = sqlite3_column_int64 ( statement , 3 ) ;
2023-01-08 12:45:15 -05:00
JSValue formatted = tf_ssb_format_message (
context ,
( const char * ) sqlite3_column_text ( statement , 0 ) ,
( const char * ) sqlite3_column_text ( statement , 1 ) ,
sqlite3_column_int64 ( statement , 3 ) ,
sqlite3_column_double ( statement , 4 ) ,
( const char * ) sqlite3_column_text ( statement , 5 ) ,
( const char * ) sqlite3_column_text ( statement , 6 ) ,
( const char * ) sqlite3_column_text ( statement , 7 ) ,
sqlite3_column_int ( statement , 8 ) ) ;
2023-01-08 15:01:35 -05:00
if ( keys )
2023-01-08 12:45:15 -05:00
{
message = JS_NewObject ( context ) ;
JS_SetPropertyStr ( context , message , " key " , JS_NewString ( context , ( const char * ) sqlite3_column_text ( statement , 2 ) ) ) ;
JS_SetPropertyStr ( context , message , " value " , formatted ) ;
JS_SetPropertyStr ( context , message , " timestamp " , JS_NewString ( context , ( const char * ) sqlite3_column_text ( statement , 4 ) ) ) ;
}
else
{
message = formatted ;
}
2023-01-08 15:01:35 -05:00
tf_ssb_connection_rpc_send_json ( connection , k_ssb_rpc_flag_stream , request_number , message , NULL , NULL , NULL ) ;
2023-01-08 12:45:15 -05:00
JS_FreeValue ( context , message ) ;
}
}
sqlite3_finalize ( statement ) ;
}
2023-07-20 01:06:15 -04:00
else
{
tf_printf ( " prepare failed: %s \n " , sqlite3_errmsg ( db ) ) ;
}
2023-06-14 20:27:49 -04:00
tf_ssb_release_db_reader ( ssb , db ) ;
2023-01-16 21:17:29 -05:00
if ( max_sequence_seen = = sequence + k_max - 1 )
{
2023-04-12 19:22:33 -04:00
_tf_ssb_connection_send_history_stream ( connection , request_number , author , max_sequence_seen + 1 , keys , live ) ;
2023-01-16 21:17:29 -05:00
}
2023-01-18 18:43:49 -05:00
else if ( ! live )
{
tf_ssb_connection_rpc_send (
connection ,
k_ssb_rpc_flag_json ,
request_number ,
( const uint8_t * ) " false " ,
strlen ( " false " ) ,
NULL ,
NULL ,
NULL ) ;
}
2023-01-08 15:01:35 -05:00
}
2023-04-12 19:22:33 -04:00
static void _tf_ssb_connection_send_history_stream ( tf_ssb_connection_t * connection , int32_t request_number , const char * author , int64_t sequence , bool keys , bool live )
{
tf_ssb_connection_send_history_stream_t * async = tf_malloc ( sizeof ( tf_ssb_connection_send_history_stream_t ) ) ;
* async = ( tf_ssb_connection_send_history_stream_t )
{
. request_number = request_number ,
. sequence = sequence ,
. keys = keys ,
. live = live ,
} ;
snprintf ( async - > author , sizeof ( async - > author ) , " %s " , author ) ;
tf_ssb_connection_schedule_idle ( connection , _tf_ssb_connection_send_history_stream_callback , async ) ;
}
2023-01-08 15:01:35 -05:00
static void _tf_ssb_rpc_createHistoryStream ( tf_ssb_connection_t * connection , uint8_t flags , int32_t request_number , JSValue args , const uint8_t * message , size_t size , void * user_data )
{
tf_ssb_t * ssb = tf_ssb_connection_get_ssb ( connection ) ;
JSContext * context = tf_ssb_get_context ( ssb ) ;
JSValue arg_array = JS_GetPropertyStr ( context , args , " args " ) ;
JSValue arg = JS_GetPropertyUint32 ( context , arg_array , 0 ) ;
if ( JS_IsUndefined ( arg ) )
{
tf_ssb_connection_rpc_send_error ( connection , flags , - request_number , " Missing request.args in createHistoryStream. " ) ;
}
JSValue id = JS_GetPropertyStr ( context , arg , " id " ) ;
JSValue seq = JS_GetPropertyStr ( context , arg , " seq " ) ;
JSValue keys = JS_GetPropertyStr ( context , arg , " keys " ) ;
JSValue live = JS_GetPropertyStr ( context , arg , " live " ) ;
bool is_keys = JS_IsUndefined ( keys ) | | JS_ToBool ( context , keys ) > 0 ;
bool is_live = JS_ToBool ( context , live ) > 0 ;
int64_t sequence = 0 ;
JS_ToInt64 ( context , & sequence , seq ) ;
const char * author = JS_ToCString ( context , id ) ;
2023-01-18 18:43:49 -05:00
_tf_ssb_connection_send_history_stream ( connection , - request_number , author , sequence , is_keys , is_live ) ;
2023-01-08 12:45:15 -05:00
if ( is_live )
{
tf_ssb_connection_add_new_message_request ( connection , author , - request_number , is_keys ) ;
}
JS_FreeCString ( context , author ) ;
JS_FreeValue ( context , id ) ;
JS_FreeValue ( context , seq ) ;
JS_FreeValue ( context , keys ) ;
JS_FreeValue ( context , arg ) ;
JS_FreeValue ( context , arg_array ) ;
}
2023-01-08 15:01:35 -05:00
static void _tf_ssb_rpc_ebt_replicate_send_clock ( tf_ssb_connection_t * connection , int32_t request_number , JSValue message )
{
tf_ssb_t * ssb = tf_ssb_connection_get_ssb ( connection ) ;
JSContext * context = tf_ssb_get_context ( ssb ) ;
JSValue full_clock = JS_NewObject ( context ) ;
/* Ask for every identity we know is being followed from local accounts. */
const char * * visible = tf_ssb_db_get_all_visible_identities ( ssb , 2 ) ;
for ( int i = 0 ; visible [ i ] ; i + + )
{
int64_t sequence = 0 ;
tf_ssb_db_get_latest_message_by_author ( ssb , visible [ i ] , & sequence , NULL , 0 ) ;
2023-01-09 17:37:34 -05:00
JS_SetPropertyStr ( context , full_clock , visible [ i ] , JS_NewInt64 ( context , sequence = = - 1 ? - 1 : ( sequence < < 1 ) ) ) ;
2023-01-08 15:01:35 -05:00
}
2023-04-12 19:28:58 -04:00
tf_free ( visible ) ;
2023-01-08 15:01:35 -05:00
/* Ask about the incoming connection, too. */
char id [ k_id_base64_len ] = " " ;
if ( tf_ssb_connection_get_id ( connection , id , sizeof ( id ) ) )
{
JSValue in_clock = JS_GetPropertyStr ( context , full_clock , id ) ;
if ( JS_IsUndefined ( in_clock ) )
{
int64_t sequence = 0 ;
tf_ssb_db_get_latest_message_by_author ( ssb , id , & sequence , NULL , 0 ) ;
2023-01-09 17:37:34 -05:00
JS_SetPropertyStr ( context , full_clock , id , JS_NewInt64 ( context , sequence = = - 1 ? - 1 : ( sequence < < 1 ) ) ) ;
2023-01-08 15:01:35 -05:00
}
JS_FreeValue ( context , in_clock ) ;
}
/* Also respond with what we know about all requested identities. */
if ( ! JS_IsUndefined ( message ) )
{
JSPropertyEnum * ptab ;
uint32_t plen ;
JS_GetOwnPropertyNames ( context , & ptab , & plen , message , JS_GPN_STRING_MASK ) ;
for ( uint32_t i = 0 ; i < plen ; + + i )
{
JSValue in_clock = JS_GetProperty ( context , full_clock , ptab [ i ] . atom ) ;
if ( JS_IsUndefined ( in_clock ) )
{
JSValue key = JS_AtomToString ( context , ptab [ i ] . atom ) ;
const char * key_string = JS_ToCString ( context , key ) ;
if ( key_string )
{
int64_t sequence = - 1 ;
tf_ssb_db_get_latest_message_by_author ( ssb , key_string , & sequence , NULL , 0 ) ;
2023-01-09 17:37:34 -05:00
JS_SetPropertyStr ( context , full_clock , key_string , JS_NewInt64 ( context , sequence = = - 1 ? - 1 : ( sequence < < 1 ) ) ) ;
2023-01-08 15:01:35 -05:00
}
JS_FreeCString ( context , key_string ) ;
JS_FreeValue ( context , key ) ;
}
}
for ( uint32_t i = 0 ; i < plen ; + + i )
{
JS_FreeAtom ( context , ptab [ i ] . atom ) ;
}
js_free ( context , ptab ) ;
}
tf_ssb_connection_rpc_send_json ( connection , k_ssb_rpc_flag_stream , - request_number , full_clock , NULL , NULL , NULL ) ;
JS_FreeValue ( context , full_clock ) ;
}
static void _tf_ssb_rpc_ebt_replicate_send_messages ( tf_ssb_connection_t * connection , JSValue message )
{
if ( JS_IsUndefined ( message ) )
{
return ;
}
tf_ssb_t * ssb = tf_ssb_connection_get_ssb ( connection ) ;
JSContext * context = tf_ssb_get_context ( ssb ) ;
JSPropertyEnum * ptab = NULL ;
uint32_t plen = 0 ;
JS_GetOwnPropertyNames ( context , & ptab , & plen , message , JS_GPN_STRING_MASK ) ;
for ( uint32_t i = 0 ; i < plen ; + + i )
{
JSValue in_clock = JS_GetProperty ( context , message , ptab [ i ] . atom ) ;
if ( ! JS_IsUndefined ( in_clock ) )
{
JSValue key = JS_AtomToString ( context , ptab [ i ] . atom ) ;
int64_t sequence = - 1 ;
JS_ToInt64 ( context , & sequence , in_clock ) ;
const char * author = JS_ToCString ( context , key ) ;
if ( sequence > = 0 & & ( sequence & 1 ) = = 0 )
{
2023-01-09 17:37:34 -05:00
int32_t request_number = tf_ssb_connection_get_ebt_request_number ( connection ) ;
2023-01-18 18:43:49 -05:00
_tf_ssb_connection_send_history_stream ( connection , request_number , author , sequence > > 1 , false , true ) ;
2023-01-08 15:01:35 -05:00
tf_ssb_connection_add_new_message_request ( connection , author , request_number , false ) ;
}
else
{
tf_ssb_connection_remove_new_message_request ( connection , author ) ;
}
JS_FreeCString ( context , author ) ;
JS_FreeValue ( context , key ) ;
}
JS_FreeValue ( context , in_clock ) ;
}
for ( uint32_t i = 0 ; i < plen ; + + i )
{
JS_FreeAtom ( context , ptab [ i ] . atom ) ;
}
js_free ( context , ptab ) ;
}
static void _tf_ssb_rpc_ebt_replicate ( tf_ssb_connection_t * connection , uint8_t flags , int32_t request_number , JSValue args , const uint8_t * message , size_t size , void * user_data )
{
tf_ssb_t * ssb = tf_ssb_connection_get_ssb ( connection ) ;
JSContext * context = tf_ssb_get_context ( ssb ) ;
if ( _is_error ( context , args ) )
{
/* TODO: Send createHistoryStream. */
return ;
}
2023-01-09 17:37:34 -05:00
if ( ! tf_ssb_connection_get_ebt_request_number ( connection ) )
{
tf_ssb_connection_set_ebt_request_number ( connection , - request_number ) ;
}
2023-01-08 15:01:35 -05:00
JSValue author = JS_GetPropertyStr ( context , args , " author " ) ;
JSValue name = JS_GetPropertyStr ( context , args , " name " ) ;
JSValue in_clock = JS_IsUndefined ( name ) ? args : JS_UNDEFINED ;
if ( ! JS_IsUndefined ( author ) )
{
/* Looks like a message. */
2023-07-19 21:02:50 -04:00
tf_ssb_verify_strip_and_store_message ( ssb , args , NULL , NULL ) ;
2023-01-08 15:01:35 -05:00
}
else
{
/* EBT clock. */
tf_ssb_connection_set_ebt_send_clock ( connection , args ) ;
if ( ! tf_ssb_connection_get_sent_clock ( connection ) )
{
_tf_ssb_rpc_ebt_replicate_send_clock ( connection , request_number , in_clock ) ;
tf_ssb_connection_set_sent_clock ( connection , true ) ;
}
_tf_ssb_rpc_ebt_replicate_send_messages ( connection , in_clock ) ;
}
JS_FreeValue ( context , name ) ;
JS_FreeValue ( context , author ) ;
}
static void _tf_ssb_rpc_ebt_replicate_client ( tf_ssb_connection_t * connection , uint8_t flags , int32_t request_number , JSValue args , const uint8_t * message , size_t size , void * user_data )
{
_tf_ssb_rpc_ebt_replicate ( connection , flags , request_number , args , message , size , user_data ) ;
}
static void _tf_ssb_rpc_send_ebt_replicate ( tf_ssb_connection_t * connection )
{
tf_ssb_t * ssb = tf_ssb_connection_get_ssb ( connection ) ;
JSContext * context = tf_ssb_get_context ( ssb ) ;
JSValue message = JS_NewObject ( context ) ;
JSValue name = JS_NewArray ( context ) ;
JS_SetPropertyUint32 ( context , name , 0 , JS_NewString ( context , " ebt " ) ) ;
JS_SetPropertyUint32 ( context , name , 1 , JS_NewString ( context , " replicate " ) ) ;
JS_SetPropertyStr ( context , message , " name " , name ) ;
JSValue arg = JS_NewObject ( context ) ;
JS_SetPropertyStr ( context , arg , " version " , JS_NewInt32 ( context , 3 ) ) ;
JS_SetPropertyStr ( context , arg , " format " , JS_NewString ( context , " classic " ) ) ;
JSValue args = JS_NewArray ( context ) ;
JS_SetPropertyUint32 ( context , args , 0 , arg ) ;
JS_SetPropertyStr ( context , message , " args " , args ) ;
JS_SetPropertyStr ( context , message , " type " , JS_NewString ( context , " duplex " ) ) ;
2023-01-09 17:37:34 -05:00
int32_t request_number = tf_ssb_connection_next_request_number ( connection ) ;
2023-01-08 15:01:35 -05:00
tf_ssb_connection_rpc_send_json (
connection ,
2023-01-09 17:37:34 -05:00
k_ssb_rpc_flag_stream ,
request_number ,
2023-01-08 15:01:35 -05:00
message ,
_tf_ssb_rpc_ebt_replicate_client ,
NULL ,
NULL ) ;
if ( ! tf_ssb_connection_get_ebt_request_number ( connection ) )
{
tf_ssb_connection_set_ebt_request_number ( connection , request_number ) ;
}
2023-01-09 17:37:34 -05:00
JS_FreeValue ( context , message ) ;
}
2023-01-08 15:01:35 -05:00
2023-01-09 17:37:34 -05:00
static void _tf_ssb_rpc_ebt_replicate_server ( tf_ssb_connection_t * connection , uint8_t flags , int32_t request_number , JSValue args , const uint8_t * message , size_t size , void * user_data )
{
2023-01-13 19:55:51 -05:00
if ( flags & k_ssb_rpc_flag_end_error )
{
return ;
}
2023-01-08 15:01:35 -05:00
_tf_ssb_rpc_ebt_replicate ( connection , flags , request_number , args , message , size , user_data ) ;
2023-01-09 17:37:34 -05:00
tf_ssb_connection_add_request ( connection , - request_number , _tf_ssb_rpc_ebt_replicate , NULL , NULL , NULL ) ;
2023-01-08 15:01:35 -05:00
}
2023-01-04 19:52:23 -05:00
static void _tf_ssb_rpc_connections_changed_callback ( tf_ssb_t * ssb , tf_ssb_change_t change , tf_ssb_connection_t * connection , void * user_data )
{
2023-01-07 19:25:38 -05:00
JSContext * context = tf_ssb_get_context ( ssb ) ;
2023-01-04 19:52:23 -05:00
if ( change = = k_tf_ssb_change_connect )
{
JSValue message = JS_NewObject ( context ) ;
JSValue name = JS_NewArray ( context ) ;
JS_SetPropertyUint32 ( context , name , 0 , JS_NewString ( context , " blobs " ) ) ;
JS_SetPropertyUint32 ( context , name , 1 , JS_NewString ( context , " createWants " ) ) ;
JS_SetPropertyStr ( context , message , " name " , name ) ;
JS_SetPropertyStr ( context , message , " type " , JS_NewString ( context , " source " ) ) ;
JS_SetPropertyStr ( context , message , " args " , JS_NewArray ( context ) ) ;
2023-01-07 19:25:38 -05:00
tf_ssb_connection_rpc_send_json (
2023-01-04 19:52:23 -05:00
connection ,
2023-01-07 19:25:38 -05:00
k_ssb_rpc_flag_stream ,
2023-01-04 19:52:23 -05:00
tf_ssb_connection_next_request_number ( connection ) ,
2023-01-07 19:25:38 -05:00
message ,
2023-01-04 19:52:23 -05:00
_tf_ssb_rpc_connection_blobs_createWants_callback ,
NULL ,
NULL ) ;
JS_FreeValue ( context , message ) ;
2023-01-07 19:25:38 -05:00
if ( tf_ssb_connection_is_client ( connection ) )
{
message = JS_NewObject ( context ) ;
name = JS_NewArray ( context ) ;
JS_SetPropertyUint32 ( context , name , 0 , JS_NewString ( context , " tunnel " ) ) ;
JS_SetPropertyUint32 ( context , name , 1 , JS_NewString ( context , " isRoom " ) ) ;
JS_SetPropertyStr ( context , message , " name " , name ) ;
JS_SetPropertyStr ( context , message , " args " , JS_NewArray ( context ) ) ;
tf_ssb_connection_rpc_send_json (
connection ,
0 ,
tf_ssb_connection_next_request_number ( connection ) ,
message ,
_tf_ssb_rpc_connection_tunnel_isRoom_callback ,
NULL ,
NULL ) ;
JS_FreeValue ( context , message ) ;
2023-01-08 15:01:35 -05:00
_tf_ssb_rpc_send_ebt_replicate ( connection ) ;
2023-01-07 19:25:38 -05:00
}
2023-01-04 19:52:23 -05:00
}
else if ( change = = k_tf_ssb_change_remove )
{
tf_ssb_remove_blob_want_added_callback ( ssb , _tf_ssb_rpc_blob_wants_added_callback , connection ) ;
2023-01-07 19:25:38 -05:00
char id [ k_id_base64_len ] = " " ;
if ( tf_ssb_connection_get_id ( connection , id , sizeof ( id ) ) )
{
2023-01-10 20:55:23 -05:00
JSValue left = JS_NewObject ( context ) ;
JS_SetPropertyStr ( context , left , " type " , JS_NewString ( context , " left " ) ) ;
2023-01-07 19:25:38 -05:00
JS_SetPropertyStr ( context , left , " id " , JS_NewString ( context , id ) ) ;
2023-01-10 20:55:23 -05:00
tf_ssb_connection_t * connections [ 1024 ] ;
int count = tf_ssb_get_connections ( ssb , connections , _countof ( connections ) ) ;
for ( int i = 0 ; i < count ; i + + )
2023-01-07 19:25:38 -05:00
{
2023-01-10 20:55:23 -05:00
if ( tf_ssb_connection_is_attendant ( connections [ i ] ) )
{
tf_ssb_connection_rpc_send_json (
connections [ i ] ,
k_ssb_rpc_flag_stream ,
- tf_ssb_connection_get_attendant_request_number ( connections [ i ] ) ,
left ,
NULL ,
NULL ,
NULL ) ;
}
2023-01-07 19:25:38 -05:00
}
2023-01-10 20:55:23 -05:00
JS_FreeValue ( context , left ) ;
2023-01-07 19:25:38 -05:00
}
2023-01-04 19:52:23 -05:00
}
}
2023-07-29 18:29:09 -04:00
typedef struct _delete_blobs_work_t
{
uv_work_t work ;
uv_thread_t thread_id ;
tf_ssb_t * ssb ;
uint64_t start_time ;
uint64_t end_time ;
} delete_blobs_work_t ;
2023-07-26 22:51:42 -04:00
static void _tf_ssb_rpc_delete_blobs_work ( uv_work_t * work )
{
2023-07-29 18:29:09 -04:00
delete_blobs_work_t * delete = work - > data ;
delete - > start_time = uv_hrtime ( ) ;
delete - > thread_id = uv_thread_self ( ) ;
tf_ssb_t * ssb = delete - > ssb ;
2023-07-26 22:51:42 -04:00
int64_t age = _get_global_setting_int64 ( ssb , " blob_expire_age_seconds " , - 1 ) ;
if ( age < = 0 )
{
2023-07-29 18:29:09 -04:00
delete - > end_time = uv_hrtime ( ) ;
2023-07-26 22:51:42 -04:00
return ;
}
2023-07-26 23:04:49 -04:00
int64_t start_ns = uv_hrtime ( ) ;
2023-07-26 22:51:42 -04:00
sqlite3 * db = tf_ssb_acquire_db_writer ( ssb ) ;
sqlite3_stmt * statement ;
int64_t now = ( int64_t ) time ( NULL ) * 1000ULL ;
int64_t timestamp = now - age * 1000ULL ;
const int k_limit = 256 ;
2023-07-26 23:04:49 -04:00
int deleted = 0 ;
2023-07-26 22:51:42 -04:00
if ( sqlite3_prepare ( db ,
" DELETE FROM blobs WHERE blobs.id IN ( "
" SELECT blobs.id FROM blobs "
" JOIN messages_refs ON blobs.id = messages_refs.ref "
" JOIN messages ON messages.id = messages_refs.message "
" GROUP BY blobs.id HAVING MAX(messages.timestamp) < ? LIMIT ?) " , - 1 , & statement , NULL ) = = SQLITE_OK )
{
if ( sqlite3_bind_int64 ( statement , 1 , timestamp ) = = SQLITE_OK & &
sqlite3_bind_int ( statement , 2 , k_limit ) = = SQLITE_OK )
{
int r = sqlite3_step ( statement ) ;
if ( r ! = SQLITE_DONE )
{
tf_printf ( " _tf_ssb_rpc_delete_blobs_work: %s \n " , sqlite3_errmsg ( db ) ) ;
}
else
{
tf_printf ( " _tf_ssb_rpc_delete_blobs_work: %d rows \n " , sqlite3_changes ( db ) ) ;
}
2023-07-26 23:04:49 -04:00
deleted = sqlite3_changes ( db ) ;
2023-07-26 22:51:42 -04:00
}
}
else
{
tf_printf ( " prepare failed: %s \n " , sqlite3_errmsg ( db ) ) ;
}
tf_ssb_release_db_writer ( ssb , db ) ;
2023-07-26 23:04:49 -04:00
int64_t duration_ms = ( uv_hrtime ( ) - start_ns ) / 1000000LL ;
tf_printf ( " Deleted %d blobs in %d ms. \n " , deleted , ( int ) duration_ms ) ;
2023-07-27 08:22:37 -04:00
_tf_ssb_rpc_start_delete_blobs ( ssb , deleted ? ( int ) duration_ms : ( 15 * 60 * 1000 ) ) ;
2023-07-29 18:29:09 -04:00
delete - > end_time = uv_hrtime ( ) ;
2023-07-26 22:51:42 -04:00
}
static void _tf_ssb_rpc_delete_blobs_after_work ( uv_work_t * work , int status )
{
2023-07-29 18:29:09 -04:00
delete_blobs_work_t * delete = work - > data ;
tf_ssb_record_thread_time ( delete - > ssb , ( int64_t ) delete - > thread_id , delete - > end_time - delete - > start_time ) ;
tf_free ( delete ) ;
2023-07-26 22:51:42 -04:00
}
static void _tf_ssb_rpc_timer_on_close ( uv_handle_t * handle )
{
tf_free ( handle ) ;
}
static void _tf_ssb_rpc_start_delete_timer ( uv_timer_t * timer )
{
tf_ssb_t * ssb = timer - > data ;
2023-07-29 18:29:09 -04:00
delete_blobs_work_t * work = tf_malloc ( sizeof ( delete_blobs_work_t ) ) ;
* work = ( delete_blobs_work_t ) { . work = { . data = work } , . ssb = ssb } ;
int r = uv_queue_work ( tf_ssb_get_loop ( ssb ) , & work - > work , _tf_ssb_rpc_delete_blobs_work , _tf_ssb_rpc_delete_blobs_after_work ) ;
2023-07-26 22:51:42 -04:00
if ( r )
{
tf_printf ( " uv_queue_work: %s \n " , uv_strerror ( r ) ) ;
tf_free ( work ) ;
}
uv_close ( ( uv_handle_t * ) timer , _tf_ssb_rpc_timer_on_close ) ;
}
static void _tf_ssb_rpc_start_delete_blobs ( tf_ssb_t * ssb , int delay_ms )
{
tf_printf ( " will delete more blobs in %d ms \n " , delay_ms ) ;
uv_timer_t * timer = tf_malloc ( sizeof ( uv_timer_t ) ) ;
* timer = ( uv_timer_t ) { . data = ssb } ;
uv_timer_init ( tf_ssb_get_loop ( ssb ) , timer ) ;
uv_timer_start ( timer , _tf_ssb_rpc_start_delete_timer , delay_ms , 0 ) ;
}
2022-11-16 20:36:24 -05:00
void tf_ssb_rpc_register ( tf_ssb_t * ssb )
{
2023-01-04 19:52:23 -05:00
tf_ssb_add_connections_changed_callback ( ssb , _tf_ssb_rpc_connections_changed_callback , NULL , NULL ) ;
2023-01-18 18:43:49 -05:00
tf_ssb_add_rpc_callback ( ssb , ( const char * [ ] ) { " gossip " , " ping " , NULL } , _tf_ssb_rpc_gossip_ping , NULL , NULL ) ; /* DUPLEX */
tf_ssb_add_rpc_callback ( ssb , ( const char * [ ] ) { " blobs " , " get " , NULL } , _tf_ssb_rpc_blobs_get , NULL , NULL ) ; /* SOURCE */
tf_ssb_add_rpc_callback ( ssb , ( const char * [ ] ) { " blobs " , " has " , NULL } , _tf_ssb_rpc_blobs_has , NULL , NULL ) ; /* ASYNC */
tf_ssb_add_rpc_callback ( ssb , ( const char * [ ] ) { " blobs " , " createWants " , NULL } , _tf_ssb_rpc_blobs_createWants , NULL , NULL ) ; /* SOURCE */
tf_ssb_add_rpc_callback ( ssb , ( const char * [ ] ) { " tunnel " , " connect " , NULL } , _tf_ssb_rpc_tunnel_connect , NULL , NULL ) ; /* DUPLEX */
tf_ssb_add_rpc_callback ( ssb , ( const char * [ ] ) { " tunnel " , " isRoom " , NULL } , _tf_ssb_rpc_room_meta , NULL , NULL ) ; /* FAKE-ASYNC */
tf_ssb_add_rpc_callback ( ssb , ( const char * [ ] ) { " room " , " metadata " , NULL } , _tf_ssb_rpc_room_meta , NULL , NULL ) ; /* ASYNC */
tf_ssb_add_rpc_callback ( ssb , ( const char * [ ] ) { " room " , " attendants " , NULL } , _tf_ssb_rpc_room_attendants , NULL , NULL ) ; /* SOURCE */
tf_ssb_add_rpc_callback ( ssb , ( const char * [ ] ) { " createHistoryStream " , NULL } , _tf_ssb_rpc_createHistoryStream , NULL , NULL ) ; /* SOURCE */
tf_ssb_add_rpc_callback ( ssb , ( const char * [ ] ) { " ebt " , " replicate " , NULL } , _tf_ssb_rpc_ebt_replicate_server , NULL , NULL ) ; /* DUPLEX */
2023-07-26 22:51:42 -04:00
_tf_ssb_rpc_start_delete_blobs ( ssb , 0 ) ;
2022-11-16 20:36:24 -05:00
}