r3228 - in trunk/libmsip: include/libmsip source
mikma at minisip.org
mikma at minisip.org
Wed Mar 7 10:37:18 CET 2007
Author: mikma
Date: 2007-03-07 10:37:17 +0100 (Wed, 07 Mar 2007)
New Revision: 3228
Modified:
trunk/libmsip/include/libmsip/SipLayerTransport.h
trunk/libmsip/include/libmsip/SipSocketServer.h
trunk/libmsip/source/SipLayerTransport.cxx
trunk/libmsip/source/SipSocketServer.cxx
Log:
Use SocketServer in SipSocketServer and SipLayerTransport. Fixes delay during stack shutdown
Modified: trunk/libmsip/include/libmsip/SipLayerTransport.h
===================================================================
--- trunk/libmsip/include/libmsip/SipLayerTransport.h 2007-03-07 09:32:19 UTC (rev 3227)
+++ trunk/libmsip/include/libmsip/SipLayerTransport.h 2007-03-07 09:37:17 UTC (rev 3228)
@@ -46,6 +46,7 @@
class SipLayerTransport;
class SipCommandDispatcher;
class StreamThreadData;
+class StreamThreadServer;
class SipLayerTransport : public SipSMCommandReceiver {
public:
@@ -68,6 +69,7 @@
bool addVia);
void addSocket(MRef<StreamSocket *> sock);
+ void removeSocket(MRef<StreamSocket *> sock);
void addServer(MRef<SipSocketServer *> server);
@@ -125,21 +127,14 @@
Mutex serversLock;
std::list<MRef<SipSocketServer *> > servers;
- std::list<MRef<StreamThreadData *> > workers;
+ MRef<SocketServer*> manager;
- Mutex socksLock;
- std::list<MRef<StreamSocket *> > socks;
- Mutex socksPendingLock;
- std::list<MRef<StreamSocket *> > socksPending;
-
MRef<certificate_chain *> cert_chain;
MRef<ca_db *> cert_db;
void * tls_ctx;
MRef<SipCommandDispatcher*> dispatcher;
- Semaphore semaphore;
-
friend class StreamThreadData;
};
Modified: trunk/libmsip/include/libmsip/SipSocketServer.h
===================================================================
--- trunk/libmsip/include/libmsip/SipSocketServer.h 2007-03-07 09:32:19 UTC (rev 3227)
+++ trunk/libmsip/include/libmsip/SipSocketServer.h 2007-03-07 09:37:17 UTC (rev 3228)
@@ -31,6 +31,7 @@
#include<libmnetutil/Socket.h>
#include<libmnetutil/ServerSocket.h>
+#include<libmnetutil/SocketServer.h>
#include<libmutil/Thread.h>
class SipLayerTransport;
@@ -40,7 +41,7 @@
* when a client connects to it.
*
*/
-class SipSocketServer : public Runnable{
+class SipSocketServer : public SocketServer, InputReadyHandler {
public:
SipSocketServer(MRef<SipLayerTransport*> r, MRef<Socket*> sock );
virtual ~SipSocketServer();
@@ -59,20 +60,16 @@
void setExternalPort(int32_t port) { externalPort = port; }
int32_t getExternalPort() const { return externalPort; }
- void run();
- void start();
- void stop();
- void join();
-
virtual void inputReady();
+ protected:
+ virtual void inputReady( MRef<Socket*> socket );
+
private:
MRef<Socket *> ssock;
MRef<SipLayerTransport *> receiver;
- bool doStop;
std::string externalIp;
int32_t externalPort;
- ThreadHandle th;
};
Modified: trunk/libmsip/source/SipLayerTransport.cxx
===================================================================
--- trunk/libmsip/source/SipLayerTransport.cxx 2007-03-07 09:32:19 UTC (rev 3227)
+++ trunk/libmsip/source/SipLayerTransport.cxx 2007-03-07 09:37:17 UTC (rev 3228)
@@ -269,43 +269,29 @@
return 0;
}
-class StreamThreadData : public MObject{
+class StreamThreadData : public InputReadyHandler{
public:
- StreamThreadData( MRef<SipLayerTransport *> );
- void run();
- void stop();
- void join();
+ StreamThreadData( MRef<StreamSocket*>,
+ MRef<SipLayerTransport *> );
+ void inputReady( MRef<Socket*> socket );
+
+ protected:
void streamSocketRead( MRef<StreamSocket *> socket );
+
private:
SipMessageParser parser;
MRef<SipLayerTransport *> transport;
- bool doStop;
- ThreadHandle th;
+ MRef<StreamSocket *> ssocket;
};
-static void * streamThread( void * arg ){
- StreamThreadData * data;
- data = (StreamThreadData *)arg;
- // We want to keep a reference to the object so that it
- // exist (at least) until we exit this function.
- MRef<StreamThreadData *> ref=data;
-
- ref->run();
- return NULL;
-}
-
-StreamThreadData::StreamThreadData( MRef<SipLayerTransport *> transport){
- doStop=false;
+StreamThreadData::StreamThreadData( MRef<StreamSocket *> theSocket,
+ MRef<SipLayerTransport *> transport)
+ :ssocket( theSocket ){
this->transport = transport;
- th=Thread::createThread(streamThread, this);
}
-void StreamThreadData::join(){
- Thread::join(th);
-}
-
bool sipdebug_print_packets=false;
void set_debug_print_packets(bool f){
@@ -347,18 +333,12 @@
mout << end;
}
-static void * streamThread( void * arg );
-
SipLayerTransport::SipLayerTransport(MRef<certificate_chain *> cchain,
MRef<ca_db *> cert_db):
cert_chain(cchain), cert_db(cert_db), tls_ctx(NULL)
{
- int i;
-
- for( i=0; i < NB_THREADS ; i++ ){
- StreamThreadData *worker = new StreamThreadData(this);
- workers.push_back(worker);
- }
+ manager = new SocketServer();
+ manager->start();
}
@@ -387,12 +367,8 @@
serversLock.lock();
list<MRef<StreamThreadData*> >::iterator j;
+ manager->stop();
- for( j=workers.begin(); j != workers.end(); j++ ){
- MRef<StreamThreadData*> w = *j;
- w->stop();
- }
-
//tell the threads to stop. Don't "join" just yet
//since that might take some time and we want that
//time to be spent in parallel for all servers.
@@ -402,18 +378,9 @@
server->stop();
}
- //wake up blocking threads
- int n;
- for (n=0; n< NB_THREADS; n++){
- semaphore.inc();
- }
+ manager->join();
+ manager = NULL;
- for( j=workers.begin(); j != workers.end(); j++ ){
- MRef<StreamThreadData*> w = *j;
- w->join();
- *j=NULL;
- }
-
//wait for the threads in the servers.
//NOTE: this can take about five seconds
//(that is the read timeout in the socket
@@ -425,7 +392,6 @@
*i=NULL;
}
- workers.clear();
servers.clear();
serversLock.unlock();
}
@@ -996,27 +962,25 @@
}
void SipLayerTransport::addSocket(MRef<StreamSocket *> sock){
- socksLock.lock();
- this->socks.push_back(sock);
- socksLock.unlock();
- socksPendingLock.lock();
- this->socksPending.push_back(sock);
- socksPendingLock.unlock();
- semaphore.inc();
+ MRef<StreamThreadData*> worker = new StreamThreadData( sock, this );
+
+ manager->addSocket( *sock, dynamic_cast<InputReadyHandler*>(*worker) );
}
+void SipLayerTransport::removeSocket( MRef<StreamSocket *> sock ){
+ manager->removeSocket( *sock );
+}
+
+
MRef<StreamSocket *> SipLayerTransport::findStreamSocket( IPAddress &address, uint16_t port ){
- list<MRef<StreamSocket *> >::iterator i;
+ MRef<Socket *> sock =
+ manager->findStreamSocketPeer( address, port );
- socksLock.lock();
- for( i=socks.begin(); i != socks.end(); i++ ){
- if( (*i)->matchesPeer(address, port) ){
- socksLock.unlock();
- return *i;
- }
+ if( !sock ){
+ return NULL;
}
- socksLock.unlock();
- return NULL;
+
+ return dynamic_cast<StreamSocket*>(*sock);
}
static void updateVia(MRef<SipMessage*> pack, MRef<IPAddress *>from,
@@ -1142,46 +1106,10 @@
} // if event
}
-void StreamThreadData::stop(){
- doStop=true;
- transport=NULL;
+void StreamThreadData::inputReady( MRef<Socket*> socket ){
+ streamSocketRead( ssocket );
}
-void StreamThreadData::run(){
- while(!doStop){
-
- MRef<StreamSocket *> socket;
-
- //Keep a local reference of transport so that
- //it is not deleted until we are done with it.
- MRef<SipLayerTransport*> transp = transport;
- if (!transp)
- break;
- transp->semaphore.dec();
- if (doStop)
- break;
-
- /* Take the last socket pending to be read */
- transp->socksPendingLock.lock();
- socket = transp->socksPending.front();
- transp->socksPending.pop_front();
- transp->socksPendingLock.unlock();
-
- /* Read from it until it gets closed */
- streamSocketRead( socket );
-
- /* The socket was closed */
- transp->socksLock.lock();
- transp->socks.remove( socket );
- transp->socksLock.unlock();
-#ifdef DEBUG_OUTPUT
- mdbg << "StreamSocket closed" << end;
-#endif
-
- parser.init();
- }
-}
-
#define STREAM_MAX_PKT_SIZE 65536
void StreamThreadData::streamSocketRead( MRef<StreamSocket *> socket ){
@@ -1189,46 +1117,21 @@
for (int i=0; i< STREAM_MAX_PKT_SIZE+1; i++){
buffer[i]=0;
}
- int avail;
MRef<SipMessage*> pack;
- while( !doStop){
- fd_set set;
-
- do{
- struct timeval tv;
- // Timeout needs to be set before each call to select since
- // it should be consider undefined after select() returns.
- tv.tv_sec = 600;
- tv.tv_usec = 0;
-
- // We need update the fd set before call to select()
- FD_ZERO(&set);
- FD_SET((SOCKET)socket->getFd(), &set);
-
- avail = select(socket->getFd()+1,&set,NULL,NULL,&tv );
- } while( avail <= 0 );
-
- if( avail == 0 ){
-#ifdef DEBUG_OUTPUT
- mdbg << "Closing Stream socket due to inactivity" << end;
-#endif
- break;
- }
-
- if( FD_ISSET( socket->getFd(), &set )){
int32_t nread;
nread = socket->read( buffer, STREAM_MAX_PKT_SIZE);
if (nread == -1){
mdbg << "Some error occured while reading from StreamSocket" << end;
- continue;
+ return;
}
if ( nread == 0){
// Connection was closed
mdbg << "Connection was closed" << end;
- break;
+ transport->removeSocket( socket );
+ return;
}
#ifdef ENABLE_TS
//ts.save( PACKET_IN );
@@ -1277,7 +1180,7 @@
#endif
/* Probably we don't have enough data
* so go back to reading */
- continue;
+// continue;
}
catch(SipExceptionInvalidStart & ){
@@ -1285,10 +1188,9 @@
// packet, close the connection
mdbg << "This does not look like a SIP packet, close the connection" << endl;
- break;
+ socket->close();
+ transport->removeSocket( socket );
}
- } // if event
- }// while true
}
Modified: trunk/libmsip/source/SipSocketServer.cxx
===================================================================
--- trunk/libmsip/source/SipSocketServer.cxx 2007-03-07 09:32:19 UTC (rev 3227)
+++ trunk/libmsip/source/SipSocketServer.cxx 2007-03-07 09:37:17 UTC (rev 3228)
@@ -38,8 +38,10 @@
//
-SipSocketServer::SipSocketServer(MRef<SipLayerTransport*> r, MRef<Socket*> sock): ssock(sock), receiver(r),doStop(false){
+SipSocketServer::SipSocketServer(MRef<SipLayerTransport*> r, MRef<Socket*> sock): ssock(sock), receiver(r){
externalPort = ssock->getPort();
+
+ addSocket( sock, this );
}
SipSocketServer::~SipSocketServer(){
@@ -61,53 +63,10 @@
return receiver;
}
-void SipSocketServer::run(){
- struct timeval timeout;
- fd_set set;
- int fd = ssock->getFd();
- while (!doStop){
-
- int avail;
- do{
- FD_ZERO(&set);
- #ifdef WIN32
- FD_SET( (uint32_t) fd, &set);
- #else
- FD_SET(fd, &set);
- #endif
-
- timeout.tv_sec = 5;
- timeout.tv_usec= 0;
- avail = select(fd+1,&set,NULL,NULL,&timeout );
- if (avail<0){
- Thread::msleep(500);
- }
- } while( avail < 0 );
- if (avail==0){
-// cerr<< "SipSocketServer::run(): Timeout"<< endl;
- }
- MRef<SipLayerTransport *> r = receiver;
- if (avail && !doStop && r){
- inputReady();
- }
-
- }
-
-} // "myself" will be freed here and the object can be freed.
-
-void SipSocketServer::start(){
- Thread t(this);
- th=t.getHandle();
+void SipSocketServer::inputReady( MRef<Socket*> socket ){
+ inputReady();
}
-void SipSocketServer::stop(){
- doStop=true;
-}
-
-void SipSocketServer::join(){
- Thread::join(th);
-}
-
void SipSocketServer::inputReady(){
}
More information about the Minisip-devel
mailing list