ssb: Indicate which muxrpc sends failed, and use that to fix some replication nonsense and log noise.
All checks were successful
Build Tilde Friends / Build-All (push) Successful in 15m22s
All checks were successful
Build Tilde Friends / Build-All (push) Successful in 15m22s
This commit is contained in:
@ -18,7 +18,8 @@
|
||||
#define _countof(a) ((int)(sizeof((a)) / sizeof(*(a))))
|
||||
#endif
|
||||
|
||||
static void _tf_ssb_connection_send_history_stream(tf_ssb_connection_t* connection, int32_t request_number, const char* author, int64_t sequence, bool keys, bool live);
|
||||
static void _tf_ssb_connection_send_history_stream(
|
||||
tf_ssb_connection_t* connection, int32_t request_number, const char* author, int64_t sequence, bool keys, bool live, bool end_request);
|
||||
static void _tf_ssb_rpc_send_peers_exchange(tf_ssb_connection_t* connection);
|
||||
static void _tf_ssb_rpc_start_delete_blobs(tf_ssb_t* ssb, int delay_ms);
|
||||
|
||||
@ -92,8 +93,11 @@ static void _tf_ssb_rpc_blobs_get_after_work(tf_ssb_connection_t* connection, in
|
||||
const size_t k_send_max = 8192;
|
||||
for (size_t offset = 0; offset < work->size; offset += k_send_max)
|
||||
{
|
||||
tf_ssb_connection_rpc_send(connection, k_ssb_rpc_flag_binary | k_ssb_rpc_flag_stream, -work->request_number, NULL, work->blob + offset,
|
||||
offset + k_send_max <= work->size ? k_send_max : (work->size - offset), NULL, NULL, NULL);
|
||||
if (!tf_ssb_connection_rpc_send(connection, k_ssb_rpc_flag_binary | k_ssb_rpc_flag_stream, -work->request_number, NULL, work->blob + offset,
|
||||
offset + k_send_max <= work->size ? k_send_max : (work->size - offset), NULL, NULL, NULL))
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
tf_free(work->blob);
|
||||
}
|
||||
@ -260,11 +264,12 @@ static void _tf_ssb_request_blob_wants_after_work(tf_ssb_connection_t* connectio
|
||||
{
|
||||
JSContext* context = tf_ssb_connection_get_context(connection);
|
||||
tf_ssb_blob_wants_t* blob_wants = tf_ssb_connection_get_blob_wants_state(connection);
|
||||
for (int i = 0; i < work->out_id_count; i++)
|
||||
bool send_failed = false;
|
||||
for (int i = 0; i < work->out_id_count && !send_failed; i++)
|
||||
{
|
||||
JSValue message = JS_NewObject(context);
|
||||
JS_SetPropertyStr(context, message, work->out_id[i], JS_NewInt32(context, -1));
|
||||
tf_ssb_connection_rpc_send_json(connection, k_ssb_rpc_flag_stream, -blob_wants->request_number, NULL, message, NULL, NULL, NULL);
|
||||
send_failed = !tf_ssb_connection_rpc_send_json(connection, k_ssb_rpc_flag_stream, -blob_wants->request_number, NULL, message, NULL, NULL, NULL);
|
||||
JS_FreeValue(context, message);
|
||||
blob_wants->wants_sent++;
|
||||
}
|
||||
@ -769,6 +774,7 @@ typedef struct _tf_ssb_connection_send_history_stream_t
|
||||
int64_t sequence;
|
||||
bool keys;
|
||||
bool live;
|
||||
bool end_request;
|
||||
|
||||
bool out_finished;
|
||||
int64_t out_max_sequence_seen;
|
||||
@ -855,15 +861,18 @@ static void _tf_ssb_connection_send_history_stream_after_work(tf_ssb_connection_
|
||||
{
|
||||
for (int i = 0; i < request->out_messages_count; i++)
|
||||
{
|
||||
tf_ssb_connection_rpc_send(connection, k_ssb_rpc_flag_stream | k_ssb_rpc_flag_json, request->request_number, NULL, (const uint8_t*)request->out_messages[i],
|
||||
strlen(request->out_messages[i]), NULL, NULL, NULL);
|
||||
if (!tf_ssb_connection_rpc_send(connection, k_ssb_rpc_flag_stream | k_ssb_rpc_flag_json, request->request_number, NULL, (const uint8_t*)request->out_messages[i],
|
||||
strlen(request->out_messages[i]), NULL, NULL, NULL))
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
bool live = request->live && (tf_ssb_connection_get_flags(connection) & k_tf_ssb_connect_flag_one_shot) == 0;
|
||||
if (!request->out_finished)
|
||||
{
|
||||
_tf_ssb_connection_send_history_stream(connection, request->request_number, request->author, request->out_max_sequence_seen, request->keys, live);
|
||||
_tf_ssb_connection_send_history_stream(
|
||||
connection, request->request_number, request->author, request->out_max_sequence_seen, request->keys, request->live, request->end_request);
|
||||
}
|
||||
else if (!live)
|
||||
else if (!request->live && request->end_request)
|
||||
{
|
||||
tf_ssb_connection_rpc_send(connection, k_ssb_rpc_flag_json, request->request_number, NULL, (const uint8_t*)"false", strlen("false"), NULL, NULL, NULL);
|
||||
}
|
||||
@ -889,7 +898,8 @@ static void _tf_ssb_connection_send_history_stream_callback(tf_ssb_connection_t*
|
||||
}
|
||||
}
|
||||
|
||||
static void _tf_ssb_connection_send_history_stream(tf_ssb_connection_t* connection, int32_t request_number, const char* author, int64_t sequence, bool keys, bool live)
|
||||
static void _tf_ssb_connection_send_history_stream(
|
||||
tf_ssb_connection_t* connection, int32_t request_number, const char* author, int64_t sequence, bool keys, bool live, bool end_request)
|
||||
{
|
||||
tf_ssb_connection_send_history_stream_t* async = tf_malloc(sizeof(tf_ssb_connection_send_history_stream_t));
|
||||
*async = (tf_ssb_connection_send_history_stream_t) {
|
||||
@ -897,6 +907,7 @@ static void _tf_ssb_connection_send_history_stream(tf_ssb_connection_t* connecti
|
||||
.sequence = sequence,
|
||||
.keys = keys,
|
||||
.live = live,
|
||||
.end_request = end_request,
|
||||
};
|
||||
snprintf(async->author, sizeof(async->author), "%s", author);
|
||||
tf_ssb_connection_schedule_idle(connection, _tf_ssb_connection_send_history_stream_callback, async);
|
||||
@ -928,7 +939,7 @@ static void _tf_ssb_rpc_createHistoryStream(
|
||||
JS_ToInt64(context, &sequence, seq);
|
||||
const char* author = JS_ToCString(context, id);
|
||||
|
||||
_tf_ssb_connection_send_history_stream(connection, -request_number, author, sequence, is_keys, is_live);
|
||||
_tf_ssb_connection_send_history_stream(connection, -request_number, author, sequence, is_keys, is_live, true);
|
||||
|
||||
if (is_live)
|
||||
{
|
||||
@ -1110,7 +1121,7 @@ static void _tf_ssb_rpc_ebt_replicate_send_messages(tf_ssb_connection_t* connect
|
||||
{
|
||||
int32_t request_number = tf_ssb_connection_get_ebt_request_number(connection);
|
||||
bool live = (tf_ssb_connection_get_flags(connection) & k_tf_ssb_connect_flag_one_shot) == 0;
|
||||
_tf_ssb_connection_send_history_stream(connection, request_number, author, sequence >> 1, false, live);
|
||||
_tf_ssb_connection_send_history_stream(connection, request_number, author, sequence >> 1, false, live, false);
|
||||
if (live)
|
||||
{
|
||||
tf_ssb_connection_add_new_message_request(connection, author, request_number, false);
|
||||
|
Reference in New Issue
Block a user