#include "packetstream.h"

#include "log.h"
#include "mem.h"

#include "uv.h"

#include <stdbool.h>
#include <string.h>

typedef struct _tf_packetstream_t
{
	tf_packetstream_onreceive_t* onreceive;
	void* onreceive_user_data;
	uv_pipe_t stream;
	char* buffer;
	size_t buffer_size;
	size_t buffer_capacity;
	bool destroyed;
} tf_packetstream_t;

tf_packetstream_t* tf_packetstream_create()
{
	tf_packetstream_t* impl = tf_malloc(sizeof(tf_packetstream_t));
	*impl = (tf_packetstream_t) { 0 };
	return impl;
}

void tf_packetstream_destroy(tf_packetstream_t* stream)
{
	stream->onreceive = NULL;
	stream->onreceive_user_data = NULL;
	stream->destroyed = true;
	if (stream->buffer)
	{
		tf_free(stream->buffer);
		stream->buffer = NULL;
		stream->buffer_size = 0;
	}
	if (stream->stream.data)
	{
		tf_packetstream_close(stream);
	}
	else
	{
		tf_free(stream);
	}
}

static void _packetstream_allocate(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buffer)
{
	buffer->base = tf_malloc(suggested_size);
	buffer->len = suggested_size;
}

static void _packetstream_process_messages(tf_packetstream_t* stream)
{
	int packet_type = 0;
	size_t length = 0;
	while (stream->buffer_size >= sizeof(packet_type) + sizeof(length))
	{
		memcpy(&packet_type, stream->buffer, sizeof(packet_type));
		memcpy(&length, stream->buffer + sizeof(packet_type), sizeof(length));

		if (stream->buffer_size >= sizeof(packet_type) + sizeof(length) + length)
		{
			if (stream->onreceive)
			{
				stream->onreceive(packet_type, stream->buffer + sizeof(length) + sizeof(packet_type), length, stream->onreceive_user_data);
			}
			size_t consumed = sizeof(length) + sizeof(packet_type) + length;
			memmove(stream->buffer, stream->buffer + consumed, stream->buffer_size - consumed);
			stream->buffer_size -= consumed;
		}
		else
		{
			break;
		}
	}
}

static void _packetstream_on_read(uv_stream_t* handle, ssize_t count, const uv_buf_t* buffer)
{
	tf_packetstream_t* stream = handle->data;
	if (count >= 0)
	{
		if (count > 0)
		{
			char* write_buffer = stream->buffer;
			if (stream->buffer_size + count > stream->buffer_capacity)
			{
				if (!stream->buffer_capacity)
				{
					stream->buffer_capacity = 256;
				}
				while (stream->buffer_capacity < stream->buffer_size + count)
				{
					stream->buffer_capacity *= 2;
				}
				write_buffer = tf_realloc(write_buffer, stream->buffer_capacity);
			}
			if (write_buffer)
			{
				memcpy(write_buffer + stream->buffer_size, buffer->base, count);
				stream->buffer = write_buffer;
				stream->buffer_size += count;
			}
			_packetstream_process_messages(stream);
		}
	}
	else
	{
		tf_packetstream_close(stream);
	}
	tf_free(buffer->base);
}

void tf_packetstream_start(tf_packetstream_t* stream)
{
	stream->stream.data = stream;
	int result = uv_read_start((uv_stream_t*)&stream->stream, _packetstream_allocate, _packetstream_on_read);
	if (result)
	{
		tf_printf("uv_read_start: %s\n", uv_strerror(result));
	}
}

static void _packetstream_on_write(uv_write_t* request, int status)
{
	tf_free(request);
}

void tf_packetstream_send(tf_packetstream_t* stream, int packet_type, const char* begin, size_t length)
{
	if (stream)
	{
		size_t buffer_length = sizeof(uv_write_t) + sizeof(packet_type) + sizeof(length) + length;
		uv_write_t* request = tf_malloc(buffer_length);
		memset(request, 0, sizeof(uv_write_t));
		char* buffer = (char*)(request + 1);
		memcpy(buffer, &packet_type, sizeof(packet_type));
		memcpy(buffer + sizeof(packet_type), &length, sizeof(length));
		if (length)
		{
			memcpy(buffer + sizeof(packet_type) + sizeof(length), begin, length);
		}
		uv_buf_t write_buffer;
		write_buffer.base = buffer;
		write_buffer.len = sizeof(packet_type) + sizeof(length) + length;
		int result = uv_write(request, (uv_stream_t*)&stream->stream, &write_buffer, 1, _packetstream_on_write);
		if (result)
		{
			tf_printf("uv_write: %s\n", uv_strerror(result));
			tf_free(request);
		}
	}
}

void tf_packetstream_set_on_receive(tf_packetstream_t* stream, tf_packetstream_onreceive_t* callback, void* user_data)
{
	stream->onreceive = callback;
	stream->onreceive_user_data = user_data;
}

static void _tf_packetstream_handle_closed(uv_handle_t* handle)
{
	tf_packetstream_t* packetstream = handle->data;
	handle->data = NULL;
	if (packetstream->buffer)
	{
		tf_free(packetstream->buffer);
		packetstream->buffer = NULL;
	}
	if (packetstream->destroyed)
	{
		tf_free(packetstream);
	}
}

void tf_packetstream_close(tf_packetstream_t* stream)
{
	if (stream->stream.data && !uv_is_closing((uv_handle_t*)&stream->stream))
	{
		uv_close((uv_handle_t*)&stream->stream, _tf_packetstream_handle_closed);
	}
}

uv_pipe_t* tf_packetstream_get_pipe(tf_packetstream_t* stream)
{
	return &stream->stream;
}