r3228 - in trunk/libmsip: include/libmsip source

mikma at minisip.org mikma at minisip.org
Wed Mar 7 10:37:18 CET 2007


Author: mikma
Date: 2007-03-07 10:37:17 +0100 (Wed, 07 Mar 2007)
New Revision: 3228

Modified:
   trunk/libmsip/include/libmsip/SipLayerTransport.h
   trunk/libmsip/include/libmsip/SipSocketServer.h
   trunk/libmsip/source/SipLayerTransport.cxx
   trunk/libmsip/source/SipSocketServer.cxx
Log:
Use SocketServer in SipSocketServer and SipLayerTransport. Fixes delay during stack shutdown

Modified: trunk/libmsip/include/libmsip/SipLayerTransport.h
===================================================================
--- trunk/libmsip/include/libmsip/SipLayerTransport.h	2007-03-07 09:32:19 UTC (rev 3227)
+++ trunk/libmsip/include/libmsip/SipLayerTransport.h	2007-03-07 09:37:17 UTC (rev 3228)
@@ -46,6 +46,7 @@
 class SipLayerTransport;
 class SipCommandDispatcher;
 class StreamThreadData;
+class StreamThreadServer;
 
 class SipLayerTransport : public SipSMCommandReceiver {
 	public:
@@ -68,6 +69,7 @@
 				 bool addVia);
 
 		void addSocket(MRef<StreamSocket *> sock);
+		void removeSocket(MRef<StreamSocket *> sock);
 
 		void addServer(MRef<SipSocketServer *> server);
 
@@ -125,21 +127,14 @@
                 
 		Mutex serversLock;
 		std::list<MRef<SipSocketServer *> > servers;
-		std::list<MRef<StreamThreadData *> > workers;
+		MRef<SocketServer*> manager;
 
-		Mutex socksLock;
-		std::list<MRef<StreamSocket *> > socks;
-		Mutex socksPendingLock;
-		std::list<MRef<StreamSocket *> > socksPending;
-
 		MRef<certificate_chain *> cert_chain;
 		MRef<ca_db *> cert_db;
 		void * tls_ctx;
 
 		MRef<SipCommandDispatcher*> dispatcher;
 
-		Semaphore semaphore;
-
 		friend class StreamThreadData;
 
 };

Modified: trunk/libmsip/include/libmsip/SipSocketServer.h
===================================================================
--- trunk/libmsip/include/libmsip/SipSocketServer.h	2007-03-07 09:32:19 UTC (rev 3227)
+++ trunk/libmsip/include/libmsip/SipSocketServer.h	2007-03-07 09:37:17 UTC (rev 3228)
@@ -31,6 +31,7 @@
 
 #include<libmnetutil/Socket.h>
 #include<libmnetutil/ServerSocket.h>
+#include<libmnetutil/SocketServer.h>
 #include<libmutil/Thread.h>
 
 class SipLayerTransport;
@@ -40,7 +41,7 @@
  * when a client connects to it.
  *
  */
-class SipSocketServer : public Runnable{
+class SipSocketServer : public SocketServer, InputReadyHandler {
 	public:
 		SipSocketServer(MRef<SipLayerTransport*> r, MRef<Socket*> sock );
 		virtual ~SipSocketServer();
@@ -59,20 +60,16 @@
 		void setExternalPort(int32_t port) { externalPort = port; }
 		int32_t getExternalPort() const { return externalPort; }
 
-		void run();
-		void start();
-		void stop();
-		void join();
-
 		virtual void inputReady();
 
+	protected:
+		virtual void inputReady( MRef<Socket*> socket );
+
 	private:
 		MRef<Socket *> ssock;
 		MRef<SipLayerTransport *> receiver;
-		bool doStop;
 		std::string externalIp;
 		int32_t externalPort;
-		ThreadHandle th;
 };
 
 

Modified: trunk/libmsip/source/SipLayerTransport.cxx
===================================================================
--- trunk/libmsip/source/SipLayerTransport.cxx	2007-03-07 09:32:19 UTC (rev 3227)
+++ trunk/libmsip/source/SipLayerTransport.cxx	2007-03-07 09:37:17 UTC (rev 3228)
@@ -269,43 +269,29 @@
 	return 0;
 }
 
