// // // Copyright (C) 2004 SIPfoundry Inc. // Licensed by SIPfoundry under the LGPL license. // // Copyright (C) 2004 Pingtel Corp. // Licensed to SIPfoundry under a Contributor Agreement. // // $$ ////////////////////////////////////////////////////////////////////////////// // SYSTEM INCLUDES //#include <...> // APPLICATION INCLUDES #include "utl/UtlInt.h" #include "os/OsLock.h" #include "os/OsDateTime.h" #include "os/OsFS.h" #include "os/OsSysLog.h" #include "xmlparser/tinyxml.h" #include "fastdb/fastdb.h" #include "net/Url.h" #include "sipdb/ResultSet.h" #include "sipdb/SIPDBManager.h" #include "sipdb/SubscriptionRow.h" #include "sipdb/SubscriptionDB.h" // DEFINES REGISTER( SubscriptionRow ); // MACROS // EXTERNAL FUNCTIONS // EXTERNAL VARIABLES // CONSTANTS // STRUCTS // TYPEDEFS // FORWARD DECLARATIONS // Static Initializers SubscriptionDB* SubscriptionDB::spInstance = NULL; OsMutex SubscriptionDB::sLockMutex (OsMutex::Q_FIFO); UtlString SubscriptionDB::gUriKey("uri"); UtlString SubscriptionDB::gCallidKey("callid"); UtlString SubscriptionDB::gContactKey("contact"); UtlString SubscriptionDB::gNotifycseqKey("notifycseq"); UtlString SubscriptionDB::gSubscribecseqKey("subscribecseq"); UtlString SubscriptionDB::gExpiresKey("expires"); UtlString SubscriptionDB::gEventtypeKey("eventtype"); UtlString SubscriptionDB::gIdKey("id"); UtlString SubscriptionDB::gToKey("to"); UtlString SubscriptionDB::gFromKey("from"); UtlString SubscriptionDB::gFileKey("file"); UtlString SubscriptionDB::gKeyKey("key"); UtlString SubscriptionDB::gRecordrouteKey("recordroute"); /* ============================ CREATORS ================================== */ SubscriptionDB::SubscriptionDB( const UtlString& name ) : mDatabaseName( name ) { // Access the shared table databse SIPDBManager* pSIPDBManager = SIPDBManager::getInstance(); m_pFastDB = pSIPDBManager->getDatabase(name); // If we are the first process to attach // then we need to load the DB int users = pSIPDBManager->getNumDatabaseProcesses(name); if ( users == 1 ) { // Load the file implicitly this->load(); } } SubscriptionDB::~SubscriptionDB() { OsSysLog::add(FAC_DB, PRI_DEBUG, "<><>## SubscriptionDB:: DESTRUCTOR"); } /* ============================ MANIPULATORS ============================== */ void SubscriptionDB::releaseInstance() { OsSysLog::add(FAC_DB, PRI_DEBUG, "<><>## SubscriptionDB:: releaseInstance() spInstance=%p", spInstance); // Critical Section here OsLock lock( sLockMutex ); // if it exists, delete the object and NULL out the pointer if (spInstance != NULL) { // unregister this table/process from the IMDB SIPDBManager::getInstance()->removeDatabase ( spInstance->mDatabaseName ); // NULL out the fastDB pointer also spInstance->m_pFastDB = NULL; delete spInstance; spInstance = NULL; } } OsStatus SubscriptionDB::load() { // Critical Section here OsLock lock( sLockMutex ); OsStatus result = OS_SUCCESS; if ( m_pFastDB != NULL ) { // Clean out the existing DB rows before loading // a new set from persistent storage removeAllRows (); UtlString fileName = SIPDBManager::getInstance()-> getConfigDirectory() + OsPath::separator + mDatabaseName + ".xml"; OsSysLog::add(FAC_DB, PRI_DEBUG, "SubscriptionDB::load loading \"%s\"", fileName.data()); TiXmlDocument doc ( fileName ); // Verify that we can load the file (i.e it must exist) if( doc.LoadFile() ) { TiXmlNode * rootNode = doc.FirstChild ( "items" ); if (rootNode != NULL) { // the folder node contains at least the name/displayname/ // and autodelete elements, it may contain others for( TiXmlNode *itemNode = rootNode->FirstChild( "item" ); itemNode; itemNode = itemNode->NextSibling( "item" ) ) { // Create a hash dictionary for element attributes UtlHashMap nvPairs; for( TiXmlNode *elementNode = itemNode->FirstChild(); elementNode; elementNode = elementNode->NextSibling() ) { // Bypass comments and other element types only interested // in parsing element attributes if ( elementNode->Type() == TiXmlNode::ELEMENT ) { UtlString elementName = elementNode->Value(); UtlString elementValue; result = SIPDBManager::getAttributeValue ( *itemNode, elementName, elementValue); if (result == OS_SUCCESS) { UtlString* collectableKey = new UtlString( elementName ); UtlString* collectableValue = new UtlString( elementValue ); nvPairs.insertKeyAndValue ( collectableKey, collectableValue ); } else if ( elementNode->FirstChild() == NULL ) { // Null Element value creaete a special // char string we have key and value so insert UtlString* collectableKey = new UtlString( elementName ); UtlString* collectableValue = new UtlString( SPECIAL_IMDB_NULL_VALUE ); nvPairs.insertKeyAndValue ( collectableKey, collectableValue ); } } } // Insert the item row into the IMDB insertRow ( nvPairs ); } } } else { OsSysLog::add(FAC_SIP, PRI_WARNING, "SubscriptionDB::load failed to load \"%s\"", fileName.data()); } } else { OsSysLog::add(FAC_DB, PRI_ERR, "SubscriptionDB::load failed - no DB"); result = OS_FAILED; } return result; } OsStatus SubscriptionDB::store() { // Critical Section here OsLock lock( sLockMutex ); OsStatus result = OS_SUCCESS; if ( m_pFastDB != NULL ) { UtlString fileName = SIPDBManager::getInstance()-> getConfigDirectory() + OsPath::separator + mDatabaseName + ".xml"; // Thread Local Storage m_pFastDB->attach(); // Search our memory for rows dbCursor< SubscriptionRow > cursor; // Select everything in the IMDB and add as item elements if present int rows = cursor.select(); if ( rows > 0 ) { OsSysLog::add( FAC_SIP, PRI_DEBUG ,"SubscriptionDB::store writing %d rows\n" ,rows ); // Create an empty document TiXmlDocument document; // Create a hard coded standalone declaration section document.Parse (""); // Create the root node container TiXmlElement itemsElement ( "items" ); itemsElement.SetAttribute( "type", mDatabaseName.data() ); int timeNow = OsDateTime::getSecsSinceEpoch(); itemsElement.SetAttribute( "timestamp", timeNow ); // metadata contains column names dbTableDescriptor* pTableMetaData = &SubscriptionRow::dbDescriptor; do { // Create an item container TiXmlElement itemElement ("item"); byte* base = (byte*)cursor.get(); // Add the column name value pairs for ( dbFieldDescriptor* fd = pTableMetaData->getFirstField(); fd != NULL; fd = fd->nextField ) { // if the column name does not contain the // np_prefix we must_presist it if ( strstr( fd->name, "np_" ) == NULL ) { // Create the a column element named after the IMDB column name TiXmlElement element (fd->name ); // See if the IMDB has the predefined SPECIAL_NULL_VALUE UtlString textValue; SIPDBManager::getFieldValue(base, fd, textValue); // If the value is not null append a text child element if ( textValue != SPECIAL_IMDB_NULL_VALUE ) { // Text type assumed here... @todo change this TiXmlText value ( textValue.data() ); // Store the column value in the element making this // coltextvalue element.InsertEndChild ( value ); } // Store this in the item tag as follows // // .. col1textvalue // .. col2textvalue // .... etc itemElement.InsertEndChild ( element ); } } // add the line to the element itemsElement.InsertEndChild ( itemElement ); } while ( cursor.next() ); // Attach the root node to the document document.InsertEndChild ( itemsElement ); document.SaveFile ( fileName ); } else { // database contains no rows so delete the file // :TODO: This is bogus, we should write out a file with no rows // rather than deleting the file, so that when the file is missing // we know that's bad. Get rid of this clause, we don't // need to treat this as a special case. UtlString fileName = SIPDBManager::getInstance()-> getConfigDirectory() + OsPath::separator + mDatabaseName + ".xml"; if ( OsFileSystem::exists ( fileName ) ) { OsFileSystem::remove( fileName ); } } // Commit rows to memory - multiprocess workaround m_pFastDB->detach(0); } else { result = OS_FAILED; } return result; } UtlBoolean SubscriptionDB::insertRow (const UtlHashMap& nvPairs) { UtlString expStr = *((UtlString*)nvPairs.findValue(&gExpiresKey)); int expires = (int) atoi( expStr ); UtlString cSubseqStr = *((UtlString*)nvPairs.findValue(&gSubscribecseqKey)); int subCseq = (int) atoi( cSubseqStr ); UtlString cNotifySeqStr = *((UtlString*)nvPairs.findValue(&gNotifycseqKey)); int notifyCseq = (int) atoi( cNotifySeqStr ); // Note: identity inferred from the uri return insertRow ( *((UtlString*)nvPairs.findValue(&gUriKey)), *((UtlString*)nvPairs.findValue(&gCallidKey)), *((UtlString*)nvPairs.findValue(&gContactKey)), expires, subCseq, *((UtlString*)nvPairs.findValue(&gEventtypeKey)), *((UtlString*)nvPairs.findValue(&gIdKey)), *((UtlString*)nvPairs.findValue(&gToKey)), *((UtlString*)nvPairs.findValue(&gFromKey)), *((UtlString*)nvPairs.findValue(&gKeyKey)), *((UtlString*)nvPairs.findValue(&gRecordrouteKey)), notifyCseq); } UtlBoolean SubscriptionDB::insertRow ( const UtlString& uri, const UtlString& callid, const UtlString& contact, const int& expires, const int& subscribeCseq, const UtlString& eventType, const UtlString& id, const UtlString& to, const UtlString& from, const UtlString& key, const UtlString& recordRoute, const int& notifyCseq) { UtlBoolean result = FALSE; if ( !uri.isNull() && ( m_pFastDB != NULL ) ) { // Thread Local Storage m_pFastDB->attach(); // Search for a matching row before deciding to update or insert dbCursor< SubscriptionRow > cursor( dbCursorForUpdate ); dbQuery query; // Firstly purge all expired field entries from the DB that are expired for this identity // we can not purge all rows because a Notify should be to user int timeNow = OsDateTime::getSecsSinceEpoch(); query = "expires <", static_cast(timeNow); if ( cursor.select( query ) > 0 ) { cursor.removeAllSelected(); } // query all sessions (should only be one here) query="to=",to, "and from=",from, "and callid=",callid, "and eventtype=",eventType, "and id=",( id.isNull() ? SPECIAL_IMDB_NULL_VALUE : id.data() ); if ( cursor.select( query ) > 0 ) { // Should only be one row, only updating this do { // only update the row if the subscribe is newer // than the last IMDB update if ( cursor->subscribecseq < subscribeCseq ) { // refreshing subscribe request cursor->uri = uri; cursor->expires = static_cast(expires); cursor->subscribecseq = subscribeCseq; cursor->recordroute = recordRoute; cursor->contact = contact; cursor.update(); } // do nothing as the the input cseq is <= the db cseq } while ( cursor.nextAvailable() ); } else { // Insert new row SubscriptionRow row; row.callid = callid; row.contact = contact; row.expires = expires; row.uri = uri; row.subscribecseq = subscribeCseq; row.notifycseq = notifyCseq; row.eventtype = eventType; row.id = id; row.from = from; row.key = key; row.to = to; row.recordroute = recordRoute; insert (row); } // Either did an insert or an update // Commit rows to memory - multiprocess workaround m_pFastDB->detach(0); result = TRUE; } // Most likely an arg problem return result; } void SubscriptionDB::removeRow ( const UtlString& to, const UtlString& from, const UtlString& callid, const int& subscribeCseq ) { if ( m_pFastDB != NULL ) { // Thread Local Storage m_pFastDB->attach(); dbCursor< SubscriptionRow > cursor(dbCursorForUpdate); // ensure we filter off of the subcribecseq since there // could be multiple removes from the IMDB and the one with // the highest cseq should be remain, note that the // < subscribeCseq comparison is important since under UDP conditions // the Status server may be busy and UDP may retransmit the // same message multiple times, this would cause the just subscribed row // to be incirrectly removed while the status server was sending down // its acknowledgement dbQuery query; query="to=",to, "and from=",from, "and callid=",callid, "and subcribecseq <",subscribeCseq; if (cursor.select(query) > 0) { cursor.removeAllSelected(); } else { OsSysLog::add(FAC_DB, PRI_DEBUG, "SubscriptionDB::removeRow row not found:\n" "to='%s' from='%s' callid='%s'\n" "cseq='%d'", to.data(), from.data(), callid.data(), subscribeCseq ); } // Commit rows to memory - multiprocess workaround m_pFastDB->detach(0); } } void SubscriptionDB::removeErrorRow ( const UtlString& to, const UtlString& from, const UtlString& callid ) { if ( m_pFastDB != NULL ) { // Thread Local Storage m_pFastDB->attach(); dbCursor< SubscriptionRow > cursor(dbCursorForUpdate); dbQuery query; query="to=",to, "and from=",from, "and callid=",callid; if (cursor.select(query) > 0) { cursor.removeAllSelected(); } else { OsSysLog::add(FAC_DB, PRI_DEBUG, "SubscriptionDB::removeErrorRow row not found:\n" "to='%s' from='%s' callid='%s'\n", to.data(), from.data(), callid.data() ); } // Commit rows to memory - multiprocess workaround m_pFastDB->detach(0); } } void SubscriptionDB::removeRows ( const UtlString& key ) { if ( !key.isNull() && (m_pFastDB != NULL) ) { // Thread Local Storage m_pFastDB->attach(); dbCursor< SubscriptionRow > cursor(dbCursorForUpdate); dbQuery query; query="key=",key; if (cursor.select(query) > 0) { cursor.removeAllSelected(); } // Commit rows to memory - multiprocess workaround m_pFastDB->detach(0); } } void SubscriptionDB::removeAllRows () { if ( m_pFastDB != NULL ) { // Thread Local Storage m_pFastDB->attach(); dbCursor< SubscriptionRow > cursor( dbCursorForUpdate ); if (cursor.select() > 0) { cursor.removeAllSelected(); } // Commit rows to memory - multiprocess workaround m_pFastDB->detach(0); } } void SubscriptionDB::getAllRows ( ResultSet& rResultSet ) const { // Clear the results rResultSet.destroyAll(); if ( m_pFastDB != NULL ) { // Thread Local Storage m_pFastDB->attach(); dbCursor< SubscriptionRow > cursor; if ( cursor.select() > 0 ) { do { UtlHashMap record; UtlString* uriValue = new UtlString ( cursor->uri ); UtlString* callidValue = new UtlString ( cursor->callid ); UtlString* contactValue = new UtlString ( cursor->contact ); UtlInt* expiresValue = new UtlInt ( cursor->expires ); UtlInt* subscribecseqValue = new UtlInt ( cursor->subscribecseq ); UtlString* eventtypeValue = new UtlString ( cursor->eventtype ); UtlString* idValue = new UtlString ( cursor->id ); UtlString* toValue = new UtlString ( cursor->to ); UtlString* fromValue = new UtlString ( cursor->from ); UtlString* keyValue = new UtlString ( cursor->key ); UtlString* recordrouteValue = new UtlString ( cursor->recordroute ); UtlInt* notifycseqValue = new UtlInt ( cursor->notifycseq ); // Memory Leak fixes, make shallow copies of static keys UtlString* uriKey = new UtlString( gUriKey ); UtlString* callidKey = new UtlString( gCallidKey ); UtlString* contactKey = new UtlString( gContactKey ); UtlString* expiresKey = new UtlString( gExpiresKey ); UtlString* subscribecseqKey = new UtlString( gSubscribecseqKey ); UtlString* eventtypeKey = new UtlString( gEventtypeKey ); UtlString* idKey = new UtlString( gIdKey ); UtlString* toKey = new UtlString( gToKey ); UtlString* fromKey = new UtlString( gFromKey ); UtlString* keyKey = new UtlString( gKeyKey ); UtlString* recordrouteKey = new UtlString( gRecordrouteKey ); UtlString* notifycseqKey = new UtlString( gNotifycseqKey ); record.insertKeyAndValue ( uriKey, uriValue); record.insertKeyAndValue ( callidKey, callidValue); record.insertKeyAndValue ( contactKey, contactValue); record.insertKeyAndValue ( expiresKey, expiresValue); record.insertKeyAndValue ( subscribecseqKey, subscribecseqValue); record.insertKeyAndValue ( eventtypeKey, eventtypeValue); record.insertKeyAndValue ( idKey, idValue); record.insertKeyAndValue ( toKey, toValue); record.insertKeyAndValue ( fromKey, fromValue); record.insertKeyAndValue ( keyKey, keyValue); record.insertKeyAndValue ( recordrouteKey, recordrouteValue); record.insertKeyAndValue ( notifycseqKey, notifycseqValue); rResultSet.addValue(record); } while (cursor.next()); } // Commit the tx m_pFastDB->detach(0); } } void SubscriptionDB::updateUnexpiredSubscription ( const UtlString& to, const UtlString& from, const UtlString& callid, const UtlString& eventType, const UtlString& id, const int& timeNow, const int& updatedNotifyCseq ) const { if ( m_pFastDB != NULL ) { // Thread Local Storage m_pFastDB->attach(); // Create an update cursor to purge the DB // of all expired contacts. This is a hack since this // should be implemented via a daemon garbage collector // thread. dbCursor< SubscriptionRow > cursor(dbCursorForUpdate); dbQuery query; query="to=",to, "and from=",from, "and callid=",callid, "and eventtype=",eventType, "and id=",(id.isNull() ? SPECIAL_IMDB_NULL_VALUE : id.data()) ; if ( cursor.select(query) > 0 ) { do { // Purge any expired rows if ( cursor->expires < timeNow ) { // note cursor.remove() auto updates the database cursor.remove(); } else { cursor->notifycseq = updatedNotifyCseq; // Update to the new NotifySCeq Number cursor.update(); } // Next replaced with nextAvailable - better when // selective updates applied to the cursor object } while ( cursor.nextAvailable() ); } // Commit rows to memory - multiprocess workaround m_pFastDB->detach(0); } } void SubscriptionDB::getUnexpiredSubscriptions ( const UtlString& key, const UtlString& eventType, const int& timeNow, ResultSet& rResultSet ) const { // Clear the results rResultSet.destroyAll(); if ( !key.isNull() && (m_pFastDB != NULL) ) { // Thread Local Storage m_pFastDB->attach(); // Create an update cursor to purge the DB // of all expired contacts. // This should be eventually done via a daemon thread dbCursor< SubscriptionRow > cursor( dbCursorForUpdate ); dbQuery query; query="expires <",timeNow; if ( cursor.select( query ) > 0 ) { cursor.removeAllSelected(); } // Now select the remaining current events query="key=",key,"and eventtype=",eventType; if ( cursor.select(query) > 0 ) { do { UtlHashMap record; UtlString* uriValue = new UtlString ( cursor->uri ); UtlString* callidValue = new UtlString ( cursor->callid ); UtlString* contactValue = new UtlString ( cursor->contact ); UtlInt* expiresValue = new UtlInt ( cursor->expires - timeNow ); UtlInt* subscribecseqValue = new UtlInt ( cursor->subscribecseq ); UtlString* eventtypeValue = new UtlString ( cursor->eventtype ); UtlString* idValue = new UtlString( ( (0 == strcmp(cursor->id, SPECIAL_IMDB_NULL_VALUE)) ? "" : cursor->id )); UtlString* toValue = new UtlString ( cursor->to ); UtlString* fromValue = new UtlString ( cursor->from ); UtlString* keyValue = new UtlString ( cursor->key ); UtlString* recordrouteValue = new UtlString ( cursor->recordroute ); UtlInt* notifycseqValue = new UtlInt ( cursor->notifycseq ); // Memory Leak fixes, make shallow copies of static keys UtlString* uriKey = new UtlString( gUriKey ); UtlString* callidKey = new UtlString( gCallidKey ); UtlString* contactKey = new UtlString( gContactKey ); UtlString* expiresKey = new UtlString( gExpiresKey ); UtlString* subscribecseqKey = new UtlString( gSubscribecseqKey ); UtlString* eventtypeKey = new UtlString( gEventtypeKey ); UtlString* idKey = new UtlString( gIdKey ); UtlString* toKey = new UtlString( gToKey ); UtlString* fromKey = new UtlString( gFromKey ); UtlString* keyKey = new UtlString( gKeyKey ); UtlString* recordrouteKey = new UtlString( gRecordrouteKey ); UtlString* notifycseqKey = new UtlString( gNotifycseqKey ); record.insertKeyAndValue ( uriKey, uriValue); record.insertKeyAndValue ( callidKey, callidValue); record.insertKeyAndValue ( contactKey, contactValue); record.insertKeyAndValue ( expiresKey, expiresValue); record.insertKeyAndValue ( subscribecseqKey, subscribecseqValue); record.insertKeyAndValue ( eventtypeKey, eventtypeValue); record.insertKeyAndValue ( idKey, idValue); record.insertKeyAndValue ( toKey, toValue); record.insertKeyAndValue ( fromKey, fromValue); record.insertKeyAndValue ( keyKey, keyValue); record.insertKeyAndValue ( recordrouteKey, recordrouteValue); record.insertKeyAndValue ( notifycseqKey, notifycseqValue); rResultSet.addValue(record); } while ( cursor.nextAvailable() ); } // Commit rows to memory - multiprocess workaround m_pFastDB->detach(0); } } SubscriptionDB* SubscriptionDB::getInstance( const UtlString& name ) { // Critical Section here OsLock lock( sLockMutex ); // See if this is the first time through for this process // Note that this being null => pgDatabase is also null if ( spInstance == NULL ) { // Create the singleton class for clients to use spInstance = new SubscriptionDB ( name ); } return spInstance; }