#include "PacketStream.h" #include #include PacketStream::PacketStream() : _onReceive(0), _onReceiveUserData(0) { } PacketStream::~PacketStream() { _onReceive = 0; _onReceiveUserData = 0; close(); } void PacketStream::close() { if (!uv_is_closing(reinterpret_cast(&_stream))) { uv_close(reinterpret_cast(&_stream), 0); } } void PacketStream::start() { _stream.data = this; uv_read_start(reinterpret_cast(&_stream), onAllocate, onRead); } void PacketStream::send(int packetType, char* begin, size_t length) { size_t bufferLength = sizeof(uv_write_t) + sizeof(packetType) + sizeof(length) + length; char* buffer = new char[bufferLength]; uv_write_t* request = reinterpret_cast(buffer); buffer += sizeof(uv_write_t); memcpy(buffer, &packetType, sizeof(packetType)); memcpy(buffer + sizeof(packetType), &length, sizeof(length)); memcpy(buffer + sizeof(packetType) + sizeof(length), begin, length); uv_buf_t writeBuffer; writeBuffer.base = buffer; writeBuffer.len = sizeof(packetType) + sizeof(length) + length; uv_write(request, reinterpret_cast(&_stream), &writeBuffer, 1, onWrite); } void PacketStream::setOnReceive(OnReceive* onReceiveCallback, void* userData) { _onReceive = onReceiveCallback; _onReceiveUserData = userData; } void PacketStream::onWrite(uv_write_t* request, int status) { delete[] reinterpret_cast(request); } void PacketStream::onAllocate(uv_handle_t* handle, size_t suggestedSize, uv_buf_t* buffer) { buffer->base = new char[suggestedSize]; buffer->len = suggestedSize; } void PacketStream::onRead(uv_stream_t* handle, ssize_t count, const uv_buf_t* buffer) { PacketStream* owner = reinterpret_cast(handle->data); if (count >= 0) { if (count > 0) { owner->_buffer.insert(owner->_buffer.end(), buffer->base, buffer->base + count); owner->processMessages(); } delete[] reinterpret_cast(buffer->base); } else { owner->close(); } } void PacketStream::processMessages() { int packetType = 0; size_t length = 0; while (_buffer.size() >= sizeof(packetType) + sizeof(length)) { memcpy(&packetType, &*_buffer.begin(), sizeof(packetType)); memcpy(&length, &*_buffer.begin() + sizeof(packetType), sizeof(length)); if (_buffer.size() >= sizeof(packetType) + sizeof(length) + length) { if (_onReceive) { _onReceive(packetType, &*_buffer.begin() + sizeof(length) + sizeof(packetType), length, _onReceiveUserData); } _buffer.erase(_buffer.begin(), _buffer.begin() + sizeof(length) + sizeof(packetType) + length); } else { break; } } }