2021-01-02 13:10:00 -05:00
# include "ssb.connections.h"
2023-03-07 12:50:17 -05:00
# include "log.h"
2022-06-04 13:04:51 -04:00
# include "mem.h"
2021-01-02 13:10:00 -05:00
# include "ssb.h"
2023-05-21 17:36:51 -04:00
# include "uv.h"
# include "sqlite3.h"
2021-01-02 13:10:00 -05:00
# include <string.h>
# if !defined(_countof)
# define _countof(a) ((int)(sizeof((a)) / sizeof(*(a))))
# endif
typedef struct _tf_ssb_connections_t
{
tf_ssb_t * ssb ;
uv_timer_t timer ;
} tf_ssb_connections_t ;
static void _tf_ssb_connections_changed_callback ( tf_ssb_t * ssb , tf_ssb_change_t change , tf_ssb_connection_t * connection , void * user_data )
{
tf_ssb_connections_t * connections = user_data ;
2021-10-10 17:51:38 -04:00
switch ( change )
{
2021-01-02 13:10:00 -05:00
case k_tf_ssb_change_create :
{
2021-10-24 15:23:21 -04:00
char key [ k_id_base64_len ] ;
2021-01-02 13:10:00 -05:00
if ( tf_ssb_connection_get_host ( connection ) & &
* tf_ssb_connection_get_host ( connection ) & &
tf_ssb_connection_get_port ( connection ) & &
2021-10-10 17:51:38 -04:00
tf_ssb_connection_get_id ( connection , key , sizeof ( key ) ) )
{
2021-01-02 13:10:00 -05:00
tf_ssb_connections_set_attempted ( connections , tf_ssb_connection_get_host ( connection ) , tf_ssb_connection_get_port ( connection ) , key ) ;
}
}
break ;
case k_tf_ssb_change_connect :
{
2021-10-24 15:23:21 -04:00
char key [ k_id_base64_len ] ;
2021-10-10 17:51:38 -04:00
if ( tf_ssb_connection_get_id ( connection , key , sizeof ( key ) ) )
{
2021-01-02 13:10:00 -05:00
tf_ssb_connections_set_succeeded ( connections , tf_ssb_connection_get_host ( connection ) , tf_ssb_connection_get_port ( connection ) , key ) ;
}
}
break ;
case k_tf_ssb_change_remove :
break ;
}
}
static bool _tf_ssb_connections_get_next_connection ( tf_ssb_connections_t * connections , char * host , size_t host_size , int * port , char * key , size_t key_size )
{
bool result = false ;
sqlite3_stmt * statement ;
2023-06-14 20:27:49 -04:00
sqlite3 * db = tf_ssb_acquire_db_reader ( connections - > ssb ) ;
if ( sqlite3_prepare ( db , " SELECT host, port, key FROM connections WHERE last_attempt IS NULL OR (strftime('%s', 'now') - last_attempt > $1) ORDER BY last_attempt LIMIT 1 " , - 1 , & statement , NULL ) = = SQLITE_OK )
2021-10-10 17:51:38 -04:00
{
2021-01-02 13:10:00 -05:00
if ( sqlite3_bind_int ( statement , 1 , 60000 ) = = SQLITE_OK & &
2021-10-10 17:51:38 -04:00
sqlite3_step ( statement ) = = SQLITE_ROW )
{
2021-01-02 13:10:00 -05:00
snprintf ( host , host_size , " %s " , sqlite3_column_text ( statement , 0 ) ) ;
* port = sqlite3_column_int ( statement , 1 ) ;
snprintf ( key , key_size , " %s " , sqlite3_column_text ( statement , 2 ) ) ;
result = true ;
}
sqlite3_finalize ( statement ) ;
2021-10-10 17:51:38 -04:00
}
else
{
2023-06-14 20:27:49 -04:00
tf_printf ( " prepare: %s \n " , sqlite3_errmsg ( db ) ) ;
2021-01-02 13:10:00 -05:00
}
2023-06-14 20:27:49 -04:00
tf_ssb_release_db_reader ( connections - > ssb , db ) ;
2021-01-02 13:10:00 -05:00
return result ;
}
2023-10-09 21:11:32 -04:00
typedef struct _tf_ssb_connections_get_next_t
{
uv_work_t work ;
tf_ssb_connections_t * connections ;
tf_ssb_t * ssb ;
bool ready ;
char host [ 256 ] ;
int port ;
char key [ k_id_base64_len ] ;
} tf_ssb_connections_get_next_t ;
static void _tf_ssb_connections_get_next_work ( uv_work_t * work )
{
tf_ssb_connections_get_next_t * next = work - > data ;
next - > ready = _tf_ssb_connections_get_next_connection ( next - > connections , next - > host , sizeof ( next - > host ) , & next - > port , next - > key , sizeof ( next - > key ) ) ;
}
static void _tf_ssb_connections_get_next_after_work ( uv_work_t * work , int status )
{
tf_ssb_connections_get_next_t * next = work - > data ;
if ( next - > ready )
{
uint8_t key_bin [ k_id_bin_len ] ;
if ( tf_ssb_id_str_to_bin ( key_bin , next - > key ) )
{
tf_ssb_connect ( next - > ssb , next - > host , next - > port , key_bin ) ;
}
}
tf_free ( next ) ;
}
2021-01-02 13:10:00 -05:00
static void _tf_ssb_connections_timer ( uv_timer_t * timer )
{
tf_ssb_connections_t * connections = timer - > data ;
tf_ssb_connection_t * active [ 4 ] ;
int count = tf_ssb_get_connections ( connections - > ssb , active , _countof ( active ) ) ;
2022-05-16 18:30:14 -04:00
if ( count < ( int ) _countof ( active ) )
2021-10-10 17:51:38 -04:00
{
2023-10-09 21:11:32 -04:00
tf_ssb_connections_get_next_t * next = tf_malloc ( sizeof ( tf_ssb_connections_get_next_t ) ) ;
* next = ( tf_ssb_connections_get_next_t )
2021-10-10 17:51:38 -04:00
{
2023-10-09 21:11:32 -04:00
. work =
2021-10-10 17:51:38 -04:00
{
2023-10-09 21:11:32 -04:00
. data = next ,
} ,
. ssb = connections - > ssb ,
. connections = connections ,
} ;
int result = uv_queue_work ( tf_ssb_get_loop ( connections - > ssb ) , & next - > work , _tf_ssb_connections_get_next_work , _tf_ssb_connections_get_next_after_work ) ;
if ( result )
{
_tf_ssb_connections_get_next_after_work ( & next - > work , result ) ;
2021-01-02 13:10:00 -05:00
}
}
}
tf_ssb_connections_t * tf_ssb_connections_create ( tf_ssb_t * ssb )
{
2022-06-04 13:04:51 -04:00
tf_ssb_connections_t * connections = tf_malloc ( sizeof ( tf_ssb_connections_t ) ) ;
2021-01-02 13:10:00 -05:00
memset ( connections , 0 , sizeof ( * connections ) ) ;
connections - > ssb = ssb ;
2021-09-06 13:50:38 -04:00
tf_ssb_add_connections_changed_callback ( ssb , _tf_ssb_connections_changed_callback , NULL , connections ) ;
2021-01-02 13:10:00 -05:00
uv_loop_t * loop = tf_ssb_get_loop ( ssb ) ;
connections - > timer . data = connections ;
uv_timer_init ( loop , & connections - > timer ) ;
uv_timer_start ( & connections - > timer , _tf_ssb_connections_timer , 2000 , 2000 ) ;
uv_unref ( ( uv_handle_t * ) & connections - > timer ) ;
return connections ;
}
static void _tf_ssb_connections_on_handle_close ( uv_handle_t * handle )
{
tf_ssb_connections_t * connections = handle - > data ;
handle - > data = NULL ;
2022-06-04 13:04:51 -04:00
tf_free ( connections ) ;
2021-01-02 13:10:00 -05:00
}
void tf_ssb_connections_destroy ( tf_ssb_connections_t * connections )
{
uv_close ( ( uv_handle_t * ) & connections - > timer , _tf_ssb_connections_on_handle_close ) ;
}
2023-10-09 21:11:32 -04:00
typedef struct _tf_ssb_connections_update_t
2021-01-02 13:10:00 -05:00
{
2023-10-09 21:11:32 -04:00
uv_work_t work ;
tf_ssb_t * ssb ;
char host [ 256 ] ;
int port ;
char key [ k_id_base64_len ] ;
bool attempted ;
bool succeeded ;
} tf_ssb_connections_update_t ;
static void _tf_ssb_connections_update_work ( uv_work_t * work )
{
tf_ssb_connections_update_t * update = work - > data ;
tf_ssb_record_thread_busy ( update - > ssb , true ) ;
2021-01-02 13:10:00 -05:00
sqlite3_stmt * statement ;
2023-10-09 21:11:32 -04:00
sqlite3 * db = tf_ssb_acquire_db_writer ( update - > ssb ) ;
if ( update - > attempted )
2021-10-10 17:51:38 -04:00
{
2023-10-09 21:11:32 -04:00
if ( sqlite3_prepare ( db , " UPDATE connections SET last_attempt = strftime('%s', 'now') WHERE host = $1 AND port = $2 AND key = $3 " , - 1 , & statement , NULL ) = = SQLITE_OK )
2021-10-10 17:51:38 -04:00
{
2023-10-09 21:11:32 -04:00
if ( sqlite3_bind_text ( statement , 1 , update - > host , - 1 , NULL ) = = SQLITE_OK & &
sqlite3_bind_int ( statement , 2 , update - > port ) = = SQLITE_OK & &
sqlite3_bind_text ( statement , 3 , update - > key , - 1 , NULL ) = = SQLITE_OK )
2021-10-10 17:51:38 -04:00
{
2023-10-09 21:11:32 -04:00
if ( sqlite3_step ( statement ) ! = SQLITE_DONE )
{
tf_printf ( " tf_ssb_connections_set_attempted: %s. \n " , sqlite3_errmsg ( db ) ) ;
}
2021-01-02 13:10:00 -05:00
}
2023-10-09 21:11:32 -04:00
sqlite3_finalize ( statement ) ;
2021-01-02 13:10:00 -05:00
}
}
2023-10-09 21:11:32 -04:00
else if ( update - > succeeded )
2021-10-10 17:51:38 -04:00
{
2023-10-09 21:11:32 -04:00
if ( sqlite3_prepare ( db , " UPDATE connections SET last_success = strftime('%s', 'now') WHERE host = $1 AND port = $2 AND key = $3 " , - 1 , & statement , NULL ) = = SQLITE_OK )
2021-10-10 17:51:38 -04:00
{
2023-10-09 21:11:32 -04:00
if ( sqlite3_bind_text ( statement , 1 , update - > host , - 1 , NULL ) = = SQLITE_OK & &
sqlite3_bind_int ( statement , 2 , update - > port ) = = SQLITE_OK & &
sqlite3_bind_text ( statement , 3 , update - > key , - 1 , NULL ) = = SQLITE_OK )
2021-10-10 17:51:38 -04:00
{
2023-10-09 21:11:32 -04:00
if ( sqlite3_step ( statement ) ! = SQLITE_DONE )
{
tf_printf ( " tf_ssb_connections_set_succeeded: %s. \n " , sqlite3_errmsg ( db ) ) ;
}
2021-01-02 13:10:00 -05:00
}
2023-10-09 21:11:32 -04:00
sqlite3_finalize ( statement ) ;
2021-01-02 13:10:00 -05:00
}
}
2023-10-09 21:11:32 -04:00
else
2021-10-10 17:51:38 -04:00
{
2023-10-09 21:11:32 -04:00
if ( sqlite3_prepare ( db , " INSERT INTO connections (host, port, key) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING " , - 1 , & statement , NULL ) = = SQLITE_OK )
2021-10-10 17:51:38 -04:00
{
2023-10-09 21:11:32 -04:00
if ( sqlite3_bind_text ( statement , 1 , update - > host , - 1 , NULL ) = = SQLITE_OK & &
sqlite3_bind_int ( statement , 2 , update - > port ) = = SQLITE_OK & &
sqlite3_bind_text ( statement , 3 , update - > key , - 1 , NULL ) = = SQLITE_OK )
2021-10-10 17:51:38 -04:00
{
2023-10-09 21:11:32 -04:00
int r = sqlite3_step ( statement ) ;
if ( r ! = SQLITE_DONE )
{
tf_printf ( " tf_ssb_connections_store: %d, %s. \n " , r , sqlite3_errmsg ( db ) ) ;
}
2021-01-02 13:10:00 -05:00
}
2023-10-09 21:11:32 -04:00
sqlite3_finalize ( statement ) ;
2021-01-02 13:10:00 -05:00
}
}
2023-10-09 21:11:32 -04:00
tf_ssb_release_db_writer ( update - > ssb , db ) ;
tf_ssb_record_thread_busy ( update - > ssb , false ) ;
}
static void _tf_ssb_connections_update_after_work ( uv_work_t * work , int status )
{
tf_ssb_connections_update_t * update = work - > data ;
tf_free ( update ) ;
}
static void _tf_ssb_connections_queue_update ( tf_ssb_connections_t * connections , tf_ssb_connections_update_t * update )
{
update - > work . data = update ;
int result = uv_queue_work ( tf_ssb_get_loop ( connections - > ssb ) , & update - > work , _tf_ssb_connections_update_work , _tf_ssb_connections_update_after_work ) ;
if ( result )
{
_tf_ssb_connections_update_after_work ( & update - > work , result ) ;
}
}
void tf_ssb_connections_store ( tf_ssb_connections_t * connections , const char * host , int port , const char * key )
{
tf_ssb_connections_update_t * update = tf_malloc ( sizeof ( tf_ssb_connections_update_t ) ) ;
* update = ( tf_ssb_connections_update_t )
{
. ssb = connections - > ssb ,
. port = port ,
} ;
snprintf ( update - > host , sizeof ( update - > host ) , " %s " , host ) ;
snprintf ( update - > key , sizeof ( update - > key ) , " %s " , key ) ;
_tf_ssb_connections_queue_update ( connections , update ) ;
}
void tf_ssb_connections_set_attempted ( tf_ssb_connections_t * connections , const char * host , int port , const char * key )
{
tf_ssb_connections_update_t * update = tf_malloc ( sizeof ( tf_ssb_connections_update_t ) ) ;
* update = ( tf_ssb_connections_update_t )
{
. ssb = connections - > ssb ,
. port = port ,
. attempted = true ,
} ;
snprintf ( update - > host , sizeof ( update - > host ) , " %s " , host ) ;
snprintf ( update - > key , sizeof ( update - > key ) , " %s " , key ) ;
_tf_ssb_connections_queue_update ( connections , update ) ;
}
void tf_ssb_connections_set_succeeded ( tf_ssb_connections_t * connections , const char * host , int port , const char * key )
{
tf_ssb_connections_update_t * update = tf_malloc ( sizeof ( tf_ssb_connections_update_t ) ) ;
* update = ( tf_ssb_connections_update_t )
{
. ssb = connections - > ssb ,
. port = port ,
. succeeded = true ,
} ;
snprintf ( update - > host , sizeof ( update - > host ) , " %s " , host ) ;
snprintf ( update - > key , sizeof ( update - > key ) , " %s " , key ) ;
_tf_ssb_connections_queue_update ( connections , update ) ;
2021-01-02 13:10:00 -05:00
}