ssb: Sync on demand fixes. Avoid keeping message streams live in this mode.

This commit is contained in:
Cory McWilliams 2024-10-06 11:50:49 -04:00
parent 8a6147d512
commit 85fce59c0c
4 changed files with 22 additions and 7 deletions

View File

@ -4323,3 +4323,8 @@ bool tf_ssb_tunnel_create(tf_ssb_t* ssb, const char* portal_id, const char* targ
} }
return connection != NULL; return connection != NULL;
} }
int tf_ssb_connection_get_flags(tf_ssb_connection_t* connection)
{
return connection->flags;
}

View File

@ -309,7 +309,7 @@ static void _tf_ssb_connections_get_all_work(tf_ssb_t* ssb, void* user_data)
int port = sqlite3_column_int(statement, 1); int port = sqlite3_column_int(statement, 1);
const char* key = (const char*)sqlite3_column_text(statement, 2); const char* key = (const char*)sqlite3_column_text(statement, 2);
char connection[1024] = { 0 }; char connection[1024] = { 0 };
snprintf(connection, sizeof(connection), "net:%s:%d~shs:%s", host, port, key); snprintf(connection, sizeof(connection), "net:%s:%d~shs:%s", host, port, *key == '@' ? key + 1 : key);
char* dot = strrchr(connection, '.'); char* dot = strrchr(connection, '.');
if (dot && strcmp(dot, ".ed25519") == 0) if (dot && strcmp(dot, ".ed25519") == 0)
{ {
@ -332,7 +332,6 @@ static void _tf_ssb_connections_get_all_after_work(tf_ssb_t* ssb, int status, vo
tf_ssb_connections_get_all_work_t* work = user_data; tf_ssb_connections_get_all_work_t* work = user_data;
for (int i = 0; i < work->connections_count; i++) for (int i = 0; i < work->connections_count; i++)
{ {
tf_printf("connections[%d] = %s\n", i, work->connections[i]);
tf_ssb_connect_str(ssb, work->connections[i], k_tf_ssb_connect_flag_one_shot); tf_ssb_connect_str(ssb, work->connections[i], k_tf_ssb_connect_flag_one_shot);
tf_free(work->connections[i]); tf_free(work->connections[i]);
} }

View File

@ -1101,4 +1101,10 @@ bool tf_ssb_tunnel_create(tf_ssb_t* ssb, const char* portal_id, const char* targ
*/ */
void tf_ssb_sync_start(tf_ssb_t* ssb); void tf_ssb_sync_start(tf_ssb_t* ssb);
/**
** Get a connection's flags.
** @param connection The connection.
*/
int tf_ssb_connection_get_flags(tf_ssb_connection_t* connection);
/** @} */ /** @} */

View File

@ -858,11 +858,12 @@ static void _tf_ssb_connection_send_history_stream_after_work(tf_ssb_connection_
tf_ssb_connection_rpc_send(connection, k_ssb_rpc_flag_stream | k_ssb_rpc_flag_json, request->request_number, NULL, (const uint8_t*)request->out_messages[i], tf_ssb_connection_rpc_send(connection, k_ssb_rpc_flag_stream | k_ssb_rpc_flag_json, request->request_number, NULL, (const uint8_t*)request->out_messages[i],
strlen(request->out_messages[i]), NULL, NULL, NULL); strlen(request->out_messages[i]), NULL, NULL, NULL);
} }
bool live = request->live && (tf_ssb_connection_get_flags(connection) & k_tf_ssb_connect_flag_one_shot) == 0;
if (!request->out_finished) if (!request->out_finished)
{ {
_tf_ssb_connection_send_history_stream(connection, request->request_number, request->author, request->out_max_sequence_seen, request->keys, request->live); _tf_ssb_connection_send_history_stream(connection, request->request_number, request->author, request->out_max_sequence_seen, request->keys, live);
} }
else if (!request->live) else if (!live)
{ {
tf_ssb_connection_rpc_send(connection, k_ssb_rpc_flag_json, request->request_number, NULL, (const uint8_t*)"false", strlen("false"), NULL, NULL, NULL); tf_ssb_connection_rpc_send(connection, k_ssb_rpc_flag_json, request->request_number, NULL, (const uint8_t*)"false", strlen("false"), NULL, NULL, NULL);
} }
@ -922,7 +923,7 @@ static void _tf_ssb_rpc_createHistoryStream(
JSValue keys = JS_GetPropertyStr(context, arg, "keys"); JSValue keys = JS_GetPropertyStr(context, arg, "keys");
JSValue live = JS_GetPropertyStr(context, arg, "live"); JSValue live = JS_GetPropertyStr(context, arg, "live");
bool is_keys = JS_IsUndefined(keys) || JS_ToBool(context, keys) > 0; bool is_keys = JS_IsUndefined(keys) || JS_ToBool(context, keys) > 0;
bool is_live = JS_ToBool(context, live) > 0; bool is_live = JS_ToBool(context, live) > 0 && (tf_ssb_connection_get_flags(connection) & k_tf_ssb_connect_flag_one_shot) == 0;
int64_t sequence = 0; int64_t sequence = 0;
JS_ToInt64(context, &sequence, seq); JS_ToInt64(context, &sequence, seq);
const char* author = JS_ToCString(context, id); const char* author = JS_ToCString(context, id);
@ -1108,9 +1109,13 @@ static void _tf_ssb_rpc_ebt_replicate_send_messages(tf_ssb_connection_t* connect
if (sequence >= 0 && (sequence & 1) == 0) if (sequence >= 0 && (sequence & 1) == 0)
{ {
int32_t request_number = tf_ssb_connection_get_ebt_request_number(connection); int32_t request_number = tf_ssb_connection_get_ebt_request_number(connection);
_tf_ssb_connection_send_history_stream(connection, request_number, author, sequence >> 1, false, true); bool live = (tf_ssb_connection_get_flags(connection) & k_tf_ssb_connect_flag_one_shot) == 0;
_tf_ssb_connection_send_history_stream(connection, request_number, author, sequence >> 1, false, live);
if (live)
{
tf_ssb_connection_add_new_message_request(connection, author, request_number, false); tf_ssb_connection_add_new_message_request(connection, author, request_number, false);
} }
}
else else
{ {
tf_ssb_connection_remove_new_message_request(connection, author); tf_ssb_connection_remove_new_message_request(connection, author);