r3214 - in trunk/libmnetutil: . include include/libmnetutil source

mikma at minisip.org mikma at minisip.org
Sun Feb 18 17:58:19 CET 2007


Author: mikma
Date: 2007-02-18 17:58:17 +0100 (Sun, 18 Feb 2007)
New Revision: 3214

Added:
   trunk/libmnetutil/include/libmnetutil/SocketServer.h
   trunk/libmnetutil/source/SocketServer.cxx
Modified:
   trunk/libmnetutil/Makefile.am
   trunk/libmnetutil/include/Makefile.am
Log:
Add SocketServer, a thread listening for socket input. A pipe is used
to wake the thread when a socket is added, removed or
when it should terminate.


Modified: trunk/libmnetutil/Makefile.am
===================================================================
--- trunk/libmnetutil/Makefile.am	2007-02-18 16:26:17 UTC (rev 3213)
+++ trunk/libmnetutil/Makefile.am	2007-02-18 16:58:17 UTC (rev 3214)
@@ -46,6 +46,7 @@
 		    source/NetworkFunctions.cxx \
 		    source/ServerSocket.cxx \
 		    source/Socket.cxx \
+		    source/SocketServer.cxx \
 		    source/TCPSocket.cxx \
 		    source/NetUtil.cxx \
 		    source/UDPSocket.cxx \

Modified: trunk/libmnetutil/include/Makefile.am
===================================================================
--- trunk/libmnetutil/include/Makefile.am	2007-02-18 16:26:17 UTC (rev 3213)
+++ trunk/libmnetutil/include/Makefile.am	2007-02-18 16:58:17 UTC (rev 3214)
@@ -12,6 +12,7 @@
 			libmnetutil/NetworkFunctions.h \
 			libmnetutil/ServerSocket.h \
 			libmnetutil/Socket.h \
+			libmnetutil/SocketServer.h \
 			libmnetutil/StreamSocket.h \
 			libmnetutil/TCPSocket.h \
 			libmnetutil/NetUtil.h \

Added: trunk/libmnetutil/include/libmnetutil/SocketServer.h
===================================================================
--- trunk/libmnetutil/include/libmnetutil/SocketServer.h	2007-02-18 16:26:17 UTC (rev 3213)
+++ trunk/libmnetutil/include/libmnetutil/SocketServer.h	2007-02-18 16:58:17 UTC (rev 3214)
@@ -0,0 +1,73 @@
+/*
+  Copyright (C) 2006-2007 Mikael Magnusson
+
+  This library is free software; you can redistribute it and/or
+  modify it under the terms of the GNU Lesser General Public
+  License as published by the Free Software Foundation; either
+  version 2.1 of the License, or (at your option) any later version.
+
+  This library is distributed in the hope that it will be useful,
+  but WITHOUT ANY WARRANTY; without even the implied warranty of
+  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+  Lesser General Public License for more details.
+
+  You should have received a copy of the GNU Lesser General Public
+  License along with this library; if not, write to the Free Software
+  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+*/
+
+/*
+ * Authors: Mikael Magnusson <mikma at users.sourceforge.net>
+ */
+
+#ifndef LIBMNETUTIL_SOCKETSERVER_H
+#define LIBMNETUTIL_SOCKETSERVER_H
+
+#include<libmnetutil/libmnetutil_config.h>
+
+#include<libmutil/MemObject.h>
+#include<libmutil/Mutex.h>
+#include<libmutil/Thread.h>
+#include<libmnetutil/Socket.h>
+
+#include<map>
+
+#ifdef WIN32
+# include <winsock2.h>
+#endif
+
+class LIBMNETUTIL_API InputReadyHandler: public virtual MObject {
+	public:
+		virtual ~InputReadyHandler();
+		virtual void inputReady( MRef<Socket*> socket )=0;
+};
+
+class LIBMNETUTIL_API SocketServer : public Runnable {
+	public:
+		SocketServer();
+		virtual ~SocketServer();
+
+		void start();
+		void stop();
+		void join();
+
+		void addSocket( MRef<Socket*> socket, MRef<InputReadyHandler*> handler );
+		void removeSocket( MRef<Socket*> socket );
+
+		MRef<Socket*> findStreamSocketPeer( const IPAddress &addr,
+						    uint16_t port );
+
+	protected:
+		void run();
+		void signal();
+		int buildFdSet( fd_set *set, int pipeFd );
+
+	private:
+		typedef std::map< MRef<Socket*>, MRef<InputReadyHandler*> > Sockets;
+		Mutex csMutex;
+		MRef<Thread *> thread;
+		Sockets sockets;
+		int fdSignal;
+		bool doStop;
+};
+#endif