-class StreamThreadData : public MObject{
+class StreamThreadData : public InputReadyHandler{
 	public:
-		StreamThreadData( MRef<SipLayerTransport *> );
-		void run();
-		void stop();
-		void join();
+		StreamThreadData( MRef<StreamSocket*>,
+				  MRef<SipLayerTransport *> );
+		void inputReady( MRef<Socket*> socket );
+
+	protected:
 		void streamSocketRead( MRef<StreamSocket *> socket );
+
 	private:
 		SipMessageParser parser;
 		MRef<SipLayerTransport  *> transport;
-		bool doStop;
-		ThreadHandle th;
+		MRef<StreamSocket *> ssocket;
 };
 
-static void * streamThread( void * arg ){
-	StreamThreadData * data;
-	data = (StreamThreadData *)arg;
 
-		// We want to keep a reference to the object so that it
-		// exist (at least) until we exit this function.
-	MRef<StreamThreadData *> ref=data;
-
-	ref->run();
-	return NULL;
-}
-
-StreamThreadData::StreamThreadData( MRef<SipLayerTransport *> transport){
-	doStop=false;
+StreamThreadData::StreamThreadData( MRef<StreamSocket *> theSocket,
+				    MRef<SipLayerTransport *> transport)
+		:ssocket( theSocket ){
 	this->transport = transport;
-	th=Thread::createThread(streamThread, this);
 }
 
-void StreamThreadData::join(){
-	Thread::join(th);
-}
 
-
 bool sipdebug_print_packets=false;
 
 void set_debug_print_packets(bool f){
@@ -347,18 +333,12 @@
 	mout << end;
 }
 
-static void * streamThread( void * arg );
-
 SipLayerTransport::SipLayerTransport(MRef<certificate_chain *> cchain,
 				     MRef<ca_db *> cert_db):
 		cert_chain(cchain), cert_db(cert_db), tls_ctx(NULL)
 {
-	int i;
-
-	for( i=0; i < NB_THREADS ; i++ ){
-		StreamThreadData *worker = new StreamThreadData(this);
-		workers.push_back(worker);
-	}
+	manager = new SocketServer();
+	manager->start();
 }
 
 
@@ -387,12 +367,8 @@
 	serversLock.lock();
 	list<MRef<StreamThreadData*> >::iterator j;
 
+	manager->stop();
 
-	for( j=workers.begin(); j != workers.end(); j++ ){
-		MRef<StreamThreadData*> w = *j;
-		w->stop();
-	}
-
 		//tell the threads to stop. Don't "join" just yet
 		//since that might take some time and we want that
 		//time to be spent in parallel for all servers.
@@ -402,18 +378,9 @@
 		server->stop();
 	}
 
-		//wake up blocking threads
-	int n;
-	for (n=0; n< NB_THREADS; n++){
-		semaphore.inc();
-	}
+	manager->join();
+	manager = NULL;
 
-	for( j=workers.begin(); j != workers.end(); j++ ){
-		MRef<StreamThreadData*> w = *j;
-		w->join();
-		*j=NULL;
-	}
-
 		//wait for the threads in the servers.
 		//NOTE: this can take about five seconds
 		//(that is the read timeout in the socket
@@ -425,7 +392,6 @@
 		*i=NULL;
 	}
 
-	workers.clear();
 	servers.clear();
 	serversLock.unlock();
 }
@@ -996,27 +962,25 @@
 }
 
 void SipLayerTransport::addSocket(MRef<StreamSocket *> sock){
-	socksLock.lock();
-	this->socks.push_back(sock);
-	socksLock.unlock();
-	socksPendingLock.lock();
-	this->socksPending.push_back(sock);
-	socksPendingLock.unlock();
-        semaphore.inc();
+	MRef<StreamThreadData*> worker = new StreamThreadData( sock, this );
+
+	manager->addSocket( *sock, dynamic_cast<InputReadyHandler*>(*worker) );
 }
 
+void SipLayerTransport::removeSocket( MRef<StreamSocket *> sock ){
+	manager->removeSocket( *sock );
+}
+
+
 MRef<StreamSocket *> SipLayerTransport::findStreamSocket( IPAddress &address, uint16_t port ){
-	list<MRef<StreamSocket *> >::iterator i;
+	MRef<Socket *> sock =
+		manager->findStreamSocketPeer( address, port );
 
-	socksLock.lock();
-	for( i=socks.begin(); i != socks.end(); i++ ){
-		if( (*i)->matchesPeer(address, port) ){
-			socksLock.unlock();
-			return *i;
-		}
+	if( !sock ){
+		return NULL;
 	}
-	socksLock.unlock();
-	return NULL;
+
+	return dynamic_cast<StreamSocket*>(*sock);
 }
 
 static void updateVia(MRef<SipMessage*> pack, MRef<IPAddress *>from,
@@ -1142,46 +1106,10 @@
 		} // if event
 }
 
