diff --git a/catalogue/CatalogueTest.cpp b/catalogue/CatalogueTest.cpp index fa6c2275caaadd92dc09a5c2ce827918de4ca61e..2ba94fc59c3b25c815becccedc01116e3eb9ca21 100644 --- a/catalogue/CatalogueTest.cpp +++ b/catalogue/CatalogueTest.cpp @@ -8216,7 +8216,7 @@ TEST_P(cta_catalogue_CatalogueTest, getAllTapes_many_tapes) { m_catalogue->createLogicalLibrary(m_admin, logicalLibraryName, "Create logical library"); m_catalogue->createTapePool(m_admin, tapePoolName, nbPartialTapes, true, "Create tape pool"); - const uint32_t nbTapes = 1000; + const uint32_t nbTapes = 10; for(uint32_t i = 0; i < nbTapes; i++) { std::ostringstream vid; @@ -8238,6 +8238,11 @@ TEST_P(cta_catalogue_CatalogueTest, getAllTapes_many_tapes) { ASSERT_NE(vidToTapeMap.end(), tapeItor); ASSERT_EQ(vid.str(), tapeItor->second.vid); + ASSERT_EQ(logicalLibraryName, tapeItor->second.logicalLibraryName); + ASSERT_EQ(tapePoolName, tapeItor->second.tapePoolName); + ASSERT_EQ(capacityInBytes, tapeItor->second.capacityInBytes); + ASSERT_EQ(disabledValue, tapeItor->second.disabled); + ASSERT_EQ(fullValue, tapeItor->second.full); ASSERT_EQ(tapeComment, tapeItor->second.comment); } } diff --git a/catalogue/DropSqliteCatalogueSchema.before_SQL.cpp b/catalogue/DropSqliteCatalogueSchema.before_SQL.cpp deleted file mode 100644 index df59da1259118483e28f83010da217b5d6657882..0000000000000000000000000000000000000000 --- a/catalogue/DropSqliteCatalogueSchema.before_SQL.cpp +++ /dev/null @@ -1,33 +0,0 @@ -/* - * The CERN Tape Archive (CTA) project - * Copyright (C) 2015 CERN - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - */ - -#include "catalogue/DropSqliteCatalogueSchema.hpp" - -namespace cta { -namespace catalogue { - -//------------------------------------------------------------------------------ -// constructor -//------------------------------------------------------------------------------ -DropSqliteCatalogueSchema::DropSqliteCatalogueSchema(): sql( - // DROP_CTA_SQL_SCHEMA - The contents of drop_sqlite_catalogue_schema.cpp go here - ) { -} - -} // namespace catalogue -} // namespace cta diff --git a/objectstore/AgentWatchdog.hpp b/objectstore/AgentWatchdog.hpp index f0201b2ec0865fc2ed0075e51407f1bc684bf944..9f1a157a1913742a15996eeb6c34d488ea7a0a53 100644 --- a/objectstore/AgentWatchdog.hpp +++ b/objectstore/AgentWatchdog.hpp @@ -49,6 +49,15 @@ public: return true; } + std::list<log::Param> getDeadAgentDetails() { + std::list<log::Param> ret; + ret.push_back(log::Param("currentHeartbeat", readHeartbeat())); + ret.push_back(log::Param("timeout", m_timeout)); + ret.push_back(log::Param("timer", m_timer.secs())); + ret.push_back(log::Param("heartbeatAtTimerStart", m_heartbeatCounter)); + return ret; + } + bool checkExists() { return m_agent.exists(); } diff --git a/objectstore/ArchiveRequest.cpp b/objectstore/ArchiveRequest.cpp index 0462e33ac7dd0335981f512c413bc533ecae0228..876259ba95c1b152e86b4db0c3f2f91aa6da13ec 100644 --- a/objectstore/ArchiveRequest.cpp +++ b/objectstore/ArchiveRequest.cpp @@ -554,6 +554,21 @@ std::string ArchiveRequest::getJobOwner(uint16_t copyNumber) { return j->owner(); } +std::string ArchiveRequest::statusToString(const serializers::ArchiveJobStatus& status) { + switch(status) { + case serializers::ArchiveJobStatus::AJS_Complete: + return "Complete"; + case serializers::ArchiveJobStatus::AJS_Failed: + return "Failed"; + case serializers::ArchiveJobStatus::AJS_LinkingToArchiveQueue: + return "LinkingToArchiveQueue"; + case serializers::ArchiveJobStatus::AJS_PendingMount: + return "PendingMount"; + default: + return std::string("Unknown (")+std::to_string((uint64_t) status)+")"; + } +} + bool ArchiveRequest::finishIfNecessary(log::LogContext & lc) { checkPayloadWritable(); @@ -570,6 +585,9 @@ bool ArchiveRequest::finishIfNecessary(log::LogContext & lc) { remove(); log::ScopedParamContainer params(lc); params.add("archiveRequestObject", getAddressIfSet()); + for (auto & j: jl) { + params.add(std::string("statusForCopyNb")+std::to_string(j.copynb()), statusToString(j.status())); + } lc.log(log::INFO, "In ArchiveRequest::finishIfNecessary(): Removed completed request."); return true; } diff --git a/objectstore/ArchiveRequest.hpp b/objectstore/ArchiveRequest.hpp index 3ab81a5be2a9d0dfb7328aa5a1af310387f03938..b20cfd237b7f1062b8e244fed63bc538c6aba497 100644 --- a/objectstore/ArchiveRequest.hpp +++ b/objectstore/ArchiveRequest.hpp @@ -50,6 +50,7 @@ public: bool setJobSuccessful(uint16_t copyNumber); //< returns true if this is the last job bool addJobFailure(uint16_t copyNumber, uint64_t sessionId, log::LogContext &lc); //< returns true the job is failed serializers::ArchiveJobStatus getJobStatus(uint16_t copyNumber); + std::string statusToString(const serializers::ArchiveJobStatus & status); bool finishIfNecessary(log::LogContext & lc);/**< Handling of the consequences of a job status change for the entire request. * This function returns true if the request got finished. */ // Mark all jobs as pending mount (following their linking to a tape pool) diff --git a/objectstore/BackendPopulator.cpp b/objectstore/BackendPopulator.cpp index 2e4345f8677465ed674d77d4b56a6c4bc081fedb..ea4dd0eac3b2ef1558a9153aa86fe16b03247420 100644 --- a/objectstore/BackendPopulator.cpp +++ b/objectstore/BackendPopulator.cpp @@ -68,6 +68,7 @@ BackendPopulator::~BackendPopulator() throw() { params.add("agentObject", agent.getAddressIfSet()) .add("ownedObjectCount", agent.getOwnershipList().size()); m_lc.log(log::WARNING, "In BackendPopulator::~BackendPopulator(): not deleting non-empty agent object, left for garbage collection."); + return; } agent.removeAndUnregisterSelf(m_lc); } catch (cta::exception::Exception & ex) { diff --git a/objectstore/GarbageCollector.cpp b/objectstore/GarbageCollector.cpp index 60bb50cca34635c2683741d18c0b87d9ca935303..662fdd29669019e6b59d31fd83acd5a43896af0c 100644 --- a/objectstore/GarbageCollector.cpp +++ b/objectstore/GarbageCollector.cpp @@ -119,7 +119,7 @@ void GarbageCollector::checkHeartbeats(log::LogContext & lc) { // Get the heartbeat. Clean dead agents and remove references to them try { if (!wa->second->checkAlive()) { - cleanupDeadAgent(wa->first, lc); + cleanupDeadAgent(wa->first, wa->second->getDeadAgentDetails(), lc); delete wa->second; m_watchedAgents.erase(wa++); } else { @@ -137,7 +137,7 @@ void GarbageCollector::checkHeartbeats(log::LogContext & lc) { } } -void GarbageCollector::cleanupDeadAgent(const std::string & address, log::LogContext & lc) { +void GarbageCollector::cleanupDeadAgent(const std::string & address, std::list<log::Param> agentDetails, log::LogContext & lc) { // We detected a dead agent. Try and take ownership of it. It could already be owned // by another garbage collector. // To minimize locking, take a lock on the agent and check its ownership first. @@ -175,7 +175,11 @@ void GarbageCollector::cleanupDeadAgent(const std::string & address, log::LogCon m_agentRegister.trackAgent(address); m_agentRegister.commit(); arl.release(); - lc.log(log::INFO, "In GarbageCollector::cleanupDeadAgent(): will cleanup dead agent."); + { + log::ScopedParamContainer params2(lc); + for (auto p: agentDetails) params2.add(p.getName(), p.getValue()); + lc.log(log::INFO, "In GarbageCollector::cleanupDeadAgent(): will cleanup dead agent."); + } // Return all objects owned by the agent to their respective backup owners OwnedObjectSorter ownedObjectSorter; @@ -217,7 +221,7 @@ void GarbageCollector::OwnedObjectSorter::fetchOwnedObjects(Agent& agent, std::l log::ScopedParamContainer params(lc); params.add("objectAddress", obj) .add("exceptionMessage", ex.getMessageValue()); - lc.log(log::ERR, "In GarbageCollector::cleanupDeadAgent(): failed to asyncLockfreeFetch(): skipping object. Garbage collection will be incomplete."); + lc.log(log::ERR, "In GarbageCollector::OwnedObjectSorter::fetchOwnedObjects(): failed to asyncLockfreeFetch(): skipping object. Garbage collection will be incomplete."); } } @@ -230,7 +234,7 @@ void GarbageCollector::OwnedObjectSorter::fetchOwnedObjects(Agent& agent, std::l ownedObjectsFetchers.at(obj.get())->wait(); } catch (Backend::NoSuchObject & ex) { goneObjects.push_back(obj->getAddressIfSet()); - lc.log(log::INFO, "In GarbageCollector::cleanupDeadAgent(): skipping garbage collection of now gone object."); + lc.log(log::INFO, "In GarbageCollector::OwnedObjectSorter::fetchOwnedObjects(): skipping garbage collection of now gone object."); ownedObjectsFetchers.erase(obj.get()); agent.removeFromOwnership(obj->getAddressIfSet()); ownershipUdated=true; @@ -239,7 +243,7 @@ void GarbageCollector::OwnedObjectSorter::fetchOwnedObjects(Agent& agent, std::l // Again, we have a problem. We will skip the object and have an incomplete GC. skippedObjects.push_back(obj->getAddressIfSet()); params2.add("exceptionMessage", ex.getMessageValue()); - lc.log(log::ERR, "In GarbageCollector::cleanupDeadAgent(): failed to AsyncLockfreeFetch::wait(): skipping object. Garbage collection will be incomplete."); + lc.log(log::ERR, "In GarbageCollector::OwnedObjectSorter::fetchOwnedObjects(): failed to AsyncLockfreeFetch::wait(): skipping object. Garbage collection will be incomplete."); ownedObjectsFetchers.erase(obj.get()); continue; } @@ -272,7 +276,7 @@ void GarbageCollector::OwnedObjectSorter::sortFetchedObjects(Agent& agent, std:: // Log the owner (except for archiveRequests which can have several owners). params2.add("actualOwner", obj->getAddressIfSet()); } - lc.log(log::WARNING, "In GarbageCollector::cleanupDeadAgent(): skipping object which is not owned by this agent"); + lc.log(log::WARNING, "In GarbageCollector::OwnedObjectSorter::sortFetchedObjects(): skipping object which is not owned by this agent"); agent.removeFromOwnership(obj->getAddressIfSet()); ownershipUdated=true; continue; @@ -454,7 +458,7 @@ void GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateArchiveJobs(Agent& a .add("tapepool", tapepool.first) .add("archiveQueueObject", aq.getAddressIfSet()) .add("garbageCollectedPreviousOwner", agent.getAddressIfSet()); - lc.log(log::INFO, "In GarbageCollector::cleanupDeadAgent(): requeued archive job."); + lc.log(log::INFO, "In GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateArchiveJobs(): requeued archive job."); } catch (cta::exception::Exception & e) { // Update did not go through. It could be benign std::string debugType=typeid(e).name(); @@ -466,7 +470,7 @@ void GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateArchiveJobs(Agent& a .add("copyNb", arup.copyNb) .add("fileId", arup.archiveRequest->getArchiveFile().archiveFileID) .add("exceptionType", debugType); - lc.log(log::ERR, "In GarbageCollector::cleanupDeadAgent(): failed to requeue gone/not owned archive job. Removing from queue."); + lc.log(log::ERR, "In GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateArchiveJobs(): failed to requeue gone/not owned archive job. Removing from queue."); } else { // We have an unexpected error. We will handle this with the request-by-request garbage collection. log::ScopedParamContainer params(lc); @@ -475,7 +479,7 @@ void GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateArchiveJobs(Agent& a .add("fileId", arup.archiveRequest->getArchiveFile().archiveFileID) .add("exceptionType", debugType) .add("exceptionMessage", e.getMessageValue()); - lc.log(log::ERR, "In GarbageCollector::cleanupDeadAgent(): failed to requeue archive job with unexpected error. Removing from queue and will re-run individual garbage collection."); + lc.log(log::ERR, "In GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateArchiveJobs(): failed to requeue archive job with unexpected error. Removing from queue and will re-run individual garbage collection."); // We will re-run the individual GC for this one. jobsIndividuallyGCed.insert(arup.archiveRequest->getAddressIfSet()); otherObjects.emplace_back(new GenericObject(arup.archiveRequest->getAddressIfSet(), objectStore)); @@ -491,7 +495,7 @@ void GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateArchiveJobs(Agent& a aq.removeJobsAndCommit(requestsToDequeue); log::ScopedParamContainer params(lc); params.add("archiveQueueObject", aq.getAddressIfSet()); - lc.log(log::INFO, "In GarbageCollector::cleanupDeadAgent(): Cleaned up and re-committed archive queue after error handling."); + lc.log(log::INFO, "In GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateArchiveJobs(): Cleaned up and re-committed archive queue after error handling."); queueRecommitTime = t.secs(utils::Timer::resetCounter); } } @@ -515,7 +519,7 @@ void GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateArchiveJobs(Agent& a .add("requestsUpdatePreparationTime", requestsUpdatePreparationTime) .add("requestsUpdatingTime", requestsUpdatingTime) .add("queueRecommitTime", queueRecommitTime); - lc.log(log::INFO, "In GarbageCollector::cleanupDeadAgent(): Requeued a batch of archive requests."); + lc.log(log::INFO, "In GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateArchiveJobs(): Requeued a batch of archive requests."); } // We can now forget pool level list. But before that, we can remove the objects // from agent ownership if this was the last reference to it. @@ -625,7 +629,7 @@ void GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateRetrieveJobs(Agent& .add("vid", tape.first) .add("retreveQueueObject", rq.getAddressIfSet()) .add("garbageCollectedPreviousOwner", agent.getAddressIfSet()); - lc.log(log::INFO, "In GarbageCollector::cleanupDeadAgent(): requeued retrieve job."); + lc.log(log::INFO, "In GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateRetrieveJobs(): requeued retrieve job."); } catch (cta::exception::Exception & e) { // Update did not go through. It could be benign std::string debugType=typeid(e).name(); @@ -638,7 +642,7 @@ void GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateRetrieveJobs(Agent& .add("copyNb", rrup.copyNb) .add("fileId", rrup.retrieveRequest->getArchiveFile().archiveFileID) .add("exceptionType", debugType); - lc.log(log::ERR, "In GarbageCollector::cleanupDeadAgent(): failed to requeue gone/not owned retrieve job. Removing from queue."); + lc.log(log::ERR, "In GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateRetrieveJobs(): failed to requeue gone/not owned retrieve job. Removing from queue."); } else { // We have an unexpected error. Log it, and remove form queue. Not much we can // do at this point. @@ -648,7 +652,7 @@ void GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateRetrieveJobs(Agent& .add("fileId", rrup.retrieveRequest->getArchiveFile().archiveFileID) .add("exceptionType", debugType) .add("exceptionMessage", e.getMessageValue()); - lc.log(log::ERR, "In GarbageCollector::cleanupDeadAgent(): failed to requeue retrieve job with unexpected error. Removing from queue and will re-run individual garbage collection."); + lc.log(log::ERR, "In GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateRetrieveJobs(): failed to requeue retrieve job with unexpected error. Removing from queue and will re-run individual garbage collection."); // We will re-run the individual GC for this one. jobsIndividuallyGCed.insert(rrup.retrieveRequest->getAddressIfSet()); otherObjects.emplace_back(new GenericObject(rrup.retrieveRequest->getAddressIfSet(), objectStore)); @@ -664,7 +668,7 @@ void GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateRetrieveJobs(Agent& rq.removeJobsAndCommit(requestsToDequeue); log::ScopedParamContainer params(lc); params.add("retreveQueueObject", rq.getAddressIfSet()); - lc.log(log::INFO, "In GarbageCollector::cleanupDeadAgent(): Cleaned up and re-committed retrieve queue after error handling."); + lc.log(log::INFO, "In GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateRetrieveJobs(): Cleaned up and re-committed retrieve queue after error handling."); queueRecommitTime = t.secs(utils::Timer::resetCounter); } } @@ -688,7 +692,7 @@ void GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateRetrieveJobs(Agent& .add("requestsUpdatePreparationTime", requestsUpdatePreparationTime) .add("requestsUpdatingTime", requestsUpdatingTime) .add("queueRecommitTime", queueRecommitTime); - lc.log(log::INFO, "In GarbageCollector::cleanupDeadAgent(): Requeued a batch of retrieve requests."); + lc.log(log::INFO, "In GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateRetrieveJobs(): Requeued a batch of retrieve requests."); } // We can now forget pool level list. But before that, we can remove the objects // from agent ownership if this was the last reference to it. @@ -726,9 +730,9 @@ void GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateOtherObjects(Agent& // Call GenericOpbject's garbage collect method, which in turn will // delegate to the object type's garbage collector. go->garbageCollectDispatcher(goLock, agent.getAddressIfSet(), agentReference, lc, catalogue); - lc.log(log::INFO, "In GarbageCollector::cleanupDeadAgent(): garbage collected owned object."); + lc.log(log::INFO, "In GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateOtherObjects(): garbage collected owned object."); } else { - lc.log(log::INFO, "In GarbageCollector::cleanupDeadAgent(): skipping garbage collection of now gone object."); + lc.log(log::INFO, "In GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateOtherObjects(): skipping garbage collection of now gone object."); } // In all cases, relinquish ownership for this object agent.removeFromOwnership(go->getAddressIfSet()); diff --git a/objectstore/GarbageCollector.hpp b/objectstore/GarbageCollector.hpp index 70f8c4deae81df2db615ae5d9fcfd0902d75ac4d..603d1d47fb224404511ecad51570523f486b1cc9 100644 --- a/objectstore/GarbageCollector.hpp +++ b/objectstore/GarbageCollector.hpp @@ -49,7 +49,7 @@ public: void checkHeartbeats(log::LogContext & lc); - void cleanupDeadAgent(const std::string & name, log::LogContext & lc); + void cleanupDeadAgent(const std::string & name, std::list<log::Param> agentDetails, log::LogContext & lc); /** Structure allowing the sorting of owned objects, so they can be requeued in batches, * one batch per queue. */ diff --git a/objectstore/RetrieveQueue.cpp b/objectstore/RetrieveQueue.cpp index 5f19a47d0f96e087785c7bcc58072e82cf444f51..14f94447b80dc89e031d4ebcf5e7a5bae842953c 100644 --- a/objectstore/RetrieveQueue.cpp +++ b/objectstore/RetrieveQueue.cpp @@ -459,6 +459,13 @@ void RetrieveQueue::addJobsAndCommit(std::list<JobToAdd> & jobsToAdd, AgentRefer maxDriveAllowedMap.incCount(j.policy.maxDrivesAllowed); priorityMap.incCount(j.policy.retrievePriority); minRetrieveRequestAgeMap.incCount(j.policy.retrieveMinRequestAge); + // oldestjobcreationtime is initialized to 0 when + if (m_payload.oldestjobcreationtime()) { + if ((uint64_t)j.startTime < m_payload.oldestjobcreationtime()) + m_payload.set_oldestjobcreationtime(j.startTime); + } else { + m_payload.set_oldestjobcreationtime(j.startTime); + } } // ... update the shard pointer auto shardSummary = rqs.getJobsSummary(); diff --git a/objectstore/RetrieveQueueTest.cpp b/objectstore/RetrieveQueueTest.cpp index 24b4ac1ecf97d1b056bb9dd52501701f86e5faf7..1701acda04966ef85d176e3cfba0e0c36dc1f92f 100644 --- a/objectstore/RetrieveQueueTest.cpp +++ b/objectstore/RetrieveQueueTest.cpp @@ -76,6 +76,8 @@ TEST(ObjectStore, RetrieveQueueShardingAndOrderingTest) { jta.retrieveRequestAddress = address.str(); jobsToAdd.push_back(jta); } + // By construction, first job has lowest start time. + auto minStartTime=jobsToAdd.front().startTime; std::string retrieveQueueAddress = agentRef.nextId("RetrieveQueue"); { // Try to create the retrieve queue @@ -119,6 +121,7 @@ TEST(ObjectStore, RetrieveQueueShardingAndOrderingTest) { ASSERT_NO_THROW(rq.fetch()); // Pop jobs while we can. They should come out in fseq order as there is // no interleaved push and pop. + ASSERT_EQ(minStartTime, rq.getJobsSummary().oldestJobStartTime); uint64_t nextExpectedFseq=0; while (rq.getJobsSummary().files) { auto candidateJobs = rq.getCandidateList(std::numeric_limits<uint64_t>::max(), 50, std::set<std::string>()); diff --git a/objectstore/RetrieveRequest.cpp b/objectstore/RetrieveRequest.cpp index 5cd915f6b2f43f0efecfbd668b6b5b756e1caa8c..2f86dc76f251b73b4a05d7897d9961cf6dfca391 100644 --- a/objectstore/RetrieveRequest.cpp +++ b/objectstore/RetrieveRequest.cpp @@ -339,6 +339,24 @@ bool RetrieveRequest::addJobFailure(uint16_t copyNumber, uint64_t mountId, log:: throw NoSuchJob ("In RetrieveRequest::addJobFailure(): could not find job"); } +std::string RetrieveRequest::statusToString(const serializers::RetrieveJobStatus& status) { + switch(status) { + case serializers::RetrieveJobStatus::RJS_Complete: + return "Complete"; + case serializers::RetrieveJobStatus::RJS_Failed: + return "Failed"; + case serializers::RetrieveJobStatus::RJS_LinkingToTape: + return "LinkingToTape"; + case serializers::RetrieveJobStatus::RJS_Pending: + return "Pending"; + case serializers::RetrieveJobStatus::RJS_Selected: + return "Selected"; + default: + return std::string("Unknown (")+std::to_string((uint64_t) status)+")"; + } +} + + //------------------------------------------------------------------------------ // RetrieveRequest::finishIfNecessary() //------------------------------------------------------------------------------ @@ -357,6 +375,9 @@ bool RetrieveRequest::finishIfNecessary(log::LogContext & lc) { remove(); log::ScopedParamContainer params(lc); params.add("retrieveRequestObject", getAddressIfSet()); + for (auto & j: jl) { + params.add(std::string("statusForCopyNb")+std::to_string(j.copynb()), statusToString(j.status())); + } lc.log(log::INFO, "In RetrieveRequest::finishIfNecessary(): removed finished retrieve request."); return true; } diff --git a/objectstore/RetrieveRequest.hpp b/objectstore/RetrieveRequest.hpp index 6090221e7938bbf791330e5a557e14d815e09dd1..f008170b9c923f15c0b2fbb5fe58c5edd7d3cc6d 100644 --- a/objectstore/RetrieveRequest.hpp +++ b/objectstore/RetrieveRequest.hpp @@ -66,6 +66,7 @@ public: std::list<JobDump> getJobs(); bool addJobFailure(uint16_t copyNumber, uint64_t mountId, log::LogContext & lc); /**< Returns true is the request is completely failed (telling wheather we should requeue or not). */ + std::string statusToString(const serializers::RetrieveJobStatus & status); bool finishIfNecessary(log::LogContext & lc); /**< Handling of the consequences of a job status change for the entire request. * This function returns true if the request got finished. */ serializers::RetrieveJobStatus getJobStatus(uint16_t copyNumber); diff --git a/scheduler/OStoreDB/OStoreDB.cpp b/scheduler/OStoreDB/OStoreDB.cpp index f11b8aee95a3bc5e226789d157cec065b86413a4..10d5a96483b7f151a675988ab38a98de486e2543 100644 --- a/scheduler/OStoreDB/OStoreDB.cpp +++ b/scheduler/OStoreDB/OStoreDB.cpp @@ -454,6 +454,7 @@ void OStoreDB::queueArchive(const std::string &instanceName, const cta::common:: // The request is now fully set. As it's multi-owned, we do not set the owner, // just to disown it from the agent. aReq.setOwner(""); + auto archiveFile = aReq.getArchiveFile(); aReq.commit(); double arOwnerResetTime = timer.secs(cta::utils::Timer::reset_t::resetCounter); arl.release(); @@ -462,6 +463,10 @@ void OStoreDB::queueArchive(const std::string &instanceName, const cta::common:: m_agentReference->removeFromOwnership(aReq.getAddressIfSet(), m_objectStore); log::ScopedParamContainer params(logContext); params.add("jobObject", aReq.getAddressIfSet()) + .add("fileId", archiveFile.archiveFileID) + .add("diskInstance", archiveFile.diskInstance) + .add("diskFilePath", archiveFile.diskFileInfo.path) + .add("diskFileId", archiveFile.diskFileId) .add("creationAndRelockTime", arCreationAndRelock) .add("totalQueueingTime", arTotalQueueingTime) .add("totalCommitTime", arTotalCommitTime) @@ -469,7 +474,7 @@ void OStoreDB::queueArchive(const std::string &instanceName, const cta::common:: .add("ownerResetTime", arOwnerResetTime) .add("lockReleaseTime", arLockRelease) .add("agentOwnershipResetTime", timer.secs()); - logContext.log(log::INFO, "In OStoreDB::queueArchive(): Finished enqueuing request."); + logContext.log(log::INFO, "In OStoreDB::queueArchive(): Finished enqueueing request."); } //------------------------------------------------------------------------------ @@ -623,6 +628,7 @@ std::list<SchedulerDatabase::RetrieveQueueStatistics> OStoreDB::getRetrieveQueue std::string OStoreDB::queueRetrieve(const cta::common::dataStructures::RetrieveRequest& rqst, const cta::common::dataStructures::RetrieveFileQueueCriteria& criteria, log::LogContext &logContext) { assertAgentAddressSet(); + cta::utils::Timer timer; // Get the best vid from the cache std::set<std::string> candidateVids; for (auto & tf:criteria.archiveFile.tapeFiles) candidateVids.insert(tf.second.vid); @@ -647,13 +653,6 @@ std::string OStoreDB::queueRetrieve(const cta::common::dataStructures::RetrieveR rReq.initialize(); rReq.setSchedulerRequest(rqst); rReq.setRetrieveFileQueueCriteria(criteria); - // Point to the request in the agent - m_agentReference->addToOwnership(rReq.getAddressIfSet(), m_objectStore); - // Set an arbitrary copy number so we can serialize. Garbage collection we re-choose - // the tape file number and override it in case of problem (and we will set it further). - rReq.setActiveCopyNumber(1); - rReq.insert(); - ScopedExclusiveLock rrl(rReq); // Find the job corresponding to the vid (and check we indeed have one). auto jobs = rReq.getJobs(); objectstore::RetrieveRequest::JobDump * job = nullptr; @@ -673,13 +672,28 @@ std::string OStoreDB::queueRetrieve(const cta::common::dataStructures::RetrieveR { // Add the request to the queue (with a shared access). auto sharedLock = ostoredb::MemRetrieveQueue::sharedAddToQueue(*job, bestVid, rReq, *this, logContext); + double qTime = timer.secs(cta::utils::Timer::reset_t::resetCounter); // The object ownership was set in SharedAdd. - rReq.commit(); + // We need to extract the owner before inserting. After, we would need to hold a lock. + auto owner = rReq.getOwner(); + rReq.insert(); + double iTime = timer.secs(cta::utils::Timer::reset_t::resetCounter); // The lock on the queue is released here (has to be after the request commit for consistency. + sharedLock.reset(); + double qUnlockTime = timer.secs(cta::utils::Timer::reset_t::resetCounter); + log::ScopedParamContainer params(logContext); + params.add("vid", bestVid) + .add("queueObject", owner) + .add("jobObject", rReq.getAddressIfSet()) + .add("fileId", rReq.getArchiveFile().archiveFileID) + .add("diskInstance", rReq.getArchiveFile().diskInstance) + .add("diskFilePath", rReq.getArchiveFile().diskFileInfo.path) + .add("diskFileId", rReq.getArchiveFile().diskFileId) + .add("queueingTime", qTime) + .add("insertTime", iTime) + .add("queueUnlockTime", qUnlockTime); + logContext.log(log::INFO, "In OStoreDB::queueRetrieve(): added job to queue (enqueueing fnished)."); } - rrl.release(); - // And relinquish ownership form agent - m_agentReference->removeFromOwnership(rReq.getAddressIfSet(), m_objectStore); return bestVid; } @@ -1995,7 +2009,7 @@ std::list<std::unique_ptr<SchedulerDatabase::RetrieveJob> > OStoreDB::RetrieveMo // We now have the queue. { log::ScopedParamContainer params(logContext); - params.add("vid", mountInfo.tapePool) + params.add("vid", mountInfo.vid) .add("queueObject", rq.getAddressIfSet()) .add("queueSize", rq.getJobsSummary().files); logContext.log(log::INFO, "In RetrieveMount::getNextJobBatch(): retrieve queue found."); @@ -2071,7 +2085,7 @@ std::list<std::unique_ptr<SchedulerDatabase::RetrieveJob> > OStoreDB::RetrieveMo jobsToDequeue.emplace_back((*j)->m_retrieveRequest.getAddressIfSet()); // Log the event. log::ScopedParamContainer params(logContext); - params.add("tapepool", mountInfo.tapePool) + params.add("vid", mountInfo.vid) .add("queueObject", rq.getAddressIfSet()) .add("requestObject", (*j)->m_retrieveRequest.getAddressIfSet()); logContext.log(log::WARNING, "In RetrieveMount::getNextJobBatch(): skipped job not owned or not present."); @@ -2082,7 +2096,7 @@ std::list<std::unique_ptr<SchedulerDatabase::RetrieveJob> > OStoreDB::RetrieveMo log::ScopedParamContainer params(logContext); int demangleStatus; char * exceptionTypeStr = abi::__cxa_demangle(typeid(e).name(), nullptr, nullptr, &demangleStatus); - params.add("tapepool", mountInfo.tapePool) + params.add("vid", mountInfo.vid) .add("queueObject", rq.getAddressIfSet()) .add("requestObject", (*j)->m_retrieveRequest.getAddressIfSet()); if (!demangleStatus) { @@ -2101,7 +2115,7 @@ std::list<std::unique_ptr<SchedulerDatabase::RetrieveJob> > OStoreDB::RetrieveMo log::ScopedParamContainer params(logContext); int demangleStatus; char * exceptionTypeStr = abi::__cxa_demangle(typeid(e).name(), nullptr, nullptr, &demangleStatus); - params.add("tapepool", mountInfo.tapePool) + params.add("vid", mountInfo.vid) .add("queueObject", rq.getAddressIfSet()) .add("requestObject", (*j)->m_retrieveRequest.getAddressIfSet()); if (!demangleStatus) { @@ -2149,14 +2163,14 @@ std::list<std::unique_ptr<SchedulerDatabase::RetrieveJob> > OStoreDB::RetrieveMo // The queue should be removed as it is empty. ScopedExclusiveLock rexl(re); re.fetch(); - re.removeArchiveQueueAndCommit(mountInfo.tapePool, logContext); + re.removeRetrieveQueueAndCommit(mountInfo.vid, logContext); log::ScopedParamContainer params(logContext); - params.add("tapepool", mountInfo.tapePool) + params.add("vid", mountInfo.vid) .add("queueObject", rq.getAddressIfSet()); logContext.log(log::INFO, "In RetrieveMount::getNextJobBatch(): deleted empty queue"); } catch (cta::exception::Exception &ex) { log::ScopedParamContainer params(logContext); - params.add("tapepool", mountInfo.tapePool) + params.add("vid", mountInfo.vid) .add("queueObject", rq.getAddressIfSet()) .add("Message", ex.getMessageValue()); logContext.log(log::INFO, "In RetrieveMount::getNextJobBatch(): could not delete a presumably empty queue");