Page 1 of 1

Sockets and Threads

Posted: 09 Jul 2008, 20:46
by Sharp
Hi all! I write some little network code for TMW, which use sockets and threads(pthread and Critical Section) for the network communication instead of SDL_net and SDL_thread.
It run under Linux(pthread) and Windows.
Unfortunately i couldn't compile TMW under windows, so i couldn't test my code there, but under linux it run.
If it doesn't problem, I put my code to here:
network.h

Code: Select all

#ifndef _TMW_NETWORK_
#define _TMW_NETWORK_

#include <map>
#include <SDL_net.h>
#include <SDL_thread.h>
#include <string>

 #ifdef _WIN32
 		#include <windows.h>
		#include <socket.h>
		//wsock32.lib is needed!!!
 #else
 		#include <pthread.h>
		#include <sys/types.h>
		#include <sys/socket.h>
		#include <sys/time.h> //struct timeval
		#include <unistd.h>
		#include <netdb.h> // struct hostent
		#include <arpa/inet.h> //for inet_* functions
		#include <netinet/in.h> //sockaddr_in
		#include <errno.h> //error handling
 #endif


class MessageHandler;
class MessageIn;

class Network;

class Network
{
    public:
        friend void* networkThread(void *data);
        friend class MessageOut;

        Network();

        ~Network();

        /*
         * I had to change name of this funcions
         * *Don't forget change it in main.cpp too*
         */
        bool
        setConnect(const std::string &address, short port);

        void
        disconnect();

        void
        registerHandler(MessageHandler *handler);

        void
        unregisterHandler(MessageHandler *handler);

        void
        clearHandlers();

        int
        getState() const { return mState; }

        const std::string&
        getError() const { return mError; }

        bool
        isConnected() const { return mState == CONNECTED; }

        int
        getInSize() const { return mInSize; }

        void
        skip(int len);

        bool
        messageReady();

        MessageIn
        getNextMessage();

        void
        dispatchMessages();

        void
        flush();

        // ERROR replaced by NET_ERROR because already defined in Windows
        enum {
            IDLE,
            CONNECTED,
            CONNECTING,
            DATA,
            NET_ERROR
        };

    protected:
        void
        setError(const std::string& error);

        Uint16
        readWord(int pos);

        bool
        realConnect();

        void
        receive();

        std::string mAddress;
        short mPort;

        char *mInBuffer, *mOutBuffer;
        unsigned int mInSize, mOutSize;

        unsigned int mToSkip;

        int mState;
        std::string mError;

        fd_set mSelect;
        #ifdef _WIN32
         		HANDLE 				mWorkerThread;
         		CRITIAL_SECTION 	mMutex;
         		SOCKET 				mSocket;
         		WSAData				wsa;
        #else
         		pthread_t 			mWorkerThread;
         		pthread_mutex_t 	mMutex;
         		int					mSocket;
        #endif
        typedef std::map<Uint16, MessageHandler*> MessageHandlers;
        typedef MessageHandlers::iterator MessageHandlerIterator;
        MessageHandlers mMessageHandlers;
};

/** Convert an address from int format to string */
char *iptostring(int address);

#endif

network.cpp

Code: Select all

#include "network.h"

#include "messagehandler.h"
#include "messagein.h"

#include "../log.h"

#include <sstream>


#ifdef _WIN32
	void
	Lock(CRITICAL_SECTION &mMutex)
	{
		EnterCriticalSection(&mMutex);
	}
	void
	UnLock(CRITICAL_SECTION &mMutex)
	{
		LeaveCriticalSection(&mMutex);
	}
#else
	void
	Lock(pthread_mutex_t mMutex)
	{
		pthread_mutex_lock(&mMutex);
	}
	void
	UnLock(pthread_mutex_t mMutex)
	{
		pthread_mutex_unlock(&mMutex);
	}
#endif

int
GetSocketError()
{
	#ifdef _WIN32
		return WSAGetLastError()
	#else
		return errno;
	#endif
}


/** Warning: buffers and other variables are shared,
    so there can be only one connection active at a time */

