Attempt to track requests better. New requests need to be flagged as such. Still trying to chase tunnel instability.

git-svn-id: https://www.unprompted.com/svn/projects/tildefriends/trunk@4412 ed5197a5-7fde-0310-b194-c3ffbd925b24
This commit is contained in:
Cory McWilliams 2023-08-20 19:55:59 +00:00
parent 85acac3a30
commit f1b55ddd64
5 changed files with 88 additions and 56 deletions

123
src/ssb.c
View File

@ -624,25 +624,40 @@ static bool _tf_ssb_connection_get_request_callback(tf_ssb_connection_t* connect
void tf_ssb_connection_add_request(tf_ssb_connection_t* connection, int32_t request_number, tf_ssb_rpc_callback_t* callback, tf_ssb_callback_cleanup_t* cleanup, void* user_data, tf_ssb_connection_t* dependent_connection)
{
tf_ssb_connection_remove_request(connection, request_number);
tf_ssb_request_t request =
tf_ssb_request_t* existing = connection->requests_count ? bsearch(&request_number, connection->requests, connection->requests_count, sizeof(tf_ssb_request_t), _request_compare) : NULL;
if (existing)
{
.request_number = request_number,
.callback = callback,
.cleanup = cleanup,
.user_data = user_data,
.dependent_connection = dependent_connection,
};
int index = tf_util_insert_index(&request_number, connection->requests, connection->requests_count, sizeof(tf_ssb_request_t), _request_compare);
connection->requests = tf_resize_vec(connection->requests, sizeof(tf_ssb_request_t) * (connection->requests_count + 1));
if (connection->requests_count - index)
{
memmove(connection->requests + index + 1, connection->requests + index, sizeof(tf_ssb_request_t) * (connection->requests_count - index));
assert(!existing->callback);
assert(!existing->cleanup);
assert(!existing->user_data);
assert(!existing->dependent_connection);
existing->callback = callback;
existing->cleanup = cleanup;
existing->user_data = user_data;
existing->dependent_connection = dependent_connection;
}
connection->requests[index] = request;
connection->requests_count++;
else
{
tf_ssb_connection_remove_request(connection, request_number);
tf_ssb_request_t request =
{
.request_number = request_number,
.callback = callback,
.cleanup = cleanup,
.user_data = user_data,
.dependent_connection = dependent_connection,
};
int index = tf_util_insert_index(&request_number, connection->requests, connection->requests_count, sizeof(tf_ssb_request_t), _request_compare);
connection->requests = tf_resize_vec(connection->requests, sizeof(tf_ssb_request_t) * (connection->requests_count + 1));
if (connection->requests_count - index)
{
memmove(connection->requests + index + 1, connection->requests + index, sizeof(tf_ssb_request_t) * (connection->requests_count - index));
}
connection->requests[index] = request;
connection->requests_count++;
connection->ssb->request_count++;
connection->ssb->request_count++;
}
}
static int _message_request_compare(const void* a, const void* b)
@ -714,8 +729,19 @@ void tf_ssb_connection_rpc_send(tf_ssb_connection_t* connection, uint8_t flags,
}
return;
}
if (flags & k_ssb_rpc_flag_new_request)
{
assert(request_number > 0);
assert(!_tf_ssb_connection_get_request_callback(connection, request_number, NULL, NULL));
}
else if (!_tf_ssb_connection_get_request_callback(connection, request_number, NULL, NULL))
{
tf_printf("Dropping message with no active request (%d): %.*s\n", request_number, (int)size, message);
return;
}
uint8_t* combined = tf_malloc(9 + size);
*combined = flags;
*combined = flags & k_ssb_rpc_mask_send;
uint32_t u32size = htonl((uint32_t)size);
memcpy(combined + 1, &u32size, sizeof(u32size));
uint32_t rn = htonl((uint32_t)request_number);
@ -723,20 +749,20 @@ void tf_ssb_connection_rpc_send(tf_ssb_connection_t* connection, uint8_t flags,
memcpy(combined + 1 + 2 * sizeof(uint32_t), message, size);
if (connection->ssb->verbose)
{
tf_printf(MAGENTA "%s RPC SEND" RESET " flags=%x RN=%d: [%zd B] %.*s\n", connection->name, flags, request_number, size, (flags & k_ssb_rpc_mask_type) == k_ssb_rpc_flag_binary ? 0 : (int)size, message);
tf_printf(MAGENTA "%s RPC SEND" RESET " flags=%x RN=%d: [%zd B] %.*s\n", connection->name, flags & k_ssb_rpc_mask_send, request_number, size, (flags & k_ssb_rpc_mask_type) == k_ssb_rpc_flag_binary ? 0 : (int)size, message);
}
_tf_ssb_connection_add_debug_message(connection, true, flags, request_number, message, size);
_tf_ssb_connection_add_debug_message(connection, true, flags & k_ssb_rpc_mask_send, request_number, message, size);
_tf_ssb_connection_box_stream_send(connection, combined, 1 + 2 * sizeof(uint32_t) + size);
tf_free(combined);
connection->ssb->rpc_out++;
if (request_number > 0 && callback)
if (flags & k_ssb_rpc_flag_end_error)
{
tf_ssb_connection_remove_request(connection, request_number);
}
else if (flags & k_ssb_rpc_flag_new_request)
{
tf_ssb_connection_add_request(connection, request_number, callback, cleanup, user_data, NULL);
}
else if (cleanup)
{
cleanup(connection->ssb, user_data);
}
}
void tf_ssb_connection_rpc_send_json(tf_ssb_connection_t* connection, uint8_t flags, int32_t request_number, JSValue message, tf_ssb_rpc_callback_t* callback, tf_ssb_callback_cleanup_t* cleanup, void* user_data)
@ -747,7 +773,7 @@ void tf_ssb_connection_rpc_send_json(tf_ssb_connection_t* connection, uint8_t fl
const char* json_string = JS_ToCStringLen(context, &size, json);
tf_ssb_connection_rpc_send(
connection,
k_ssb_rpc_flag_json | (flags & k_ssb_rpc_flag_stream) | (flags & k_ssb_rpc_flag_end_error),
k_ssb_rpc_flag_json | (flags & ~k_ssb_rpc_mask_type),
request_number,
(const uint8_t*)json_string,
size,
@ -1474,6 +1500,7 @@ static void _tf_ssb_connection_rpc_recv(tf_ssb_connection_t* connection, uint8_t
{
connection->ssb->rpc_in++;
_tf_ssb_connection_add_debug_message(connection, false, flags, request_number, message, size);
bool close_connection = false;
if (size == 0)
{
_tf_ssb_connection_close(connection, "rpc recv zero");
@ -1492,9 +1519,25 @@ static void _tf_ssb_connection_rpc_recv(tf_ssb_connection_t* connection, uint8_t
if (!JS_IsUndefined(val))
{
bool found = false;
if (JS_IsObject(val))
tf_ssb_rpc_callback_t* callback = NULL;
void* user_data = NULL;
if (_tf_ssb_connection_get_request_callback(connection, -request_number, &callback, &user_data))
{
if (callback)
{
char buffer[64];
snprintf(buffer, sizeof(buffer), "request %d", request_number);
tf_trace_begin(connection->ssb->trace, buffer);
PRE_CALLBACK(connection->ssb, callback);
callback(connection, flags, request_number, val, message, size, user_data);
POST_CALLBACK(connection->ssb, callback);
tf_trace_end(connection->ssb->trace);
}
}
else if (JS_IsObject(val))
{
bool found = false;
tf_ssb_connection_add_request(connection, -request_number, NULL, NULL, NULL, NULL);
for (tf_ssb_rpc_callback_node_t* it = connection->ssb->rpc; it; it = it->next)
{
if (_tf_ssb_name_equals(context, val, it->name))
@ -1508,35 +1551,19 @@ static void _tf_ssb_connection_rpc_recv(tf_ssb_connection_t* connection, uint8_t
break;
}
}
}
if (!found)
{
tf_ssb_rpc_callback_t* callback = NULL;
void* user_data = NULL;
if (_tf_ssb_connection_get_request_callback(connection, -request_number, &callback, &user_data))
{
if (callback)
{
char buffer[64];
snprintf(buffer, sizeof(buffer), "request %d", request_number);
tf_trace_begin(connection->ssb->trace, buffer);
PRE_CALLBACK(connection->ssb, callback);
callback(connection, flags, request_number, val, message, size, user_data);
POST_CALLBACK(connection->ssb, callback);
tf_trace_end(connection->ssb->trace);
}
}
else if (!_tf_ssb_name_equals(context, val, (const char*[]) { "Error", NULL }))
if (!found)
{
char buffer[256];
_tf_ssb_name_to_string(context, val, buffer, sizeof(buffer));
tf_ssb_connection_rpc_send_error_method_not_allowed(connection, flags, -request_number, buffer);
}
}
}
else
{
tf_printf("Failed to parse %.*s\n", (int)size, message);
close_connection = true;
}
JS_FreeValue(context, val);
@ -1568,9 +1595,9 @@ static void _tf_ssb_connection_rpc_recv(tf_ssb_connection_t* connection, uint8_t
}
}
if (flags & k_ssb_rpc_flag_end_error)
if (close_connection)
{
tf_ssb_connection_remove_request(connection, -request_number);
tf_ssb_connection_close(connection);
}
}

View File

@ -15,6 +15,11 @@ enum
k_ssb_rpc_flag_end_error = 0x4,
k_ssb_rpc_flag_stream = 0x8,
k_ssb_rpc_mask_message = 0xC,
k_ssb_rpc_mask_send = 0xf,
k_ssb_rpc_flag_new_request = 0x10,
k_ssb_blob_bytes_max = 5 * 1024 * 1024,
};

View File

@ -1096,7 +1096,7 @@ static JSValue _tf_ssb_createTunnel(JSContext* context, JSValueConst this_val, i
JS_SetPropertyUint32(context, args, 0, arg);
JS_SetPropertyStr(context, message, "args", args);
JS_SetPropertyStr(context, message, "type", JS_NewString(context, "duplex"));
tf_ssb_connection_rpc_send_json(connection, k_ssb_rpc_flag_stream, request_number, message, NULL, NULL, NULL);
tf_ssb_connection_rpc_send_json(connection, k_ssb_rpc_flag_stream | k_ssb_rpc_flag_new_request, request_number, message, NULL, NULL, NULL);
JS_FreeValue(context, message);
tf_ssb_connection_tunnel_create(ssb, portal_id, request_number, target_id);

View File

@ -351,7 +351,7 @@ static void _tf_ssb_rpc_tunnel_connect(tf_ssb_connection_t* connection, uint8_t
tf_ssb_connection_rpc_send_json(
target_connection,
k_ssb_rpc_flag_stream,
k_ssb_rpc_flag_stream | k_ssb_rpc_flag_new_request,
tunnel_request_number,
message,
NULL,
@ -565,7 +565,7 @@ static void _tf_ssb_rpc_connection_blobs_get(tf_ssb_connection_t* connection, co
tf_ssb_connection_rpc_send_json(
connection,
k_ssb_rpc_flag_stream,
k_ssb_rpc_flag_stream | k_ssb_rpc_flag_new_request,
tf_ssb_connection_next_request_number(connection),
message,
_tf_ssb_rpc_connection_blobs_get_callback,
@ -756,7 +756,7 @@ static void _tf_ssb_rpc_connection_tunnel_isRoom_callback(tf_ssb_connection_t* c
JS_SetPropertyStr(context, message, "args", JS_NewArray(context));
tf_ssb_connection_rpc_send_json(
connection,
k_ssb_rpc_flag_stream,
k_ssb_rpc_flag_stream | k_ssb_rpc_flag_new_request,
tf_ssb_connection_next_request_number(connection),
message,
_tf_ssb_rpc_connection_room_attendants_callback,
@ -1074,7 +1074,7 @@ static void _tf_ssb_rpc_send_ebt_replicate(tf_ssb_connection_t* connection)
int32_t request_number = tf_ssb_connection_next_request_number(connection);
tf_ssb_connection_rpc_send_json(
connection,
k_ssb_rpc_flag_stream,
k_ssb_rpc_flag_stream | k_ssb_rpc_flag_new_request,
request_number,
message,
_tf_ssb_rpc_ebt_replicate_client,
@ -1111,7 +1111,7 @@ static void _tf_ssb_rpc_connections_changed_callback(tf_ssb_t* ssb, tf_ssb_chang
JS_SetPropertyStr(context, message, "args", JS_NewArray(context));
tf_ssb_connection_rpc_send_json(
connection,
k_ssb_rpc_flag_stream,
k_ssb_rpc_flag_stream | k_ssb_rpc_flag_new_request,
tf_ssb_connection_next_request_number(connection),
message,
_tf_ssb_rpc_connection_blobs_createWants_callback,
@ -1129,7 +1129,7 @@ static void _tf_ssb_rpc_connections_changed_callback(tf_ssb_t* ssb, tf_ssb_chang
JS_SetPropertyStr(context, message, "args", JS_NewArray(context));
tf_ssb_connection_rpc_send_json(
connection,
0,
k_ssb_rpc_flag_new_request,
tf_ssb_connection_next_request_number(connection),
message,
_tf_ssb_rpc_connection_tunnel_isRoom_callback,

View File

@ -443,7 +443,7 @@ void tf_ssb_test_rooms(const tf_test_options_t* options)
tf_ssb_connection_rpc_send_json(
connections[0],
k_ssb_rpc_flag_stream,
k_ssb_rpc_flag_stream | k_ssb_rpc_flag_new_request,
tunnel_request_number,
message,
NULL,