Move the HTTP timeout into C where we can manage it better as writes are active. Fixes an accidental 45 second GET timeout from httpd.js.
git-svn-id: https://www.unprompted.com/svn/projects/tildefriends/trunk@4466 ed5197a5-7fde-0310-b194-c3ffbd925b24
This commit is contained in:
		| @@ -4,7 +4,7 @@ let gHandlers = []; | |||||||
| let gSocketHandlers = []; | let gSocketHandlers = []; | ||||||
| let gBadRequests = {}; | let gBadRequests = {}; | ||||||
|  |  | ||||||
| const kRequestTimeout = 15000; | const kRequestTimeout = 5000; | ||||||
| const kStallTimeout = 60000; | const kStallTimeout = 60000; | ||||||
|  |  | ||||||
| function logError(error) { | function logError(error) { | ||||||
| @@ -395,41 +395,10 @@ function handleConnection(client) { | |||||||
| 	let parsing_header = true; | 	let parsing_header = true; | ||||||
| 	let bodyToRead = -1; | 	let bodyToRead = -1; | ||||||
| 	let body; | 	let body; | ||||||
| 	let requestCount = -1; |  | ||||||
| 	let readCount = 0; | 	let readCount = 0; | ||||||
| 	let isWebsocket = false; | 	let isWebsocket = false; | ||||||
|  |  | ||||||
| 	function resetTimeout(requestIndex) { | 	client.setActivityTimeout(kRequestTimeout); | ||||||
| 		if (isWebsocket) { |  | ||||||
| 			return; |  | ||||||
| 		} |  | ||||||
| 		if (bodyToRead == -1) { |  | ||||||
| 			setTimeout(function() { |  | ||||||
| 				if (requestCount == requestIndex) { |  | ||||||
| 					client.info = 'timed out'; |  | ||||||
| 					if (requestCount == 0) { |  | ||||||
| 						badRequest(client, 'Timed out waiting for request.'); |  | ||||||
| 					} else { |  | ||||||
| 						client.close(); |  | ||||||
| 					} |  | ||||||
| 				} |  | ||||||
| 			}, kRequestTimeout); |  | ||||||
| 		} else { |  | ||||||
| 			let lastReadCount = readCount; |  | ||||||
| 			setTimeout(function() { |  | ||||||
| 				if (readCount == lastReadCount) { |  | ||||||
| 					client.info = 'stalled'; |  | ||||||
| 					if (requestCount == 0) { |  | ||||||
| 						badRequest(client, 'Request stalled.'); |  | ||||||
| 					} else { |  | ||||||
| 						client.close(); |  | ||||||
| 					} |  | ||||||
| 				} |  | ||||||
| 			}, kStallTimeout); |  | ||||||
| 		} |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	resetTimeout(++requestCount); |  | ||||||
|  |  | ||||||
| 	function reset() { | 	function reset() { | ||||||
| 		request = undefined; | 		request = undefined; | ||||||
| @@ -438,7 +407,7 @@ function handleConnection(client) { | |||||||
| 		bodyToRead = -1; | 		bodyToRead = -1; | ||||||
| 		body = undefined; | 		body = undefined; | ||||||
| 		client.info = 'reset'; | 		client.info = 'reset'; | ||||||
| 		resetTimeout(++requestCount); | 		client.setActivityTimeout(kRequestTimeout); | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	function finish() { | 	function finish() { | ||||||
| @@ -463,9 +432,6 @@ function handleConnection(client) { | |||||||
| 	client.read(function(data) { | 	client.read(function(data) { | ||||||
| 		readCount++; | 		readCount++; | ||||||
| 		if (data) { | 		if (data) { | ||||||
| 			if (bodyToRead != -1 && !isWebsocket) { |  | ||||||
| 				resetTimeout(requestCount); |  | ||||||
| 			} |  | ||||||
| 			let newBuffer = new Uint8Array(inputBuffer.length + data.length); | 			let newBuffer = new Uint8Array(inputBuffer.length + data.length); | ||||||
| 			newBuffer.set(inputBuffer, 0); | 			newBuffer.set(inputBuffer, 0); | ||||||
| 			newBuffer.set(data, inputBuffer.length); | 			newBuffer.set(data, inputBuffer.length); | ||||||
| @@ -483,6 +449,7 @@ function handleConnection(client) { | |||||||
| 							return; | 							return; | ||||||
| 						} | 						} | ||||||
| 					} else if (typeof result === 'object')  { | 					} else if (typeof result === 'object')  { | ||||||
|  | 						client.setActivityTimeout(kStallTimeout); | ||||||
| 						request = [ | 						request = [ | ||||||
| 							result.method, | 							result.method, | ||||||
| 							result.path, | 							result.path, | ||||||
| @@ -509,7 +476,6 @@ function handleConnection(client) { | |||||||
| 							} | 							} | ||||||
| 							body = new Uint8Array(bodyToRead); | 							body = new Uint8Array(bodyToRead); | ||||||
| 							client.info = 'waiting for body'; | 							client.info = 'waiting for body'; | ||||||
| 							resetTimeout(requestCount); |  | ||||||
| 						} else if (headers["connection"] | 						} else if (headers["connection"] | ||||||
| 							&& headers["connection"].toLowerCase().split(",").map(x => x.trim()).indexOf("upgrade") != -1 | 							&& headers["connection"].toLowerCase().split(",").map(x => x.trim()).indexOf("upgrade") != -1 | ||||||
| 							&& headers["upgrade"] | 							&& headers["upgrade"] | ||||||
| @@ -520,7 +486,7 @@ function handleConnection(client) { | |||||||
| 							let response = new Response(requestObject, client); | 							let response = new Response(requestObject, client); | ||||||
| 							handleWebSocketRequest(requestObject, response, client); | 							handleWebSocketRequest(requestObject, response, client); | ||||||
| 							/* Prevent the timeout from disconnecting us. */ | 							/* Prevent the timeout from disconnecting us. */ | ||||||
| 							requestCount++; | 							client.setActivityTimeout(); | ||||||
| 						} else { | 						} else { | ||||||
| 							finish(); | 							finish(); | ||||||
| 						} | 						} | ||||||
|   | |||||||
| @@ -30,6 +30,7 @@ typedef enum _socket_direction_t { | |||||||
| typedef struct _socket_t { | typedef struct _socket_t { | ||||||
| 	tf_task_t* _task; | 	tf_task_t* _task; | ||||||
| 	uv_tcp_t _socket; | 	uv_tcp_t _socket; | ||||||
|  | 	uv_timer_t _timer; | ||||||
| 	tf_tls_session_t* _tls; | 	tf_tls_session_t* _tls; | ||||||
| 	promiseid_t _startTlsPromise; | 	promiseid_t _startTlsPromise; | ||||||
| 	promiseid_t _closePromise; | 	promiseid_t _closePromise; | ||||||
| @@ -37,6 +38,7 @@ typedef struct _socket_t { | |||||||
| 	bool _noDelay; | 	bool _noDelay; | ||||||
| 	bool _reading; | 	bool _reading; | ||||||
| 	bool _listening; | 	bool _listening; | ||||||
|  | 	int _active; | ||||||
| 	char _peerName[256]; | 	char _peerName[256]; | ||||||
| 	socket_direction_t _direction; | 	socket_direction_t _direction; | ||||||
| 	JSValue _object; | 	JSValue _object; | ||||||
| @@ -44,6 +46,7 @@ typedef struct _socket_t { | |||||||
| 	JSValue _onRead; | 	JSValue _onRead; | ||||||
| 	JSValue _onError; | 	JSValue _onError; | ||||||
| 	uint64_t created_ms; | 	uint64_t created_ms; | ||||||
|  | 	uint64_t timeout_ms; | ||||||
| } socket_t; | } socket_t; | ||||||
|  |  | ||||||
| static JSValue _socket_create(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv); | static JSValue _socket_create(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv); | ||||||
| @@ -66,6 +69,7 @@ static JSValue _socket_getPeerCertificate(JSContext* context, JSValueConst this_ | |||||||
| static JSValue _socket_getNoDelay(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv); | static JSValue _socket_getNoDelay(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv); | ||||||
| static JSValue _socket_setNoDelay(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv); | static JSValue _socket_setNoDelay(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv); | ||||||
| static JSValue _sockets_get(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv); | static JSValue _sockets_get(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv); | ||||||
|  | static JSValue _socket_setActivityTimeout(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv); | ||||||
|  |  | ||||||
| static void _socket_onClose(uv_handle_t* handle); | static void _socket_onClose(uv_handle_t* handle); | ||||||
| static void _socket_onShutdown(uv_shutdown_t* request, int status); | static void _socket_onShutdown(uv_shutdown_t* request, int status); | ||||||
| @@ -77,6 +81,9 @@ static void _socket_allocateBuffer(uv_handle_t* handle, size_t suggestedSize, uv | |||||||
| static void _socket_onRead(uv_stream_t* stream, ssize_t readSize, const uv_buf_t* buffer); | static void _socket_onRead(uv_stream_t* stream, ssize_t readSize, const uv_buf_t* buffer); | ||||||
| static void _socket_onWrite(uv_write_t* request, int status); | static void _socket_onWrite(uv_write_t* request, int status); | ||||||
| static void _socket_onTlsShutdown(uv_write_t* request, int status); | static void _socket_onTlsShutdown(uv_write_t* request, int status); | ||||||
|  | static void _socket_resetTimeout(socket_t* socket); | ||||||
|  | static void _socket_pauseTimeout(socket_t* socket); | ||||||
|  | static void _socket_resumeTimeout(socket_t* socket); | ||||||
|  |  | ||||||
| static void _socket_notifyDataRead(socket_t* socket, const char* data, size_t length); | static void _socket_notifyDataRead(socket_t* socket, const char* data, size_t length); | ||||||
| static int _socket_writeBytes(socket_t* socket, promiseid_t promise, int (*callback)(socket_t* socket, promiseid_t promise, const char*, size_t), JSValue value, int* outLength); | static int _socket_writeBytes(socket_t* socket, promiseid_t promise, int (*callback)(socket_t* socket, promiseid_t promise, const char*, size_t), JSValue value, int* outLength); | ||||||
| @@ -188,6 +195,7 @@ socket_t* _socket_create_internal(JSContext* context) | |||||||
| 	JS_SetPropertyStr(context, object, "read", JS_NewCFunction(context, _socket_read, "read", 0)); | 	JS_SetPropertyStr(context, object, "read", JS_NewCFunction(context, _socket_read, "read", 0)); | ||||||
| 	JS_SetPropertyStr(context, object, "onError", JS_NewCFunction(context, _socket_onError, "onError", 1)); | 	JS_SetPropertyStr(context, object, "onError", JS_NewCFunction(context, _socket_onError, "onError", 1)); | ||||||
| 	JS_SetPropertyStr(context, object, "write", JS_NewCFunction(context, _socket_write, "write", 1)); | 	JS_SetPropertyStr(context, object, "write", JS_NewCFunction(context, _socket_write, "write", 1)); | ||||||
|  | 	JS_SetPropertyStr(context, object, "setActivityTimeout", JS_NewCFunction(context, _socket_setActivityTimeout, "setActivityTimeout", 1)); | ||||||
|  |  | ||||||
| 	JSAtom atom = JS_NewAtom(context, "isConnected"); | 	JSAtom atom = JS_NewAtom(context, "isConnected"); | ||||||
| 	JS_DefinePropertyGetSet(context, object, atom, JS_NewCFunction(context, _socket_isConnected, "isConnected", 0), JS_NULL, 0); | 	JS_DefinePropertyGetSet(context, object, atom, JS_NewCFunction(context, _socket_isConnected, "isConnected", 0), JS_NULL, 0); | ||||||
| @@ -210,6 +218,8 @@ socket_t* _socket_create_internal(JSContext* context) | |||||||
| 	++_open_count; | 	++_open_count; | ||||||
| 	uv_tcp_init(tf_task_get_loop(socket->_task), &socket->_socket); | 	uv_tcp_init(tf_task_get_loop(socket->_task), &socket->_socket); | ||||||
| 	socket->_socket.data = socket; | 	socket->_socket.data = socket; | ||||||
|  | 	uv_timer_init(tf_task_get_loop(socket->_task), &socket->_timer); | ||||||
|  | 	socket->_timer.data = socket; | ||||||
|  |  | ||||||
| 	return socket; | 	return socket; | ||||||
| } | } | ||||||
| @@ -234,8 +244,14 @@ void _socket_close_internal(socket_t* socket) | |||||||
| 	{ | 	{ | ||||||
| 		uv_close((uv_handle_t*)&socket->_socket, _socket_onClose); | 		uv_close((uv_handle_t*)&socket->_socket, _socket_onClose); | ||||||
| 	} | 	} | ||||||
|  | 	if (socket->_timer.data && | ||||||
|  | 		!uv_is_closing((uv_handle_t*)&socket->_timer)) | ||||||
|  | 	{ | ||||||
|  | 		uv_close((uv_handle_t*)&socket->_timer, _socket_onClose); | ||||||
|  | 	} | ||||||
|  |  | ||||||
| 	if (!socket->_socket.data && | 	if (!socket->_socket.data && | ||||||
|  | 		!socket->_timer.data && | ||||||
| 		JS_IsUndefined(socket->_object)) | 		JS_IsUndefined(socket->_object)) | ||||||
| 	{ | 	{ | ||||||
| 		--_count; | 		--_count; | ||||||
| @@ -381,6 +397,7 @@ bool _socket_processSomeOutgoingTls(socket_t* socket, promiseid_t promise, uv_wr | |||||||
| 			.len = result, | 			.len = result, | ||||||
| 		}; | 		}; | ||||||
|  |  | ||||||
|  | 		_socket_pauseTimeout(socket); | ||||||
| 		int writeResult = uv_write(request, (uv_stream_t*)&socket->_socket, &writeBuffer, 1, callback); | 		int writeResult = uv_write(request, (uv_stream_t*)&socket->_socket, &writeBuffer, 1, callback); | ||||||
| 		if (writeResult != 0) | 		if (writeResult != 0) | ||||||
| 		{ | 		{ | ||||||
| @@ -706,6 +723,7 @@ JSValue _socket_read(JSContext* context, JSValueConst this_val, int argc, JSValu | |||||||
| 	JSValue read_result = tf_task_allocate_promise(socket->_task, &promise); | 	JSValue read_result = tf_task_allocate_promise(socket->_task, &promise); | ||||||
| 	if (!socket->_reading && socket->_socket.data) | 	if (!socket->_reading && socket->_socket.data) | ||||||
| 	{ | 	{ | ||||||
|  | 		_socket_resetTimeout(socket); | ||||||
| 		int result = uv_read_start((uv_stream_t*)&socket->_socket, _socket_allocateBuffer, _socket_onRead); | 		int result = uv_read_start((uv_stream_t*)&socket->_socket, _socket_allocateBuffer, _socket_onRead); | ||||||
| 		if (result != 0) | 		if (result != 0) | ||||||
| 		{ | 		{ | ||||||
| @@ -733,6 +751,7 @@ void _socket_allocateBuffer(uv_handle_t* handle, size_t suggestedSize, uv_buf_t* | |||||||
| void _socket_onRead(uv_stream_t* stream, ssize_t readSize, const uv_buf_t* buffer) | void _socket_onRead(uv_stream_t* stream, ssize_t readSize, const uv_buf_t* buffer) | ||||||
| { | { | ||||||
| 	socket_t* socket = stream->data; | 	socket_t* socket = stream->data; | ||||||
|  | 	_socket_resetTimeout(socket); | ||||||
| 	JSContext* context = tf_task_get_context(socket->_task); | 	JSContext* context = tf_task_get_context(socket->_task); | ||||||
| 	JSValue ref = JS_DupValue(context, socket->_object); | 	JSValue ref = JS_DupValue(context, socket->_object); | ||||||
| 	if (readSize <= 0) | 	if (readSize <= 0) | ||||||
| @@ -907,6 +926,7 @@ int _socket_writeInternal(socket_t* socket, promiseid_t promise, const char* dat | |||||||
| 	}; | 	}; | ||||||
|  |  | ||||||
| 	request->data = (void*)(intptr_t)promise; | 	request->data = (void*)(intptr_t)promise; | ||||||
|  | 	_socket_pauseTimeout(socket); | ||||||
| 	int result = uv_write(request, (uv_stream_t*)&socket->_socket, &buffer, 1, _socket_onWrite); | 	int result = uv_write(request, (uv_stream_t*)&socket->_socket, &buffer, 1, _socket_onWrite); | ||||||
| 	if (result != 0) | 	if (result != 0) | ||||||
| 	{ | 	{ | ||||||
| @@ -970,6 +990,7 @@ JSValue _socket_write(JSContext* context, JSValueConst this_val, int argc, JSVal | |||||||
| void _socket_onWrite(uv_write_t* request, int status) | void _socket_onWrite(uv_write_t* request, int status) | ||||||
| { | { | ||||||
| 	socket_t* socket = request->handle->data; | 	socket_t* socket = request->handle->data; | ||||||
|  | 	_socket_resumeTimeout(socket); | ||||||
| 	promiseid_t promise = (intptr_t)request->data; | 	promiseid_t promise = (intptr_t)request->data; | ||||||
| 	if (promise != -1) | 	if (promise != -1) | ||||||
| 	{ | 	{ | ||||||
| @@ -985,6 +1006,26 @@ void _socket_onWrite(uv_write_t* request, int status) | |||||||
| 	tf_free(request); | 	tf_free(request); | ||||||
| } | } | ||||||
|  |  | ||||||
|  | static void _socket_timeout(uv_timer_t* timer) | ||||||
|  | { | ||||||
|  | 	socket_t* socket = timer->data; | ||||||
|  | 	_socket_close_internal(socket); | ||||||
|  | } | ||||||
|  |  | ||||||
|  | JSValue _socket_setActivityTimeout(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv) | ||||||
|  | { | ||||||
|  | 	socket_t* socket = JS_GetOpaque(this_val, _classId); | ||||||
|  | 	int64_t timeout = 0; | ||||||
|  | 	if (JS_ToInt64(context, &timeout, argv[0]) == 0 && | ||||||
|  | 		socket->timeout_ms > 0) { | ||||||
|  | 		socket->timeout_ms = timeout; | ||||||
|  | 		uv_timer_start(&socket->_timer, _socket_timeout, socket->timeout_ms, 0); | ||||||
|  | 	} else { | ||||||
|  | 		uv_timer_stop(&socket->_timer); | ||||||
|  | 	} | ||||||
|  | 	return JS_UNDEFINED; | ||||||
|  | } | ||||||
|  |  | ||||||
| JSValue _socket_isConnected(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv) | JSValue _socket_isConnected(JSContext* context, JSValueConst this_val, int argc, JSValueConst* argv) | ||||||
| { | { | ||||||
| 	socket_t* socket = JS_GetOpaque(this_val, _classId); | 	socket_t* socket = JS_GetOpaque(this_val, _classId); | ||||||
| @@ -1016,6 +1057,7 @@ void _socket_onClose(uv_handle_t* handle) | |||||||
| void _socket_onShutdown(uv_shutdown_t* request, int status) | void _socket_onShutdown(uv_shutdown_t* request, int status) | ||||||
| { | { | ||||||
| 	socket_t* socket = request->handle->data; | 	socket_t* socket = request->handle->data; | ||||||
|  | 	_socket_resetTimeout(socket); | ||||||
| 	promiseid_t promise = (intptr_t)request->data; | 	promiseid_t promise = (intptr_t)request->data; | ||||||
| 	if (status == 0) | 	if (status == 0) | ||||||
| 	{ | 	{ | ||||||
| @@ -1102,3 +1144,27 @@ JSValue _sockets_get(JSContext* context, JSValueConst this_val, int argc, JSValu | |||||||
| 	} | 	} | ||||||
| 	return array; | 	return array; | ||||||
| } | } | ||||||
|  |  | ||||||
|  | static void _socket_resetTimeout(socket_t* socket) | ||||||
|  | { | ||||||
|  | 	if (socket->timeout_ms && socket->_active == 0) | ||||||
|  | 	{ | ||||||
|  | 		uv_timer_start(&socket->_timer, _socket_timeout, socket->timeout_ms, 0); | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | static void _socket_pauseTimeout(socket_t* socket) | ||||||
|  | { | ||||||
|  | 	if (socket->_active++ == 1) | ||||||
|  | 	{ | ||||||
|  | 		uv_timer_stop(&socket->_timer); | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | static void _socket_resumeTimeout(socket_t* socket) | ||||||
|  | { | ||||||
|  | 	if (--socket->_active == 0 && socket->timeout_ms > 0) | ||||||
|  | 	{ | ||||||
|  | 		uv_timer_start(&socket->_timer, _socket_timeout, socket->timeout_ms, 0); | ||||||
|  | 	} | ||||||
|  | } | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user