short packet_lengths[] = {
   10,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
    0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
    0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
    0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
// #0x0040
    0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
    0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
    0,  0,  0,  0, 55, 17,  3, 37, 46, -1, 23, -1,  3,108,  3,  2,
    3, 28, 19, 11,  3, -1,  9,  5, 54, 53, 58, 60, 41,  2,  6,  6,
// #0x0080
    7,  3,  2,  2,  2,  5, 16, 12, 10,  7, 29, 23, -1, -1, -1,  0,
    7, 22, 28,  2,  6, 30, -1, -1,  3, -1, -1,  5,  9, 17, 17,  6,
   23,  6,  6, -1, -1, -1, -1,  8,  7,  6,  7,  4,  7,  0, -1,  6,
    8,  8,  3,  3, -1,  6,  6, -1,  7,  6,  2,  5,  6, 44,  5,  3,
// #0x00C0
    7,  2,  6,  8,  6,  7, -1, -1, -1, -1,  3,  3,  6,  6,  2, 27,
    3,  4,  4,  2, -1, -1,  3, -1,  6, 14,  3, -1, 28, 29, -1, -1,
   30, 30, 26,  2,  6, 26,  3,  3,  8, 19,  5,  2,  3,  2,  2,  2,
    3,  2,  6,  8, 21,  8,  8,  2,  2, 26,  3, -1,  6, 27, 30, 10,
// #0x0100
    2,  6,  6, 30, 79, 31, 10, 10, -1, -1,  4,  6,  6,  2, 11, -1,
   10, 39,  4, 10, 31, 35, 10, 18,  2, 13, 15, 20, 68,  2,  3, 16,
    6, 14, -1, -1, 21,  8,  8,  8,  8,  8,  2,  2,  3,  4,  2, -1,
    6, 86,  6, -1, -1,  7, -1,  6,  3, 16,  4,  4,  4,  6, 24, 26,
// #0x0140
   22, 14,  6, 10, 23, 19,  6, 39,  8,  9,  6, 27, -1,  2,  6,  6,
  110,  6, -1, -1, -1, -1, -1,  6, -1, 54, 66, 54, 90, 42,  6, 42,
   -1, -1, -1, -1, -1, 30, -1,  3, 14,  3, 30, 10, 43, 14,186,182,
   14, 30, 10,  3, -1,  6,106, -1,  4,  5,  4, -1,  6,  7, -1, -1,
// #0x0180
    6,  3,106, 10, 10, 34,  0,  6,  8,  4,  4,  4, 29, -1, 10,  6,
   90, 86, 24,  6, 30,102,  9,  4,  8,  4, 14, 10,  4,  6,  2,  6,
    3,  3, 35,  5, 11, 26, -1,  4,  4,  6, 10, 12,  6, -1,  4,  4,
   11,  7, -1, 67, 12, 18,114,  6,  3,  6, 26, 26, 26, 26,  2,  3,
// #0x01C0
    2, 14, 10, -1, 22, 22,  4,  2, 13, 97,  0,  9,  9, 29,  6, 28,
    8, 14, 10, 35,  6,  8,  4, 11, 54, 53, 60,  2, -1, 47, 33,  6,
   30,  8, 34, 14,  2,  6, 26,  2, 28, 81,  6, 10, 26,  2, -1, -1,
   -1, -1, 20, 10, 32,  9, 34, 14,  2,  6, 48, 56, -1,  4,  5, 10,
// #0x200
   26,  0,  0,  0, 18,  0,  0,  0,  0,  0,  0, 19,  0,  0,  0,  0,
    0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
};

const unsigned int BUFFER_SIZE = 65536;

void*
networkThread(void *data)
{
    Network *network = static_cast<Network*>(data);

    if (!network->realConnect())
        pthread_exit(0);

    network->receive();
}

Network::Network():
    mSocket(0),
    mAddress(""), mPort(0),
    mInBuffer(new char[BUFFER_SIZE]),
    mOutBuffer(new char[BUFFER_SIZE]),
    mInSize(0), mOutSize(0),
    mToSkip(0),
    mState(IDLE),
    mWorkerThread(0)
{
      #ifdef _WIN32
    		WSAStartup(MAKEWORD(1, 1), &wsa);
    #endif
}

Network::~Network()
{
    clearHandlers();

    if (mState != IDLE && mState != NET_ERROR)
        disconnect();

    #ifdef _WIN32
		DeleteCriticalSection(&mMutex);
		WSACleanup(&wsa);
		closesocket(mSocket);
    #else
     	pthread_mutex_destroy(&mMutex);
     	close(mSocket);
    #endif

    delete[] mInBuffer;
    delete[] mOutBuffer;
}

