Sockets and Threads
Posted: 09 Jul 2008, 20:46
Hi all! I write some little network code for TMW, which use sockets and threads(pthread and Critical Section) for the network communication instead of SDL_net and SDL_thread.
It run under Linux(pthread) and Windows.
Unfortunately i couldn't compile TMW under windows, so i couldn't test my code there, but under linux it run.
If it doesn't problem, I put my code to here:
network.h
network.cpp
Sharp
It run under Linux(pthread) and Windows.
Unfortunately i couldn't compile TMW under windows, so i couldn't test my code there, but under linux it run.
If it doesn't problem, I put my code to here:
network.h
Code: Select all
#ifndef _TMW_NETWORK_
#define _TMW_NETWORK_
#include <map>
#include <SDL_net.h>
#include <SDL_thread.h>
#include <string>
#ifdef _WIN32
#include <windows.h>
#include <socket.h>
//wsock32.lib is needed!!!
#else
#include <pthread.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/time.h> //struct timeval
#include <unistd.h>
#include <netdb.h> // struct hostent
#include <arpa/inet.h> //for inet_* functions
#include <netinet/in.h> //sockaddr_in
#include <errno.h> //error handling
#endif
class MessageHandler;
class MessageIn;
class Network;
class Network
{
public:
friend void* networkThread(void *data);
friend class MessageOut;
Network();
~Network();
/*
* I had to change name of this funcions
* *Don't forget change it in main.cpp too*
*/
bool
setConnect(const std::string &address, short port);
void
disconnect();
void
registerHandler(MessageHandler *handler);
void
unregisterHandler(MessageHandler *handler);
void
clearHandlers();
int
getState() const { return mState; }
const std::string&
getError() const { return mError; }
bool
isConnected() const { return mState == CONNECTED; }
int
getInSize() const { return mInSize; }
void
skip(int len);
bool
messageReady();
MessageIn
getNextMessage();
void
dispatchMessages();
void
flush();
// ERROR replaced by NET_ERROR because already defined in Windows
enum {
IDLE,
CONNECTED,
CONNECTING,
DATA,
NET_ERROR
};
protected:
void
setError(const std::string& error);
Uint16
readWord(int pos);
bool
realConnect();
void
receive();
std::string mAddress;
short mPort;
char *mInBuffer, *mOutBuffer;
unsigned int mInSize, mOutSize;
unsigned int mToSkip;
int mState;
std::string mError;
fd_set mSelect;
#ifdef _WIN32
HANDLE mWorkerThread;
CRITIAL_SECTION mMutex;
SOCKET mSocket;
WSAData wsa;
#else
pthread_t mWorkerThread;
pthread_mutex_t mMutex;
int mSocket;
#endif
typedef std::map<Uint16, MessageHandler*> MessageHandlers;
typedef MessageHandlers::iterator MessageHandlerIterator;
MessageHandlers mMessageHandlers;
};
/** Convert an address from int format to string */
char *iptostring(int address);
#endif
Code: Select all
#include "network.h"
#include "messagehandler.h"
#include "messagein.h"
#include "../log.h"
#include <sstream>
#ifdef _WIN32
void
Lock(CRITICAL_SECTION &mMutex)
{
EnterCriticalSection(&mMutex);
}
void
UnLock(CRITICAL_SECTION &mMutex)
{
LeaveCriticalSection(&mMutex);
}
#else
void
Lock(pthread_mutex_t mMutex)
{
pthread_mutex_lock(&mMutex);
}
void
UnLock(pthread_mutex_t mMutex)
{
pthread_mutex_unlock(&mMutex);
}
#endif
int
GetSocketError()
{
#ifdef _WIN32
return WSAGetLastError()
#else
return errno;
#endif
}
/** Warning: buffers and other variables are shared,
so there can be only one connection active at a time */
short packet_lengths[] = {
10, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
// #0x0040
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 55, 17, 3, 37, 46, -1, 23, -1, 3,108, 3, 2,
3, 28, 19, 11, 3, -1, 9, 5, 54, 53, 58, 60, 41, 2, 6, 6,
// #0x0080
7, 3, 2, 2, 2, 5, 16, 12, 10, 7, 29, 23, -1, -1, -1, 0,
7, 22, 28, 2, 6, 30, -1, -1, 3, -1, -1, 5, 9, 17, 17, 6,
23, 6, 6, -1, -1, -1, -1, 8, 7, 6, 7, 4, 7, 0, -1, 6,
8, 8, 3, 3, -1, 6, 6, -1, 7, 6, 2, 5, 6, 44, 5, 3,
// #0x00C0
7, 2, 6, 8, 6, 7, -1, -1, -1, -1, 3, 3, 6, 6, 2, 27,
3, 4, 4, 2, -1, -1, 3, -1, 6, 14, 3, -1, 28, 29, -1, -1,
30, 30, 26, 2, 6, 26, 3, 3, 8, 19, 5, 2, 3, 2, 2, 2,
3, 2, 6, 8, 21, 8, 8, 2, 2, 26, 3, -1, 6, 27, 30, 10,
// #0x0100
2, 6, 6, 30, 79, 31, 10, 10, -1, -1, 4, 6, 6, 2, 11, -1,
10, 39, 4, 10, 31, 35, 10, 18, 2, 13, 15, 20, 68, 2, 3, 16,
6, 14, -1, -1, 21, 8, 8, 8, 8, 8, 2, 2, 3, 4, 2, -1,
6, 86, 6, -1, -1, 7, -1, 6, 3, 16, 4, 4, 4, 6, 24, 26,
// #0x0140
22, 14, 6, 10, 23, 19, 6, 39, 8, 9, 6, 27, -1, 2, 6, 6,
110, 6, -1, -1, -1, -1, -1, 6, -1, 54, 66, 54, 90, 42, 6, 42,
-1, -1, -1, -1, -1, 30, -1, 3, 14, 3, 30, 10, 43, 14,186,182,
14, 30, 10, 3, -1, 6,106, -1, 4, 5, 4, -1, 6, 7, -1, -1,
// #0x0180
6, 3,106, 10, 10, 34, 0, 6, 8, 4, 4, 4, 29, -1, 10, 6,
90, 86, 24, 6, 30,102, 9, 4, 8, 4, 14, 10, 4, 6, 2, 6,
3, 3, 35, 5, 11, 26, -1, 4, 4, 6, 10, 12, 6, -1, 4, 4,
11, 7, -1, 67, 12, 18,114, 6, 3, 6, 26, 26, 26, 26, 2, 3,
// #0x01C0
2, 14, 10, -1, 22, 22, 4, 2, 13, 97, 0, 9, 9, 29, 6, 28,
8, 14, 10, 35, 6, 8, 4, 11, 54, 53, 60, 2, -1, 47, 33, 6,
30, 8, 34, 14, 2, 6, 26, 2, 28, 81, 6, 10, 26, 2, -1, -1,
-1, -1, 20, 10, 32, 9, 34, 14, 2, 6, 48, 56, -1, 4, 5, 10,
// #0x200
26, 0, 0, 0, 18, 0, 0, 0, 0, 0, 0, 19, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
};
const unsigned int BUFFER_SIZE = 65536;
void*
networkThread(void *data)
{
Network *network = static_cast<Network*>(data);
if (!network->realConnect())
pthread_exit(0);
network->receive();
}
Network::Network():
mSocket(0),
mAddress(""), mPort(0),
mInBuffer(new char[BUFFER_SIZE]),
mOutBuffer(new char[BUFFER_SIZE]),
mInSize(0), mOutSize(0),
mToSkip(0),
mState(IDLE),
mWorkerThread(0)
{
#ifdef _WIN32
WSAStartup(MAKEWORD(1, 1), &wsa);
#endif
}
Network::~Network()
{
clearHandlers();
if (mState != IDLE && mState != NET_ERROR)
disconnect();
#ifdef _WIN32
DeleteCriticalSection(&mMutex);
WSACleanup(&wsa);
closesocket(mSocket);
#else
pthread_mutex_destroy(&mMutex);
close(mSocket);
#endif
delete[] mInBuffer;
delete[] mOutBuffer;
}
bool
Network::setConnect(const std::string &address, short port)
{
#ifdef _WIN32
WSAStartup(MAKEWORD(1, 1), &wsa);
InitializeCriticalSecition(&mMutex);
#else
pthread_mutex_init(&mMutex, NULL);
#endif
if ( (mSocket = socket(AF_INET, SOCK_STREAM, 0)) == -1 )
{
std::string error = "Error: Socket Init(): ";
error += GetSocketError();
setError(error);
}
FD_ZERO(&mSelect);
FD_SET(mSocket, &mSelect);
logger->log("Socket creating...DONE");
if (mState != IDLE && mState != NET_ERROR)
{
logger->log("Tried to connect an already connected socket!");
return false;
}
if (address.empty())
{
setError("Empty address given to Network::connect()!");
return false;
}
logger->log("Network::Connecting to %s:%i", address.c_str(), port);
mAddress = address;
mPort = port;
// Reset to sane values
mOutSize = 0;
mInSize = 0;
mToSkip = 0;
mState = CONNECTING;
#ifdef _WIN32
mWorkerThread = CreateThread(NULL,
0,
networkThread,
(void*)this,
0);
if ( mWorkerThread == NULL )
{
setError("Unable to create network worker thread");
return false;
}
#else
if ( pthread_create(&mWorkerThread, NULL, networkThread, (void*)this) != 0 )
{
setError("Unable to create network worker thread");
return false;
}
#endif
logger->log("Create network worker thread...DONE");
return true;
}
void
Network::disconnect()
{
mState = IDLE;
if (mWorkerThread)
{
pthread_join(mWorkerThread, NULL);
mWorkerThread = NULL;
}
if (mSocket)
{
#ifdef _WIN32
closesocket(mSocket);
#else
close(mSocket);
#endif
mSocket = 0;
}
}
void
Network::registerHandler(MessageHandler *handler)
{
for (const Uint16 *i = handler->handledMessages; *i; i++)
{
mMessageHandlers[*i] = handler;
}
handler->setNetwork(this);
}
void
Network::unregisterHandler(MessageHandler *handler)
{
for (const Uint16 *i = handler->handledMessages; *i; i++)
{
mMessageHandlers.erase(*i);
}
handler->setNetwork(0);
}
void
Network::clearHandlers()
{
MessageHandlerIterator i;
for (i = mMessageHandlers.begin(); i != mMessageHandlers.end(); i++)
{
i->second->setNetwork(0);
}
mMessageHandlers.clear();
}
void
Network::dispatchMessages()
{
while (messageReady())
{
MessageIn msg = getNextMessage();
MessageHandlerIterator iter = mMessageHandlers.find(msg.getId());
if (iter != mMessageHandlers.end())
iter->second->handleMessage(&msg);
else
logger->log("Unhandled packet: %x", msg.getId());
skip(msg.getLength());
}
}
void
Network::flush()
{
if (!mOutSize || mState != CONNECTED)
return;
int ret;
Lock(mMutex);
#ifdef DEBUG
logger->log("[PACKET]");
logger->log("-->Sending packet[%d]: %s", mOutSize, mOutBuffer);
#endif
ret = send(mSocket, mOutBuffer, mOutSize, 0);
#ifdef DEBUG
logger->log("-->Sended data size: %d", ret);
logger->log("[END]");
#endif
if (ret < (int)mOutSize)
{
std::string error = "Error in Send(): ";
error += GetSocketError();
setError(error);
}
mOutSize = 0;
UnLock(mMutex);
}
void
Network::skip(int len)
{
Lock(mMutex);
mToSkip += len;
if (!mInSize)
{
UnLock(mMutex);
return;
}
if (mInSize >= mToSkip)
{
mInSize -= mToSkip;
memmove(mInBuffer, mInBuffer + mToSkip, mInSize);
mToSkip = 0;
}
else
{
mToSkip -= mInSize;
mInSize = 0;
}
UnLock(mMutex);
}
bool
Network::messageReady()
{
int len = -1;
Lock(mMutex);
if (mInSize >= 2)
{
len = packet_lengths[readWord(0)];
if (len == -1 && mInSize > 4)
len = readWord(2);
}
bool ret = (mInSize >= static_cast<unsigned int>(len));
UnLock(mMutex);
return ret;
}
MessageIn
Network::getNextMessage()
{
while (!messageReady())
{
if (mState == NET_ERROR)
break;
}
Lock(mMutex);
int msgId = readWord(0);
int len = packet_lengths[msgId];
if (len == -1)
len = readWord(2);
#ifdef DEBUG
logger->log("Received packet 0x%x of length %d", msgId, length);
#endif
MessageIn msg(mInBuffer, len);
UnLock(mMutex);
return msg;
}
bool
Network::realConnect()
{
struct hostent* h = gethostbyname(mAddress.c_str());
if ( h == NULL )
{
std::string error = "Unable to resolve host \"" + mAddress + "\"";
setError(error);
logger->log("gethostbyname: %s", error.c_str());
return false;
}
mState = CONNECTING;
struct sockaddr_in addr;
memset(&addr, '\0', sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_port = htons(mPort);
addr.sin_addr.s_addr = inet_addr( inet_ntoa(*((struct in_addr*)h->h_addr)) );
logger->log("trying to connect to %s : %d", inet_ntoa(*((struct in_addr*)h->h_addr)), mPort);
if ( connect(mSocket, (sockaddr*)&addr, sizeof(addr)) == -1 )
{
std::string error = "Error: Socket Connect(): ";
error += GetSocketError();
setError(error);
logger->log("Error in connection: %d", GetSocketError());
return false;
}
logger->log("Network::Started session with %s:%i",
inet_ntoa(*((struct in_addr*)h->h_addr)), mPort);
mState = CONNECTED;
return true;
}
void
Network::receive()
{
fd_set read_fds;
int ret;
struct timeval tv;
// 1 sec
tv.tv_sec = 1;
tv.tv_usec = 0;
while (mState == CONNECTED)
{
// TODO Try to get this to block all the time while still being able
// to escape the loop
read_fds = mSelect;
if ( select(10, &read_fds, NULL, NULL, &tv) == -1 )
{
std::string error = "Error in Select(): ";
error += GetSocketError();
setError(error);
break;
}
if (FD_ISSET(mSocket, &read_fds))
{
// Receive data from the socket
ret = recv(mSocket, mInBuffer + mInSize, BUFFER_SIZE - mInSize, 0);
Lock(mMutex);
if ( !ret )
{
mState = IDLE;
logger->log("Disconnected.");
}
else if (ret < 0)
{
std::string error = "Error in Recv(): ";
error += GetSocketError();
setError(error);
}
else
{
mInSize += ret;
if (mToSkip)
{
if (mInSize >= mToSkip)
{
mInSize -= mToSkip;
memmove(mInBuffer, mInBuffer + mToSkip, mInSize);
mToSkip = 0;
}
else
{
mToSkip -= mInSize;
mInSize = 0;
}
}
}
UnLock(mMutex);
}
}
#ifdef _WIN32
closesocke(mSocket);
#else
close(mSocket);
#endif
}
char*
iptostring(int address)
{
static char asciiIP[16];
sprintf(asciiIP, "%i.%i.%i.%i",
(unsigned char)(address),
(unsigned char)(address >> 8),
(unsigned char)(address >> 16),
(unsigned char)(address >> 24));
return asciiIP;
}
void
Network::setError(const std::string& error)
{
logger->log("Network error: %s", error.c_str());
mError = error;
mState = NET_ERROR;
}
Uint16
Network::readWord(int pos)
{
#if SDL_BYTEORDER == SDL_BIG_ENDIAN
return SDL_Swap16((*(Uint16*)(mInBuffer+(pos))));
#else
return (*(Uint16*)(mInBuffer+(pos)));
#endif
}