diff --git a/common/dataStructures/MountPolicy.cpp b/common/dataStructures/MountPolicy.cpp index 347894ff657f2d82dff2745aa0640ea2fcadf6d3..eaf612cf90a82ab66be4889597f2ee28e10d961a 100644 --- a/common/dataStructures/MountPolicy.cpp +++ b/common/dataStructures/MountPolicy.cpp @@ -19,6 +19,7 @@ #include "common/dataStructures/MountPolicy.hpp" #include "common/dataStructures/utils.hpp" #include "common/exception/Exception.hpp" +#include "MountPolicy.hpp" namespace cta { namespace common { @@ -60,6 +61,20 @@ bool MountPolicy::operator!=(const MountPolicy &rhs) const { return !operator==(rhs); } +MountPolicy MountPolicy::operator=(const MountPolicy& other){ + if(this != &other){ + this->archiveMinRequestAge = other.archiveMinRequestAge; + this->archivePriority = other.archivePriority; + this->comment = other.comment; + this->creationLog = other.creationLog; + this->lastModificationLog = other.lastModificationLog; + this->maxDrivesAllowed = other.maxDrivesAllowed; + this->name = other.name; + this->retrieveMinRequestAge = other.retrieveMinRequestAge; + this->retrievePriority = other.retrievePriority; + } + return *this; +} //------------------------------------------------------------------------------ // operator<< //------------------------------------------------------------------------------ diff --git a/common/dataStructures/MountPolicy.hpp b/common/dataStructures/MountPolicy.hpp index cb9eef63566abc87f96eb4cff7d088f9663fc07b..b8e85b34578d9453ec047abde73f7747bfa4104d 100644 --- a/common/dataStructures/MountPolicy.hpp +++ b/common/dataStructures/MountPolicy.hpp @@ -39,6 +39,8 @@ struct MountPolicy { bool operator==(const MountPolicy &rhs) const; bool operator!=(const MountPolicy &rhs) const; + + MountPolicy operator=(const MountPolicy& other); std::string name; uint64_t archivePriority; diff --git a/objectstore/Agent.hpp b/objectstore/Agent.hpp index daff010e46f43b89d3bdc01783c5346f55ced479..0d9fd4161c73d5378307dfffe1487767ae89becd 100644 --- a/objectstore/Agent.hpp +++ b/objectstore/Agent.hpp @@ -29,6 +29,7 @@ namespace cta { namespace objectstore { class GenericObject; class AgentReference; class GarbageCollector; +class Sorter; /** * Class containing agent information and managing the update of the @@ -42,6 +43,7 @@ class GarbageCollector; class Agent: public ObjectOps<serializers::Agent, serializers::Agent_t> { friend class AgentReference; friend class GarbageCollector; + friend class Sorter; public: CTA_GENERATE_EXCEPTION_CLASS(AgentStillOwnsObjects); Agent(GenericObject & go); diff --git a/objectstore/Algorithms.hpp b/objectstore/Algorithms.hpp index 3883c84d206240de8ba95f705a6b088ec52f1fb9..af2a67c9352fc44738629808fce0d3001f366804 100644 --- a/objectstore/Algorithms.hpp +++ b/objectstore/Algorithms.hpp @@ -116,7 +116,7 @@ public: log::TimingList timingList; utils::Timer t; ContainerTraits<Q,C>::getLockedAndFetched(cont, contLock, m_agentReference, contId, lc); - contAddress = cont.getAddressIfSet(); + contAddress = cont.getAddressIfSet();//TODO : It would be better to return this value auto contSummaryBefore = ContainerTraits<Q,C>::getContainerSummary(cont); timingList.insertAndReset("queueLockFetchTime", t); ContainerTraits<Q,C>::addReferencesIfNecessaryAndCommit(cont, elements, m_agentReference, lc); diff --git a/objectstore/ArchiveQueueAlgorithms.hpp b/objectstore/ArchiveQueueAlgorithms.hpp index 1da9acfa4043a25e19208086c53e07f37852dc42..682baedde0c2137c3e38dcbc1b95b28bb5d317b0 100644 --- a/objectstore/ArchiveQueueAlgorithms.hpp +++ b/objectstore/ArchiveQueueAlgorithms.hpp @@ -495,7 +495,7 @@ struct ContainerTraits<ArchiveQueue,ArchiveQueueToReport>::QueueType { template<> struct ContainerTraits<ArchiveQueue, ArchiveQueueToTransferForRepack>::QueueType{ - objectstore::JobQueueType value = objectstore::JobQueueType::JobsToTransfer; + objectstore::JobQueueType value = objectstore::JobQueueType::JobsToTransferForRepack; }; template<> diff --git a/objectstore/ArchiveRequest.cpp b/objectstore/ArchiveRequest.cpp index 2e0b3335ef5fbb308d9e87b7792f468a907e2977..986998bb66ea694b0e69e8d1218dac3e3366f083 100644 --- a/objectstore/ArchiveRequest.cpp +++ b/objectstore/ArchiveRequest.cpp @@ -96,6 +96,8 @@ JobQueueType ArchiveRequest::getJobQueueType(uint16_t copyNumber) { switch (j.status()) { case serializers::ArchiveJobStatus::AJS_ToTransfer: return JobQueueType::JobsToTransfer; + case serializers::ArchiveJobStatus::AJS_ToTransferForRepack: + return JobQueueType::JobsToTransferForRepack; case serializers::ArchiveJobStatus::AJS_Complete: throw JobNotQueueable("In ArchiveRequest::getJobQueueType(): Complete jobs are not queueable. They are finished and pend siblings completion."); case serializers::ArchiveJobStatus::AJS_ToReportForTransfer: @@ -534,8 +536,8 @@ ArchiveRequest::AsyncJobOwnerUpdater* ArchiveRequest::asyncUpdateJobOwner(uint16 // If a status change was requested, do it. if (newStatus) j->set_status(*newStatus); // We also need to gather all the job content for the user to get in-memory - // representation. - // TODO this is an unfortunate duplication of the getXXX() members of ArchiveRequest. + // representation.getLockedAndFetchedJobQueue + // TODO this is an unfortunate duplication of the getXXX() members of ArchiveRequesgetLockedAndFetchedJobQueuet. // We could try and refactor this. retRef.m_archiveFile.archiveFileID = payload.archivefileid(); retRef.m_archiveFile.checksumType = payload.checksumtype(); diff --git a/objectstore/BackendVFS.cpp b/objectstore/BackendVFS.cpp index 60d3e3e38ef5c6db7219fb681bf5a1f79b87165a..8b49bc2d0e93f5e16dfc63a3e7dd3ca2df02aa28 100644 --- a/objectstore/BackendVFS.cpp +++ b/objectstore/BackendVFS.cpp @@ -41,6 +41,7 @@ #include <iostream> #endif #include <valgrind/helgrind.h> +#include <iostream> namespace cta { namespace objectstore { diff --git a/objectstore/CMakeLists.txt b/objectstore/CMakeLists.txt index f7f38e39c1ca08c33bdc5b8c5c7d97cda7457354..7f4a947f50410451b2e843e1d31a738c4e769aa4 100644 --- a/objectstore/CMakeLists.txt +++ b/objectstore/CMakeLists.txt @@ -75,6 +75,7 @@ add_library (ctaobjectstore SHARED RetrieveQueueFailedAlgorithms.cpp RetrieveQueueToReportToRepackForSuccessAlgorithms.cpp JobQueueType.cpp + Sorter.cpp ArchiveRequest.cpp RetrieveRequest.cpp DriveRegister.cpp @@ -108,6 +109,7 @@ set(ObjectStoreUnitTests RetrieveQueueTest.cpp GarbageCollectorTest.cpp AlgorithmsTest.cpp + SorterTest.cpp ) add_library(ctaobjectstoreunittests SHARED ${ObjectStoreUnitTests}) diff --git a/objectstore/GarbageCollector.cpp b/objectstore/GarbageCollector.cpp index f664d4571243063ea458be06280e236fa6260e84..7537cf9ec7d2bb63430de40b12a45627b9f7a831 100644 --- a/objectstore/GarbageCollector.cpp +++ b/objectstore/GarbageCollector.cpp @@ -385,6 +385,8 @@ void GarbageCollector::OwnedObjectSorter::sortFetchedObjects(Agent& agent, std:: fetchedObjects.clear(); } +//TODO : We should record the VID in the ArchiveRequest object to allow the requeueing in the proper report queue (currently, the report queue is selected +//by tapepool, which works but is not the most efficient way to report the request (contention problem) void GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateArchiveJobs(Agent& agent, AgentReference& agentReference, Backend & objectStore, log::LogContext & lc) { // We can now start updating the objects efficiently. We still need to re-fetch them locked diff --git a/objectstore/GarbageCollector.hpp b/objectstore/GarbageCollector.hpp index a7a55c6099b5a48e827d27e27debe3f22e18ecbe..f9eedf80f1ded862309dd40bfa573749ddac5de5 100644 --- a/objectstore/GarbageCollector.hpp +++ b/objectstore/GarbageCollector.hpp @@ -24,6 +24,7 @@ #include "AgentRegister.hpp" #include "JobQueueType.hpp" #include "common/log/LogContext.hpp" +#include "Sorter.hpp" /** * Plan => Garbage collector keeps track of the agents. @@ -58,6 +59,7 @@ public: std::map<std::tuple<std::string, JobQueueType>, std::list<std::shared_ptr <ArchiveRequest>>> archiveQueuesAndRequests; std::map<std::tuple<std::string, JobQueueType>, std::list<std::shared_ptr <RetrieveRequest>>> retrieveQueuesAndRequests; std::list<std::shared_ptr<GenericObject>> otherObjects; + //Sorter m_sorter; /// Fill up the fetchedObjects with objects of interest. void fetchOwnedObjects(Agent & agent, std::list<std::shared_ptr<GenericObject>> & fetchedObjects, Backend & objectStore, log::LogContext & lc); @@ -71,13 +73,16 @@ public: // Lock, fetch and update other objects void lockFetchAndUpdateOtherObjects(Agent & agent, AgentReference & agentReference, Backend & objectStore, cta::catalogue::Catalogue & catalogue, log::LogContext & lc); + //Sorter& getSorter(); }; + private: Backend & m_objectStore; catalogue::Catalogue & m_catalogue; AgentReference & m_ourAgentReference; AgentRegister m_agentRegister; std::map<std::string, AgentWatchdog * > m_watchedAgents; + //void garbageCollectArchiveRequests(Agent& agent, OwnedObjectSorter &ownedObjectSorter,log::LogContext & lc); }; }} diff --git a/objectstore/JobQueueType.hpp b/objectstore/JobQueueType.hpp index 725b5ede17760318a09d1a117fd76590361ceee8..df7108ca2a5974c3a53b88c4ddebaf81b99e7fa4 100644 --- a/objectstore/JobQueueType.hpp +++ b/objectstore/JobQueueType.hpp @@ -21,6 +21,6 @@ #include <string> namespace cta { namespace objectstore { -enum class JobQueueType { JobsToTransfer, FailedJobs, JobsToReportToUser, JobsToReportToRepackForSuccess, JobsToReportToRepackForFailure }; +enum class JobQueueType { JobsToTransfer, FailedJobs, JobsToReportToUser, JobsToReportToRepackForSuccess, JobsToReportToRepackForFailure, JobsToTransferForRepack }; std::string toString(JobQueueType queueType); }} // namespace cta::objectstore \ No newline at end of file diff --git a/objectstore/RetrieveRequest.cpp b/objectstore/RetrieveRequest.cpp index 22a81b50a1dd46f107cf4e0cf34a4b1047c6661c..f4c70dd8d3d64ef961bb63fcce0b88299bbad416 100644 --- a/objectstore/RetrieveRequest.cpp +++ b/objectstore/RetrieveRequest.cpp @@ -548,7 +548,7 @@ JobQueueType RetrieveRequest::getQueueType() { case serializers::RetrieveJobStatus::RJS_ToTransfer: return JobQueueType::JobsToTransfer; break; - case serializers::RetrieveJobStatus::RJS_Succeeded: + case serializers::RetrieveJobStatus::RJS_ToReportToRepackForSuccess: return JobQueueType::JobsToReportToRepackForSuccess; break; case serializers::RetrieveJobStatus::RJS_ToReportForFailure: @@ -860,7 +860,7 @@ RetrieveRequest::AsyncJobSucceedForRepackReporter * RetrieveRequest::asyncReport if(job.copynb() == copyNb) { //Change the status to RJS_Succeed - job.set_status(serializers::RetrieveJobStatus::RJS_Succeeded); + job.set_status(serializers::RetrieveJobStatus::RJS_ToReportToRepackForSuccess); oh.set_payload(payload.SerializePartialAsString()); return oh.SerializeAsString(); } diff --git a/objectstore/RootEntry.cpp b/objectstore/RootEntry.cpp index 0257c46a9608b925235bd6c789539679c5efb778..0a859f93dda372ba5bcb689c32e9b5528ace0994 100644 --- a/objectstore/RootEntry.cpp +++ b/objectstore/RootEntry.cpp @@ -183,6 +183,7 @@ std::string RootEntry::addOrGetArchiveQueueAndCommit(const std::string& tapePool case JobQueueType::JobsToTransfer: archiveQueueNameHeader+="ToTransfer"; break; case JobQueueType::JobsToReportToUser: archiveQueueNameHeader+="ToReport"; break; case JobQueueType::FailedJobs: archiveQueueNameHeader+="Failed"; break; + case JobQueueType::JobsToTransferForRepack: archiveQueueNameHeader+="ToTransferForRepack"; break; case JobQueueType::JobsToReportToRepackForSuccess: archiveQueueNameHeader+="ToReportToRepackForSuccess"; break; default: break; } diff --git a/objectstore/Sorter.cpp b/objectstore/Sorter.cpp new file mode 100644 index 0000000000000000000000000000000000000000..c11311e7b4f515abd36688bbfa89b2c55f907dde --- /dev/null +++ b/objectstore/Sorter.cpp @@ -0,0 +1,304 @@ +/** + * The CERN Tape Archive (CTA) project + * Copyright © 2018 CERN + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ +#include "Sorter.hpp" +#include "Helpers.hpp" +#include "common/threading/MutexLocker.hpp" +#include <iostream> + +namespace cta { namespace objectstore { + +Sorter::Sorter(AgentReference& agentReference, Backend& objectstore, catalogue::Catalogue& catalogue):m_agentReference(agentReference),m_objectstore(objectstore),m_catalogue(catalogue){} + +Sorter::~Sorter() { +} + +void Sorter::insertArchiveJob(std::shared_ptr<ArchiveRequest> archiveRequest, ArchiveRequest::JobDump& jobToInsert, log::LogContext & lc){ + std::shared_ptr<ArchiveJobQueueInfo> ajqi = std::make_shared<ArchiveJobQueueInfo>(ArchiveJobQueueInfo()); + ajqi->archiveRequest = archiveRequest; + Sorter::ArchiveJob jobToAdd; + jobToAdd.archiveRequest = archiveRequest; + jobToAdd.archiveFileId = archiveRequest->getArchiveFile().archiveFileID; + jobToAdd.jobDump.copyNb = jobToInsert.copyNb; + jobToAdd.fileSize = archiveRequest->getArchiveFile().fileSize; + jobToAdd.mountPolicy = archiveRequest->getMountPolicy(); + jobToAdd.jobDump.owner = jobToInsert.owner;//TODO : Maybe should be passed in parameter of this method + jobToAdd.startTime = archiveRequest->getEntryLog().time; + jobToAdd.jobDump.tapePool = jobToInsert.tapePool; + ajqi->jobToQueue = std::make_tuple(jobToAdd,std::promise<void>()); + threading::MutexLocker mapLocker(m_mutex); + m_archiveQueuesAndRequests[std::make_tuple(jobToInsert.tapePool, archiveRequest->getJobQueueType(jobToInsert.copyNb))].emplace_back(ajqi); +} + +void Sorter::insertRetrieveRequest(std::shared_ptr<RetrieveRequest> retrieveRequest, log::LogContext & lc){ + std::set<std::string> candidateVids = getCandidateVids(*retrieveRequest); + //We need to select the best VID to queue the RetrieveJob in the best queue + if(candidateVids.empty()){ + std::shared_ptr<RetrieveJobQueueInfo> rjqi = std::make_shared<RetrieveJobQueueInfo>(RetrieveJobQueueInfo()); + rjqi->retrieveRequest = retrieveRequest; + //The first copy of the ArchiveFile will be queued + cta::common::dataStructures::TapeFile jobTapeFile = retrieveRequest->getArchiveFile().tapeFiles.begin()->second; + Sorter::RetrieveJob jobToAdd; + jobToAdd.jobDump.copyNb = jobTapeFile.copyNb; + jobToAdd.fSeq = jobTapeFile.fSeq; + jobToAdd.fileSize = retrieveRequest->getArchiveFile().fileSize; + jobToAdd.mountPolicy = retrieveRequest->getRetrieveFileQueueCriteria().mountPolicy; + jobToAdd.retrieveRequest = retrieveRequest; + jobToAdd.startTime = retrieveRequest->getEntryLog().time; + rjqi->jobToQueue = std::make_tuple(jobToAdd,std::promise<void>()); + try{ + threading::MutexLocker mapLocker(m_mutex); + m_retrieveQueuesAndRequests[std::make_tuple(retrieveRequest->getArchiveFile().tapeFiles.begin()->second.vid, retrieveRequest->getQueueType())].emplace_back(rjqi); + } catch (cta::exception::Exception &ex){ + log::ScopedParamContainer params(lc); + params.add("fileId", retrieveRequest->getArchiveFile().archiveFileID) + .add("exceptionMessage", ex.getMessageValue()); + lc.log(log::ERR, "In Sorter::insertRetrieveRequest() Failed to determine destination queue for retrieve request."); + throw ex; + } + } + std::string bestVid = getBestVidForQueueingRetrieveRequest(*retrieveRequest, candidateVids ,lc); + + for (auto & tf: retrieveRequest->getArchiveFile().tapeFiles) { + if (tf.second.vid == bestVid) { + goto vidFound; + } + } + { + std::stringstream err; + err << "In Sorter::insertRetrieveRequest(): no tape file for requested vid. archiveId=" << retrieveRequest->getArchiveFile().archiveFileID + << " vid=" << bestVid; + throw RetrieveRequestHasNoCopies(err.str()); + } + vidFound: + std::shared_ptr<RetrieveJobQueueInfo> rjqi = std::make_shared<RetrieveJobQueueInfo>(RetrieveJobQueueInfo()); + rjqi->retrieveRequest = retrieveRequest; + log::ScopedParamContainer params(lc); + size_t copyNb = std::numeric_limits<size_t>::max(); + uint64_t fSeq = std::numeric_limits<uint64_t>::max(); + for (auto & tc: retrieveRequest->getArchiveFile().tapeFiles) { if (tc.second.vid==bestVid) { copyNb=tc.first; fSeq=tc.second.fSeq; } } + Sorter::RetrieveJob jobToAdd; + jobToAdd.jobDump.copyNb = copyNb; + jobToAdd.fSeq = fSeq; + jobToAdd.fileSize = retrieveRequest->getArchiveFile().fileSize; + jobToAdd.mountPolicy = retrieveRequest->getRetrieveFileQueueCriteria().mountPolicy; + jobToAdd.retrieveRequest = retrieveRequest; + jobToAdd.startTime = retrieveRequest->getEntryLog().time; + + threading::MutexLocker mapLocker(m_mutex); + //We are sure that we want to transfer jobs + rjqi->jobToQueue = std::make_tuple(jobToAdd,std::promise<void>()); + m_retrieveQueuesAndRequests[std::make_tuple(bestVid,JobQueueType::JobsToTransfer)].emplace_back(rjqi); + params.add("fileId", retrieveRequest->getArchiveFile().archiveFileID) + .add("copyNb", copyNb) + .add("tapeVid", bestVid) + .add("fSeq", fSeq); + lc.log(log::INFO, "Selected vid to be queued for retrieve request."); +} + +std::set<std::string> Sorter::getCandidateVids(RetrieveRequest &request){ + using serializers::RetrieveJobStatus; + std::set<std::string> candidateVids; + for (auto & j: request.dumpJobs()) { + if(j.status == RetrieveJobStatus::RJS_ToTransfer) { + candidateVids.insert(request.getArchiveFile().tapeFiles.at(j.copyNb).vid); + } + } + return candidateVids; +} + +std::string Sorter::getBestVidForQueueingRetrieveRequest(RetrieveRequest& retrieveRequest, std::set<std::string>& candidateVids, log::LogContext &lc){ + std::string vid; + try{ + vid = Helpers::selectBestRetrieveQueue(candidateVids,m_catalogue,m_objectstore); + } catch (Helpers::NoTapeAvailableForRetrieve & ex) { + log::ScopedParamContainer params(lc); + params.add("fileId", retrieveRequest.getArchiveFile().archiveFileID); + lc.log(log::INFO, "In Sorter::getVidForQueueingRetrieveRequest(): No available tape found."); + throw ex; + } + return vid; +} + +bool Sorter::flushOneArchive(log::LogContext &lc) { + threading::MutexLocker locker(m_mutex); + for(auto & kv: m_archiveQueuesAndRequests){ + if(!kv.second.empty()){ + queueArchiveRequests(std::get<0>(kv.first),std::get<1>(kv.first),kv.second,lc); + return true; + } + } + return false; +} + +bool Sorter::flushOneRetrieve(log::LogContext &lc){ + return true; +} + +Sorter::MapArchive Sorter::getAllArchive(){ + return m_archiveQueuesAndRequests; +} + +Sorter::MapRetrieve Sorter::getAllRetrieve(){ + return m_retrieveQueuesAndRequests; +} + +void Sorter::queueArchiveRequests(const std::string tapePool, const JobQueueType jobQueueType, std::list<std::shared_ptr<ArchiveJobQueueInfo>>& archiveJobInfos, log::LogContext &lc){ + for(auto& archiveJobInfo: archiveJobInfos){ + double queueLockFetchTime=0; + double queueProcessAndCommitTime=0; + double requestsUpdatePreparationTime=0; + double requestsUpdatingTime = 0; + utils::Timer t; + uint64_t filesBefore=0; + uint64_t bytesBefore=0; + + ArchiveQueue aq(m_objectstore); + ScopedExclusiveLock rql; + Helpers::getLockedAndFetchedJobQueue<ArchiveQueue>(aq,rql, m_agentReference, tapePool, jobQueueType, lc); + queueLockFetchTime = t.secs(utils::Timer::resetCounter); + auto jobsSummary=aq.getJobsSummary(); + filesBefore=jobsSummary.jobs; + bytesBefore=jobsSummary.bytes; + // Prepare the list of requests to add to the queue (if needed). + std::list<ArchiveQueue::JobToAdd> jta; + // We have the queue. We will loop on the requests, add them to the list. We will launch their updates + // after committing the queue. + Sorter::ArchiveJob jobToQueue = std::get<0>(archiveJobInfo->jobToQueue); + std::promise<void>& jobPromise = std::get<1>(archiveJobInfo->jobToQueue); + + jta.push_back({jobToQueue.jobDump,jobToQueue.archiveRequest->getAddressIfSet(),jobToQueue.archiveFileId,jobToQueue.fileSize,jobToQueue.mountPolicy,jobToQueue.startTime}); + auto addedJobs = aq.addJobsIfNecessaryAndCommit(jta,m_agentReference,lc); + queueProcessAndCommitTime = t.secs(utils::Timer::resetCounter); + if(!addedJobs.files){ + try{ + throw cta::exception::Exception("In Sorter::queueArchiveRequests, no job have been added with addJobsIfNecessaryAndCommit()."); + } catch (cta::exception::Exception &e){ + jobPromise.set_exception(std::current_exception()); + continue; + } + } + // We will keep individual references for the job update we launch so that we make + // our life easier downstream. + struct ARUpdatedParams { + std::unique_ptr<ArchiveRequest::AsyncJobOwnerUpdater> updater; + std::shared_ptr<ArchiveRequest> archiveRequest; + uint16_t copyNb; + }; + + ARUpdatedParams arUpdaterParams; + arUpdaterParams.archiveRequest = archiveJobInfo->archiveRequest; + arUpdaterParams.copyNb = jobToQueue.jobDump.copyNb; + //Here the new owner is the agentReference of the process that runs the sorter, ArchiveRequest has no owner, the jobs have + arUpdaterParams.updater.reset(archiveJobInfo->archiveRequest->asyncUpdateJobOwner(jobToQueue.jobDump.copyNb,m_agentReference.getAgentAddress(),jobToQueue.jobDump.owner,cta::nullopt)); + + requestsUpdatePreparationTime = t.secs(utils::Timer::resetCounter); + try{ + arUpdaterParams.updater->wait(); + //No problem, the job has been inserted into the queue, log it. + jobPromise.set_value(); + log::ScopedParamContainer params(lc); + params.add("archiveRequestObject", archiveJobInfo->archiveRequest->getAddressIfSet()) + .add("copyNb", arUpdaterParams.copyNb) + .add("fileId",arUpdaterParams.updater->getArchiveFile().archiveFileID) + .add("tapePool",tapePool) + .add("archiveQueueObject",aq.getAddressIfSet()) + .add("previousOwner",jobToQueue.jobDump.owner); + lc.log(log::INFO, "In Sorter::queueArchiveRequests(): queued archive job."); + } catch (cta::exception::Exception &e){ + jobPromise.set_exception(std::current_exception()); + continue; + } + + requestsUpdatingTime = t.secs(utils::Timer::resetCounter); + { + log::ScopedParamContainer params(lc); + auto jobsSummary = aq.getJobsSummary(); + params.add("tapePool", tapePool) + .add("archiveQueueObject", aq.getAddressIfSet()) + /*.add("filesAdded", filesQueued) + .add("bytesAdded", bytesQueued) + .add("filesAddedInitially", filesQueued) + .add("bytesAddedInitially", bytesQueued)*/ + /*.add("filesDequeuedAfterErrors", filesDequeued) + .add("bytesDequeuedAfterErrors", bytesDequeued)*/ + .add("filesBefore", filesBefore) + .add("bytesBefore", bytesBefore) + .add("filesAfter", jobsSummary.jobs) + .add("bytesAfter", jobsSummary.bytes) + .add("queueLockFetchTime", queueLockFetchTime) + .add("queuePreparationTime", queueProcessAndCommitTime) + .add("requestsUpdatePreparationTime", requestsUpdatePreparationTime) + .add("requestsUpdatingTime", requestsUpdatingTime); + //.add("queueRecommitTime", queueRecommitTime); + lc.log(log::INFO, "In Sorter::queueArchiveRequests(): " + "Queued an archiveRequest"); + } + } + archiveJobInfos.clear(); +} + +void Sorter::queueRetrieveRequests(const std::string vid, const JobQueueType jobQueueType, std::list<std::shared_ptr<RetrieveJobQueueInfo>>& retrieveJobsInfo, log::LogContext &lc){ + /*for(auto& retrieveJobInfo: retrieveJobsInfo){ + double queueLockFetchTime=0; + double queueProcessAndCommitTime=0; + //double requestsUpdatePreparationTime=0; + //double requestsUpdatingTime = 0; + utils::Timer t; + uint64_t filesBefore=0; + uint64_t bytesBefore=0; + + RetrieveQueue rq(m_objectstore); + ScopedExclusiveLock rql; + Helpers::getLockedAndFetchedJobQueue<RetrieveQueue>(rq,rql,m_agentReference,vid,jobQueueType,lc); + queueLockFetchTime = t.secs(utils::Timer::resetCounter); + auto jobsSummary=rq.getJobsSummary(); + filesBefore=jobsSummary.jobs; + bytesBefore=jobsSummary.bytes; + + Sorter::RetrieveJob jobToQueue = std::get<0>(retrieveJobInfo->jobToQueue); + std::promise<void>& jobPromise = std::get<1>(retrieveJobInfo->jobToQueue); + + std::list<RetrieveQueue::JobToAdd> jta; + jta.push_back({jobToQueue.jobDump.copyNb,jobToQueue.fSeq,jobToQueue.retrieveRequest->getAddressIfSet(),jobToQueue.fileSize,jobToQueue.mountPolicy,jobToQueue.startTime}); + + auto addedJobs = rq.addJobsIfNecessaryAndCommit(jta, m_agentReference, lc); + queueProcessAndCommitTime = t.secs(utils::Timer::resetCounter); + + if(!addedJobs.files){ + throw cta::exception::Exception("In Sorter::queueRetrieveRequests(), failed of adding a job to the retrieve queue through addJobsIfNecessaryAndCommit()"); + } + + // We will keep individual references for each job update we launch so that we make + // our life easier downstream. + struct RRUpdatedParams { + std::unique_ptr<RetrieveRequest::AsyncJobOwnerUpdater> updater; + std::shared_ptr<RetrieveRequest> retrieveRequest; + uint64_t copyNb; + }; + + { + std::list<RRUpdatedParams> rrUpdatersParams; + + } + + if(queueLockFetchTime && filesBefore && bytesBefore &&queueProcessAndCommitTime){} + + }*/ +} + +}} diff --git a/objectstore/Sorter.hpp b/objectstore/Sorter.hpp new file mode 100644 index 0000000000000000000000000000000000000000..e36baeeb344c2ec5d0d50d08054762c0ed6f366c --- /dev/null +++ b/objectstore/Sorter.hpp @@ -0,0 +1,109 @@ +/** + * The CERN Tape Archive (CTA) project + * Copyright © 2018 CERN + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#ifndef SORTER_HPP +#define SORTER_HPP +#include <map> +#include <tuple> +#include "JobQueueType.hpp" +#include <memory> +#include "ArchiveRequest.hpp" +#include "RetrieveRequest.hpp" +#include "common/log/LogContext.hpp" +#include "Agent.hpp" +#include <future> +#include "common/threading/Mutex.hpp" +#include "GenericObject.hpp" +#include "catalogue/Catalogue.hpp" +#include "common/dataStructures/ArchiveJob.hpp" +#include "RetrieveQueue.hpp" +#include "ArchiveQueue.hpp" + +namespace cta { namespace objectstore { + + struct ArchiveJobQueueInfo; + struct RetrieveJobQueueInfo; + +class Sorter { +public: + CTA_GENERATE_EXCEPTION_CLASS(RetrieveRequestHasNoCopies); + Sorter(AgentReference& agentReference,Backend &objectstore, catalogue::Catalogue& catalogue); + virtual ~Sorter(); + void insertArchiveJob(std::shared_ptr<ArchiveRequest> archiveRequest, ArchiveRequest::JobDump& jobToInsert,log::LogContext & lc); + /** + * + * @param retrieveRequest + * @param lc + * @throws TODO : explain what is the exception thrown by this method + */ + void insertRetrieveRequest(std::shared_ptr<RetrieveRequest> retrieveRequest, log::LogContext & lc); + typedef std::map<std::tuple<std::string, JobQueueType>, std::list<std::shared_ptr<ArchiveJobQueueInfo>>> MapArchive; + typedef std::map<std::tuple<std::string, JobQueueType>, std::list<std::shared_ptr<RetrieveJobQueueInfo>>> MapRetrieve; + bool flushOneRetrieve(log::LogContext &lc); + bool flushOneArchive(log::LogContext &lc); + MapArchive getAllArchive(); + MapRetrieve getAllRetrieve(); + + struct ArchiveJob{ + std::shared_ptr<ArchiveRequest> archiveRequest; + ArchiveRequest::JobDump jobDump; + uint64_t archiveFileId; + time_t startTime; + uint64_t fileSize; + common::dataStructures::MountPolicy mountPolicy; + }; + + struct RetrieveJob{ + std::shared_ptr<RetrieveRequest> retrieveRequest; + RetrieveRequest::JobDump jobDump; + uint64_t archiveFileId; + time_t startTime; + uint64_t fileSize; + uint64_t fSeq; + common::dataStructures::MountPolicy mountPolicy; + }; + +private: + AgentReference &m_agentReference; + Backend &m_objectstore; + catalogue::Catalogue &m_catalogue; + MapArchive m_archiveQueuesAndRequests; + MapRetrieve m_retrieveQueuesAndRequests; + threading::Mutex m_mutex; + const unsigned int c_maxBatchSize = 500; + std::set<std::string> getCandidateVids(RetrieveRequest &request); + std::string getBestVidForQueueingRetrieveRequest(RetrieveRequest& retrieveRequest, std::set<std::string>& candidateVids, log::LogContext &lc); + void queueArchiveRequests(const std::string tapePool, const JobQueueType jobQueueType, std::list<std::shared_ptr<ArchiveJobQueueInfo>>& requests, log::LogContext &lc); + void queueRetrieveRequests(const std::string vid, const JobQueueType jobQueueType, std::list<std::shared_ptr<RetrieveJobQueueInfo>>& archiveJobInfos, log::LogContext &lc); +}; + +struct ArchiveJobQueueInfo{ + std::shared_ptr<ArchiveRequest> archiveRequest; + std::tuple<Sorter::ArchiveJob,std::promise<void>> jobToQueue; + //TODO : Job reporting +}; + +struct RetrieveJobQueueInfo{ + std::shared_ptr<RetrieveRequest> retrieveRequest; + std::tuple<Sorter::RetrieveJob,std::promise<void>> jobToQueue; + //TODO : Job reporting +}; + +}} +#endif /* SORTER_HPP */ + diff --git a/objectstore/SorterTest.cpp b/objectstore/SorterTest.cpp new file mode 100644 index 0000000000000000000000000000000000000000..58c67f9f82299a407a32e26277c523615ed0371e --- /dev/null +++ b/objectstore/SorterTest.cpp @@ -0,0 +1,193 @@ +/** + * The CERN Tape Archive (CTA) project + * Copyright © 2018 CERN + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#include <gtest/gtest.h> +#include "BackendVFS.hpp" +#include "common/exception/Exception.hpp" +#include "common/dataStructures/ArchiveFile.hpp" +#include "common/log/DummyLogger.hpp" +#include "tests/TestsCompileTimeSwitches.hpp" +#include "common/log/StdoutLogger.hpp" +#include "GarbageCollector.hpp" +#include "RootEntry.hpp" +#include "Agent.hpp" +#include "AgentReference.hpp" +#include "AgentRegister.hpp" +#include "DriveRegister.hpp" +#include "ArchiveRequest.hpp" +#include "RetrieveRequest.hpp" +#include "ArchiveQueue.hpp" +#include "RetrieveQueue.hpp" +#include "EntryLogSerDeser.hpp" +#include "catalogue/DummyCatalogue.hpp" +#include "Sorter.hpp" +#include <memory> + +namespace unitTests { + + TEST(ObjectStore,SorterInsertArchiveRequest){ + cta::log::StdoutLogger dl("dummy", "unitTest"); + //cta::log::DummyLogger dl("dummy", "unitTest"); + cta::log::LogContext lc(dl); + // We need a dummy catalogue + cta::catalogue::DummyCatalogue catalogue; + // Here we check that can successfully call ArchiveRequests's garbage collector + cta::objectstore::BackendVFS be; + // Create the root entry + cta::objectstore::RootEntry re(be); + re.initialize(); + re.insert(); + // Create the agent register + cta::objectstore::EntryLogSerDeser el("user0", + "unittesthost", time(NULL)); + cta::objectstore::ScopedExclusiveLock rel(re); + // Create the agent for objects creation + cta::objectstore::AgentReference agentRef("unitTestCreateEnv", dl); + // Finish root creation. + re.addOrGetAgentRegisterPointerAndCommit(agentRef, el, lc); + rel.release(); + // continue agent creation. + cta::objectstore::Agent agent(agentRef.getAgentAddress(), be); + agent.initialize(); + agent.setTimeout_us(0); + agent.insertAndRegisterSelf(lc); + + //Create the agent of the Sorter + cta::objectstore::AgentReference agentRefSorter("agentRefSorter", dl); + cta::objectstore::Agent agentSorter(agentRefSorter.getAgentAddress(), be); + agentSorter.initialize(); + agentSorter.setTimeout_us(0); + agentSorter.insertAndRegisterSelf(lc); + + std::string archiveRequestID = agentRef.nextId("ArchiveRequest"); + agentRef.addToOwnership(archiveRequestID,be); + cta::objectstore::ArchiveRequest ar(archiveRequestID,be); + ar.initialize(); + cta::common::dataStructures::ArchiveFile aFile; + aFile.archiveFileID = 123456789L; + aFile.diskFileId = "eos://diskFile"; + aFile.checksumType = "checksumType"; + aFile.checksumValue = "checksumValue"; + aFile.creationTime = 0; + aFile.reconciliationTime = 0; + aFile.diskFileInfo = cta::common::dataStructures::DiskFileInfo(); + aFile.diskInstance = "eoseos"; + aFile.fileSize = 667; + aFile.storageClass = "sc"; + ar.setArchiveFile(aFile); + ar.addJob(1, "TapePool0", agentRef.getAgentAddress(), 1, 1, 1); + ar.addJob(2, "TapePool1", agentRef.getAgentAddress(), 1, 1, 1); + ar.setJobStatus(1,cta::objectstore::serializers::ArchiveJobStatus::AJS_ToReportForTransfer); + cta::common::dataStructures::MountPolicy mp; + ar.setMountPolicy(mp); + ar.setArchiveReportURL(""); + ar.setArchiveErrorReportURL(""); + ar.setRequester(cta::common::dataStructures::UserIdentity("user0", "group0")); + ar.setSrcURL("root://eoseos/myFile"); + ar.setEntryLog(cta::common::dataStructures::EntryLog("user0", "host0", time(nullptr))); + ar.insert(); + cta::objectstore::ScopedExclusiveLock atfrl(ar); + ar.fetch(); + auto jobs = ar.dumpJobs(); + cta::objectstore::Sorter sorter(agentRefSorter,be,catalogue); + std::shared_ptr<cta::objectstore::ArchiveRequest> arPtr = std::make_shared<cta::objectstore::ArchiveRequest>(ar); + for(auto& j: jobs){ + sorter.insertArchiveJob(arPtr,j,lc); + } + atfrl.release(); + //Get the future + cta::objectstore::Sorter::MapArchive allArchiveJobs = sorter.getAllArchive(); + std::list<std::tuple<cta::objectstore::Sorter::ArchiveJob,std::future<void>>> allFutures; + for(auto& kv: allArchiveJobs){ + for(auto& job: kv.second){ + allFutures.emplace_back(std::make_tuple(std::get<0>(job->jobToQueue),std::get<1>(job->jobToQueue).get_future())); + } + } + sorter.flushOneArchive(lc); + sorter.flushOneArchive(lc); + for(auto& future: allFutures){ + ASSERT_NO_THROW(std::get<1>(future).get()); + } + + cta::objectstore::ScopedExclusiveLock sel(re); + re.fetch(); + + { + //Get the archiveQueueToReport + std::string archiveQueueToReport = re.getArchiveQueueAddress("TapePool0",cta::objectstore::JobQueueType::JobsToReportToUser); + cta::objectstore::ArchiveQueue aq(archiveQueueToReport,be); + + //Fetch the queue so that we can get the archiveRequests from it + cta::objectstore::ScopedExclusiveLock aql(aq); + aq.fetch(); + ASSERT_EQ(aq.dumpJobs().size(),1); + ASSERT_EQ(aq.getTapePool(),"TapePool0"); + for(auto &job: aq.dumpJobs()){ + ASSERT_EQ(job.copyNb,1); + ASSERT_EQ(job.size,667); + cta::objectstore::ArchiveRequest archiveRequest(job.address,be); + archiveRequest.fetchNoLock(); + cta::common::dataStructures::ArchiveFile archiveFile = archiveRequest.getArchiveFile(); + + ASSERT_EQ(archiveFile.archiveFileID,aFile.archiveFileID); + + ASSERT_EQ(archiveFile.diskFileId,aFile.diskFileId); + ASSERT_EQ(archiveFile.checksumType,aFile.checksumType); + ASSERT_EQ(archiveFile.checksumValue,aFile.checksumValue); + ASSERT_EQ(archiveFile.creationTime,aFile.creationTime); + ASSERT_EQ(archiveFile.reconciliationTime,aFile.reconciliationTime); + ASSERT_EQ(archiveFile.diskFileInfo,aFile.diskFileInfo); + ASSERT_EQ(archiveFile.fileSize,aFile.fileSize); + ASSERT_EQ(archiveFile.storageClass,aFile.storageClass); + } + } + + { + //Get the archiveQueueToTransfer + std::string archiveQueueToTransfer = re.getArchiveQueueAddress("TapePool1",cta::objectstore::JobQueueType::JobsToTransfer); + cta::objectstore::ArchiveQueue aq(archiveQueueToTransfer,be); + + //Fetch the queue so that we can get the archiveRequests from it + cta::objectstore::ScopedExclusiveLock aql(aq); + aq.fetch(); + ASSERT_EQ(aq.dumpJobs().size(),1); + ASSERT_EQ(aq.getTapePool(),"TapePool1"); + for(auto &job: aq.dumpJobs()){ + ASSERT_EQ(job.copyNb,2); + ASSERT_EQ(job.size,667); + cta::objectstore::ArchiveRequest archiveRequest(job.address,be); + archiveRequest.fetchNoLock(); + cta::common::dataStructures::ArchiveFile archiveFile = archiveRequest.getArchiveFile(); + + ASSERT_EQ(archiveFile.archiveFileID,aFile.archiveFileID); + + ASSERT_EQ(archiveFile.diskFileId,aFile.diskFileId); + ASSERT_EQ(archiveFile.checksumType,aFile.checksumType); + ASSERT_EQ(archiveFile.checksumValue,aFile.checksumValue); + ASSERT_EQ(archiveFile.creationTime,aFile.creationTime); + ASSERT_EQ(archiveFile.reconciliationTime,aFile.reconciliationTime); + ASSERT_EQ(archiveFile.diskFileInfo,aFile.diskFileInfo); + ASSERT_EQ(archiveFile.fileSize,aFile.fileSize); + ASSERT_EQ(archiveFile.storageClass,aFile.storageClass); + } + } + } + + +} + diff --git a/objectstore/cta.proto b/objectstore/cta.proto index c2d647e97869a6689253b11667b7ef5785be16f8..1cc97d906259ad79010881a2f1e7ae479fa4f4f3 100644 --- a/objectstore/cta.proto +++ b/objectstore/cta.proto @@ -280,6 +280,7 @@ enum ArchiveJobStatus { AJS_ToReportForFailure = 997; AJS_Failed = 998; AJS_Abandoned = 999; + AJS_ToTransferForRepack = 1001; } message ArchiveJob { @@ -333,7 +334,7 @@ enum RetrieveJobStatus { RJS_ToTransfer = 1; RJS_ToReportForFailure = 997; RJS_Failed = 998; - RJS_Succeeded = 1002; //For Retrieve request created by a Repack request + RJS_ToReportToRepackForSuccess = 1002; //For Retrieve request created by a Repack request } message SchedulerRetrieveRequest { diff --git a/scheduler/OStoreDB/OStoreDB.cpp b/scheduler/OStoreDB/OStoreDB.cpp index df278b8de3e4063ab1ad944cc83b4b9a90af1d46..b8d87d2e6f587d27684b4466759a0edbb40cd56f 100644 --- a/scheduler/OStoreDB/OStoreDB.cpp +++ b/scheduler/OStoreDB/OStoreDB.cpp @@ -95,7 +95,7 @@ void OStoreDB::waitSubthreadsComplete() { } //------------------------------------------------------------------------------ -// OStoreDB::waitSubthreadsComplete() +// OStoreDB::setThreadNumber() //------------------------------------------------------------------------------ void OStoreDB::setThreadNumber(uint64_t threadNumber) { // Clear all threads. @@ -641,23 +641,130 @@ void OStoreDB::queueArchive(const std::string &instanceName, const cta::common:: logContext.log(log::INFO, "In OStoreDB::queueArchive(): recorded request for queueing (enqueueing posted to thread pool)."); } -void OStoreDB::queueArchiveForRepack(cta::objectstore::ArchiveRequest& request, log::LogContext& logContext){ -// objectstore::ScopedExclusiveLock rReqL(request); -// request.fetch(); +void OStoreDB::queueArchiveForRepack(std::unique_ptr<cta::objectstore::ArchiveRequest> request, log::LogContext& logContext){ + assertAgentAddressSet(); auto mutexForHelgrind = cta::make_unique<cta::threading::Mutex>(); cta::threading::MutexLocker mlForHelgrind(*mutexForHelgrind); auto * mutexForHelgrindAddr = mutexForHelgrind.release(); - std::unique_ptr<cta::objectstore::ArchiveRequest> aReqUniqPtr; - aReqUniqPtr.reset(&request); + std::unique_ptr<cta::objectstore::ArchiveRequest> aReqUniqPtr(request.release()); + objectstore::ScopedExclusiveLock rReqL(*aReqUniqPtr); + aReqUniqPtr->fetch(); + uint64_t taskQueueSize = m_taskQueueSize; + // We create the object here + //m_agentReference->addToOwnership(aReqUniqPtr->getAddressIfSet(), m_objectStore); + utils::Timer timer; + double agentReferencingTime = timer.secs(cta::utils::Timer::reset_t::resetCounter); + // Prepare the logs to avoid multithread access on the object. + log::ScopedParamContainer params(logContext); + cta::common::dataStructures::ArchiveFile aFile = aReqUniqPtr->getArchiveFile(); + params.add("jobObject", aReqUniqPtr->getAddressIfSet()) + .add("fileId", aFile.archiveFileID) + .add("diskInstance", aFile.diskInstance) + .add("diskFilePath", aFile.diskFileInfo.path) + .add("diskFileId", aFile.diskFileId) + .add("agentReferencingTime", agentReferencingTime); + delayIfNecessary(logContext); cta::objectstore::ArchiveRequest *aReqPtr = aReqUniqPtr.release(); + + m_taskQueueSize++; + auto * et = new EnqueueingTask([aReqPtr, mutexForHelgrindAddr, this]{ - //TODO : queue the ArchiveRequest + std::unique_ptr<cta::threading::Mutex> mutexForHelgrind(mutexForHelgrindAddr); + std::unique_ptr<cta::objectstore::ArchiveRequest> aReq(aReqPtr); + // This unique_ptr's destructor will ensure the OStoreDB object is not deleted before the thread exits. + auto scopedCounterDecrement = [this](void *){ + m_taskQueueSize--; + m_taskPostingSemaphore.release(); + }; + // A bit ugly, but we need a non-null pointer for the "deleter" to be called at the end of the thread execution + std::unique_ptr<void, decltype(scopedCounterDecrement)> scopedCounterDecrementerInstance((void *)1, scopedCounterDecrement); + + log::LogContext logContext(m_logger); + utils::Timer timer; + ScopedExclusiveLock arl(*aReq); + aReq->fetch(); + timer.secs(cta::utils::Timer::reset_t::resetCounter); + double arTotalQueueUnlockTime = 0; + double arTotalQueueingTime = timer.secs(cta::utils::Timer::reset_t::resetCounter); + // We can now enqueue the requests + std::list<std::string> linkedTapePools; + std::string currentTapepool; + try { + for (auto &j: aReq->dumpJobs()) { + //Queue each job into the ArchiveQueue + double qTime = timer.secs(cta::utils::Timer::reset_t::resetCounter); + currentTapepool = j.tapePool; + linkedTapePools.push_back(j.owner); + auto shareLock = ostoredb::MemArchiveQueue::sharedAddToQueue(j, j.tapePool, *aReq, *this, logContext); + arTotalQueueingTime += qTime; + aReq->commit(); + // Now we can let go off the queue. + shareLock.reset(); + double qUnlockTime = timer.secs(cta::utils::Timer::reset_t::resetCounter); + arTotalQueueUnlockTime += qUnlockTime; + linkedTapePools.push_back(j.owner); + log::ScopedParamContainer params(logContext); + params.add("tapePool", j.tapePool) + .add("queueObject", j.owner) + .add("jobObject", aReq->getAddressIfSet()) + .add("queueingTime", qTime) + .add("queueUnlockTime", qUnlockTime); + logContext.log(log::INFO, "In OStoreDB::queueArchiveForRepack(): added job to queue."); + } + } catch (NoSuchArchiveQueue &ex) { + // Unlink the request from already connected tape pools + for (auto tpa=linkedTapePools.begin(); tpa!=linkedTapePools.end(); tpa++) { + objectstore::ArchiveQueue aq(*tpa, m_objectStore); + ScopedExclusiveLock aql(aq); + aq.fetch(); + aq.removeJobsAndCommit({aReq->getAddressIfSet()}); + } + aReq->remove(); + log::ScopedParamContainer params(logContext); + params.add("tapePool", currentTapepool) + .add("archiveRequestObject", aReq->getAddressIfSet()) + .add("exceptionMessage", ex.getMessageValue()) + .add("jobObject", aReq->getAddressIfSet()); + logContext.log(log::ERR, "In OStoreDB::queueArchiveForRepack(): failed to enqueue job"); + return; + } + // The request is now fully set. + double arOwnerResetTime = timer.secs(cta::utils::Timer::reset_t::resetCounter); + + double arLockRelease = timer.secs(cta::utils::Timer::reset_t::resetCounter); + + // And remove reference from the agent + m_agentReference->removeFromOwnership(aReq->getAddressIfSet(), m_objectStore); + double agOwnershipResetTime = timer.secs(cta::utils::Timer::reset_t::resetCounter); + + auto archiveFile = aReq->getArchiveFile(); + log::ScopedParamContainer params(logContext); + params.add("jobObject", aReq->getAddressIfSet()) + .add("fileId", archiveFile.archiveFileID) + .add("diskInstance", archiveFile.diskInstance) + .add("diskFilePath", archiveFile.diskFileInfo.path) + .add("diskFileId", archiveFile.diskFileId) + .add("totalQueueingTime", arTotalQueueingTime) + .add("totalQueueUnlockTime", arTotalQueueUnlockTime) + .add("ownerResetTime", arOwnerResetTime) + .add("lockReleaseTime", arLockRelease) + .add("agentOwnershipResetTime", agOwnershipResetTime) + .add("totalTime", arTotalQueueingTime + + arTotalQueueUnlockTime + arOwnerResetTime + arLockRelease + + agOwnershipResetTime); + + logContext.log(log::INFO, "In OStoreDB::queueArchiveForRepack(): Finished enqueueing request."); }); ANNOTATE_HAPPENS_BEFORE(et); mlForHelgrind.unlock(); + rReqL.release(); m_enqueueingTasksQueue.push(et); //TODO Time measurement - logContext.log(log::INFO, "In OStoreDB::queueArchiveForRepack(): recorded request for queueing (enqueueing posted to thread pool)."); + double taskPostingTime = timer.secs(cta::utils::Timer::reset_t::resetCounter); + params.add("taskPostingTime", taskPostingTime) + .add("taskQueueSize", taskQueueSize) + .add("totalTime", agentReferencingTime + taskPostingTime); + logContext.log(log::INFO, "In OStoreDB::queueArchiveForRepack(): recorded request for queueing (enqueueing posted to thread pool)."); } //------------------------------------------------------------------------------ @@ -2508,7 +2615,7 @@ std::set<cta::SchedulerDatabase::RetrieveJob *> OStoreDB::RetrieveMount::batchSu osdbJob->checkReportSucceedForRepack(); auto & tapeFile = osdbJob->archiveFile.tapeFiles[osdbJob->selectedCopyNb]; vid = osdbJob->m_retrieveMount->mountInfo.vid; - insertedElementsLists.push_back(AqtrtrfsCa::InsertedElement{&osdbJob->m_retrieveRequest, (uint16_t)osdbJob->selectedCopyNb, tapeFile.fSeq,osdbJob->archiveFile.fileSize,cta::common::dataStructures::MountPolicy::s_defaultMountPolicyForRepack,serializers::RetrieveJobStatus::RJS_Succeeded}); + insertedElementsLists.push_back(AqtrtrfsCa::InsertedElement{&osdbJob->m_retrieveRequest, (uint16_t)osdbJob->selectedCopyNb, tapeFile.fSeq,osdbJob->archiveFile.fileSize,cta::common::dataStructures::MountPolicy::s_defaultMountPolicyForRepack,serializers::RetrieveJobStatus::RJS_ToReportToRepackForSuccess}); } aqtrtrfsCa.referenceAndSwitchOwnership(vid,insertedElementsLists,lc); return ret; diff --git a/scheduler/OStoreDB/OStoreDB.hpp b/scheduler/OStoreDB/OStoreDB.hpp index 718933e97d738a5bf9ab6419e83ab0e18c61c0d8..6462953be41ed160606bd739eba4aa61e3a836a8 100644 --- a/scheduler/OStoreDB/OStoreDB.hpp +++ b/scheduler/OStoreDB/OStoreDB.hpp @@ -267,7 +267,7 @@ public: void queueArchive(const std::string &instanceName, const cta::common::dataStructures::ArchiveRequest &request, const cta::common::dataStructures::ArchiveFileQueueCriteriaAndFileId &criteria, log::LogContext &logContext) override; - void queueArchiveForRepack(cta::objectstore::ArchiveRequest& request, log::LogContext& logContext) override; + void queueArchiveForRepack(std::unique_ptr<cta::objectstore::ArchiveRequest> request, log::LogContext& logContext) override; std::map<std::string, std::list<common::dataStructures::ArchiveJob>> getArchiveJobs() const override; diff --git a/scheduler/OStoreDB/OStoreDBFactory.hpp b/scheduler/OStoreDB/OStoreDBFactory.hpp index c53a254c5c48151dad2ac43abac6107aca7dd95e..7b43fc30354d7c00d0341c87eba93a83584358c9 100644 --- a/scheduler/OStoreDB/OStoreDBFactory.hpp +++ b/scheduler/OStoreDB/OStoreDBFactory.hpp @@ -110,8 +110,8 @@ public: return m_OStoreDB.queueArchive(instanceName, request, criteria, logContext); } - void queueArchiveForRepack(cta::objectstore::ArchiveRequest& request, log::LogContext& lc) override { - return m_OStoreDB.queueArchiveForRepack(request,lc); + void queueArchiveForRepack(std::unique_ptr<cta::objectstore::ArchiveRequest> request, log::LogContext& lc) override { + return m_OStoreDB.queueArchiveForRepack(std::move(request),lc); } void deleteRetrieveRequest(const common::dataStructures::SecurityIdentity& cliIdentity, const std::string& remoteFile) override { diff --git a/scheduler/Scheduler.cpp b/scheduler/Scheduler.cpp index 488a0620256e22e423ca82627495dcfa1f0db807..7057a6b30939b34891fba046ec373dbf256ae73c 100644 --- a/scheduler/Scheduler.cpp +++ b/scheduler/Scheduler.cpp @@ -179,8 +179,8 @@ void Scheduler::queueArchiveRequestForRepackBatch(std::list<cta::objectstore::Ar archiveRequest.fetch(); cta::common::dataStructures::ArchiveFile archiveFile = archiveRequest.getArchiveFile(); rReqL.release(); - this->m_db.queueArchiveForRepack(archiveRequest,lc); - + std::unique_ptr<cta::objectstore::ArchiveRequest> arUniqPtr = cta::make_unique<cta::objectstore::ArchiveRequest>(archiveRequest); + this->m_db.queueArchiveForRepack(std::move(arUniqPtr),lc); cta::log::TimingList tl; utils::Timer t; tl.insOrIncAndReset("schedulerDbTime", t); diff --git a/scheduler/SchedulerDatabase.hpp b/scheduler/SchedulerDatabase.hpp index 2b8ea4c9931148ea8e08da14585362f47d34d60c..d6c9457cf969127ab80c0fa0325a864ddf194d48 100644 --- a/scheduler/SchedulerDatabase.hpp +++ b/scheduler/SchedulerDatabase.hpp @@ -118,7 +118,7 @@ public: const cta::common::dataStructures::ArchiveFileQueueCriteriaAndFileId &criteria, log::LogContext &logContext) = 0; - virtual void queueArchiveForRepack(cta::objectstore::ArchiveRequest &request,log::LogContext &logContext) = 0; + virtual void queueArchiveForRepack(std::unique_ptr<cta::objectstore::ArchiveRequest> request,log::LogContext &logContext) = 0; /** * Returns all of the queued archive jobs. The returned jobs are diff --git a/scheduler/SchedulerTest.cpp b/scheduler/SchedulerTest.cpp index 635bf252aea1c9107612b8e4a7a919945e2babbc..9aa4e3296a84c5b218e9cd6810a0970d6adf03cd 100644 --- a/scheduler/SchedulerTest.cpp +++ b/scheduler/SchedulerTest.cpp @@ -1588,7 +1588,7 @@ TEST_P(SchedulerTest, expandRepackRequest) { ASSERT_EQ(retrieveRequest.getQueueType(),cta::objectstore::JobQueueType::JobsToReportToRepackForSuccess); ASSERT_EQ(retrieveRequest.getRetrieveFileQueueCriteria().mountPolicy,cta::common::dataStructures::MountPolicy::s_defaultMountPolicyForRepack); ASSERT_EQ(retrieveRequest.getActiveCopyNumber(),1); - ASSERT_EQ(retrieveRequest.getJobStatus(job.copyNb),cta::objectstore::serializers::RetrieveJobStatus::RJS_Succeeded); + ASSERT_EQ(retrieveRequest.getJobStatus(job.copyNb),cta::objectstore::serializers::RetrieveJobStatus::RJS_ToReportToRepackForSuccess); ASSERT_EQ(retrieveRequest.getJobs().size(),1); //Testing the archive file associated to the retrieve request @@ -1715,8 +1715,22 @@ TEST_P(SchedulerTest, expandRepackRequest) { //queue all the ArchiveRequests into the ArchiveQueueToTransferForRepack queue. for(uint64_t i = 1; i <= nbTapesForTest; ++i){ scheduler.queueArchiveRequestForRepackBatch(archiveRequestsPerTape[i],lc); - scheduler.waitSchedulerDbSubthreadsComplete(); } + scheduler.waitSchedulerDbSubthreadsComplete(); + + //Test that the ArchiveRequests are in the ArchiveQueueToTransferForRepack queue + /*cta::objectstore::RootEntry re(schedulerDB.getBackend()); + cta::objectstore::ScopedExclusiveLock sel(re); + re.fetch(); + + //Get the retrieveQueueToReportToRepackForSuccess + std::string archiveQueueToTransferForRepack = re.getArchiveQueueAddress(s_tapePoolName,cta::objectstore::JobQueueType::JobsToTransfer); + cta::objectstore::ArchiveQueue aq(archiveQueueToTransferForRepack,schedulerDB.getBackend()); + + //Fetch the queue so that we can get the retrieveRequests from it + cta::objectstore::ScopedExclusiveLock rql(aq); + aq.fetch();*/ + } }