/* * CommunicationsChannel.cpp * Created by Woody Zenfell, III on Mon Sep 01 2003. */ /* Copyright (c) 2003, Woody Zenfell, III Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ #if !defined(DISABLE_NETWORKING) #include "CommunicationsChannel.h" #include "AStream.h" #include "MessageInflater.h" #include "MessageHandler.h" #include #include // debugging #include #include "cseries.h" #if defined(WIN32) #include // hacky non-cross-platform setting of nonblocking #elif defined(__MACOS__) #include // hacky non-cross-platform setting of nonblocking AND reading static int Our_TCP_Recv(TCPsocket, void *, int); // look below if you really want to see this mess; you almost certainly do not #else #include // hacky non-cross-platform setting of nonblocking #endif #include enum { // If any incoming message claims to be longer than this, we bail kMaximumMessageLength = 4 * 1024 * 1024, // Milliseconds we wait between pump() calls during receive[Specific]Message() kSSRPumpInterval = 50, // Milliseconds we wait between pump() calls during flushOutgoingMessages() kFlushPumpInterval = kSSRPumpInterval, }; // if you really want to read what this does, scroll down static void MakeTCPsocketNonBlocking(TCPsocket *socket); CommunicationsChannel::CommunicationsChannel() : mConnected(false), mSocket(NULL), mMessageInflater(NULL), mMessageHandler(NULL), mMemento(NULL), mIncomingHeaderPosition(0), mIncomingMessage(NULL), mIncomingMessagePosition(0), mOutgoingHeaderPosition(0), mOutgoingMessagePosition(0) { mTicksAtLastReceive = SDL_GetTicks(); mTicksAtLastSend = SDL_GetTicks(); } CommunicationsChannel::CommunicationsChannel(TCPsocket inSocket) : mConnected(inSocket != NULL), mSocket(inSocket), mMessageInflater(NULL), mMessageHandler(NULL), mMemento(NULL), mIncomingHeaderPosition(0), mIncomingMessage(NULL), mIncomingMessagePosition(0), mOutgoingHeaderPosition(0), mOutgoingMessagePosition(0) { mTicksAtLastReceive = SDL_GetTicks(); mTicksAtLastSend = SDL_GetTicks(); } CommunicationsChannel::~CommunicationsChannel() { disconnect(); } CommunicationsChannel::CommunicationResult CommunicationsChannel::receive_some(TCPsocket inSocket, byte* inBuffer, size_t& ioBufferPosition, size_t inBufferLength) { // std::cout << "Want to receive " << inBufferLength << " bytes; buffer position " << ioBufferPosition << std::endl; if (inBufferLength == 0) return kComplete; size_t theBytesLeft = inBufferLength - ioBufferPosition; if(theBytesLeft > 0) { #ifndef __MACOS__ int theResult = SDLNet_TCP_Recv(inSocket, inBuffer + ioBufferPosition, theBytesLeft); #else int theResult = Our_TCP_Recv(inSocket, inBuffer + ioBufferPosition, theBytesLeft); #endif // std::cout << " theResult is " << theResult << std::endl; // Unfortunately, SDLNet_TCP_Recv() often returns -1 even when there's no error, and I // don't think I have any legitimate way to distinguish this case from a true error condition. #ifdef SANE_RECV_RESULTS if(theResult < 0) { disconnect(); return kError; } else { if(theResult > 0) { mTicksAtLastReceive = SDL_GetTicks(); } ioBufferPosition += theResult; return (ioBufferPosition == inBufferLength) ? kComplete : kIncomplete; } } #else if(theResult == 0) { // For some reason we get 0 back if the connection is lost ... disconnect(); return kError; } if(theResult < 0) { // Please close your eyes for this part ... we get -1 back from SDL_net, // then peek around behind its back to try to figure out why. YUCK // Hmm surely this is doomed to fail on non-UNIXy systems? sigh ... // Perhaps we should treat 0 and < 0 the same here, and change what it // means to be connected. Maybe we could use the Get Peer function to // detect connected/disconnected. // grsmith: we could do that, or we could add another // platform-specific hack #ifdef WIN32 if (WSAGetLastError() == WSAEWOULDBLOCK) { theResult = 0; } else { std::cout << "theResult == " << theResult << std::endl; disconnect(); return kError; } #else if(errno == EAGAIN) { theResult = 0; } else { std::cout << "theResult == " << theResult << " ; errno == " << errno << " ; strerror() == " << strerror(errno) << " ; SDL_GetError() == " << SDL_GetError() << std::endl; disconnect(); return kError; } #endif } if(theResult > 0) { mTicksAtLastReceive = SDL_GetTicks(); } ioBufferPosition += theResult; } // if we actually expect to receive something return (ioBufferPosition == inBufferLength) ? kComplete : kIncomplete; #endif // SANE_RECV_RESULTS } CommunicationsChannel::CommunicationResult CommunicationsChannel::send_some(TCPsocket inSocket, byte* inBuffer, size_t& ioBufferPosition, size_t inBufferLength) { // std::cout << "Want to send " << inBufferLength << " bytes; buffer position " << ioBufferPosition << std::endl; size_t theBytesLeft = inBufferLength - ioBufferPosition; int theResult = SDLNet_TCP_Send(inSocket, inBuffer + ioBufferPosition, theBytesLeft); // std::cout << " theResult is " << theResult << std::endl; if(theResult < 0) { disconnect(); return kError; } else { if(theResult > 0) mTicksAtLastSend = SDL_GetTicks(); ioBufferPosition += theResult; return (ioBufferPosition == inBufferLength) ? kComplete : kIncomplete; } } bool CommunicationsChannel::receiveHeader() { CommunicationResult theResult = receive_some(mSocket, mIncomingHeader, mIncomingHeaderPosition, kHeaderPackedSize); if(theResult == kComplete) { // Finished receiving a header AIStreamBE theHeaderStream(mIncomingHeader, kHeaderPackedSize); uint16 theMagic; uint16 theMessageType; uint32 theMessageLength; theHeaderStream >> theMagic >> theMessageType >> theMessageLength; // Incoming length includes header length theMessageLength -= kHeaderPackedSize; if(theMagic != kHeaderMagic || theMessageLength > kMaximumMessageLength) { disconnect(); } else { // Successfully received a valid header; switch to receive-message mode mIncomingMessage = new UninflatedMessage(theMessageType, theMessageLength); mIncomingMessagePosition = 0; } // We should try to receive more stuff, since we got all we asked for. return true; } else { // We got less than we wanted - no sense in trying for more. return false; } } bool CommunicationsChannel::_receiveMessage() { CommunicationResult theResult = receive_some(mSocket, mIncomingMessage->buffer(), mIncomingMessagePosition, mIncomingMessage->length()); if(theResult == kComplete) { // Received a complete message; inflate (if possible) then enqueue it Message* theMessageToEnqueue = mIncomingMessage; if(mMessageInflater != NULL) { theMessageToEnqueue = mMessageInflater->inflate(*mIncomingMessage); delete mIncomingMessage; } mIncomingMessages.push_back(theMessageToEnqueue); // No longer receiving message body - prepare to receive next header mIncomingMessage = NULL; mIncomingHeaderPosition = 0; // We got all we wanted - so we should go again to see if there's more. return true; } else { // Ran out of data to receive, or error - no sense looking for more data return false; } } bool CommunicationsChannel::sendHeader() { CommunicationResult theResult = send_some(mSocket, mOutgoingHeader, mOutgoingHeaderPosition, kHeaderPackedSize); if(theResult == kComplete) { // Finished sending a header; switch to sending message now mOutgoingMessagePosition = 0; // We should try to send more stuff, since we sent all we asked to. return true; } else { // We sent less than we wanted - no sense in trying for more. return false; } } bool CommunicationsChannel::sendMessage() { UninflatedMessage* theOutgoingMessage = mOutgoingMessages.front(); CommunicationResult theResult = send_some(mSocket, theOutgoingMessage->buffer(), mOutgoingMessagePosition, theOutgoingMessage->length()); if(theResult == kComplete) { // Sent a complete message; delete and dequeue it delete theOutgoingMessage; mOutgoingMessages.pop_front(); // No longer sending message body - prepare to send next header mOutgoingHeaderPosition = 0; // We sent all we wanted - so we should go again to see if we can do more. return true; } else { // Could not send it all, or error - no sense trying for more data return false; } } void CommunicationsChannel::pumpReceivingSide() { bool keepGoing = true; while(keepGoing && mConnected) { if(mIncomingMessage != NULL) { // Already working on receiving message body keepGoing = _receiveMessage(); } else { // Not receiving message body - must be receiving message header then keepGoing = receiveHeader(); } } } void CommunicationsChannel::pumpSendingSide() { bool keepGoing = true; while(keepGoing && mConnected && !mOutgoingMessages.empty()) { if(mOutgoingHeaderPosition == 0) { // Need to fill packed header buffer with packed header // We may end up doing this more than once if for some reason we can't // send any data bytes to TCP ... but that's OK. UninflatedMessage* theMessage = mOutgoingMessages.front(); AOStreamBE theHeaderStream(mOutgoingHeader, kHeaderPackedSize); theHeaderStream << (Uint16)kHeaderMagic << theMessage->inflatedType() << (uint32)(theMessage->length() + kHeaderPackedSize); } if(mOutgoingHeaderPosition < kHeaderPackedSize) { keepGoing = sendHeader(); } else { keepGoing = sendMessage(); } } } void CommunicationsChannel::pump() { pumpSendingSide(); pumpReceivingSide(); } bool CommunicationsChannel::dispatchOneIncomingMessage() { if (mIncomingMessages.empty()) return false; Message* theMessage = mIncomingMessages.front(); if (messageHandler() != NULL) { messageHandler()->handle(theMessage, this); } delete theMessage; mIncomingMessages.pop_front(); return true; } void CommunicationsChannel::dispatchIncomingMessages() { while (dispatchOneIncomingMessage()); } void CommunicationsChannel::enqueueOutgoingMessage(const Message& inMessage) { if(isConnected()) { UninflatedMessage* theUninflatedMessage = inMessage.deflate(); mOutgoingMessages.push_back(theUninflatedMessage); } } IPaddress CommunicationsChannel::peerAddress() const { return *(SDLNet_TCP_GetPeerAddress(mSocket)); } void CommunicationsChannel::connect(const IPaddress& inAddress) { assert(!isConnected()); // Have to copy the address since we get a const, but SDL_net takes a non-const IPaddress theAddress = inAddress; mSocket = SDLNet_TCP_Open(&theAddress); if(mSocket != NULL) { mConnected = true; mTicksAtLastReceive = SDL_GetTicks(); mTicksAtLastSend = SDL_GetTicks(); MakeTCPsocketNonBlocking(&mSocket); } } void CommunicationsChannel::connect(const std::string& inAddressString, uint16 inPort) { IPaddress theAddress; // Have to copy the string since we get a const, but SDL_net takes a non-const char* theDuplicateString = strdup(inAddressString.c_str()); int theResult = SDLNet_ResolveHost(&theAddress, theDuplicateString, inPort); free(theDuplicateString); if(theResult == 0) { connect(theAddress); } } void CommunicationsChannel::disconnect() { if(mSocket != NULL) { SDLNet_TCP_Close(mSocket); mSocket = NULL; mConnected = false; } // Discard all data so next connect()ion starts with a clean slate mIncomingHeaderPosition = 0; mIncomingMessagePosition = 0; mOutgoingHeaderPosition = 0; mOutgoingMessagePosition = 0; delete mIncomingMessage; mIncomingMessage = NULL; for(MessageQueue::iterator i = mIncomingMessages.begin(); i != mIncomingMessages.end(); ++i) delete *i; mIncomingMessages.clear(); for(UninflatedMessageQueue::iterator i = mOutgoingMessages.begin(); i != mOutgoingMessages.end(); ++i) delete *i; mOutgoingMessages.clear(); } bool CommunicationsChannel::isMessageAvailable() { pump(); return !mIncomingMessages.empty(); } // Call does not return unless (1) times out (NULL); (2) disconnected (NULL); or // (3) some message received (pointer to inflated message object). Message* CommunicationsChannel::receiveMessage(Uint32 inOverallTimeout, Uint32 inInactivityTimeout) { // Here we give a backstop for our inactivity timeout Uint32 theTicksAtStart = SDL_GetTicks(); Uint32 theDeadline = SDL_GetTicks() + inOverallTimeout; pump(); while(SDL_GetTicks() - std::max(mTicksAtLastReceive, theTicksAtStart) < inInactivityTimeout && SDL_GetTicks() < theDeadline && isConnected() && mIncomingMessages.empty()) { SDL_Delay(kSSRPumpInterval); pump(); } Message* theMessage = NULL; if(!mIncomingMessages.empty()) { theMessage = mIncomingMessages.front(); mIncomingMessages.pop_front(); } return theMessage; } // As above, but if messages of type other than inType are received, they're handled // normally (so might want to install conservative Handler first) Message* CommunicationsChannel::receiveSpecificMessage( MessageTypeID inType, Uint32 inOverallTimeout, Uint32 inInactivityTimeout) { Message* theMessage = NULL; Uint32 theDeadline = SDL_GetTicks() + inOverallTimeout; while(SDL_GetTicks() < theDeadline) { theMessage = receiveMessage(theDeadline - SDL_GetTicks(), inInactivityTimeout); if(theMessage) { if(theMessage->type() == inType) // Got our message break; else { // Got some other message - handle it and destroy it if(messageHandler() != NULL) { messageHandler()->handle(theMessage, this); } delete theMessage; } } else // Other routine timed out or got disconnected break; } return theMessage; } void CommunicationsChannel::flushOutgoingMessages(bool shouldDispatchIncomingMessages, Uint32 inOverallTimeout, Uint32 inInactivityTimeout) { Uint32 theDeadline = SDL_GetTicks() + inOverallTimeout; Uint32 theTicksAtStart = SDL_GetTicks(); while(isConnected() && !mOutgoingMessages.empty() && SDL_GetTicks() < theDeadline && SDL_GetTicks() - std::max(mTicksAtLastSend, theTicksAtStart) < inInactivityTimeout) { SDL_Delay(kFlushPumpInterval); pump(); if(shouldDispatchIncomingMessages) dispatchIncomingMessages(); } } void CommunicationsChannel::multipleFlushOutgoingMessages( std::vector& channels, bool shouldDispatchIncomingMessages, Uint32 inOverallTimeout, Uint32 inInactivityTimeout) { Uint32 theDeadline = SDL_GetTicks() + inOverallTimeout; Uint32 theTicksAtStart = SDL_GetTicks(); bool someoneIsStillActive = true; while (SDL_GetTicks() < theDeadline && someoneIsStillActive) { someoneIsStillActive = false; SDL_Delay(kFlushPumpInterval); for (std::vector::iterator it = channels.begin(); it != channels.end(); it++) { if (!(*it)->mOutgoingMessages.empty() && SDL_GetTicks() - std::max((*it)->mTicksAtLastSend, theTicksAtStart) < inInactivityTimeout) { someoneIsStillActive = true; } (*it)->pump(); if (shouldDispatchIncomingMessages) (*it)->dispatchIncomingMessages(); } } } CommunicationsChannelFactory::CommunicationsChannelFactory(uint16 inPort) { IPaddress theAddress; theAddress.host = INADDR_ANY; theAddress.port = SDL_SwapBE16(inPort); mSocket = SDLNet_TCP_Open(&theAddress); } CommunicationsChannel* CommunicationsChannelFactory::newIncomingConnection() { CommunicationsChannel* theNewChannel = NULL; if(isFunctional()) { SDLNet_SocketSet theSocketSet = SDLNet_AllocSocketSet(1); SDLNet_TCP_AddSocket(theSocketSet, mSocket); if(SDLNet_CheckSockets(theSocketSet, 0) > 0) { // Yee-haw! There's an incoming connection request. TCPsocket theNewSocket = SDLNet_TCP_Accept(mSocket); theNewChannel = new CommunicationsChannel(theNewSocket); MakeTCPsocketNonBlocking(&theNewSocket); } SDLNet_FreeSocketSet(theSocketSet); } return theNewChannel; } CommunicationsChannelFactory::~CommunicationsChannelFactory() { SDLNet_TCP_Close(mSocket); } void MakeTCPsocketNonBlocking(TCPsocket *socket) { // SET NONBLOCKING MODE // XXX: this depends on intimate carnal knowledge of the SDL_net struct _UDPsocket // if it changes that structure, we are hosed. int fd = ((int *) (*socket))[1]; #if defined(WIN32) u_long val = 1; ioctlsocket(fd, FIONBIO, &val); #elif defined(__MACOS__) OTSetNonBlocking((TProvider *) fd); #else #ifdef __MWERKS__ /* out of /usr/include/sys/fcntl.h - mwerks doesn't have these defined */ #define F_SETFL 4 #define O_NONBLOCK 0x0004 #endif fcntl(fd, F_SETFL, O_NONBLOCK); #endif } #ifdef __MACOS__ // XXX Can it possibly get worse!? YES! // big chunks stolen from SDL_Net #define SOCKET EndpointRef struct _TCPsocket { int ready; SOCKET channel; // These are taken from GUSI interface. // I'm not sure if it's really necessary here yet // ( masahiro minami ) // ( 01/02/19 ) OTEventCode curEvent; OTEventCode newEvent; OTEventCode event; OTEventCode curCompletion; OTEventCode newCompletion; OTEventCode completion; OSStatus error; TEndpointInfo info; Boolean readShutdown; Boolean writeShutdown; Boolean connected; OTConfigurationRef config; // Master configuration. you can clone this. TCPsocket nextListener; // ( end of new members --- masahiro minami IPaddress remoteAddress; IPaddress localAddress; int sflag; // Maybe we don't need this---- it's from original SDL_net // (masahiro minami) // ( 01/02/20 ) int rcvdPassConn; }; static void AsyncTCPPopEvent( _TCPsocket *sock ) { // Make sure OT calls are not interrupted // Not sure if we really need this. OTEnterNotifier( sock->channel ); sock->event |= (sock->curEvent = sock->newEvent ); sock->completion |= ( sock->curCompletion = sock->newCompletion ); sock->newEvent = sock->newCompletion = 0; OTLeaveNotifier( sock->channel ); if( sock->curEvent & T_UDERR) { // We just clear the error. // Should we feed this back to users ? // (TODO ) OTRcvUDErr( sock->channel, NULL ); #ifdef DEBUG_NET printf("AsyncTCPPopEvent T_UDERR recognized"); #endif } // Remote is disconnecting... if( sock->curEvent & ( T_DISCONNECT | T_ORDREL )) { sock->readShutdown = true; } if( sock->curEvent &T_CONNECT ) { // Ignore the info of remote (second parameter). // Shoule we care ? // (TODO) OTRcvConnect( sock->channel, NULL ); sock->connected = 1; } if( sock->curEvent & T_ORDREL ) { OTRcvOrderlyDisconnect( sock->channel ); } if( sock->curEvent & T_DISCONNECT ) { OTRcvDisconnect( sock->channel, NULL ); } // Do we need to ? // (masahiro minami) //YieldToAnyThread(); } static int Our_TCP_Recv(_TCPsocket *sock, void *data, int maxlen) { int len = 0; OSStatus res; /* Server sockets are for accepting connections only */ if ( sock->sflag ) { SDLNet_SetError("Server sockets cannot receive"); return(-1); } // do { res = OTRcv(sock->channel, data, maxlen-len, 0); if (res > 0) { len = res; } #ifdef DEBUG_NET if ( res != kOTNoDataErr ) printf("SDLNet_TCP_Recv received ; %d\n", res ); #endif AsyncTCPPopEvent(sock); if( res == kOTLookErr ) { res = OTLook(sock->channel ); // continue; } } //while ( (len == 0) && (res == kOTNoDataErr) ); sock->ready = 0; if ( len == 0 && res != kOTNoDataErr) { /* Open Transport error */ #ifdef DEBUG_NET printf("Open Transport error: %d\n", res); #endif return(-1); } return(len); } #endif #endif // !defined(DISABLE_NETWORKING)