127 lines
3.9 KiB
C
127 lines
3.9 KiB
C
|
#include "packetstream.h"
|
||
|
|
||
|
#include <uv.h>
|
||
|
|
||
|
#include <malloc.h>
|
||
|
#include <stdbool.h>
|
||
|
#include <string.h>
|
||
|
|
||
|
typedef struct _tf_packetstream_t {
|
||
|
tf_packetstream_onreceive_t* onreceive;
|
||
|
void* onreceive_user_data;
|
||
|
uv_pipe_t stream;
|
||
|
char* buffer;
|
||
|
size_t buffer_size;
|
||
|
bool destroyed;
|
||
|
} tf_packetstream_t;
|
||
|
|
||
|
tf_packetstream_t* tf_packetstream_create() {
|
||
|
tf_packetstream_t* impl = malloc(sizeof(tf_packetstream_t));
|
||
|
*impl = (tf_packetstream_t) { 0 };
|
||
|
return impl;
|
||
|
}
|
||
|
|
||
|
void tf_packetstream_destroy(tf_packetstream_t* stream) {
|
||
|
stream->onreceive = NULL;
|
||
|
stream->onreceive_user_data = NULL;
|
||
|
stream->destroyed = true;
|
||
|
if (stream->stream.data) {
|
||
|
tf_packetstream_close(stream);
|
||
|
} else {
|
||
|
free(stream);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
static void _packetstream_allocate(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buffer) {
|
||
|
buffer->base = malloc(suggested_size);
|
||
|
buffer->len = suggested_size;
|
||
|
}
|
||
|
|
||
|
static void _packetstream_process_messages(tf_packetstream_t* stream) {
|
||
|
int packet_type = 0;
|
||
|
size_t length = 0;
|
||
|
while (stream->buffer_size >= sizeof(packet_type) + sizeof(length)) {
|
||
|
memcpy(&packet_type, stream->buffer, sizeof(packet_type));
|
||
|
memcpy(&length, stream->buffer + sizeof(packet_type), sizeof(length));
|
||
|
|
||
|
if (stream->buffer_size >= sizeof(packet_type) + sizeof(length) + length) {
|
||
|
if (stream->onreceive) {
|
||
|
stream->onreceive(packet_type, stream->buffer + sizeof(length) + sizeof(packet_type), length, stream->onreceive_user_data);
|
||
|
}
|
||
|
size_t consumed = sizeof(length) + sizeof(packet_type) + length;
|
||
|
memmove(stream->buffer, stream->buffer + consumed, stream->buffer_size - consumed);
|
||
|
stream->buffer_size -= consumed;
|
||
|
stream->buffer = realloc(stream->buffer, stream->buffer_size);
|
||
|
} else {
|
||
|
break;
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
static void _packetstream_on_read(uv_stream_t* handle, ssize_t count, const uv_buf_t* buffer) {
|
||
|
tf_packetstream_t* stream = handle->data;
|
||
|
if (count >= 0) {
|
||
|
if (count > 0) {
|
||
|
char* new_buffer = realloc(stream->buffer, stream->buffer_size + count);
|
||
|
if (new_buffer) {
|
||
|
memcpy(new_buffer + stream->buffer_size, buffer->base, count);
|
||
|
stream->buffer = new_buffer;
|
||
|
stream->buffer_size += count;
|
||
|
}
|
||
|
_packetstream_process_messages(stream);
|
||
|
}
|
||
|
} else {
|
||
|
tf_packetstream_close(stream);
|
||
|
}
|
||
|
free(buffer->base);
|
||
|
}
|
||
|
|
||
|
void tf_packetstream_start(tf_packetstream_t* stream) {
|
||
|
stream->stream.data = stream;
|
||
|
uv_read_start((uv_stream_t*)&stream->stream, _packetstream_allocate, _packetstream_on_read);
|
||
|
}
|
||
|
|
||
|
static void _packetstream_on_write(uv_write_t* request, int status) {
|
||
|
free(request);
|
||
|
}
|
||
|
|
||
|
void tf_packetstream_send(tf_packetstream_t* stream, int packet_type, char* begin, size_t length) {
|
||
|
size_t buffer_length = sizeof(uv_write_t) + sizeof(packet_type) + sizeof(length) + length;
|
||
|
uv_write_t* request = malloc(buffer_length);
|
||
|
memset(request, 0, sizeof(uv_write_t));
|
||
|
char* buffer = (char*)(request + 1);
|
||
|
memcpy(buffer, &packet_type, sizeof(packet_type));
|
||
|
memcpy(buffer + sizeof(packet_type), &length, sizeof(length));
|
||
|
if (length) {
|
||
|
memcpy(buffer + sizeof(packet_type) + sizeof(length), begin, length);
|
||
|
}
|
||
|
uv_buf_t write_buffer;
|
||
|
write_buffer.base = buffer;
|
||
|
write_buffer.len = sizeof(packet_type) + sizeof(length) + length;
|
||
|
uv_write(request, (uv_stream_t*)&stream->stream, &write_buffer, 1, _packetstream_on_write);
|
||
|
}
|
||
|
|
||
|
void tf_packetstream_set_on_receive(tf_packetstream_t* stream, tf_packetstream_onreceive_t* callback, void* user_data) {
|
||
|
stream->onreceive = callback;
|
||
|
stream->onreceive_user_data = user_data;
|
||
|
}
|
||
|
|
||
|
static void _tf_packetstream_handle_closed(uv_handle_t* handle)
|
||
|
{
|
||
|
tf_packetstream_t* packetstream = handle->data;
|
||
|
handle->data = NULL;
|
||
|
if (packetstream->destroyed) {
|
||
|
free(packetstream);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
void tf_packetstream_close(tf_packetstream_t* stream) {
|
||
|
if (stream->stream.data && !uv_is_closing((uv_handle_t*)&stream->stream)) {
|
||
|
uv_close((uv_handle_t*)&stream->stream, _tf_packetstream_handle_closed);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
uv_pipe_t* tf_packetstream_get_pipe(tf_packetstream_t* stream) {
|
||
|
return &stream->stream;
|
||
|
}
|