diff --git a/objectstore/ArchiveRequest.cpp b/objectstore/ArchiveRequest.cpp index 1a23bd6dbafe73fa7831702a90214391d9ffe8b9..c5501a8400c521c1d7b0400dc11fddf181933cc6 100644 --- a/objectstore/ArchiveRequest.cpp +++ b/objectstore/ArchiveRequest.cpp @@ -65,6 +65,18 @@ void cta::objectstore::ArchiveRequest::addJob(uint16_t copyNumber, j->set_maxtotalretries(maxTotalRetries); } +void ArchiveRequest::setJobArchiveQueueAddress(uint16_t copyNumber, const std::string& queueAddress) { + checkPayloadWritable(); + auto * jl = m_payload.mutable_jobs(); + for (auto j=jl->begin(); j!=jl->end(); j++) { + if (j->copynb() == copyNumber) { + j->set_archivequeueaddress(queueAddress); + return; + } + } + throw NoSuchJob("In ArchiveRequest::setJobArchiveQueueAddress(): job not found"); +} + bool cta::objectstore::ArchiveRequest::setJobSuccessful(uint16_t copyNumber) { checkPayloadWritable(); auto * jl = m_payload.mutable_jobs(); diff --git a/objectstore/ArchiveRequest.hpp b/objectstore/ArchiveRequest.hpp index af997ad1b0ef0a93a0d4dd314405e231ef2a098b..b4d69460058546c18b64317b585d72b6f75810f0 100644 --- a/objectstore/ArchiveRequest.hpp +++ b/objectstore/ArchiveRequest.hpp @@ -44,6 +44,7 @@ public: // Job management ============================================================ void addJob(uint16_t copyNumber, const std::string & tapepool, const std::string & archivequeueaddress, uint16_t maxRetiesWithinMount, uint16_t maxTotalRetries); + void setJobArchiveQueueAddress(uint16_t copyNumber, const std::string & queueAddress); void setJobSelected(uint16_t copyNumber, const std::string & owner); void setJobPending(uint16_t copyNumber); bool setJobSuccessful(uint16_t copyNumber); //< returns true if this is the last job diff --git a/scheduler/CMakeLists.txt b/scheduler/CMakeLists.txt index 28eab7b094816e937dd84f974ec2e568b808b85f..56c2673575728786b9accc47bb8a9686bb723197 100644 --- a/scheduler/CMakeLists.txt +++ b/scheduler/CMakeLists.txt @@ -16,6 +16,7 @@ set (CTA_SCHEDULER_SRC_FILES SchedulerDatabase.cpp SchedulerDatabaseFactory.cpp MountType.cpp + OStoreDB/MemQueues.cpp OStoreDB/OStoreDB.cpp OStoreDB/OStoreDBWithAgent.cpp LabelMount.cpp) diff --git a/scheduler/OStoreDB/MemQueues.cpp b/scheduler/OStoreDB/MemQueues.cpp new file mode 100644 index 0000000000000000000000000000000000000000..7253eacd2c321d7b01bf733a8ffa8c7a3b368c72 --- /dev/null +++ b/scheduler/OStoreDB/MemQueues.cpp @@ -0,0 +1,139 @@ +/* + * The CERN Tape Archive (CTA) project + * Copyright (C) 2015 CERN + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + + +#include "MemQueues.hpp" +#include "OStoreDB.hpp" + +namespace cta { namespace ostoredb { + +std::mutex MemArchiveQueue::g_mutex; + +std::map<std::string, MemArchiveQueue *> MemArchiveQueue::g_queues; + +void MemArchiveQueue::sharedAddToArchiveQueue(objectstore::ArchiveRequest::JobDump& job, + objectstore::ArchiveRequest& archiveRequest, OStoreDB & oStoreDB) { + // 1) Take the global lock (implicit in the constructor) + std::unique_lock<std::mutex> ul(g_mutex); + try { + // 2) Determine if the queue exists already or not + auto & q = *g_queues.at(job.tapePool); + // It does: we just ride the train: queue ourselves + std::unique_lock<std::mutex> ulq(q.m_mutex); + MemArchiveQueueRequest maqr(job, archiveRequest); + q.add(maqr); + // If there are already enough elements, signal to the initiating thread + if (q.m_requests.size() + 1 >= g_maxQueuedElements) { + // signal the initiating thread + q.m_promise.set_value(); + // Unreference the queue so no new request gets added to it + g_queues.erase(job.tapePool); + } + // Release the locks + ulq.unlock(); + ul.unlock(); + // Wait for our request completion (this could throw, if there was a problem) + maqr.m_promise.get_future().get(); + } catch (std::out_of_range &) { + // The queue for our tape pool does not exist. We will create it, wait for + // the necessary amount of time or requests and release it. + // Create the queue + MemArchiveQueue maq; + // Reference it + g_queues[job.tapePool] = &maq; + // Release the global list + ul.unlock(); + // Wait for timeout or enough jobs. + maq.m_promise.get_future().wait_for(std::chrono::milliseconds(100)); + // Re-take the global lock to make sure the queue is not referenced anymore, + // and the queue as well, to make sure the last user is gone. + ul.lock(); + std::unique_lock<std::mutex> ulq(maq.m_mutex); + // Remove the entry for our tape pool iff it also has our pointer (a new + // queue could have been created in the mean time. + auto i = g_queues.find(job.tapePool); + if (i != g_queues.end() && (&maq == i->second)) + g_queues.erase(i); + // Our mem queue is now unreachable so we can let the global part go + ul.unlock(); + // We can now proceed with the queuing of the jobs. + try { + objectstore::ArchiveQueue aq(oStoreDB.m_objectStore); + objectstore::ScopedExclusiveLock aql; + oStoreDB.getLockedAndFetchedArchiveQueue(aq, aql, job.tapePool); + // First add the job for this thread + { + auto af = archiveRequest.getArchiveFile(); + aq.addJob(job, archiveRequest.getAddressIfSet(), af.archiveFileID, + af.fileSize, archiveRequest.getMountPolicy(), archiveRequest.getEntryLog().time); + // Back reference the queue in the job and archive request + job.ArchiveQueueAddress = aq.getAddressIfSet(); + archiveRequest.setJobArchiveQueueAddress(job.copyNb, aq.getAddressIfSet()); + } + // The do the same for all the queued requests + for (auto &maqr: maq.m_requests) { + // Add the job + auto af = maqr->m_archiveRequest.getArchiveFile(); + aq.addJob(job, archiveRequest.getAddressIfSet(), af.archiveFileID, + af.fileSize, maqr->m_archiveRequest.getMountPolicy(), + maqr->m_archiveRequest.getEntryLog().time); + // Back reference the queue in the job and archive request + maqr->m_job.ArchiveQueueAddress = aq.getAddressIfSet(); + archiveRequest.setJobArchiveQueueAddress(maqr->m_job.copyNb, aq.getAddressIfSet()); + } + // We can now commit the multi-request addition to the object store + aq.commit(); + // And finally release all the user threads + for (auto &maqr: maq.m_requests) { + maqr->m_promise.set_value(); + } + // Done! + } catch (...) { + size_t exceptionsNotPassed = 0; + // Something went wrong. We should inform the other threads + for (auto & maqr: maq.m_requests) { + try { + maqr->m_promise.set_exception(std::current_exception()); + } catch (...) { + exceptionsNotPassed++; + } + } + // And we inform the caller in our thread too + if (exceptionsNotPassed) { + try { + std::rethrow_exception(std::current_exception()); + } catch (std::exception & ex) { + std::stringstream err; + err << "In MemArchiveQueue::sharedAddToArchiveQueue(), in main thread, failed to notify " + << exceptionsNotPassed << " other threads out of " << maq.m_requests.size() + << " : " << ex.what(); + throw cta::exception::Exception(err.str()); + } + } else + throw; + } + } +} + + +void MemArchiveQueue::add(MemArchiveQueueRequest& request) { + m_requests.emplace_back(&request); +} + + +}} // namespac ecta::ostoredb diff --git a/scheduler/OStoreDB/MemQueues.hpp b/scheduler/OStoreDB/MemQueues.hpp new file mode 100644 index 0000000000000000000000000000000000000000..5b1ab4f74a7f4ca1a13c835e33f49aa9fe386b7c --- /dev/null +++ b/scheduler/OStoreDB/MemQueues.hpp @@ -0,0 +1,78 @@ + +/* + * The CERN Tape Archive (CTA) project + * Copyright (C) 2015 CERN + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#pragma once + +#include "objectstore/ArchiveRequest.hpp" +#include "objectstore/ArchiveQueue.hpp" + +#include <future> + +namespace cta { +// Forward declaration +class OStoreDB; + namespace ostoredb { + +class MemArchiveQueueRequest { + friend class MemArchiveQueue; +public: + MemArchiveQueueRequest(objectstore::ArchiveRequest::JobDump & job, + objectstore::ArchiveRequest & archiveRequest): m_job(job), m_archiveRequest(archiveRequest) {} +private: + objectstore::ArchiveRequest::JobDump & m_job; + objectstore::ArchiveRequest & m_archiveRequest; + std::promise<void> m_promise; +}; + +class MemArchiveQueue { +public: + /** + * This function adds ArchiveRequeuest to an ArchiveQueue in batch. + * A static function that will either create the shared queue for a given + * tape pool if none exist, of add the job to it otherwise. When adding + * the job, the first calling thread will be woken up if enough jobs have been + * accumulated. + * The creation and action are done using the global lock, which should be + * sufficiently fast as we work in memory. + * All calls sharing the same batch will either succeed of throw the same + * exception. + * The address of the archive queue object will be updated in both parameters + * (job and archiveRequest). + * + * @param job to be added to the ArchiveQueue (contains the tape pool name) + * @param archiveRequest the request itself. + */ + static void sharedAddToArchiveQueue(objectstore::ArchiveRequest::JobDump & job, + objectstore::ArchiveRequest & archiveRequest, OStoreDB & oStoreDB); + +private: + /** Mutex that should be locked before attempting any operation */ + std::mutex m_mutex; + /** Add the object */ + void add(MemArchiveQueueRequest & request); + /** Static function implementing the shared addition of archive requests to + * the object store queue */ + static const size_t g_maxQueuedElements = 100; + std::list<MemArchiveQueueRequest *> m_requests; + std::promise<void> m_promise; + static std::mutex g_mutex; + static std::map<std::string, MemArchiveQueue *> g_queues; +}; + +}} // namespace cta::ostoreDBUtils \ No newline at end of file diff --git a/scheduler/OStoreDB/OStoreDB.cpp b/scheduler/OStoreDB/OStoreDB.cpp index d0c3926b27371c9165147ac8776d37505f542ba5..ba3f0f8eeae690ec949ca5c0b02aeb9dde6fd130 100644 --- a/scheduler/OStoreDB/OStoreDB.cpp +++ b/scheduler/OStoreDB/OStoreDB.cpp @@ -17,6 +17,7 @@ */ #include "OStoreDB.hpp" +#include "MemQueues.hpp" #include "common/dataStructures/SecurityIdentity.hpp" #include "objectstore/RootEntry.hpp" #include "objectstore/ArchiveQueue.hpp" @@ -345,16 +346,12 @@ void OStoreDB::queueArchive(const std::string &instanceName, const cta::common:: aReq.setEntryLog(request.creationLog); std::list<cta::objectstore::ArchiveRequest::JobDump> jl; for (auto & copy:criteria.copyToPoolMap) { - ArchiveQueue aq(m_objectStore); - ScopedExclusiveLock aql; - getLockedAndFetchedArchiveQueue(aq, aql, copy.second); const uint32_t hardcodedRetriesWithinMount = 3; const uint32_t hardcodedTotalRetries = 6; - aReq.addJob(copy.first, copy.second, aq.getAddressIfSet(), hardcodedRetriesWithinMount, hardcodedTotalRetries); + aReq.addJob(copy.first, copy.second, "archive queue address to be set later", hardcodedRetriesWithinMount, hardcodedTotalRetries); jl.push_back(cta::objectstore::ArchiveRequest::JobDump()); jl.back().copyNb = copy.first; jl.back().tapePool = copy.second; - jl.back().ArchiveQueueAddress = aq.getAddressIfSet(); } if (jl.empty()) { throw ArchiveRequestHasNoCopies("In OStoreDB::queue: the archive to file request has no copy"); @@ -374,20 +371,9 @@ void OStoreDB::queueArchive(const std::string &instanceName, const cta::common:: std::list<std::string> linkedTapePools; try { for (auto &j: aReq.dumpJobs()) { - objectstore::ArchiveQueue aq(j.ArchiveQueueAddress, m_objectStore); - ScopedExclusiveLock aql(aq); - aq.fetch(); - if (aq.getOwner() != RootEntry::address) - throw NoSuchArchiveQueue("In OStoreDB::queue: non-existing archive queue found " - "(dangling pointer): canceling request creation."); - aq.addJob(j, aReq.getAddressIfSet(), aReq.getArchiveFile().archiveFileID, - aReq.getArchiveFile().fileSize, aReq.getMountPolicy(), - aReq.getEntryLog().time); - // Now that we have the tape pool handy, get the retry limits from it and - // assign them to the job - aq.commit(); + ostoredb::MemArchiveQueue::sharedAddToArchiveQueue(j, aReq, *this); linkedTapePools.push_back(j.ArchiveQueueAddress); - aReq.setJobOwner(j.copyNb, aq.getAddressIfSet()); + aReq.setJobOwner(j.copyNb, j.ArchiveQueueAddress); } } catch (NoSuchArchiveQueue &) { // Unlink the request from already connected tape pools diff --git a/scheduler/OStoreDB/OStoreDB.hpp b/scheduler/OStoreDB/OStoreDB.hpp index abb3a210a8474f9c63a4d412ddd6b9a4d3a3e0e3..91181d7aa7d922adf8cf37fef5c1cafed9e70be0 100644 --- a/scheduler/OStoreDB/OStoreDB.hpp +++ b/scheduler/OStoreDB/OStoreDB.hpp @@ -34,8 +34,13 @@ namespace objectstore { class Backend; class Agent; } + +namespace ostoredb { + class MemArchiveQueue; +} class OStoreDB: public SchedulerDatabase { + friend class cta::ostoredb::MemArchiveQueue; public: OStoreDB(objectstore::Backend & be); virtual ~OStoreDB() throw(); @@ -157,7 +162,8 @@ public: void queueArchive(const std::string &instanceName, const cta::common::dataStructures::ArchiveRequest &request, const cta::common::dataStructures::ArchiveFileQueueCriteria &criteria) override; - + +private: /** * Find or create an archive queue, and return it locked and fetched to the caller * (ArchiveQueue and ScopedExclusiveLock objects are provided empty) @@ -168,7 +174,8 @@ public: void getLockedAndFetchedArchiveQueue(cta::objectstore::ArchiveQueue & archiveQueue, cta::objectstore::ScopedExclusiveLock & archiveQueueLock, const std::string & tapePool); - + +public: CTA_GENERATE_EXCEPTION_CLASS(NoSuchArchiveRequest); CTA_GENERATE_EXCEPTION_CLASS(ArchiveRequestAlreadyDeleted); virtual void deleteArchiveRequest(const std::string &diskInstanceName, uint64_t fileId) override;