diff --git a/objectstore/RepackQueue.cpp b/objectstore/RepackQueue.cpp index c1f1b133c7d2e8f5dd1b37da505235d9f958e032..dbf336ba9b98bcf0793f435458f7f420e8c00af3 100644 --- a/objectstore/RepackQueue.cpp +++ b/objectstore/RepackQueue.cpp @@ -146,7 +146,7 @@ auto RepackQueue::getCandidateList(uint64_t maxRequests, std::set<std::string> r //------------------------------------------------------------------------------ bool RepackQueue::isEmpty() { checkPayloadReadable(); - return m_payload.repackrequestpointers().size(); + return !m_payload.repackrequestpointers().size(); } //------------------------------------------------------------------------------ diff --git a/objectstore/RepackQueueAlgorithms.hpp b/objectstore/RepackQueueAlgorithms.hpp index 70aa09ad425d8f64ad2441a24b4c61c6bd17105d..e9e16700815c223529aa2f927f2d8a08a1b99e81 100644 --- a/objectstore/RepackQueueAlgorithms.hpp +++ b/objectstore/RepackQueueAlgorithms.hpp @@ -217,7 +217,10 @@ trimContainerIfNeeded(Container& cont, ScopedExclusiveLock & contLock, const ContainerIdentifier & cId, log::LogContext& lc) { // Repack queues are one per status, so we do not need to trim them. - return false; + if(!cont.isEmpty()){ + return false; + } + return true; } template<typename C> @@ -355,8 +358,29 @@ auto ContainerTraits<RepackQueue,C>::switchElementsOwnership(PoppedElementsBatch const ContainerAddress& previousOwnerAddress, log::TimingList& timingList, utils::Timer &t, log::LogContext& lc) -> typename OpFailure<PoppedElement>::list { + std::list<std::unique_ptr<RepackRequest::AsyncOwnerAndStatusUpdater>> updaters; + for(auto &e : poppedElementBatch.elements){ + RepackRequest & repackRequest = *e.repackRequest; + updaters.emplace_back(repackRequest.asyncUpdateOwnerAndStatus(contAddress,previousOwnerAddress,cta::nullopt)); + } + timingList.insertAndReset("asyncUpdateLaunchTime", t); + typename OpFailure<PoppedElement>::list ret; - //TODO : Implement this method + + for(auto el = std::make_pair(updaters.begin(), poppedElementBatch.elements.begin()); + el.first != updaters.end(); ++el.first, ++el.second) + { + auto & updater = *(el.first); + auto & element = *(el.second); + try{ + updater.get()->wait(); + element.repackInfo = updater.get()->getInfo(); + } catch(...) + { + ret.push_back(OpFailure<PoppedElement>(&element, std::current_exception())); + } + } + timingList.insertAndReset("asyncUpdateCompletionTime", t); return ret; } diff --git a/objectstore/RootEntry.hpp b/objectstore/RootEntry.hpp index 58f43b6683b96c11ec4dd14fd5fd0b3f21227140..de1f6f0fc1251dbfa4c1d642009abc7f7d74a7d9 100644 --- a/objectstore/RootEntry.hpp +++ b/objectstore/RootEntry.hpp @@ -151,7 +151,6 @@ private: public: // Dump the root entry std::string dump (); -}; +}; }} - diff --git a/scheduler/CMakeLists.txt b/scheduler/CMakeLists.txt index 036429339604bb54efff9cb46285cff405614e86..6f63371d265dd58db373cf9b0db3ae82884a3c30 100644 --- a/scheduler/CMakeLists.txt +++ b/scheduler/CMakeLists.txt @@ -6,6 +6,7 @@ include_directories (${CMAKE_CURRENT_SOURCE_DIR} set (CTA_SCHEDULER_SRC_FILES ArchiveJob.cpp ArchiveMount.cpp + RepackRequest.cpp TapeMount.cpp LogicalLibrary.cpp MountType.cpp diff --git a/scheduler/OStoreDB/OStoreDB.cpp b/scheduler/OStoreDB/OStoreDB.cpp index 5fb2150fc25248823da65b54e8181ac73c59f939..2d1f2c6228279eb8232e5c75ed49a7df8577f990 100644 --- a/scheduler/OStoreDB/OStoreDB.cpp +++ b/scheduler/OStoreDB/OStoreDB.cpp @@ -1267,10 +1267,8 @@ auto OStoreDB::getRepackStatisticsNoLock() -> std::unique_ptr<SchedulerDatabase: // OStoreDB::getNextRequestToExpand() //------------------------------------------------------------------------------ std::unique_ptr<SchedulerDatabase::RepackRequest> OStoreDB::getNextRequestToExpand() { - typedef objectstore::ContainerAlgorithms<RepackQueue,RepackQueuePending> RQTEAlgo; - RQTEAlgo rqteAlgo(m_objectStore, *m_agentReference); - RootEntry re(m_objectStore); - re.fetchNoLock(); + typedef objectstore::ContainerAlgorithms<RepackQueue,RepackQueueToExpand> RQTEAlgo; + RQTEAlgo rqteAlgo(m_objectStore, *m_agentReference); log::LogContext lc(m_logger); while(true){ RQTEAlgo::PopCriteria criteria; @@ -1278,24 +1276,13 @@ std::unique_ptr<SchedulerDatabase::RepackRequest> OStoreDB::getNextRequestToExpa if(jobs.elements.empty()){ continue; } + auto repackRequest = jobs.elements.front().repackRequest.get(); + auto repackInfo = jobs.elements.front().repackInfo; std::unique_ptr<OStoreDB::RepackRequest> ret; - for(auto &elt : jobs.elements){ - ret.reset(new OStoreDB::RepackRequest(elt.repackRequest->getAddressIfSet(),*this)); - std::cout<<elt.repackInfo.vid<<std::endl; - ret->m_repackRequest.initialize(); - ret->m_repackRequest.getInfo().vid; - } - auto elt = jobs.elements.front().repackRequest.get(); - - ret.reset(new OStoreDB::RepackRequest(elt->getAddressIfSet(),*this)); - ret->repackInfo.vid = elt->getInfo().vid; - std::cout<<jobs.elements.front().repackRequest->dump()<<std::endl; - /*for(auto elt : jobs.elements){ - ret.reset(new OStoreDB::RepackRequest(elt.getAddressIfSet(),*this)); - ret->repackInfo.vid = elt.repackRequest->getInfo().vid; - ret->repackInfo.status = elt.repackRequest->getInfo().status; - ret->repackInfo.type = elt.repackRequest->getInfo().status; - }*/ + ret.reset(new OStoreDB::RepackRequest(repackRequest->getAddressIfSet(),*this)); + ret->repackInfo.vid = repackInfo.vid; + ret->repackInfo.type = repackInfo.type; + ret->repackInfo.status = repackInfo.status; return std::move(ret); } } diff --git a/scheduler/OStoreDB/OStoreDBTest.cpp b/scheduler/OStoreDB/OStoreDBTest.cpp index bd29496026064e1b3f1c1ea099c602b817a83032..630e4065db85f7bc53ab62af17df901b33412df3 100644 --- a/scheduler/OStoreDB/OStoreDBTest.cpp +++ b/scheduler/OStoreDB/OStoreDBTest.cpp @@ -227,37 +227,6 @@ TEST_P(OStoreDBTest, MemQueuesSharedAddToArchiveQueue) { ASSERT_EQ(filesToDo, osdbi.getArchiveJobs("tapepool").size()); } -TEST_P(OStoreDBTest, getNextRequestToExpand) { - using namespace cta::objectstore; - cta::log::StringLogger logger("dummy", "OStoreAbstractTest", cta::log::DEBUG); - cta::log::LogContext lc(logger); - // Get the OStoreBDinterface - OStoreDBWrapperInterface & osdbi = getDb(); - osdbi.queueRepack("testvid","bufferUrl",cta::common::dataStructures::RepackInfo::Type::ExpandOnly,lc); - osdbi.waitSubthreadsComplete(); - - auto expandRequest = osdbi.getNextRequestToExpand(); - - /*RootEntry re(osdbi.getBackend()); - ScopedSharedLock rel(re); - re.fetch(); - std::string rqAddr = re.getRepackQueueAddress(RepackQueueType::ToExpand); - rel.release(); - RepackQueue rq(rqAddr, osdbi.getBackend()); - ScopedSharedLock aql(rq); - rq.fetch(); - aql.release();*/ - - //std::cout<<expandRequest.get()->repackInfo.vid<<std::endl; - - /*RepackQueue rq(rqa, osdbi.getBackend()); - ScopedSharedLock aql(rq); - rq.fetch();*/ - /*auto expandRequest = osdbi.getNextRequestToExpand(); - std::cout<<expandRequest.get()->repackInfo.vid<<std::endl; - ASSERT_EQ(expandRequest.get()->repackInfo.vid,"testvid");*/ -} - static cta::objectstore::BackendVFS osVFS(__LINE__, __FILE__); #ifdef TEST_RADOS static cta::OStoreDBFactory<cta::objectstore::BackendRados> OStoreDBFactoryRados("rados://tapetest@tapetest"); diff --git a/scheduler/RepackRequest.cpp b/scheduler/RepackRequest.cpp index 8b137891791fe96927ad78e64b0aad7bded08bdc..95f6a4123ef4bc3c63a33f1dbf8bac2dc2c5c941 100644 --- a/scheduler/RepackRequest.cpp +++ b/scheduler/RepackRequest.cpp @@ -1 +1,8 @@ +#include "RepackRequest.hpp" +cta::RepackRequest::RepackRequest(){} + +const cta::common::dataStructures::RepackInfo cta::RepackRequest::getRepackInfo() const +{ + return m_dbReq.get()->repackInfo; +} diff --git a/scheduler/RepackRequest.hpp b/scheduler/RepackRequest.hpp index 3f9c3ae27a5293008922a1cc34f641a4ca9fff2d..836069cfc6c84a80c2d6f0f4fba465fd7fb9cf21 100644 --- a/scheduler/RepackRequest.hpp +++ b/scheduler/RepackRequest.hpp @@ -19,6 +19,8 @@ #pragma once #include "common/log/LogContext.hpp" +#include "common/dataStructures/RepackInfo.hpp" +#include "scheduler/SchedulerDatabase.hpp" namespace cta { @@ -26,9 +28,13 @@ namespace cta { * Control structure for the RepackRequest. */ class RepackRequest { + friend class Scheduler; public: + RepackRequest(); void expand(); -private: + const cta::common::dataStructures::RepackInfo getRepackInfo() const; + +protected: std::unique_ptr<cta::SchedulerDatabase::RepackRequest> m_dbReq; }; // class RepackRequest diff --git a/scheduler/Scheduler.cpp b/scheduler/Scheduler.cpp index 958feb9ce4f89da03367ecc952e989d7c6cf55e7..e90ec2d190a7ba1202895900988554d2d16292b4 100644 --- a/scheduler/Scheduler.cpp +++ b/scheduler/Scheduler.cpp @@ -374,9 +374,11 @@ void Scheduler::promoteRepackRequestsToToExpand(log::LogContext & lc) { // getNextRepackRequestToExpand //------------------------------------------------------------------------------ std::unique_ptr<RepackRequest> Scheduler::getNextRepackRequestToExpand() { - auto repackRequest = m_db.getNextRequestToExpand(); - std::unique_ptr<RepackRequest> repReqRet(new RepackRequest()); + std::unique_ptr<cta::SchedulerDatabase::RepackRequest> repackRequest; + repackRequest = m_db.getNextRequestToExpand(); + std::unique_ptr<RepackRequest> ret(new RepackRequest()); + ret->m_dbReq.reset(repackRequest.release()); throw exception::Exception("In Scheduler::getNextRepackRequestToExpand(): not implemented."); } diff --git a/scheduler/SchedulerTest.cpp b/scheduler/SchedulerTest.cpp index 533c13b5fad7eab464bfed6f3227f0e3d7d5b3a0..57565f7150f904614b9d477c23ffd8f42cbc387c 100644 --- a/scheduler/SchedulerTest.cpp +++ b/scheduler/SchedulerTest.cpp @@ -1030,6 +1030,28 @@ TEST_P(SchedulerTest, repack) { } } +TEST_P(SchedulerTest, getNextRepackRequestToExpand) { + using namespace cta; + + setupDefaultCatalogue(); + + Scheduler &scheduler = getScheduler(); + + log::DummyLogger dl("", ""); + log::LogContext lc(dl); + + // Create and then cancel repack + common::dataStructures::SecurityIdentity cliId; + std::string tape1 = "Tape"; + scheduler.queueRepack(cliId, tape1, "root://server/repackDir", common::dataStructures::RepackInfo::Type::RepackOnly, lc); + + scheduler.promoteRepackRequestsToToExpand(lc); + + /*auto repackRequestToExpand = scheduler.getNextRepackRequestToExpand(); + ASSERT_EQ(repackRequestToExpand.get()->getRepackInfo().vid,tape1);*/ + +} + #undef TEST_MOCK_DB #ifdef TEST_MOCK_DB static cta::MockSchedulerDatabaseFactory mockDbFactory;