r3214 - in trunk/libmnetutil: . include include/libmnetutil source
mikma at minisip.org
mikma at minisip.org
Sun Feb 18 17:58:19 CET 2007
Author: mikma
Date: 2007-02-18 17:58:17 +0100 (Sun, 18 Feb 2007)
New Revision: 3214
Added:
trunk/libmnetutil/include/libmnetutil/SocketServer.h
trunk/libmnetutil/source/SocketServer.cxx
Modified:
trunk/libmnetutil/Makefile.am
trunk/libmnetutil/include/Makefile.am
Log:
Add SocketServer, a thread listening for socket input. A pipe is used
to wake the thread when a socket is added, removed or
when it should terminate.
Modified: trunk/libmnetutil/Makefile.am
===================================================================
--- trunk/libmnetutil/Makefile.am 2007-02-18 16:26:17 UTC (rev 3213)
+++ trunk/libmnetutil/Makefile.am 2007-02-18 16:58:17 UTC (rev 3214)
@@ -46,6 +46,7 @@
source/NetworkFunctions.cxx \
source/ServerSocket.cxx \
source/Socket.cxx \
+ source/SocketServer.cxx \
source/TCPSocket.cxx \
source/NetUtil.cxx \
source/UDPSocket.cxx \
Modified: trunk/libmnetutil/include/Makefile.am
===================================================================
--- trunk/libmnetutil/include/Makefile.am 2007-02-18 16:26:17 UTC (rev 3213)
+++ trunk/libmnetutil/include/Makefile.am 2007-02-18 16:58:17 UTC (rev 3214)
@@ -12,6 +12,7 @@
libmnetutil/NetworkFunctions.h \
libmnetutil/ServerSocket.h \
libmnetutil/Socket.h \
+ libmnetutil/SocketServer.h \
libmnetutil/StreamSocket.h \
libmnetutil/TCPSocket.h \
libmnetutil/NetUtil.h \
Added: trunk/libmnetutil/include/libmnetutil/SocketServer.h
===================================================================
--- trunk/libmnetutil/include/libmnetutil/SocketServer.h 2007-02-18 16:26:17 UTC (rev 3213)
+++ trunk/libmnetutil/include/libmnetutil/SocketServer.h 2007-02-18 16:58:17 UTC (rev 3214)
@@ -0,0 +1,73 @@
+/*
+ Copyright (C) 2006-2007 Mikael Magnusson
+
+ This library is free software; you can redistribute it and/or
+ modify it under the terms of the GNU Lesser General Public
+ License as published by the Free Software Foundation; either
+ version 2.1 of the License, or (at your option) any later version.
+
+ This library is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public
+ License along with this library; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+*/
+
+/*
+ * Authors: Mikael Magnusson <mikma at users.sourceforge.net>
+ */
+
+#ifndef LIBMNETUTIL_SOCKETSERVER_H
+#define LIBMNETUTIL_SOCKETSERVER_H
+
+#include<libmnetutil/libmnetutil_config.h>
+
+#include<libmutil/MemObject.h>
+#include<libmutil/Mutex.h>
+#include<libmutil/Thread.h>
+#include<libmnetutil/Socket.h>
+
+#include<map>
+
+#ifdef WIN32
+# include <winsock2.h>
+#endif
+
+class LIBMNETUTIL_API InputReadyHandler: public virtual MObject {
+ public:
+ virtual ~InputReadyHandler();
+ virtual void inputReady( MRef<Socket*> socket )=0;
+};
+
+class LIBMNETUTIL_API SocketServer : public Runnable {
+ public:
+ SocketServer();
+ virtual ~SocketServer();
+
+ void start();
+ void stop();
+ void join();
+
+ void addSocket( MRef<Socket*> socket, MRef<InputReadyHandler*> handler );
+ void removeSocket( MRef<Socket*> socket );
+
+ MRef<Socket*> findStreamSocketPeer( const IPAddress &addr,
+ uint16_t port );
+
+ protected:
+ void run();
+ void signal();
+ int buildFdSet( fd_set *set, int pipeFd );
+
+ private:
+ typedef std::map< MRef<Socket*>, MRef<InputReadyHandler*> > Sockets;
+ Mutex csMutex;
+ MRef<Thread *> thread;
+ Sockets sockets;
+ int fdSignal;
+ bool doStop;
+};
+#endif
Property changes on: trunk/libmnetutil/include/libmnetutil/SocketServer.h
___________________________________________________________________
Name: svn:mime-type
+ text/plain
Name: svn:eol-style
+ native
Added: trunk/libmnetutil/source/SocketServer.cxx
===================================================================
--- trunk/libmnetutil/source/SocketServer.cxx 2007-02-18 16:26:17 UTC (rev 3213)
+++ trunk/libmnetutil/source/SocketServer.cxx 2007-02-18 16:58:17 UTC (rev 3214)
@@ -0,0 +1,263 @@
+/*
+ Copyright (C) 2006-2007 Mikael Magnusson
+
+ This library is free software; you can redistribute it and/or
+ modify it under the terms of the GNU Lesser General Public
+ License as published by the Free Software Foundation; either
+ version 2.1 of the License, or (at your option) any later version.
+
+ This library is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public
+ License along with this library; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+*/
+
+/*
+ * Authors: Mikael Magnusson <mikma at users.sourceforge.net>
+ */
+
+#include<config.h>
+
+#include<libmnetutil/SocketServer.h>
+#include<libmutil/CriticalSection.h>
+#include<libmnetutil/NetworkException.h>
+#include<libmnetutil/StreamSocket.h>
+
+#ifdef WIN32
+# include<io.h>
+# include<fcntl.h>
+# define write _write
+# define pipe(fds) _pipe((fds), 256, O_BINARY)
+#endif
+
+#ifndef SOCKET
+# ifdef WIN32
+# define SOCKET uint32_t
+# else
+# define SOCKET int32_t
+# endif
+#endif
+
+using namespace std;
+
+class EqualAddrPort: public std::unary_function< std::pair<const MRef<Socket*>, MRef<InputReadyHandler*> >&, bool>{
+ public:
+ EqualAddrPort( const IPAddress &theAddress,
+ uint16_t thePort )
+ : address( theAddress ), port( thePort ){}
+
+ bool operator() ( pair<const MRef<Socket*>, MRef<InputReadyHandler*> > &pair ) {
+ MRef<Socket *> sock = pair.first;
+ if( sock.isNull() )
+ return false;
+
+ StreamSocket *ssock =
+ dynamic_cast<StreamSocket*>(*sock);
+
+ if( !ssock )
+ return false;
+
+ return ssock->matchesPeer( address, port );
+ }
+ private:
+ const IPAddress &address;
+ uint16_t port;
+};
+
+
+InputReadyHandler::~InputReadyHandler()
+{
+}
+
+
+SocketServer::SocketServer(): fdSignal( -1 ), doStop( false )
+{
+}
+
+SocketServer::~SocketServer()
+{
+ stop();
+}
+
+void SocketServer::start()
+{
+ CriticalSection cs( csMutex );
+ if( !thread.isNull() )
+ return;
+
+ thread = new Thread( this );
+}
+
+void SocketServer::stop()
+{
+ CriticalSection cs( csMutex );
+
+ if( thread.isNull() )
+ return;
+
+ doStop = true;
+ signal();
+}
+
+void SocketServer::join()
+{
+ CriticalSection cs( csMutex );
+
+ if( thread.isNull() ){
+ return;
+ }
+
+ thread->join();
+ thread = NULL;
+}
+
+void SocketServer::addSocket( MRef<Socket*> socket,
+ MRef<InputReadyHandler*> handler )
+{
+ CriticalSection cs( csMutex );
+
+ sockets[ socket ] = handler;
+ signal();
+}
+
+void SocketServer::removeSocket( MRef<Socket*> socket )
+{
+ CriticalSection cs( csMutex );
+
+ Sockets::iterator pos = sockets.find( socket );
+ if( pos != sockets.end() ){
+ sockets.erase( pos );
+ signal();
+ }
+}
+
+MRef<Socket*> SocketServer::findStreamSocketPeer( const IPAddress &addr,
+ uint16_t port )
+{
+ CriticalSection cs( csMutex );
+
+ EqualAddrPort pred( addr, port );
+
+ Sockets::iterator iter =
+ find_if( sockets.begin(), sockets.end(), pred );
+
+ if( iter == sockets.end() )
+ return NULL;
+
+ return iter->first;
+}
+
+void SocketServer::signal()
+{
+ char c = 0;
+
+ if( fdSignal < 0 )
+ return;
+
+ if( write( fdSignal, &c, sizeof( c )) < 0 ) {
+ cerr << "Write failed " << fdSignal << endl;
+ throw NetworkException( errno );
+ }
+}
+
+int SocketServer::buildFdSet( fd_set *set, int pipeFd )
+{
+ SOCKET maxFd;
+ Sockets::const_iterator i;
+
+ FD_ZERO( set );
+ FD_SET( (SOCKET)pipeFd, set );
+
+ maxFd = pipeFd;
+
+ for( i = sockets.begin(); i != sockets.end(); i++ ){
+ MRef<Socket*> socket = i->first;
+ SOCKET fd = socket->getFd();
+
+ FD_SET( fd, set );
+ if( fd > maxFd )
+ maxFd = fd;
+ }
+
+ return maxFd;
+}
+
+void SocketServer::run()
+{
+ struct timeval timeout;
+ fd_set tmpl;
+ fd_set set;
+ int pipeFds[2] = {-1,-1};
+ int maxFd;
+ Sockets::const_iterator i;
+
+ csMutex.lock();
+
+ if( fdSignal >= 0 ){
+ close( fdSignal );
+ fdSignal = -1;
+ }
+
+ if( ::pipe( pipeFds ) ){
+ throw Exception( "Can't create pipe" );
+ }
+
+ fdSignal = pipeFds[1];
+
+ csMutex.unlock();
+
+ maxFd = buildFdSet( &tmpl, pipeFds[0] );
+
+ while (!doStop){
+
+ int avail;
+ do{
+ memcpy( &set, &tmpl, sizeof( set ) );
+
+ timeout.tv_sec = 5;
+ timeout.tv_usec= 0;
+ avail = select( maxFd + 1, &set, NULL, NULL, &timeout );
+ if (avail<0){
+ Thread::msleep(500);
+ }
+ } while( avail < 0 );
+ if (avail==0){
+// cerr<< "SipSocketServer::run(): Timeout"<< endl;
+ }
+
+ if( FD_ISSET( pipeFds[0], &set ) ){
+ char buf[255];
+
+ if( read( pipeFds[0], buf, sizeof(buf) ) < 0){
+ cerr << "Read failed" << endl;
+ throw NetworkException( errno );
+ }
+
+ maxFd = buildFdSet( &tmpl, pipeFds[0] );
+ }
+
+ for( i = sockets.begin(); i != sockets.end(); i++ ){
+ MRef<Socket*> socket = i->first;
+ MRef<InputReadyHandler*> handler = i->second;
+
+ if( FD_ISSET( socket->getFd(), &set ) ){
+ if( !handler.isNull() ){
+ handler->inputReady( socket );
+ }
+ }
+ }
+ }
+
+// csMutex.lock();
+
+ fdSignal = -1;
+ close( pipeFds[0] );
+ close( pipeFds[1] );
+ doStop = false;
+
+// csMutex.unlock();
+}
Property changes on: trunk/libmnetutil/source/SocketServer.cxx
___________________________________________________________________
Name: svn:mime-type
+ text/plain
Name: svn:eol-style
+ native
More information about the Minisip-devel
mailing list