#include "packetstream.h" #include "log.h" #include "mem.h" #include "uv.h" #include #include typedef struct _tf_packetstream_t { tf_packetstream_onreceive_t* onreceive; void* onreceive_user_data; tf_packetstream_on_close_t* on_close; void* on_close_user_data; uv_pipe_t stream; char* buffer; size_t buffer_size; size_t buffer_capacity; bool destroyed; } tf_packetstream_t; tf_packetstream_t* tf_packetstream_create() { tf_packetstream_t* impl = tf_malloc(sizeof(tf_packetstream_t)); *impl = (tf_packetstream_t) { 0 }; return impl; } void tf_packetstream_destroy(tf_packetstream_t* stream) { stream->onreceive = NULL; stream->onreceive_user_data = NULL; stream->on_close = NULL; stream->on_close_user_data = NULL; stream->destroyed = true; if (stream->buffer) { tf_free(stream->buffer); stream->buffer = NULL; stream->buffer_size = 0; } if (stream->stream.data) { tf_packetstream_close(stream); } else { tf_free(stream); } } static void _packetstream_allocate(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buffer) { buffer->base = tf_malloc(suggested_size); buffer->len = suggested_size; } static void _packetstream_process_messages(tf_packetstream_t* stream) { int packet_type = 0; size_t length = 0; while (stream->buffer_size >= sizeof(packet_type) + sizeof(length)) { memcpy(&packet_type, stream->buffer, sizeof(packet_type)); memcpy(&length, stream->buffer + sizeof(packet_type), sizeof(length)); if (stream->buffer_size >= sizeof(packet_type) + sizeof(length) + length) { if (stream->onreceive) { stream->onreceive(packet_type, stream->buffer + sizeof(length) + sizeof(packet_type), length, stream->onreceive_user_data); } size_t consumed = sizeof(length) + sizeof(packet_type) + length; memmove(stream->buffer, stream->buffer + consumed, stream->buffer_size - consumed); stream->buffer_size -= consumed; } else { break; } } } static void _packetstream_on_read(uv_stream_t* handle, ssize_t count, const uv_buf_t* buffer) { tf_packetstream_t* stream = handle->data; if (count >= 0) { if (count > 0) { char* write_buffer = stream->buffer; if (stream->buffer_size + count > stream->buffer_capacity) { if (!stream->buffer_capacity) { stream->buffer_capacity = 256; } while (stream->buffer_capacity < stream->buffer_size + count) { stream->buffer_capacity *= 2; } write_buffer = tf_realloc(write_buffer, stream->buffer_capacity); } if (write_buffer) { memcpy(write_buffer + stream->buffer_size, buffer->base, count); stream->buffer = write_buffer; stream->buffer_size += count; } _packetstream_process_messages(stream); } } else { tf_packetstream_on_close_t* on_close = stream->on_close; void* user_data = stream->on_close_user_data; stream->on_close = NULL; stream->on_close_user_data = NULL; if (on_close) { on_close(user_data); } tf_packetstream_close(stream); } tf_free(buffer->base); } void tf_packetstream_start(tf_packetstream_t* stream) { stream->stream.data = stream; int result = uv_read_start((uv_stream_t*)&stream->stream, _packetstream_allocate, _packetstream_on_read); if (result) { tf_printf("uv_read_start: %s\n", uv_strerror(result)); } } static void _packetstream_on_write(uv_write_t* request, int status) { tf_free(request); } void tf_packetstream_send(tf_packetstream_t* stream, int packet_type, const char* begin, size_t length) { if (stream) { size_t buffer_length = sizeof(uv_write_t) + sizeof(packet_type) + sizeof(length) + length; uv_write_t* request = tf_malloc(buffer_length); memset(request, 0, sizeof(uv_write_t)); char* buffer = (char*)(request + 1); memcpy(buffer, &packet_type, sizeof(packet_type)); memcpy(buffer + sizeof(packet_type), &length, sizeof(length)); if (length) { memcpy(buffer + sizeof(packet_type) + sizeof(length), begin, length); } uv_buf_t write_buffer; write_buffer.base = buffer; write_buffer.len = sizeof(packet_type) + sizeof(length) + length; int result = uv_write(request, (uv_stream_t*)&stream->stream, &write_buffer, 1, _packetstream_on_write); if (result) { tf_printf("tf_packetstream_send: uv_write: %s\n", uv_strerror(result)); tf_free(request); } } } void tf_packetstream_set_on_receive(tf_packetstream_t* stream, tf_packetstream_onreceive_t* callback, void* user_data) { stream->onreceive = callback; stream->onreceive_user_data = user_data; } void tf_packetstream_set_on_close(tf_packetstream_t* stream, tf_packetstream_on_close_t* callback, void* user_data) { stream->on_close = callback; stream->on_close_user_data = user_data; } static void _tf_packetstream_handle_closed(uv_handle_t* handle) { tf_packetstream_t* packetstream = handle->data; handle->data = NULL; if (packetstream->buffer) { tf_free(packetstream->buffer); packetstream->buffer = NULL; } if (packetstream->destroyed) { tf_free(packetstream); } } void tf_packetstream_close(tf_packetstream_t* stream) { if (stream->stream.data && !uv_is_closing((uv_handle_t*)&stream->stream)) { uv_close((uv_handle_t*)&stream->stream, _tf_packetstream_handle_closed); } } uv_pipe_t* tf_packetstream_get_pipe(tf_packetstream_t* stream) { return &stream->stream; }