-void StreamThreadData::stop(){
-	doStop=true;
-	transport=NULL;
+void StreamThreadData::inputReady( MRef<Socket*> socket ){
+	streamSocketRead( ssocket );
 }
 
-void StreamThreadData::run(){
-	while(!doStop){
-
-		MRef<StreamSocket *> socket;
-		
-			//Keep a local reference of transport so that
-			//it is not deleted until we are done with it.
-		MRef<SipLayerTransport*> transp = transport;
-		if (!transp)
-			break;
-                transp->semaphore.dec();
-		if (doStop)
-			break;
-                
-		/* Take the last socket pending to be read */
-		transp->socksPendingLock.lock();
-		socket = transp->socksPending.front();
-		transp->socksPending.pop_front();
-		transp->socksPendingLock.unlock();
-                
-		/* Read from it until it gets closed */
-		streamSocketRead( socket );
-
-		/* The socket was closed */
-		transp->socksLock.lock();
-		transp->socks.remove( socket );
-		transp->socksLock.unlock();
-#ifdef DEBUG_OUTPUT
-		mdbg << "StreamSocket closed" << end;
-#endif
-
-		parser.init();
-	}
-}
-
 #define STREAM_MAX_PKT_SIZE 65536
 
 void StreamThreadData::streamSocketRead( MRef<StreamSocket *> socket ){
@@ -1189,46 +1117,21 @@
 	for (int i=0; i< STREAM_MAX_PKT_SIZE+1; i++){
 		buffer[i]=0;
 	}
-	int avail;
 	MRef<SipMessage*> pack;
 
-	while( !doStop){
-		fd_set set;
-
-		do{
-			struct timeval tv;
-			// Timeout needs to be set before each call to select since
-			// it should be consider undefined after select() returns.
-			tv.tv_sec = 600;
-			tv.tv_usec = 0;
-
-			// We need update the fd set before call to select()
-			FD_ZERO(&set);
-			FD_SET((SOCKET)socket->getFd(), &set);
-
-			avail = select(socket->getFd()+1,&set,NULL,NULL,&tv );
-		} while( avail <= 0 );
-
-		if( avail == 0 ){
-#ifdef DEBUG_OUTPUT
-			mdbg << "Closing Stream socket due to inactivity" << end;
-#endif
-			break;
-		}
-
-		if( FD_ISSET( socket->getFd(), &set )){
 			int32_t nread;
 			nread = socket->read( buffer, STREAM_MAX_PKT_SIZE);
 
 			if (nread == -1){
 				mdbg << "Some error occured while reading from StreamSocket" << end;
-				continue;
+				return;
 			}
 
 			if ( nread == 0){
 				// Connection was closed
 				mdbg << "Connection was closed" << end;
-				break;
+				transport->removeSocket( socket );
+				return;
 			}
 #ifdef ENABLE_TS
 			//ts.save( PACKET_IN );
@@ -1277,7 +1180,7 @@
 #endif
 				/* Probably we don't have enough data
 				 * so go back to reading */
-				continue;
+// 				continue;
 			}
 			
 			catch(SipExceptionInvalidStart & ){
@@ -1285,10 +1188,9 @@
 				// packet, close the connection
 				
 				mdbg << "This does not look like a SIP packet, close the connection" << endl;
-				break;
+				socket->close();
+				transport->removeSocket( socket );
 			}
-		} // if event
-	}// while true
 }
 
 

Modified: trunk/libmsip/source/SipSocketServer.cxx
===================================================================
--- trunk/libmsip/source/SipSocketServer.cxx	2007-03-07 09:32:19 UTC (rev 3227)
+++ trunk/libmsip/source/SipSocketServer.cxx	2007-03-07 09:37:17 UTC (rev 3228)
@@ -38,8 +38,10 @@
 // 
 
 
-SipSocketServer::SipSocketServer(MRef<SipLayerTransport*> r, MRef<Socket*> sock): ssock(sock), receiver(r),doStop(false){
+SipSocketServer::SipSocketServer(MRef<SipLayerTransport*> r, MRef<Socket*> sock): ssock(sock), receiver(r){
 	externalPort = ssock->getPort();
+
+	addSocket( sock, this );
 }
 
 SipSocketServer::~SipSocketServer(){
@@ -61,53 +63,10 @@
 	return receiver;
 }
 
-void SipSocketServer::run(){
-	struct timeval timeout;
-	fd_set set;
-	int fd = ssock->getFd();
-	while (!doStop){
-
-		int avail;
-		do{
-			FD_ZERO(&set);
-			#ifdef WIN32
-			FD_SET( (uint32_t) fd, &set);
-			#else
-			FD_SET(fd, &set);
-			#endif
-			
-			timeout.tv_sec = 5;
-			timeout.tv_usec= 0;
-			avail = select(fd+1,&set,NULL,NULL,&timeout );
-			if (avail<0){
-				Thread::msleep(500);
-			}
-		} while( avail < 0 );
-		if (avail==0){
-// 			cerr<< "SipSocketServer::run(): Timeout"<< endl;
-		}
-		MRef<SipLayerTransport *> r = receiver;
-		if (avail && !doStop && r){
-			inputReady();
-		}
-
-	}
-
-} // "myself" will be freed here and the object can be freed.
-
-void SipSocketServer::start(){
-	Thread t(this);
-	th=t.getHandle();
+void SipSocketServer::inputReady( MRef<Socket*> socket ){
+	inputReady();
 }
 
-void SipSocketServer::stop(){
-	doStop=true;
-}
-
-void SipSocketServer::join(){
-	Thread::join(th);
-}
-
 void SipSocketServer::inputReady(){
 }
 



More information about the Minisip-devel mailing list