bool
Network::setConnect(const std::string &address, short port)
{
	#ifdef _WIN32
		WSAStartup(MAKEWORD(1, 1), &wsa);
		InitializeCriticalSecition(&mMutex);

	#else
		pthread_mutex_init(&mMutex, NULL);
	#endif
	if ( (mSocket = socket(AF_INET, SOCK_STREAM, 0)) == -1 )
	{
		std::string error = "Error: Socket Init(): ";
		error += GetSocketError();
		setError(error);
	}
	FD_ZERO(&mSelect);
	FD_SET(mSocket, &mSelect);
	logger->log("Socket creating...DONE");

    if (mState != IDLE && mState != NET_ERROR)
    {
        logger->log("Tried to connect an already connected socket!");
        return false;
    }

    if (address.empty())
    {
        setError("Empty address given to Network::connect()!");
        return false;
    }

    logger->log("Network::Connecting to %s:%i", address.c_str(), port);

    mAddress = address;
    mPort = port;

    // Reset to sane values
    mOutSize = 0;
    mInSize = 0;
    mToSkip = 0;

    mState = CONNECTING;

    #ifdef _WIN32
		mWorkerThread = CreateThread(NULL,
									 0,
									 networkThread,
									 (void*)this,
									 0);
		if ( mWorkerThread == NULL )
    	{
   			setError("Unable to create network worker thread");
   	        return false;
   	    }
    #else
    	if ( pthread_create(&mWorkerThread, NULL, networkThread, (void*)this) != 0 )
    	{
   			setError("Unable to create network worker thread");
   	        return false;
   	    }
    #endif
    logger->log("Create network worker thread...DONE");
    return true;
}

void
Network::disconnect()
{
    mState = IDLE;

    if (mWorkerThread)
    {
    	pthread_join(mWorkerThread, NULL);
        mWorkerThread = NULL;
    }

    if (mSocket)
    {
		#ifdef _WIN32
			closesocket(mSocket);
		#else
			close(mSocket);
		#endif
		mSocket = 0;
    }
}

void
Network::registerHandler(MessageHandler *handler)
{
    for (const Uint16 *i = handler->handledMessages; *i; i++)
    {
        mMessageHandlers[*i] = handler;
    }

    handler->setNetwork(this);
}

void
Network::unregisterHandler(MessageHandler *handler)
{
    for (const Uint16 *i = handler->handledMessages; *i; i++)
    {
        mMessageHandlers.erase(*i);
    }

    handler->setNetwork(0);
}

void
Network::clearHandlers()
{
    MessageHandlerIterator i;
    for (i = mMessageHandlers.begin(); i != mMessageHandlers.end(); i++)
    {
        i->second->setNetwork(0);
    }
    mMessageHandlers.clear();
}

void
Network::dispatchMessages()
{
    while (messageReady())
    {
        MessageIn msg = getNextMessage();

        MessageHandlerIterator iter = mMessageHandlers.find(msg.getId());

        if (iter != mMessageHandlers.end())
            iter->second->handleMessage(&msg);
        else
            logger->log("Unhandled packet: %x", msg.getId());

        skip(msg.getLength());
    }
}

void
Network::flush()
{
    if (!mOutSize || mState != CONNECTED)
        return;

    int ret;

    Lock(mMutex);

#ifdef DEBUG
    logger->log("[PACKET]");
    logger->log("-->Sending packet[%d]: %s", mOutSize, mOutBuffer);
#endif
    ret = send(mSocket, mOutBuffer, mOutSize, 0);
#ifdef DEBUG
    logger->log("-->Sended data size: %d", ret);
    logger->log("[END]");
#endif
    if (ret < (int)mOutSize)
    {
    	std::string error = "Error in Send(): ";
    	error += GetSocketError();
        setError(error);
    }
    mOutSize = 0;
    UnLock(mMutex);
}

void
Network::skip(int len)
{
	Lock(mMutex);
    mToSkip += len;
    if (!mInSize)
    {
    	UnLock(mMutex);
        return;
    }

    if (mInSize >= mToSkip)
    {
        mInSize -= mToSkip;
        memmove(mInBuffer, mInBuffer + mToSkip, mInSize);
        mToSkip = 0;
    }
    else
    {
        mToSkip -= mInSize;
        mInSize = 0;
    }
    UnLock(mMutex);
}

bool
Network::messageReady()
{
    int len = -1;

    Lock(mMutex);
    if (mInSize >= 2)
    {
        len = packet_lengths[readWord(0)];

        if (len == -1 && mInSize > 4)
            len = readWord(2);

    }

    bool ret = (mInSize >= static_cast<unsigned int>(len));
    UnLock(mMutex);
    return ret;
}

MessageIn
Network::getNextMessage()
{
    while (!messageReady())
    {
        if (mState == NET_ERROR)
            break;
    }

    Lock(mMutex);
    int msgId = readWord(0);
    int len = packet_lengths[msgId];

    if (len == -1)
        len = readWord(2);

#ifdef DEBUG
    logger->log("Received packet 0x%x of length %d", msgId, length);
#endif

    MessageIn msg(mInBuffer, len);
    UnLock(mMutex);

    return msg;
}

