ssb: Tweak idle scheduling even more, still.  Fixes -t=bench.
	
		
			
	
		
	
	
		
	
		
			All checks were successful
		
		
	
	
		
			
				
	
				Build Tilde Friends / Build-All (push) Successful in 22m36s
				
			
		
		
	
	
				
					
				
			
		
			All checks were successful
		
		
	
	Build Tilde Friends / Build-All (push) Successful in 22m36s
				
			This commit is contained in:
		
							
								
								
									
										27
									
								
								src/ssb.c
									
									
									
									
									
								
							
							
						
						
									
										27
									
								
								src/ssb.c
									
									
									
									
									
								
							| @@ -17,6 +17,7 @@ | |||||||
| #include "sodium/crypto_scalarmult_curve25519.h" | #include "sodium/crypto_scalarmult_curve25519.h" | ||||||
| #include "sodium/crypto_secretbox.h" | #include "sodium/crypto_secretbox.h" | ||||||
| #include "sodium/crypto_sign.h" | #include "sodium/crypto_sign.h" | ||||||
|  | #include "sodium/randombytes.h" | ||||||
| #include "sodium/utils.h" | #include "sodium/utils.h" | ||||||
| #include "sqlite3.h" | #include "sqlite3.h" | ||||||
| #include "uv.h" | #include "uv.h" | ||||||
| @@ -261,7 +262,7 @@ typedef struct _tf_ssb_connection_message_request_t | |||||||
|  |  | ||||||
| typedef struct _tf_ssb_connection_scheduled_t | typedef struct _tf_ssb_connection_scheduled_t | ||||||
| { | { | ||||||
| 	char key[k_id_base64_len]; | 	char key[128]; | ||||||
| 	tf_ssb_scheduled_callback_t* callback; | 	tf_ssb_scheduled_callback_t* callback; | ||||||
| 	void* user_data; | 	void* user_data; | ||||||
| } tf_ssb_connection_scheduled_t; | } tf_ssb_connection_scheduled_t; | ||||||
| @@ -323,6 +324,7 @@ typedef struct _tf_ssb_connection_t | |||||||
| 	uint8_t secretbox_buf[k_tf_ssb_rpc_message_body_length_max]; | 	uint8_t secretbox_buf[k_tf_ssb_rpc_message_body_length_max]; | ||||||
|  |  | ||||||
| 	uint32_t send_request_number; | 	uint32_t send_request_number; | ||||||
|  | 	uint64_t prng; | ||||||
|  |  | ||||||
| 	tf_ssb_connection_t* next; | 	tf_ssb_connection_t* next; | ||||||
| 	tf_ssb_request_t* requests; | 	tf_ssb_request_t* requests; | ||||||
| @@ -592,17 +594,26 @@ static void _tf_ssb_connection_scheduled_async(uv_async_t* async) | |||||||
| 	_tf_ssb_connection_dispatch_scheduled(connection); | 	_tf_ssb_connection_dispatch_scheduled(connection); | ||||||
| } | } | ||||||
|  |  | ||||||
|  | static uint32_t _tf_ssb_connection_prng(tf_ssb_connection_t* connection) | ||||||
|  | { | ||||||
|  | 	uint64_t seed = connection->prng * 6364136223846793005ULL + 1442695040888963407ULL; | ||||||
|  | 	uint32_t result = (uint32_t)((seed ^ (seed >> 22)) >> (22 + (seed >> 61))); | ||||||
|  | 	connection->prng = seed; | ||||||
|  | 	return result; | ||||||
|  | } | ||||||
|  |  | ||||||
| static void _tf_ssb_connection_dispatch_scheduled(tf_ssb_connection_t* connection) | static void _tf_ssb_connection_dispatch_scheduled(tf_ssb_connection_t* connection) | ||||||
| { | { | ||||||
| 	while (((connection->active_write_count == 0 && connection->read_back_pressure == 0) || connection->closing) && connection->scheduled_count && connection->scheduled) | 	while (((connection->active_write_count == 0 && connection->read_back_pressure == 0) || connection->closing) && connection->scheduled_count && connection->scheduled) | ||||||
| 	{ | 	{ | ||||||
| 		int index = uv_hrtime() % connection->scheduled_count; | 		int index = _tf_ssb_connection_prng(connection) % connection->scheduled_count; | ||||||
| 		tf_ssb_connection_scheduled_t scheduled = connection->scheduled[index]; | 		tf_ssb_connection_scheduled_t scheduled = connection->scheduled[index]; | ||||||
| 		if (index != connection->scheduled_count - 1) | 		if (index != connection->scheduled_count - 1) | ||||||
| 		{ | 		{ | ||||||
| 			memmove(connection->scheduled + index, connection->scheduled + index + 1, sizeof(tf_ssb_connection_scheduled_t) * (connection->scheduled_count - index - 1)); | 			memmove(connection->scheduled + index, connection->scheduled + index + 1, sizeof(tf_ssb_connection_scheduled_t) * (connection->scheduled_count - index - 1)); | ||||||
| 		} | 		} | ||||||
| 		connection->scheduled_count--; | 		connection->scheduled_count--; | ||||||
|  |  | ||||||
| 		tf_trace_begin(connection->ssb->trace, "scheduled callback"); | 		tf_trace_begin(connection->ssb->trace, "scheduled callback"); | ||||||
| 		PRE_CALLBACK(connection->ssb, scheduled.callback); | 		PRE_CALLBACK(connection->ssb, scheduled.callback); | ||||||
| 		scheduled.callback(connection, false, scheduled.user_data); | 		scheduled.callback(connection, false, scheduled.user_data); | ||||||
| @@ -623,14 +634,11 @@ void tf_ssb_connection_schedule_idle(tf_ssb_connection_t* connection, const char | |||||||
| 	int index = tf_util_insert_index(key, connection->scheduled, connection->scheduled_count, sizeof(tf_ssb_connection_scheduled_t), _tf_ssb_connection_scheduled_compare); | 	int index = tf_util_insert_index(key, connection->scheduled, connection->scheduled_count, sizeof(tf_ssb_connection_scheduled_t), _tf_ssb_connection_scheduled_compare); | ||||||
| 	if (index != connection->scheduled_count && strcmp(key, connection->scheduled[index].key) == 0) | 	if (index != connection->scheduled_count && strcmp(key, connection->scheduled[index].key) == 0) | ||||||
| 	{ | 	{ | ||||||
| 		tf_ssb_connection_scheduled_t scheduled = connection->scheduled[index]; | 		/* Keep the old request.  Skip the new request. */ | ||||||
| 		connection->scheduled[index].callback = callback; |  | ||||||
| 		connection->scheduled[index].user_data = user_data; |  | ||||||
|  |  | ||||||
| 		tf_trace_begin(connection->ssb->trace, "scheduled callback (skip)"); | 		tf_trace_begin(connection->ssb->trace, "scheduled callback (skip)"); | ||||||
| 		PRE_CALLBACK(connection->ssb, scheduled.callback); | 		PRE_CALLBACK(connection->ssb, callback); | ||||||
| 		scheduled.callback(connection, true, scheduled.user_data); | 		callback(connection, true, user_data); | ||||||
| 		POST_CALLBACK(connection->ssb, scheduled.callback); | 		POST_CALLBACK(connection->ssb, callback); | ||||||
| 		tf_trace_end(connection->ssb->trace); | 		tf_trace_end(connection->ssb->trace); | ||||||
| 	} | 	} | ||||||
| 	else | 	else | ||||||
| @@ -2728,6 +2736,7 @@ static tf_ssb_connection_t* _tf_ssb_connection_create_internal(tf_ssb_t* ssb, co | |||||||
| 	snprintf(connection->name, sizeof(connection->name), "%s%d", name, index); | 	snprintf(connection->name, sizeof(connection->name), "%s%d", name, index); | ||||||
| 	connection->ssb = ssb; | 	connection->ssb = ssb; | ||||||
| 	connection->send_request_number = 1; | 	connection->send_request_number = 1; | ||||||
|  | 	randombytes_buf(&connection->prng, sizeof(connection->prng)); | ||||||
|  |  | ||||||
| 	connection->async.data = connection; | 	connection->async.data = connection; | ||||||
| 	uv_async_init(ssb->loop, &connection->async, _tf_ssb_connection_process_message_async); | 	uv_async_init(ssb->loop, &connection->async, _tf_ssb_connection_process_message_async); | ||||||
|   | |||||||
| @@ -807,7 +807,7 @@ typedef void(tf_ssb_scheduled_callback_t)(tf_ssb_connection_t* connection, bool | |||||||
| /** | /** | ||||||
| ** Schedule work to be run when the connection is next idle. | ** Schedule work to be run when the connection is next idle. | ||||||
| ** @param connection The owning connection. | ** @param connection The owning connection. | ||||||
| ** @param key A key identifying the work.  If work by the same key already exists, it will be replaced. | ** @param key A key identifying the work.  If work by the same key already exists, the new request will be discarded. | ||||||
| ** @param callback The callback to call. | ** @param callback The callback to call. | ||||||
| ** @param user_data User data to pass to the callback. | ** @param user_data User data to pass to the callback. | ||||||
| */ | */ | ||||||
|   | |||||||
| @@ -908,7 +908,7 @@ static void _tf_ssb_connection_send_history_stream_callback(tf_ssb_connection_t* | |||||||
| static void _tf_ssb_connection_send_history_stream( | static void _tf_ssb_connection_send_history_stream( | ||||||
| 	tf_ssb_connection_t* connection, int32_t request_number, const char* author, int64_t sequence, bool keys, bool live, bool end_request) | 	tf_ssb_connection_t* connection, int32_t request_number, const char* author, int64_t sequence, bool keys, bool live, bool end_request) | ||||||
| { | { | ||||||
| 	if (!tf_ssb_is_shutting_down(tf_ssb_connection_get_ssb(connection))) | 	if (tf_ssb_connection_is_connected(connection) && !tf_ssb_is_shutting_down(tf_ssb_connection_get_ssb(connection))) | ||||||
| 	{ | 	{ | ||||||
| 		tf_ssb_connection_send_history_stream_t* async = tf_malloc(sizeof(tf_ssb_connection_send_history_stream_t)); | 		tf_ssb_connection_send_history_stream_t* async = tf_malloc(sizeof(tf_ssb_connection_send_history_stream_t)); | ||||||
| 		*async = (tf_ssb_connection_send_history_stream_t) { | 		*async = (tf_ssb_connection_send_history_stream_t) { | ||||||
| @@ -919,7 +919,9 @@ static void _tf_ssb_connection_send_history_stream( | |||||||
| 			.end_request = end_request, | 			.end_request = end_request, | ||||||
| 		}; | 		}; | ||||||
| 		snprintf(async->author, sizeof(async->author), "%s", author); | 		snprintf(async->author, sizeof(async->author), "%s", author); | ||||||
| 		tf_ssb_connection_schedule_idle(connection, author, _tf_ssb_connection_send_history_stream_callback, async); | 		char key[128]; | ||||||
|  | 		snprintf(key, sizeof(key), "%s:%" PRId64, author, sequence); | ||||||
|  | 		tf_ssb_connection_schedule_idle(connection, key, _tf_ssb_connection_send_history_stream_callback, async); | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user