diff --git a/common/dataStructures/RepackInfo.cpp b/common/dataStructures/RepackInfo.cpp index 2f8700cc1ac877fe96f52e2d2deba4b3abbd8064..3aea42563f823ff09becac2d543e2b03ec6511c0 100644 --- a/common/dataStructures/RepackInfo.cpp +++ b/common/dataStructures/RepackInfo.cpp @@ -17,6 +17,7 @@ */ #include "common/dataStructures/RepackInfo.hpp" +#include "common/exception/Exception.hpp" namespace cta { namespace common { @@ -54,6 +55,22 @@ std::string toString(RepackInfo::Status status) { } } +cta::objectstore::RepackQueueType RepackInfo::getQueueType(){ + switch(status){ + case RepackInfo::Status::Pending: + return cta::objectstore::RepackQueueType::Pending; + case RepackInfo::Status::ToExpand: + return cta::objectstore::RepackQueueType::ToExpand; + case RepackInfo::Status::Running: + case RepackInfo::Status::Starting: + if(!isExpandFinished){ + return cta::objectstore::RepackQueueType::ToExpand; + } + default: + throw cta::exception::Exception("The status "+toString(status)+" have no corresponding queue."); + } +} + } // namespace dataStructures } // namespace common } // namespace cta diff --git a/common/dataStructures/RepackInfo.hpp b/common/dataStructures/RepackInfo.hpp index 1d188ca172352bc50d68cea9dfbfa1d6826bddbb..e94e87246db7c47cecc94bb74aaddb237567561b 100644 --- a/common/dataStructures/RepackInfo.hpp +++ b/common/dataStructures/RepackInfo.hpp @@ -19,6 +19,7 @@ #pragma once #include <string> +#include "objectstore/RepackQueueType.hpp" namespace cta { namespace common { @@ -57,6 +58,7 @@ struct RepackInfo { uint64_t failedBytesToRetrieve; uint64_t lastExpandedFseq; uint64_t userProvidedFiles; + bool isExpandFinished; // std::string tag; // uint64_t totalFiles; // uint64_t totalSize; @@ -68,7 +70,7 @@ struct RepackInfo { // std::string repackStatus; // std::map<uint64_t,std::string> errors; // EntryLog creationLog; - +cta::objectstore::RepackQueueType getQueueType(); }; // struct RepackInfo std::string toString(RepackInfo::Type type); diff --git a/objectstore/GarbageCollectorTest.cpp b/objectstore/GarbageCollectorTest.cpp index 82b2348ac59ca17bacaeae22aaa5531189d20d48..0f786e45579ee25c495ac7fcee596fa5480158d6 100644 --- a/objectstore/GarbageCollectorTest.cpp +++ b/objectstore/GarbageCollectorTest.cpp @@ -22,6 +22,8 @@ #include "common/exception/Exception.hpp" #include "common/dataStructures/ArchiveFile.hpp" #include "common/log/DummyLogger.hpp" +#include "common/log/StdoutLogger.hpp" +#include "common/log/StringLogger.hpp" #include "tests/TestsCompileTimeSwitches.hpp" #ifdef STDOUT_LOGGING #include "common/log/StdoutLogger.hpp" @@ -672,4 +674,418 @@ TEST(ObjectStore, GarbageCollectorRetrieveRequest) { // TODO: this unit test still leaks tape pools and requests } +TEST(ObjectStore, GarbageCollectorRepackRequestPending) { +// We will need a log object +#ifdef STDOUT_LOGGING + cta::log::StdoutLogger dl("dummy", "unitTest"); +#else + cta::log::DummyLogger dl("dummy", "unitTest"); +#endif + cta::log::LogContext lc(dl); + // We need a dummy catalogue + cta::catalogue::DummyCatalogue catalogue; + // Here we check that can successfully call RetrieveRequests's garbage collector + cta::objectstore::BackendVFS be; + // Create the root entry + cta::objectstore::RootEntry re(be); + re.initialize(); + re.insert(); + // Create the agent register + cta::objectstore::EntryLogSerDeser el("user0", + "unittesthost", time(NULL)); + cta::objectstore::ScopedExclusiveLock rel(re); + // Create the agent for objects creation + cta::objectstore::AgentReference agentRef("unitTestCreateEnv", dl); + // Finish root creation. + re.addOrGetAgentRegisterPointerAndCommit(agentRef, el, lc); + rel.release(); + // continue agent creation. + cta::objectstore::Agent agent(agentRef.getAgentAddress(), be); + agent.initialize(); + agent.setTimeout_us(0); + agent.insertAndRegisterSelf(lc); + { + // Create an agent to be garbage collected + cta::objectstore::AgentReference agentReferenceRepackRequest("AgentReferenceRepackRequest", dl); + cta::objectstore::Agent agentRepackRequest(agentReferenceRepackRequest.getAgentAddress(), be); + agentRepackRequest.initialize(); + agentRepackRequest.setTimeout_us(0); + agentRepackRequest.insertAndRegisterSelf(lc); + //Create a RepackQueue and insert a RepackRequest with status "Pending" in it + cta::objectstore::RootEntry re(be); + cta::objectstore::ScopedExclusiveLock rel(re); + re.fetch(); + + //Create the RepackRequest + std::string repackRequestAddr = agentReferenceRepackRequest.nextId("RepackRequest"); + agentReferenceRepackRequest.addToOwnership(repackRequestAddr, be); + cta::objectstore::RepackRequest repackRequest(repackRequestAddr,be); + repackRequest.initialize(); + repackRequest.setStatus(cta::common::dataStructures::RepackInfo::Status::Pending); + repackRequest.setVid("VIDTest"); + repackRequest.setBufferURL("test/buffer/url"); + repackRequest.setOwner(agentReferenceRepackRequest.getAgentAddress()); + repackRequest.insert(); + } + { + //Now we garbage collect the RepackRequest + + // Create the garbage collector and run it once. + cta::objectstore::AgentReference gcAgentRef("unitTestGarbageCollector", dl); + cta::objectstore::Agent gcAgent(gcAgentRef.getAgentAddress(), be); + gcAgent.initialize(); + gcAgent.setTimeout_us(0); + gcAgent.insertAndRegisterSelf(lc); + { + cta::objectstore::GarbageCollector gc(be, gcAgentRef, catalogue); + gc.runOnePass(lc); + } + } + //The repack request should have been requeued in the RepackQueuePending + { + cta::objectstore::RootEntry re(be); + cta::objectstore::ScopedExclusiveLock rel(re); + re.fetch(); + std::string repackQueueAddr = re.addOrGetRepackQueueAndCommit(agentRef,cta::objectstore::RepackQueueType::Pending); + cta::objectstore::RepackQueue rq(repackQueueAddr,be); + cta::objectstore::ScopedExclusiveLock rql(rq); + rq.fetch(); + ASSERT_EQ(1,rq.getRequestsSummary().requests); + } +} + +TEST(ObjectStore, GarbageCollectorRepackRequestToExpand) { +// We will need a log object +#ifdef STDOUT_LOGGING + cta::log::StdoutLogger dl("dummy", "unitTest"); +#else + cta::log::DummyLogger dl("dummy", "unitTest"); +#endif + cta::log::LogContext lc(dl); + // We need a dummy catalogue + cta::catalogue::DummyCatalogue catalogue; + // Here we check that can successfully call RetrieveRequests's garbage collector + cta::objectstore::BackendVFS be; + // Create the root entry + cta::objectstore::RootEntry re(be); + re.initialize(); + re.insert(); + // Create the agent register + cta::objectstore::EntryLogSerDeser el("user0", + "unittesthost", time(NULL)); + cta::objectstore::ScopedExclusiveLock rel(re); + // Create the agent for objects creation + cta::objectstore::AgentReference agentRef("unitTestCreateEnv", dl); + // Finish root creation. + re.addOrGetAgentRegisterPointerAndCommit(agentRef, el, lc); + rel.release(); + // continue agent creation. + cta::objectstore::Agent agent(agentRef.getAgentAddress(), be); + agent.initialize(); + agent.setTimeout_us(0); + agent.insertAndRegisterSelf(lc); + { + // Create an agent to be garbage collected + cta::objectstore::AgentReference agentReferenceRepackRequest("AgentReferenceRepackRequest", dl); + cta::objectstore::Agent agentRepackRequest(agentReferenceRepackRequest.getAgentAddress(), be); + agentRepackRequest.initialize(); + agentRepackRequest.setTimeout_us(0); + agentRepackRequest.insertAndRegisterSelf(lc); + //Create a RepackQueue and insert a RepackRequest with status "ToExpand" in it + cta::objectstore::RootEntry re(be); + cta::objectstore::ScopedExclusiveLock rel(re); + re.fetch(); + + //Create the RepackRequest + std::string repackRequestAddr = agentReferenceRepackRequest.nextId("RepackRequest"); + agentReferenceRepackRequest.addToOwnership(repackRequestAddr, be); + cta::objectstore::RepackRequest repackRequest(repackRequestAddr,be); + repackRequest.initialize(); + repackRequest.setStatus(cta::common::dataStructures::RepackInfo::Status::ToExpand); + repackRequest.setVid("VID2Test"); + repackRequest.setBufferURL("test/buffer/url"); + repackRequest.setOwner(agentReferenceRepackRequest.getAgentAddress()); + repackRequest.insert(); + } + { + // Now we garbage collect the RepackRequest + + // Create the garbage collector and run it once. + cta::objectstore::AgentReference gcAgentRef("unitTestGarbageCollector", dl); + cta::objectstore::Agent gcAgent(gcAgentRef.getAgentAddress(), be); + gcAgent.initialize(); + gcAgent.setTimeout_us(0); + gcAgent.insertAndRegisterSelf(lc); + { + cta::objectstore::GarbageCollector gc(be, gcAgentRef, catalogue); + gc.runOnePass(lc); + } + } + { + cta::objectstore::RootEntry re(be); + cta::objectstore::ScopedExclusiveLock rel(re); + re.fetch(); + std::string repackQueueAddr = re.addOrGetRepackQueueAndCommit(agentRef,cta::objectstore::RepackQueueType::ToExpand); + cta::objectstore::RepackQueue rq(repackQueueAddr,be); + cta::objectstore::ScopedExclusiveLock rql(rq); + rq.fetch(); + ASSERT_EQ(1,rq.getRequestsSummary().requests); + } +} + +TEST(ObjectStore, GarbageCollectorRepackRequestRunningExpandNotFinished) { + // We will need a log object +#ifdef STDOUT_LOGGING + cta::log::StdoutLogger dl("dummy", "unitTest"); +#else + cta::log::DummyLogger dl("dummy", "unitTest"); +#endif + cta::log::LogContext lc(dl); + // We need a dummy catalogue + cta::catalogue::DummyCatalogue catalogue; + // Here we check that can successfully call RetrieveRequests's garbage collector + cta::objectstore::BackendVFS be; + // Create the root entry + cta::objectstore::RootEntry re(be); + re.initialize(); + re.insert(); + // Create the agent register + cta::objectstore::EntryLogSerDeser el("user0", + "unittesthost", time(NULL)); + cta::objectstore::ScopedExclusiveLock rel(re); + // Create the agent for objects creation + cta::objectstore::AgentReference agentRef("unitTestCreateEnv", dl); + // Finish root creation. + re.addOrGetAgentRegisterPointerAndCommit(agentRef, el, lc); + rel.release(); + // continue agent creation. + cta::objectstore::Agent agent(agentRef.getAgentAddress(), be); + agent.initialize(); + agent.setTimeout_us(0); + agent.insertAndRegisterSelf(lc); + { + // Create an agent to be garbage collected + cta::objectstore::AgentReference agentReferenceRepackRequest("AgentReferenceRepackRequest", dl); + cta::objectstore::Agent agentRepackRequest(agentReferenceRepackRequest.getAgentAddress(), be); + agentRepackRequest.initialize(); + agentRepackRequest.setTimeout_us(0); + agentRepackRequest.insertAndRegisterSelf(lc); + //Create a RepackQueue and insert a RepackRequest with status "ToExpand" in it + cta::objectstore::RootEntry re(be); + cta::objectstore::ScopedExclusiveLock rel(re); + re.fetch(); + + //Create the RepackRequest + std::string repackRequestAddr = agentReferenceRepackRequest.nextId("RepackRequest"); + agentReferenceRepackRequest.addToOwnership(repackRequestAddr, be); + cta::objectstore::RepackRequest repackRequest(repackRequestAddr,be); + repackRequest.initialize(); + repackRequest.setStatus(cta::common::dataStructures::RepackInfo::Status::Running); + repackRequest.setVid("VIDTest"); + repackRequest.setBufferURL("test/buffer/url"); + repackRequest.setOwner(agentReferenceRepackRequest.getAgentAddress()); + repackRequest.setExpandFinished(false); + repackRequest.insert(); + } + { + // Now we garbage collect the RepackRequest + + // Create the garbage collector and run it once. + cta::objectstore::AgentReference gcAgentRef("unitTestGarbageCollector", dl); + cta::objectstore::Agent gcAgent(gcAgentRef.getAgentAddress(), be); + gcAgent.initialize(); + gcAgent.setTimeout_us(0); + gcAgent.insertAndRegisterSelf(lc); + { + cta::objectstore::GarbageCollector gc(be, gcAgentRef, catalogue); + gc.runOnePass(lc); + } + } + { + //The request should be requeued in the ToExpand as it has not finished to expand + cta::objectstore::RootEntry re(be); + cta::objectstore::ScopedExclusiveLock rel(re); + re.fetch(); + std::string repackQueueAddr = re.addOrGetRepackQueueAndCommit(agentRef,cta::objectstore::RepackQueueType::ToExpand); + cta::objectstore::RepackQueue rq(repackQueueAddr,be); + cta::objectstore::ScopedExclusiveLock rql(rq); + rq.fetch(); + ASSERT_EQ(1,rq.getRequestsSummary().requests); + } +} + +TEST(ObjectStore, GarbageCollectorRepackRequestRunningExpandFinished) { + // We will need a log object +#ifdef STDOUT_LOGGING + cta::log::StdoutLogger dl("dummy", "unitTest"); +#else + cta::log::DummyLogger dl("dummy", "unitTest"); +#endif + cta::log::LogContext lc(dl); + // We need a dummy catalogue + cta::catalogue::DummyCatalogue catalogue; + // Here we check that can successfully call RetrieveRequests's garbage collector + cta::objectstore::BackendVFS be; + // Create the root entry + cta::objectstore::RootEntry re(be); + re.initialize(); + re.insert(); + // Create the agent register + cta::objectstore::EntryLogSerDeser el("user0", + "unittesthost", time(NULL)); + cta::objectstore::ScopedExclusiveLock rel(re); + // Create the agent for objects creation + cta::objectstore::AgentReference agentRef("unitTestCreateEnv", dl); + // Finish root creation. + re.addOrGetAgentRegisterPointerAndCommit(agentRef, el, lc); + rel.release(); + // continue agent creation. + cta::objectstore::Agent agent(agentRef.getAgentAddress(), be); + agent.initialize(); + agent.setTimeout_us(0); + agent.insertAndRegisterSelf(lc); + { + // Create an agent to be garbage collected + cta::objectstore::AgentReference agentReferenceRepackRequest("AgentReferenceRepackRequest", dl); + cta::objectstore::Agent agentRepackRequest(agentReferenceRepackRequest.getAgentAddress(), be); + agentRepackRequest.initialize(); + agentRepackRequest.setTimeout_us(0); + agentRepackRequest.insertAndRegisterSelf(lc); + //Create a RepackQueue and insert a RepackRequest with status "ToExpand" in it + cta::objectstore::RootEntry re(be); + cta::objectstore::ScopedExclusiveLock rel(re); + re.fetch(); + + //Create the RepackRequest + std::string repackRequestAddr = agentReferenceRepackRequest.nextId("RepackRequest"); + agentReferenceRepackRequest.addToOwnership(repackRequestAddr, be); + cta::objectstore::RepackRequest repackRequest(repackRequestAddr,be); + repackRequest.initialize(); + repackRequest.setStatus(cta::common::dataStructures::RepackInfo::Status::Running); + repackRequest.setVid("VIDTest"); + repackRequest.setBufferURL("test/buffer/url"); + repackRequest.setOwner(agentReferenceRepackRequest.getAgentAddress()); + repackRequest.setExpandFinished(true); + repackRequest.insert(); + } + cta::log::StringLogger strLogger("dummy", "dummy", cta::log::DEBUG); + cta::log::LogContext lc2(strLogger); + { + // Now we garbage collect the RepackRequest + + // Create the garbage collector and run it once. + cta::objectstore::AgentReference gcAgentRef("unitTestGarbageCollector", strLogger); + cta::objectstore::Agent gcAgent(gcAgentRef.getAgentAddress(), be); + gcAgent.initialize(); + gcAgent.setTimeout_us(0); + gcAgent.insertAndRegisterSelf(lc2); + { + cta::objectstore::GarbageCollector gc(be, gcAgentRef, catalogue); + gc.runOnePass(lc2); + } + } + { + //The request should not be requeued in the ToExpand as it has finished to expand + cta::objectstore::RootEntry re(be); + cta::objectstore::ScopedExclusiveLock rel(re); + re.fetch(); + std::string repackQueueAddr = re.addOrGetRepackQueueAndCommit(agentRef,cta::objectstore::RepackQueueType::ToExpand); + cta::objectstore::RepackQueue rq(repackQueueAddr,be); + cta::objectstore::ScopedExclusiveLock rql(rq); + rq.fetch(); + ASSERT_EQ(0,rq.getRequestsSummary().requests); + } + { + //The request should not be requeued in the ToExpand as it has not finished to expand + cta::objectstore::RootEntry re(be); + cta::objectstore::ScopedExclusiveLock rel(re); + re.fetch(); + std::string repackQueueAddr = re.addOrGetRepackQueueAndCommit(agentRef,cta::objectstore::RepackQueueType::Pending); + cta::objectstore::RepackQueue rq(repackQueueAddr,be); + cta::objectstore::ScopedExclusiveLock rql(rq); + rq.fetch(); + ASSERT_EQ(0,rq.getRequestsSummary().requests); + } + //Check the logs contains the failed to requeue message + std::string logToCheck = strLogger.getLog(); + logToCheck += ""; + ASSERT_NE(std::string::npos,logToCheck.find("MSG=\"In RepackRequest::garbageCollect(): failed to requeue the RepackRequest (leaving it as it is) : The status Running have no corresponding queue.\"")); +} + +TEST(ObjectStore, GarbageCollectorRepackRequestStarting) { +// We will need a log object +#ifdef STDOUT_LOGGING + cta::log::StdoutLogger dl("dummy", "unitTest"); +#else + cta::log::DummyLogger dl("dummy", "unitTest"); +#endif + cta::log::LogContext lc(dl); + // We need a dummy catalogue + cta::catalogue::DummyCatalogue catalogue; + // Here we check that can successfully call RetrieveRequests's garbage collector + cta::objectstore::BackendVFS be; + // Create the root entry + cta::objectstore::RootEntry re(be); + re.initialize(); + re.insert(); + // Create the agent register + cta::objectstore::EntryLogSerDeser el("user0", + "unittesthost", time(NULL)); + cta::objectstore::ScopedExclusiveLock rel(re); + // Create the agent for objects creation + cta::objectstore::AgentReference agentRef("unitTestCreateEnv", dl); + // Finish root creation. + re.addOrGetAgentRegisterPointerAndCommit(agentRef, el, lc); + rel.release(); + // continue agent creation. + cta::objectstore::Agent agent(agentRef.getAgentAddress(), be); + agent.initialize(); + agent.setTimeout_us(0); + agent.insertAndRegisterSelf(lc); + { + // Create an agent to be garbage collected + cta::objectstore::AgentReference agentReferenceRepackRequest("AgentReferenceRepackRequest", dl); + cta::objectstore::Agent agentRepackRequest(agentReferenceRepackRequest.getAgentAddress(), be); + agentRepackRequest.initialize(); + agentRepackRequest.setTimeout_us(0); + agentRepackRequest.insertAndRegisterSelf(lc); + //Create a RepackQueue and insert a RepackRequest with status "ToExpand" in it + cta::objectstore::RootEntry re(be); + cta::objectstore::ScopedExclusiveLock rel(re); + re.fetch(); + + //Create the RepackRequest + std::string repackRequestAddr = agentReferenceRepackRequest.nextId("RepackRequest"); + agentReferenceRepackRequest.addToOwnership(repackRequestAddr, be); + cta::objectstore::RepackRequest repackRequest(repackRequestAddr,be); + repackRequest.initialize(); + repackRequest.setStatus(cta::common::dataStructures::RepackInfo::Status::Starting); + repackRequest.setVid("VIDTest"); + repackRequest.setBufferURL("test/buffer/url"); + repackRequest.setOwner(agentReferenceRepackRequest.getAgentAddress()); + repackRequest.setExpandFinished(true); + repackRequest.insert(); + } + cta::log::StringLogger strLogger("dummy", "dummy", cta::log::DEBUG); + cta::log::LogContext lc2(strLogger); + { + // Now we garbage collect the RepackRequest + + // Create the garbage collector and run it once. + cta::objectstore::AgentReference gcAgentRef("unitTestGarbageCollector", strLogger); + cta::objectstore::Agent gcAgent(gcAgentRef.getAgentAddress(), be); + gcAgent.initialize(); + gcAgent.setTimeout_us(0); + gcAgent.insertAndRegisterSelf(lc2); + { + cta::objectstore::GarbageCollector gc(be, gcAgentRef, catalogue); + gc.runOnePass(lc2); + } + } + //Check the logs contains the failed to requeue message + std::string logToCheck = strLogger.getLog(); + logToCheck += ""; + ASSERT_NE(std::string::npos,logToCheck.find("MSG=\"In RepackRequest::garbageCollect(): failed to requeue the RepackRequest (leaving it as it is) : The status Starting have no corresponding queue.\"")); +} + } diff --git a/objectstore/GenericObject.cpp b/objectstore/GenericObject.cpp index f02609264773e13390ac34642e5853716255c530..b0cd9a98d0f5ec00e20a931746cd785785417e5e 100644 --- a/objectstore/GenericObject.cpp +++ b/objectstore/GenericObject.cpp @@ -140,6 +140,9 @@ void GenericObject::garbageCollectDispatcher(ScopedExclusiveLock& lock, case serializers::RetrieveQueue_t: garbageCollectWithType<RetrieveQueue>(this, lock, presumedOwner, agentReference, lc, catalogue); break; + case serializers::RepackRequest_t: + garbageCollectWithType<RepackRequest>(this, lock, presumedOwner, agentReference, lc, catalogue); + break; default: { std::stringstream err; err << "In GenericObject::garbageCollect, unsupported type: " diff --git a/objectstore/RepackRequest.cpp b/objectstore/RepackRequest.cpp index d20143bf945a9b332db6a42eacea280726d040d4..bfbe311b1b17803aa25d9dc883b5c010234d5d0a 100644 --- a/objectstore/RepackRequest.cpp +++ b/objectstore/RepackRequest.cpp @@ -19,7 +19,10 @@ #include "RepackRequest.hpp" #include "GenericObject.hpp" #include "AgentReference.hpp" +#include "RepackQueueAlgorithms.hpp" +#include "Algorithms.hpp" #include <google/protobuf/util/json_util.h> +#include <iostream> namespace cta { namespace objectstore { @@ -71,6 +74,7 @@ void RepackRequest::initialize() { m_payload.set_failedtoarchivefiles(0); m_payload.set_failedtoarchivebytes(0); m_payload.set_lastexpandedfseq(0); + m_payload.set_is_expand_finished(false); // This object is good to go (to storage) m_payloadInterpreted = true; } @@ -135,6 +139,7 @@ common::dataStructures::RepackInfo RepackRequest::getInfo() { ret.failedBytesToRetrieve = m_payload.failedtoretrievebytes(); ret.lastExpandedFseq = m_payload.lastexpandedfseq(); ret.userProvidedFiles = m_payload.userprovidedfiles(); + ret.isExpandFinished = m_payload.is_expand_finished(); if (m_payload.move_mode()) { if (m_payload.add_copies_mode()) { ret.type = RepackInfo::Type::MoveAndAddCopies; @@ -149,6 +154,22 @@ common::dataStructures::RepackInfo RepackRequest::getInfo() { return ret; } +//------------------------------------------------------------------------------ +// RepackRequest::setExpandFinished() +//------------------------------------------------------------------------------ +void RepackRequest::setExpandFinished(const bool expandFinished){ + checkPayloadWritable(); + m_payload.set_is_expand_finished(expandFinished); +} + +//------------------------------------------------------------------------------ +// RepackRequest::getExpandFinished() +//------------------------------------------------------------------------------ +bool RepackRequest::isExpandFinished(){ + checkPayloadReadable(); + return m_payload.is_expand_finished(); +} + //------------------------------------------------------------------------------ // RepackRequest::setBufferURL() //------------------------------------------------------------------------------ @@ -441,7 +462,35 @@ auto RepackRequest::getStats() -> std::map<StatsType, StatsValues> { //------------------------------------------------------------------------------ void RepackRequest::garbageCollect(const std::string& presumedOwner, AgentReference& agentReference, log::LogContext& lc, cta::catalogue::Catalogue& catalogue) { - throw exception::Exception("In RepackRequest::garbageCollect(): not implemented."); + //Let's requeue the RepackRequest if its status is ToExpand or Pending + agentReference.addToOwnership(this->getAddressIfSet(), m_objectStore); + cta::utils::Timer t; + RepackQueue rq(m_objectStore); + ScopedExclusiveLock rql; + try{ + Helpers::getLockedAndFetchedRepackQueue(rq, rql, agentReference, this->getInfo().getQueueType(), lc); + } catch(const cta::exception::Exception &e){ + lc.log(log::INFO,"In RepackRequest::garbageCollect(): failed to requeue the RepackRequest (leaving it as it is) : "+e.getMessage().str()); + return; + } + double queueLockFetchTime = t.secs(utils::Timer::resetCounter); + auto jobsSummary = rq.getRequestsSummary(); + uint64_t requestsBefore = jobsSummary.requests; + std::list<std::string> requestsToAdd; + requestsToAdd.push_back(this->getAddressIfSet()); + try{ + rq.addRequestsAndCommit(requestsToAdd,lc); + jobsSummary = rq.getRequestsSummary(); + uint64_t requestsAfter = jobsSummary.requests; + log::ScopedParamContainer params(lc); + params.add("queueLockFetchTime",queueLockFetchTime) + .add("queueAddress",rq.getAddressIfSet()) + .add("requestsBefore",requestsBefore) + .add("requestsAfter",requestsAfter); + lc.log(log::INFO,"In RepackRequest::garbageCollect() succesfully requeued the RepackRequest."); + } catch(const cta::exception::Exception &e){ + lc.log(log::INFO,"In RepackRequest::garbageCollect() failed to requeue the RepackRequest. Leaving it as it is."); + } } //------------------------------------------------------------------------------ diff --git a/objectstore/RepackRequest.hpp b/objectstore/RepackRequest.hpp index 6fac2ea6df0a21edbf0db16802f94336cee871ed..bc8de1d2b69e6f6fb56da5a0fc3443a0c127b51b 100644 --- a/objectstore/RepackRequest.hpp +++ b/objectstore/RepackRequest.hpp @@ -42,7 +42,8 @@ public: void setStatus(common::dataStructures::RepackInfo::Status repackStatus); common::dataStructures::RepackInfo getInfo(); void setBufferURL(const std::string & bufferURL); - + void setExpandFinished(const bool expandFinished); + bool isExpandFinished(); // Sub request management struct SubrequestInfo { std::string address; diff --git a/objectstore/cta.proto b/objectstore/cta.proto index e08ac8568a22aed300d16c880739bf23ad1d5490..fa14344739c77ce52e1b445369b9f735877345bc 100644 --- a/objectstore/cta.proto +++ b/objectstore/cta.proto @@ -541,6 +541,10 @@ message RepackRequest { required uint64 failedtoarchivefiles = 11540; required uint64 failedtoarchivebytes = 11550; required uint64 lastexpandedfseq = 11560; + //A RepackRequest can be in status Running even if the expansion + //is not finished. This boolean is used to indicate wether + //the expansion of the RepackRequest is done or not + required bool is_expand_finished = 11561; repeated RepackSubRequestPointer subrequests = 11570; } diff --git a/scheduler/OStoreDB/OStoreDB.cpp b/scheduler/OStoreDB/OStoreDB.cpp index fd445943c84a690c6b4c90b2a5de1609fa6dd890..f61b74c340c16b191988dd6d348ee6e33fb8d1d2 100644 --- a/scheduler/OStoreDB/OStoreDB.cpp +++ b/scheduler/OStoreDB/OStoreDB.cpp @@ -2305,6 +2305,9 @@ void OStoreDB::RepackRequest::expandDone() { break; } } + if(stats.at(StatsType::RetrieveTotal).files == m_repackRequest.getInfo().totalFilesToRetrieve){ + m_repackRequest.setExpandFinished(true); + } typedef common::dataStructures::RepackInfo::Status Status; m_repackRequest.setStatus(running? Status::Running: Status::Starting); m_repackRequest.commit();