bool
Network::realConnect()
{
	struct hostent* h = gethostbyname(mAddress.c_str());
	if ( h == NULL )
	{
        std::string error = "Unable to resolve host \"" + mAddress + "\"";
        setError(error);
        logger->log("gethostbyname: %s", error.c_str());
        return false;
	}

    mState = CONNECTING;

    struct sockaddr_in addr;
    memset(&addr, '\0', sizeof(addr));
    addr.sin_family = AF_INET;
    addr.sin_port = htons(mPort);

    addr.sin_addr.s_addr = inet_addr( inet_ntoa(*((struct in_addr*)h->h_addr)) );
    logger->log("trying to connect to %s : %d", inet_ntoa(*((struct in_addr*)h->h_addr)), mPort);
    if ( connect(mSocket, (sockaddr*)&addr, sizeof(addr)) == -1 )
    {
        std::string error = "Error: Socket Connect(): ";
        error += GetSocketError();
        setError(error);
        logger->log("Error in connection: %d", GetSocketError());
        return false;
    }

    logger->log("Network::Started session with %s:%i",
    		inet_ntoa(*((struct in_addr*)h->h_addr)), mPort);

    mState = CONNECTED;

    return true;
}

void
Network::receive()
{
	fd_set read_fds;
    int ret;
    struct timeval tv;
    // 1 sec
    tv.tv_sec = 1;
    tv.tv_usec = 0;
    while (mState == CONNECTED)
    {
        // TODO Try to get this to block all the time while still being able
        // to escape the loop
    	read_fds = mSelect;
    	if ( select(10, &read_fds, NULL, NULL, &tv) == -1 )
    	{
    		std::string error = "Error in Select(): ";
    		error += GetSocketError();
            setError(error);
    		break;
    	}
    	if (FD_ISSET(mSocket, &read_fds))
    	{
    		// Receive data from the socket
    		ret = recv(mSocket, mInBuffer + mInSize, BUFFER_SIZE - mInSize, 0);

    		Lock(mMutex);
    		if ( !ret )
    		{
                mState = IDLE;
                logger->log("Disconnected.");
    		}
    		else if (ret < 0)
    		{
    			std::string error = "Error in Recv(): ";
    			error += GetSocketError();
                setError(error);
    		}
    		else
    		{
    			mInSize += ret;
    			if (mToSkip)
    			{
    				if (mInSize >= mToSkip)
    				{
    				    mInSize -= mToSkip;
    				    memmove(mInBuffer, mInBuffer + mToSkip, mInSize);
    				    mToSkip = 0;
    				}
    				else
    				{
    				    mToSkip -= mInSize;
    				    mInSize = 0;
    				 }
    			}
    		}
    		UnLock(mMutex);
    	}
    }
	#ifdef _WIN32
		closesocke(mSocket);
	#else
		close(mSocket);
	#endif
}

char*
iptostring(int address)
{
    static char asciiIP[16];

    sprintf(asciiIP, "%i.%i.%i.%i",
            (unsigned char)(address),
            (unsigned char)(address >> 8),
            (unsigned char)(address >> 16),
            (unsigned char)(address >> 24));

    return asciiIP;
}

void
Network::setError(const std::string& error)
{
    logger->log("Network error: %s", error.c_str());
    mError = error;
    mState = NET_ERROR;
}

Uint16
Network::readWord(int pos)
{
#if SDL_BYTEORDER == SDL_BIG_ENDIAN
    return SDL_Swap16((*(Uint16*)(mInBuffer+(pos))));
#else
    return (*(Uint16*)(mInBuffer+(pos)));
#endif
}

Sharp

Re: Sockets and Threads

Posted: 09 Jul 2008, 21:35
by Shaggy
Thanks. Can you use Mantis in future.

Re: Sockets and Threads

Posted: 09 Jul 2008, 22:35
by Crush
In what way is this code better than the code we are currently using?

Re: Sockets and Threads

Posted: 09 Jul 2008, 22:59
by Jaxad0127
In many ways it's worse. With SDL, you only need to code to one thread/socket spec. This method codes for two different ones, and chooses which one at compile time. Twice the code, twice the chance of bugs, twice as difficult to maintain, etc. New platforms would require new code specifically for them, verses SDL where they handle that for us.

Re: Sockets and Threads

Posted: 09 Jul 2008, 23:08
by Sharp
Yes...
I write this to practice.
But I like much better my code, because I can feel that this code is really mine, and I don't have to use others code and libs...You can be independent from these libs, and You can understanding the network communication (as I mentioned above, I did it to my practice)
But I love SDL too, but don't SDL_net, just the other SDL_stuff.
Sharp

Re: Sockets and Threads

Posted: 11 Jul 2008, 12:21
by Crush
When you want to help TMW while practicing programming then how about picking a task from our bugtracker?