/* Copyright (C) 2003 Frédéric Giudicelli (contact_nos@yahoo.com). All rights reserved. This product includes cryptographic software written by Eric Young (eay@cryptsoft.com) This program is released under the GPL with the additional exemption that compiling, linking, and/or using OpenSSL is allowed. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ // AsynchJobs.cpp: implementation of the AsynchJobs class. // ////////////////////////////////////////////////////////////////////// #include "AsynchJobs.h" #include "MIME.h" #include "svintl.h" ////////////////////////////////////////////////////////////////////// // Construction/Destruction ////////////////////////////////////////////////////////////////////// AsynchJobs::AsynchJobs() { m_StoreType = NEWPKISTORE_TYPE_NONE; m_DbConn = NULL; m_store = NULL; m_event = NULL; hThreadMailQueue.Create(ThreadMailQueue, this); hThreadRepositoriesSynchro.Create(ThreadRepositoriesSynchro, this); hThreadResponderWorker.Create(ThreadResponderWorker, this); hThreadConfigurationPush.Create(ThreadConfigurationPush, this); } AsynchJobs::~AsynchJobs() { StopAll(); if(m_DbConn) delete m_DbConn; } bool AsynchJobs::StartMailer() { if(!hThreadMailQueue.Start()) { NEWPKIerr(PKI_ERROR_TXT, ERROR_UNKNOWN); return false; } return true; } void AsynchJobs::StopMailer() { hThreadMailQueue.Stop(); } bool AsynchJobs::StartConfSync() { if(!hThreadRepositoriesSynchro.Start()) { NEWPKIerr(PKI_ERROR_TXT, ERROR_UNKNOWN); return false; } if( (m_StoreType & NEWPKISTORE_TYPE_RESPONDER) ) { if(!hThreadResponderWorker.Start()) { NEWPKIerr(PKI_ERROR_TXT, ERROR_UNKNOWN); return false; } } return true; } void AsynchJobs::StopConfSync() { hThreadRepositoriesSynchro.Stop(); hThreadResponderWorker.Stop(); } bool AsynchJobs::StartConfPush(bool PushToAllRep) { m_PushToAllRep = PushToAllRep; if(!hThreadConfigurationPush.Start()) { NEWPKIerr(PKI_ERROR_TXT, ERROR_UNKNOWN); return false; } return true; } void AsynchJobs::StopConfPush() { hThreadConfigurationPush.Stop(); } void AsynchJobs::StopAll() { StopMailer(); StopConfSync(); StopConfPush(); } void AsynchJobs::SetEventHandler(AsynchJobsEvent * event) { m_event = event; } void AsynchJobs::SetStore(NewPKIStore * store) { if(!store) return; m_store = store; m_StoreType = m_store->get_Type(); } void AsynchJobs::SetEntityCert(const PKI_CERT & cert) { m_EntityCert = cert; } void AsynchJobs::SetEntityName(const mString & EntityName) { m_EntityName = EntityName; } bool AsynchJobs::SetEntityParentsCert(const mVector & EntityParentCerts) { m_AdminMailQueueLock.EnterCS(); m_EntityParentCerts = EntityParentCerts; m_AdminMailQueueLock.LeaveCS(); return true; } void AsynchJobs::SetLogger(EntityLog *log) { m_Logging = log; } bool AsynchJobs::SetAdminMails(const mVector & Admins) { size_t i; m_AdminMailQueueLock.EnterCS(); m_AdminMails.clear(); for(i=0; im_AdminMailQueue.push_back(q); m_AdminMailQueueLock.LeaveCS(); return true; } void AsynchJobs::ThreadMailQueue(const NewpkiThread * Thread, void *param) { AsynchJobs * me_this = (AsynchJobs *)param; MIME mimeEncoder; bool ret; mString Body; mString mime; mVector Recipients; time_t lastLoad = 0;; time_t currTime; MailQueueEntry currMail; do { me_this->m_AdminMailQueueLock.EnterCS(); do { time(&currTime); if( (currTime - lastLoad) >= MAILS_DB_RELOAD ) { me_this->LoadMails(); lastLoad = currTime; } me_this->m_AdminMailQueueLock.LeaveCS(); NewpkiThread::Sleep(100); me_this->m_AdminMailQueueLock.EnterCS(); } while(!Thread->ShouldStop() && (!me_this->m_AdminMailQueue.size() || !me_this->m_Mailer || !me_this->m_EntityParentCerts.size())); if(Thread->ShouldStop()) { me_this->m_AdminMailQueueLock.LeaveCS(); return; } currMail = me_this->m_AdminMailQueue[0]; me_this->m_AdminMailQueue.erase(me_this->m_AdminMailQueue.begin()); me_this->m_AdminMailQueueLock.LeaveCS(); Body = _sv("Entity: "); Body += me_this->m_EntityName; Body += _sv("\nAuthor: "); Body += currMail.get_author(); Body += "\n\n"; Body += currMail.get_mail().get_body(); if(currMail.get_recipients().size()) Recipients = currMail.get_recipients(); else Recipients = me_this->m_AdminMails; if( (ret = mimeEncoder.GenerateMIME(mime, Body.c_str(), currMail.get_mail().get_attach().get_Buffer(), currMail.get_mail().get_attach().get_BufferLen(), currMail.get_mail().get_attachname().c_str(), currMail.get_mail().get_attachtype().c_str())) ) { if(currMail.get_mail().get_signmail()) ret = mimeEncoder.GenerateSMIME(mime, mime.c_str(), me_this->m_EntityCert.GetX509(), me_this->m_EntityCert.GetPrivateKey().GetRsaKey(), me_this->m_EntityParentCerts); } if(ret) { ERR_clear_error(); ret = me_this->m_Mailer.Send((currMail.get_adminmail() == 1), Recipients, currMail.get_mail().get_subject(), mime); } // The email was successfully sent we can // now remove it from the DB if(ret) { me_this->RemoveMail(currMail.get_id()); currMail.Clear(); } } while(true); } void AsynchJobs::RemoveMail(unsigned long id) { SQL sql(m_DbConn); mString req; req.sprintf(MAIL_DELETE, id); sql.Execute(req); } void AsynchJobs::LoadMails() { SQL sql(m_DbConn); mString pem; long NumRows; long i; size_t j; MailQueueEntry newEntry; if(!sql.Execute(MAIL_SELECT)) { return; } if(!sql.NumRows(&NumRows)) { return; } for(i=0; iShouldStop() && newConf) { NewpkiDebug(LOG_LEVEL_DEBUG, m_EntityName.c_str(), _sv("Importing new conf")); if(!m_event->OnNewConf(newConf)) { ERR_to_mstring(err); m_Logging->LogMessage(LOG_STATUS_TYPE_FAILURE, LOG_MESSAGE_TYPE_ENTITY_GET_MY_CONF, 0, m_EntityCert.GetStringName(), LOG_NO_OBJECTID, "Conf", err.c_str()); NewpkiDebug(LOG_LEVEL_DEBUG, m_EntityName.c_str(), _sv("Failed to import new conf: %s"), err.c_str()); } else { NewpkiDebug(LOG_LEVEL_DEBUG, m_EntityName.c_str(), _sv("New conf imported")); } } if(m_store) { // Process the new requests if(!Thread->ShouldStop() && (m_StoreType & NEWPKISTORE_TYPE_RESPONDER) && reqs) { // For each new request verify that we don't already have it // and trigger the event for(i=0; !Thread->ShouldStop() && iLogMessage(LOG_STATUS_TYPE_FAILURE, LOG_MESSAGE_TYPE_NEW_REQUEST, 0, SenderName.c_str(), LOG_NO_OBJECTID, TransactionId.c_str(), err.c_str()); NewpkiDebug(LOG_LEVEL_DEBUG, m_EntityName.c_str(), _sv("Failed to import new request %s : %s"), TransactionId.c_str(), err.c_str()); } else { m_Logging->LogMessage(LOG_STATUS_TYPE_SUCCESS, LOG_MESSAGE_TYPE_NEW_REQUEST, 0, SenderName.c_str(), LOG_NO_OBJECTID, TransactionId.c_str()); NewpkiDebug(LOG_LEVEL_DEBUG, m_EntityName.c_str(), _sv("Successfully imported new request %s"), TransactionId.c_str()); } } } reqs.Clear(); } // Process the new responses if(!Thread->ShouldStop() && (m_StoreType & NEWPKISTORE_TYPE_REQUESTER) && resps) { // For each new response verify that we don't already have it // and trigger the event for(i=0; !Thread->ShouldStop() && iLogMessage(LOG_STATUS_TYPE_FAILURE, LOG_MESSAGE_TYPE_ENTITY_GET_MY_CONF, 0, m_EntityCert.GetStringName(), LOG_NO_OBJECTID, _sv("Response"), err.c_str()); NewpkiDebug(LOG_LEVEL_DEBUG, m_EntityName.c_str(), _sv("Failed to import new response %s : %s"), TransactionId.c_str(), err.c_str()); } else { NewpkiDebug(LOG_LEVEL_DEBUG, m_EntityName.c_str(), _sv("Successfully imported new response %s"), TransactionId.c_str()); } } } resps.Clear(); } } return true; } else { return false; } } void AsynchJobs::ThreadRepositoriesSynchro(const NewpkiThread * Thread, void *param) { AsynchJobs * me_this = (AsynchJobs *)param; time_t currTime; time_t lastAllTime; time_t lastFullSend; int indexReqs; int indexResps; int what; do { NewpkiThread::Sleep(500); } while(!Thread->ShouldStop() && !me_this->m_event && !me_this->m_Repositories.size()); if(Thread->ShouldStop()) { return; } indexReqs = 0; indexResps = 0; lastFullSend = 0; lastAllTime = 0; while(!Thread->ShouldStop()) { time(&currTime); if(currTime - lastFullSend > 3600) { // Every hour we retry to send objects // that might have failed earlier lastFullSend = currTime; indexReqs = 0; indexResps = 0; } if(currTime - lastAllTime > ASYNCH_JOBS_INTERVAL) { what = SYNCH_ALL; lastAllTime = currTime; } else { what = SYNCH_REQS | SYNCH_RESPS; } // Get the requests and responses that weren't sent me_this->m_waitingReqs.clear(); me_this->m_store->Requester_GetUnsentRequests(me_this->m_waitingReqs, indexReqs, 100); me_this->m_waitingResps.get_responses().clear(); me_this->m_store->Responder_GetUnsentResponses(me_this->m_waitingResps.get_responses(), indexResps, 100); if(what == SYNCH_ALL || me_this->m_waitingReqs.size() || me_this->m_waitingResps.get_responses().size() || me_this->m_waitingDelResps.get_transactionids().size() ) { me_this->DoRepositoriesSynchro(Thread, SYNCH_ALL); if(me_this->m_waitingReqs.size()) indexReqs++; if(me_this->m_waitingResps.get_responses().size()) indexResps++; } // We still wait a bit time(&currTime); do { NewpkiThread::Sleep(100); } while( ((time(NULL) - currTime) < 5) && !Thread->ShouldStop()); } } bool AsynchJobs::SynchronizeWithRepository(const NewpkiThread * Thread, EntityConfCrypted & myConf, CryptedNewpkiRequests &reqs, CryptedNewpkiResponses &resps, const mString & RepName, const mString & RepAddress, unsigned int RepPort, const PKI_CERT & RepCert, PkiClient * ClientPki, mString & Err, int what) { TransactionIds m_transactionIds; Asn1OctetString transactionId; StackRequest currReq; CryptedNewpkiResponse currResp; Asn1OctetString currDel; size_t i; mString strTID; NewpkiDebug(LOG_LEVEL_DEBUG, m_EntityName.c_str(), _sv("Connecting to repository %s (%s:%ld)"), RepName.c_str(), RepAddress.c_str(), RepPort); //We connect to the repository if(!ConnectToRepository(RepName, RepAddress, RepPort, RepCert, ClientPki, 30, Err)) { NewpkiDebug(LOG_LEVEL_DEBUG, m_EntityName.c_str(), _sv("Failed to connect to repository %s (%s:%ld)"), RepName.c_str(), RepAddress.c_str(), RepPort); return false; } NewpkiDebug(LOG_LEVEL_DEBUG, m_EntityName.c_str(), _sv("Connected to repository %s (%s:%ld)"), RepName.c_str(), RepAddress.c_str(), RepPort); if(Thread->ShouldStop()) { return true; } //Send each request if(m_store && (m_StoreType & NEWPKISTORE_TYPE_REQUESTER) && (what & SYNCH_REQS)) { while(!Thread->ShouldStop() && m_waitingReqs.size()) { currReq = m_waitingReqs[0]; if(!ClientPki->SendRequest(currReq.get_req())) { NewpkiDebug(LOG_LEVEL_DEBUG, m_EntityName.c_str(), _sv("Failed to sent request %ld to repository %s (%s:%ld)"), currReq.get_id(), RepName.c_str(), RepAddress.c_str(), RepPort); Err = ClientPki->GetError(); return false; } m_store->Requester_RequestSent(currReq.get_id()); //Remove entry m_waitingReqs.erase(m_waitingReqs.begin()); NewpkiDebug(LOG_LEVEL_DEBUG, m_EntityName.c_str(), _sv("Successfully sent request %ld to repository %s (%s:%ld), still %d to go"), currReq.get_id(), RepName.c_str(), RepAddress.c_str(), RepPort, m_waitingReqs.size()); } if(Thread->ShouldStop()) { return true; } // Send each deletion request // A deletion request is sent when a requester has successfully // processed the response, and therefore declares is at not // needed anymore m_waitingDelRespsLock.EnterCS(); for(i=0; !Thread->ShouldStop() && iDeleteResponse(currDel)) { NewpkiDebug(LOG_LEVEL_DEBUG, m_EntityName.c_str(), _sv("Failed to sent deletion request %ld to repository %s (%s:%ld)"), i, RepName.c_str(), RepAddress.c_str(), RepPort); Err = ClientPki->GetError(); return false; } NewpkiDebug(LOG_LEVEL_DEBUG, m_EntityName.c_str(), _sv("Successfully sent deletion request %ld to repository %s (%s:%ld), still %d to go"), i, RepName.c_str(), RepAddress.c_str(), RepPort, m_waitingDelResps.get_transactionids().size()); m_waitingDelRespsLock.EnterCS(); m_waitingDelResps.get_transactionids().erase(m_waitingDelResps.get_transactionids().begin()); } m_waitingDelRespsLock.LeaveCS(); if(Thread->ShouldStop()) { return true; } // Get new responses that were sent to me // Get the list of transactions for which we are waiting for an answer if(m_store->Requester_GetWaitingTIDs(m_transactionIds) && m_transactionIds.get_transactionids().size()) { // Get the responses (if they're available) if(Thread->ShouldStop()) { return true; } if(!ClientPki->GetMyResponses(m_transactionIds, resps)) { NewpkiDebug(LOG_LEVEL_DEBUG, m_EntityName.c_str(), _sv("Failed to get my responses from repository %s (%s:%ld)"), RepName.c_str(), RepAddress.c_str(), RepPort); Err = ClientPki->GetError(); return false; } NewpkiDebug(LOG_LEVEL_DEBUG, m_EntityName.c_str(), _sv("Successfully got my responses from repository %s (%s:%ld)"), RepName.c_str(), RepAddress.c_str(), RepPort); } if(Thread->ShouldStop()) { return true; } } if(m_store && (m_StoreType & NEWPKISTORE_TYPE_RESPONDER) && (what & SYNCH_RESPS)) { //Send each new response while(!Thread->ShouldStop() && m_waitingResps.get_responses().size()) { currResp = m_waitingResps.get_responses()[0]; if(!NewPKIStore::transactionIDtoString(currResp.get_transactionid(), strTID)) { strTID = "Unknown"; } if(!ClientPki->SendResponse(currResp)) { NewpkiDebug(LOG_LEVEL_DEBUG, m_EntityName.c_str(), _sv("Failed to sent response %s to repository %s (%s:%ld)"), strTID.c_str(), RepName.c_str(), RepAddress.c_str(), RepPort); Err = ClientPki->GetError(); return false; } m_store->Responder_ResponseSent(currResp.get_transactionid()); //Remove entry m_waitingResps.get_responses().erase(m_waitingResps.get_responses().begin()); NewpkiDebug(LOG_LEVEL_DEBUG, m_EntityName.c_str(), _sv("Successfully sent response %s to repository %s (%s:%ld), still %d to go"), strTID.c_str(), RepName.c_str(), RepAddress.c_str(), RepPort, m_waitingResps.get_responses().size()); } if(Thread->ShouldStop()) { return true; } // Get the new requests that were sent to me if(m_store->Responder_GetKnownTIDs(m_transactionIds)) { if(Thread->ShouldStop()) { return true; } if(!ClientPki->GetMyRequests(m_transactionIds, reqs)) { NewpkiDebug(LOG_LEVEL_DEBUG, m_EntityName.c_str(), _sv("Failed to get my requests from repository %s (%s:%ld)"), RepName.c_str(), RepAddress.c_str(), RepPort); Err = ClientPki->GetError(); return false; } NewpkiDebug(LOG_LEVEL_DEBUG, m_EntityName.c_str(), _sv("Successfully got my requests from repository %s (%s:%ld)"), RepName.c_str(), RepAddress.c_str(), RepPort); } if(Thread->ShouldStop()) { return false; } } if(what & SYNCH_CONF) { // Get entity conf if(!ClientPki->GetMyConf(myConf)) { Err = ClientPki->GetError(); return false; } } return true; } bool AsynchJobs::SynchronizeWithRepositories(const NewpkiThread * Thread, EntityConfCrypted & myConf, CryptedNewpkiRequests &reqs, CryptedNewpkiResponses &resps, int what) { size_t i; mString RepName; mString RepAddress; unsigned int RepPort; PKI_CERT RepCert; mString Err; PkiClient ClientPki(NULL); resps.Clear(); myConf.Clear(); reqs.Clear(); m_RepositoriesLock.LockRead(); for(i=0; iShouldStop()) { m_RepositoriesLock.UnlockRead(); return true; } //If the destination is me we ignore it if(m_EntityCert == m_Repositories[i].get_repositoryssl()) continue; // We copy the datas, so we can unlock RepName = m_Repositories[i].get_name(); RepAddress = m_Repositories[i].get_address(); RepPort = m_Repositories[i].get_port(); RepCert = m_Repositories[i].get_repositoryssl(); m_RepositoriesLock.UnlockRead(); ERR_clear_error(); if(SynchronizeWithRepository(Thread, myConf, reqs, resps, RepName, RepAddress, RepPort, RepCert, &ClientPki, Err, what)) { NewpkiDebug(LOG_LEVEL_INFO, m_EntityName.c_str(), _sv("Successfully got my conf and my objects from repository %s (%s:%ld)"), RepName.c_str(), RepAddress.c_str(), RepPort); ClientPki.CloseConnection(); return true; } NewpkiDebug(LOG_LEVEL_WARNING, m_EntityName.c_str(), _sv("Failed to get my conf and my objects from repository %s (%s:%ld)\nReason:%s"), RepName.c_str(), RepAddress.c_str(), RepPort, Err.c_str()); m_Logging->LogMessage(LOG_STATUS_TYPE_FAILURE, LOG_MESSAGE_TYPE_ENTITY_GET_MY_CONF, 0, NULL, LOG_NO_OBJECTID, RepName.c_str(), Err.c_str()); ClientPki.CloseConnection(); m_RepositoriesLock.LockRead(); } m_RepositoriesLock.UnlockRead(); return false; } bool AsynchJobs::SetRepositories(const mVector &Repositories) { m_RepositoriesLock.LockWrite(); m_Repositories = Repositories; m_RepositoriesLock.UnlockWrite(); return true; } bool AsynchJobs::InsertRequest(unsigned long priv_attr, const NewpkiRequest & Request, const X509_PUBKEY * Recipient) const { EVP_PKEY * CryptKey; CryptedNewpkiRequest newReq; if(!m_store || !(m_StoreType & NEWPKISTORE_TYPE_REQUESTER)) { NEWPKIerr(PKI_ERROR_TXT, ERROR_NOT_ALLOWED); return false; } if(!newReq.set_recipient(Recipient)) { NEWPKIerr(PKI_ERROR_TXT, ERROR_ABORT); return false; } if(!newReq.set_sender(m_EntityCert.GetX509_PUBKEY())) { NEWPKIerr(PKI_ERROR_TXT, ERROR_MALLOC); return false; } if(!(CryptKey = X509_PUBKEY_get((X509_PUBKEY*)Recipient))) { NEWPKIerr(PKI_ERROR_TXT, ERROR_ABORT); return false; } if(!Request.to_SignEncrypt(newReq.get_request(), m_EntityCert.GetPrivateKey().GetRsaKey(), CryptKey, EVP_sha1(), EVP_des_ede3_cbc())) { NEWPKIerr(PKI_ERROR_TXT, ERROR_ABORT); EVP_PKEY_free(CryptKey); return false; } EVP_PKEY_free(CryptKey); if(!m_store->Requester_InsertRequest(newReq, priv_attr)) { NEWPKIerr(PKI_ERROR_TXT, ERROR_ABORT); return false; } return true; } bool AsynchJobs::InsertResponse(const Asn1OctetString & transactionID, const NewpkiResponse & Response, const X509_PUBKEY * Recipient) const { CryptedNewpkiResponse s_resp; EVP_PKEY * cryptkey; if(!m_store || !(m_StoreType & NEWPKISTORE_TYPE_RESPONDER)) { NEWPKIerr(PKI_ERROR_TXT, ERROR_NOT_ALLOWED); return false; } if(!s_resp.set_transactionid(transactionID)) { NEWPKIerr(PKI_ERROR_TXT, ERROR_ABORT); return false; } if(!s_resp.set_sender(m_EntityCert.GetX509_PUBKEY())) { NEWPKIerr(PKI_ERROR_TXT, ERROR_ABORT); return false; } if(!s_resp.set_recipient(Recipient)) { NEWPKIerr(PKI_ERROR_TXT, ERROR_ABORT); return false; } cryptkey = X509_PUBKEY_get((X509_PUBKEY*)Recipient); if(!cryptkey) { NEWPKIerr(PKI_ERROR_TXT, ERROR_ABORT); return false; } if(!Response.to_SignEncrypt(s_resp.get_response(), m_EntityCert.GetPrivateKey().GetRsaKey(), cryptkey, EVP_sha1(), EVP_des_ede3_cbc())) { EVP_PKEY_free(cryptkey); NEWPKIerr(PKI_ERROR_TXT, ERROR_ABORT); return false; } EVP_PKEY_free(cryptkey); // The response will be removed once it has been sent if(!m_store->Responder_SetResponse(s_resp)) { NEWPKIerr(PKI_ERROR_TXT, ERROR_ABORT); return false; } return true; } void AsynchJobs::ThreadConfigurationPush(const NewpkiThread * Thread, void *param) { AsynchJobs * me_this = (AsynchJobs *)param; long lastVersion; time_t currTime; time_t lastTime; time_t waitTime; time_t lastUpdate; //We do this to always push the configuration at startup //Which forces a full synchronization of all repositories lastVersion = 0; //We wait a bit (2 mins) do { NewpkiThread::Sleep(500); } while(!Thread->ShouldStop() && !me_this->m_Repositories.size()); time(&lastTime); time(&lastUpdate); waitTime = 15; while(!Thread->ShouldStop()) { me_this->ConfPushLock.EnterCS(); //Do we have a conf ? if(!me_this->m_PkiConf) { me_this->ConfPushLock.LeaveCS(); goto wait_next; } // We deploy new configuration on repositories time(&lastUpdate); if(!me_this->Private_PushConfiguration(Thread)) { waitTime = 60; } else { me_this->m_PkiConf.Clear(); waitTime = 15; } me_this->ConfPushLock.LeaveCS(); wait_next: do { NewpkiThread::Sleep(500); time(&currTime); } while( ((currTime - lastTime) < waitTime) && !Thread->ShouldStop()); lastTime = currTime; //Every hour we force an update if( (currTime - lastUpdate) > 3600) lastVersion = 0; } } bool AsynchJobs::Private_PushConfiguration(const NewpkiThread * Thread) { size_t i; mString RepName; mString RepAddress; unsigned int RepPort; PKI_CERT RepCert; mString err; bool Success = false; PkiClient ClientPki(NULL); // We deploy conf on first available repository // the repository will be in charge on deploying it // on all other repositories m_RepositoriesLock.LockRead(); for(i=0; iShouldStop()) { m_RepositoriesLock.UnlockRead(); return true; } // If the current rep is me or is firewalled or aloready // known the conf, we ignore it if(m_EntityCert == m_Repositories[i].get_repositoryssl() || ASN1_BIT_STRING_get_bit(m_Repositories[i].get_flags(), REP_ENTRY_INFO_FLAG_FIREWALLED) || RepositoryKnowsObject(m_PkiConf.get_repPath(), m_Repositories[i].get_repositoryssl())) continue; // We copy the datas, so we can unlock RepName = m_Repositories[i].get_name(); RepAddress = m_Repositories[i].get_address(); RepPort = m_Repositories[i].get_port(); RepCert = m_Repositories[i].get_repositoryssl(); m_RepositoriesLock.UnlockRead(); ERR_clear_error(); m_Logging->LogMessage(LOG_STATUS_TYPE_REQUEST, LOG_MESSAGE_TYPE_CONFIG_PUSH, 0, NULL, LOG_NO_OBJECTID, RepName.c_str()); //We connect to the repository if(!ConnectToRepository(RepName, RepAddress, RepPort, RepCert, &ClientPki, 30, err)) { NewpkiDebug(LOG_LEVEL_WARNING, m_EntityName.c_str(), _sv("Failed to push conf on entity %s (%s:%ld)\nReason:%s"), RepName.c_str(), RepAddress.c_str(), RepPort, err.c_str()); m_Logging->LogMessage(LOG_STATUS_TYPE_FAILURE, LOG_MESSAGE_TYPE_CONFIG_PUSH, 0, NULL, LOG_NO_OBJECTID, RepName.c_str(), err.c_str()); } else { if(!ClientPki.PushConfiguration(m_PkiConf)) { NewpkiDebug(LOG_LEVEL_WARNING, m_EntityName.c_str(), _sv("Failed to push conf on entity %s (%s:%ld)\nReason:%s"), RepName.c_str(), RepAddress.c_str(), RepPort, ClientPki.GetError()); m_Logging->LogMessage(LOG_STATUS_TYPE_FAILURE, LOG_MESSAGE_TYPE_CONFIG_PUSH, 0, NULL, LOG_NO_OBJECTID, RepName.c_str(), ClientPki.GetError()); } else { NewpkiDebug(LOG_LEVEL_INFO, m_EntityName.c_str(), _sv("Successfully pushed conf on entity %s (%s:%ld)"), RepName.c_str(), RepAddress.c_str(), RepPort); m_Logging->LogMessage(LOG_STATUS_TYPE_SUCCESS, LOG_MESSAGE_TYPE_CONFIG_PUSH, 0, NULL, LOG_NO_OBJECTID, RepName.c_str()); if(!m_PushToAllRep) { ClientPki.CloseConnection(); return true; } AddRepositoryToObjectsPath(m_PkiConf.get_repPath(), m_Repositories[i].get_repositoryssl()); Success = true; } } m_RepositoriesLock.LockRead(); } m_RepositoriesLock.UnlockRead(); ClientPki.CloseConnection(); return Success; } bool AsynchJobs::SetPkiConf(const ExportedPkiConf & PkiConf) { ConfPushLock.EnterCS(); if(!(m_PkiConf = PkiConf)) { NEWPKIerr(PKI_ERROR_TXT, ERROR_ABORT); ConfPushLock.LeaveCS(); return false; } ConfPushLock.LeaveCS(); return true; } bool AsynchJobs::ConnectToRepository(const RepEntryInfo & currRep, PkiClient * ClientPki, int TimeOut, mString & err) const { return ConnectToRepository(currRep.get_name(), currRep.get_address(), currRep.get_port(), currRep.get_repositoryssl(), ClientPki, TimeOut, err); } bool AsynchJobs::ConnectToRepository(const mString & RepName, const mString & RepAddress, unsigned int RepPort, const PKI_CERT & RepCert, PkiClient * ClientPki, int TimeOut, mString & err) const { PKI_CERT peerCert; int UserType; long Pos; const char * cn; AdminReqLogin Login; err=""; ClientPki->CloseConnection(); ClientPki->SetReadTimeOut(TimeOut); //We connect to the repository if(!ClientPki->Connect(RepAddress.c_str(), RepPort, m_EntityCert, NULL)) { err = ClientPki->GetError(); return false; } Login.set_entity(RepName); if(!ClientPki->UserLogin(Login, UserType)) { err = ClientPki->GetError(); return false; } //Does the cert correspond to the one expected ? ClientPki->GetEntityCert(peerCert); if(!peerCert) { NEWPKIerr(PKI_ERROR_TXT, ERROR_UNEXPECTED_CERT); ERR_to_mstring(err); return false; } // Is a cert signed by the internal CA ? if(m_AclValidator->ValidateCert(peerCert) != INTERNAL_CA_TYPE_ENTITY) { NEWPKIerr(PKI_ERROR_TXT, ERROR_UNEXPECTED_CERT); ERR_to_mstring(err); return false; } if(peerCert != RepCert) { // Is the CN the same ? Pos = peerCert.GetCertDN().SeekEntryName("commonName", HASHTABLE_NOT_FOUND); if(Pos != HASHTABLE_NOT_FOUND) { cn = peerCert.GetCertDN().Get(Pos); if(cn) { if(RepName == cn) { // Yes it the same name, we issue a warning NewpkiDebug(LOG_LEVEL_WARNING, m_EntityName.c_str(), _sv("Repository certificate changed (%s)!"), RepName.c_str()); return true; } } } NEWPKIerr(PKI_ERROR_TXT, ERROR_UNEXPECTED_CERT); ERR_to_mstring(err); return false; } return true; } void AsynchJobs::SetX509_ACL_Validator(const X509_ACL_Validator *AclValidator) { m_AclValidator = AclValidator; } bool AsynchJobs::CreateTables(SQL_Connection *DbConn) { SQL sql(DbConn); long i; char * CommonCreates[] = {MAIL_JOBS_STACK, NULL}; mString req; //We execute each request for(i=0; CommonCreates[i]; i++) { if(!sql.Execute(CommonCreates[i])) { NEWPKIerr(PKI_ERROR_TXT, ERROR_ABORT); return false; } } return true; } bool AsynchJobs::SetDbConn(SQL_Connection *DbConn) { if(m_DbConn) { delete m_DbConn; m_DbConn = NULL; } try { m_DbConn = DbConn->Clone(); } catch(ExceptionNewPKI e) { NEWPKIerr(PKI_ERROR_TXT, ERROR_ABORT); return false; } if(!m_DbConn) { NEWPKIerr(PKI_ERROR_TXT, ERROR_MALLOC); return false; } return true; } bool AsynchJobs::DeleteResponse(const Asn1OctetString & transationID) const { m_waitingDelRespsLock.EnterCS(); ((AsynchJobs*)this)->m_waitingDelResps.get_transactionids().push_back(transationID); m_waitingDelRespsLock.LeaveCS(); return true; } bool AsynchJobs::OnNewRequest(const CryptedNewpkiRequest & request, mString & SenderName) { TRANSACTION_STATUS TransactionStatus; NewpkiRequest newRequest; EVP_PKEY * signerkey; WorkingRequest w_req; CryptedNewpkiResponse Response; // We first check that I am the recipient ! if( ! (m_EntityCert == request.get_recipient()) ) { NEWPKIerr(PKI_ERROR_TXT, ERROR_NOT_ALLOWED); return false; } // Check if we don't already know it if(!m_store->Responder_GetTransactionStatus(request.get_transactionid(), TransactionStatus)) { NEWPKIerr(PKI_ERROR_TXT, ERROR_ABORT); return false; } switch(TransactionStatus) { case TRANSACTION_STATUS_PROCESSED: case TRANSACTION_STATUS_SENT: // Repository should have the response // we need to resend it if(!m_store->Responder_GetResponse(request.get_transactionid(), Response)) { ERR_clear_error(); return true; } m_waitingResps.get_responses().push_back(Response); return true; break; case TRANSACTION_STATUS_UNKNOWN: break; default: return true; break; } // Ok we now decrypt the request signerkey = X509_PUBKEY_get((X509_PUBKEY*)request.get_sender()); if(!signerkey) { NEWPKIerr(PKI_ERROR_TXT, ERROR_ABORT); return false; } // Decrypt the request if(!newRequest.from_SignEncrypt(request.get_request(), signerkey, m_EntityCert.GetPrivateKey().GetRsaKey())) { EVP_PKEY_free(signerkey); ERR_clear_error(); NEWPKIerr(PKI_ERROR_TXT, ERROR_BAD_DATAS); ERR_to_STORED_NEWPKI_RESPONSE(request.get_transactionid(), request.get_sender()); return false; } EVP_PKEY_free(signerkey); // We first ask the responder to validate the request and the sender if(!m_event->Responder_ValidateRequest(newRequest, request.get_sender(), SenderName)) { NEWPKIerr(PKI_ERROR_TXT, ERROR_ABORT); ERR_to_STORED_NEWPKI_RESPONSE(request.get_transactionid(), request.get_sender()); return false; } // We now create the working request // Copy the sender's name w_req.set_sendername(SenderName); //Copy transaction ID if(!w_req.set_transactionid(request.get_transactionid())) { NEWPKIerr(PKI_ERROR_TXT, ERROR_ABORT); ERR_to_STORED_NEWPKI_RESPONSE(request.get_transactionid(), request.get_sender()); return false; } //Copy the sender public key if(!w_req.set_sender(request.get_sender())) { NEWPKIerr(PKI_ERROR_TXT, ERROR_ABORT); ERR_to_STORED_NEWPKI_RESPONSE(request.get_transactionid(), request.get_sender()); return false; } if(!w_req.set_request(newRequest)) { NEWPKIerr(PKI_ERROR_TXT, ERROR_ABORT); ERR_to_STORED_NEWPKI_RESPONSE(request.get_transactionid(), request.get_sender()); return false; } ERR_clear_error(); if(!m_store->Responder_InsertRequest(w_req)) { NEWPKIerr(PKI_ERROR_TXT, ERROR_ABORT); ERR_to_STORED_NEWPKI_RESPONSE(request.get_transactionid(), request.get_sender()); return false; } return true; } void AsynchJobs::ThreadResponderWorker(const NewpkiThread * Thread, void *param) { AsynchJobs * me_this = (AsynchJobs *)param; NewpkiResponse Response; mVector WorkingReqs; size_t i; while(!Thread->ShouldStop()) { do { NewpkiThread::Sleep(1000); } while(!Thread->ShouldStop() && !me_this->m_store->Responder_GetRequests(WorkingReqs)); //I have a new request for(i=0; !Thread->ShouldStop() && im_event->Responder_TreatRequest(WorkingReqs[i].get_request(), WorkingReqs[i].get_sendername(), Response)) { me_this->ERR_to_STORED_NEWPKI_RESPONSE(WorkingReqs[i].get_transactionid(), WorkingReqs[i].get_sender()); } else { me_this->InsertResponse(WorkingReqs[i].get_transactionid(), Response, WorkingReqs[i].get_sender()); } } WorkingReqs.clear(); } } void AsynchJobs::ERR_to_STORED_NEWPKI_RESPONSE(const Asn1OctetString & transactionID, const X509_PUBKEY * recipient) { NewpkiResponse response; if(!response.set_type(NEWPKI_RESPONSE_TYPE_ERR)) { return; } ERR_to_ERROR_ENTRIES(response.get_errors()); InsertResponse(transactionID, response, recipient); } bool AsynchJobs::OnNewResponse(CryptedNewpkiResponse &response) { TRANSACTION_STATUS TransactionStatus; NewpkiResponse newResponse; EVP_PKEY * signerkey; // We first check that I am the recipient ! if( ! (m_EntityCert == response.get_recipient()) ) { NEWPKIerr(PKI_ERROR_TXT, ERROR_NOT_ALLOWED); return false; } // Check if we don't already know it if(!m_store->Requester_GetTransactionStatus(response.get_transactionid(), TransactionStatus)) { NEWPKIerr(PKI_ERROR_TXT, ERROR_ABORT); return false; } // TRANSACTION_STATUS_SENT is the only right status // at this point switch(TransactionStatus) { case TRANSACTION_STATUS_UNKNOWN: case TRANSACTION_STATUS_WAITING: case TRANSACTION_STATUS_PROCESSED: // We request the deletion of this response // on the respository if(!DeleteResponse(response.get_transactionid())) ERR_clear_error(); return true; break; default: break; } // Ok we now decrypt the request signerkey = X509_PUBKEY_get(response.get_sender()); if(!signerkey) { NEWPKIerr(PKI_ERROR_TXT, ERROR_ABORT); return false; } // Decrypt the request if(!newResponse.from_SignEncrypt(response.get_response(), signerkey, m_EntityCert.GetPrivateKey().GetRsaKey())) { EVP_PKEY_free(signerkey); ERR_clear_error(); NEWPKIerr(PKI_ERROR_TXT, ERROR_BAD_DATAS); return false; } EVP_PKEY_free(signerkey); if(!m_event->Requester_OnNewResponse(response.get_transactionid(), response.get_sender(), newResponse)) { NEWPKIerr(PKI_ERROR_TXT, ERROR_ABORT); return false; } // We mark this response as having been processed if(!m_store->Requester_RequestWasResponded(response.get_transactionid())) { NEWPKIerr(PKI_ERROR_TXT, ERROR_ABORT); return false; } // The response has been successfully been processed // we can delete it from the repository DeleteResponse(response.get_transactionid()); ERR_clear_error(); return true; } bool AsynchJobs::AddRepositoryToObjectsPath(STACK_OF(X509_PUBKEY) * rep_path, const PKI_CERT & Cert) { X509_PUBKEY * currPubkey; if(RepositoryKnowsObject(rep_path, Cert)) return true; currPubkey = (X509_PUBKEY*)ASN1_item_dup(ASN1_ITEM_rptr(X509_PUBKEY), (void*)Cert.GetX509_PUBKEY()); if(!currPubkey) { NEWPKIerr(PKI_ERROR_TXT, ERROR_ABORT); return false; } if(SKM_sk_push(X509_PUBKEY, rep_path, currPubkey) < 0) { X509_PUBKEY_free(currPubkey); NEWPKIerr(PKI_ERROR_TXT, ERROR_ABORT); return false; } return true; } bool AsynchJobs::RepositoryKnowsObject(const STACK_OF(X509_PUBKEY) * rep_path, const PKI_CERT & Cert) { int j; X509_PUBKEY * currPubKey; for(j=0; j