Property changes on: trunk/libmnetutil/include/libmnetutil/SocketServer.h
___________________________________________________________________
Name: svn:mime-type
   + text/plain
Name: svn:eol-style
   + native

Added: trunk/libmnetutil/source/SocketServer.cxx
===================================================================
--- trunk/libmnetutil/source/SocketServer.cxx	2007-02-18 16:26:17 UTC (rev 3213)
+++ trunk/libmnetutil/source/SocketServer.cxx	2007-02-18 16:58:17 UTC (rev 3214)
@@ -0,0 +1,263 @@
+/*
+  Copyright (C) 2006-2007 Mikael Magnusson
+
+  This library is free software; you can redistribute it and/or
+  modify it under the terms of the GNU Lesser General Public
+  License as published by the Free Software Foundation; either
+  version 2.1 of the License, or (at your option) any later version.
+
+  This library is distributed in the hope that it will be useful,
+  but WITHOUT ANY WARRANTY; without even the implied warranty of
+  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+  Lesser General Public License for more details.
+
+  You should have received a copy of the GNU Lesser General Public
+  License along with this library; if not, write to the Free Software
+  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+*/
+
+/*
+ * Authors: Mikael Magnusson <mikma at users.sourceforge.net>
+ */
+
+#include<config.h>
+
+#include<libmnetutil/SocketServer.h>
+#include<libmutil/CriticalSection.h>
+#include<libmnetutil/NetworkException.h>
+#include<libmnetutil/StreamSocket.h>
+
+#ifdef WIN32
+# include<io.h>
+# include<fcntl.h>
+# define write _write
+# define pipe(fds) _pipe((fds), 256, O_BINARY)
+#endif
+
+#ifndef SOCKET
+# ifdef WIN32
+#  define SOCKET uint32_t
+# else
+#  define SOCKET int32_t
+# endif
+#endif
+
+using namespace std;
+
+class EqualAddrPort: public std::unary_function< std::pair<const MRef<Socket*>, MRef<InputReadyHandler*> >&, bool>{
+	public:
+		EqualAddrPort( const IPAddress &theAddress,
+			       uint16_t thePort )
+				: address( theAddress ), port( thePort ){}
+
+		bool operator() ( pair<const MRef<Socket*>, MRef<InputReadyHandler*> > &pair  ) {
+			MRef<Socket *> sock = pair.first;
+			if( sock.isNull() )
+				return false;
+
+			StreamSocket *ssock =
+				dynamic_cast<StreamSocket*>(*sock);
+
+			if( !ssock )
+				return false;
+
+			return ssock->matchesPeer( address, port );
+		}
+	private:
+		const IPAddress &address;
+		uint16_t port;
+};
+
+
+InputReadyHandler::~InputReadyHandler()
+{
+}
+
+
+SocketServer::SocketServer(): fdSignal( -1 ), doStop( false )
+{
+}
+
+SocketServer::~SocketServer()
+{
+	stop();
+}
+
+void SocketServer::start()
+{
+	CriticalSection cs( csMutex );
+	if( !thread.isNull() )
+		return;
+
+	thread = new Thread( this );
+}
+
+void SocketServer::stop()
+{
+	CriticalSection cs( csMutex );
+
+	if( thread.isNull() )
+		return;
+
+	doStop = true;
+	signal();
+}
+
+void SocketServer::join()
+{
+	CriticalSection cs( csMutex );
+
+	if( thread.isNull() ){
+		return;
+	}
+
+	thread->join();
+	thread = NULL;
+}
+
+void SocketServer::addSocket( MRef<Socket*> socket,
+			      MRef<InputReadyHandler*> handler )
+{
+	CriticalSection cs( csMutex );
+
+	sockets[ socket ] = handler;
+	signal();
+}
+
+void SocketServer::removeSocket( MRef<Socket*> socket )
+{
+	CriticalSection cs( csMutex );
+
+	Sockets::iterator pos = sockets.find( socket );
+	if( pos != sockets.end() ){
+		sockets.erase( pos );
+		signal();
+	}
+}
+
+MRef<Socket*> SocketServer::findStreamSocketPeer( const IPAddress &addr,
+						  uint16_t port )
+{
+	CriticalSection cs( csMutex );
+
+	EqualAddrPort pred( addr, port );
+
+	Sockets::iterator iter =
+		find_if( sockets.begin(), sockets.end(), pred );
+
+	if( iter == sockets.end() )
+		return NULL;
+
+	return iter->first;
+}
+
+void SocketServer::signal()
+{
+	char c = 0;
+
+	if( fdSignal < 0 )
+		return;
+
+	if( write( fdSignal, &c, sizeof( c )) < 0 ) {
+		cerr << "Write failed " << fdSignal << endl;
+		throw NetworkException( errno );
+	}
+}
+
+int SocketServer::buildFdSet( fd_set *set, int pipeFd )
+{
+	SOCKET maxFd;
+	Sockets::const_iterator i;
+
+	FD_ZERO( set );
+	FD_SET( (SOCKET)pipeFd, set );
+
+	maxFd = pipeFd;
+
+	for( i = sockets.begin(); i != sockets.end(); i++ ){
+		MRef<Socket*> socket = i->first;
+		SOCKET fd = socket->getFd();
+
+		FD_SET( fd, set );
+		if( fd > maxFd )
+			maxFd = fd;
+	}
+
+	return maxFd;
+}
+
+void SocketServer::run()
+{
+	struct timeval timeout;
+	fd_set tmpl;
+	fd_set set;
+	int pipeFds[2] = {-1,-1};
+	int maxFd;
+	Sockets::const_iterator i;
+
+	csMutex.lock();
+
+	if( fdSignal >= 0 ){
+		close( fdSignal );
+		fdSignal = -1;
+	}
+
+	if( ::pipe( pipeFds ) ){
+		throw Exception( "Can't create pipe" );
+	}
+
+	fdSignal = pipeFds[1];
+
+	csMutex.unlock();
+
+	maxFd = buildFdSet( &tmpl, pipeFds[0] );
+
+	while (!doStop){
+
+		int avail;
+		do{
+			memcpy( &set, &tmpl, sizeof( set ) );
+
+			timeout.tv_sec = 5;
+			timeout.tv_usec= 0;
+			avail = select( maxFd + 1, &set, NULL, NULL, &timeout );
+			if (avail<0){
+				Thread::msleep(500);
+			}
+		} while( avail < 0 );
+		if (avail==0){
+// 			cerr<< "SipSocketServer::run(): Timeout"<< endl;
+		}
+
+		if( FD_ISSET( pipeFds[0], &set ) ){
+			char buf[255];
+
+			if( read( pipeFds[0], buf, sizeof(buf) ) < 0){
+				cerr << "Read failed" << endl;
+				throw NetworkException( errno );
+			}
+
+			maxFd = buildFdSet( &tmpl, pipeFds[0] );
+		}
+
+		for( i = sockets.begin(); i != sockets.end(); i++ ){
+			MRef<Socket*> socket = i->first;
+			MRef<InputReadyHandler*> handler = i->second;
+
+			if( FD_ISSET( socket->getFd(), &set ) ){
+				if( !handler.isNull() ){
+					handler->inputReady( socket );
+				}
+			}
+		}
+	}
+
+// 	csMutex.lock();
+
+	fdSignal = -1;
+	close( pipeFds[0] );
+	close( pipeFds[1] );
+	doStop = false;
+
+// 	csMutex.unlock();
+}


Property changes on: trunk/libmnetutil/source/SocketServer.cxx
___________________________________________________________________
Name: svn:mime-type
   + text/plain
Name: svn:eol-style
   + native



More information about the Minisip-devel mailing list