diff --git a/objectstore/RetrieveRequest.cpp b/objectstore/RetrieveRequest.cpp index ac706ab3d24cb3254a1b9382c40e2aecf39be2c2..035a4e09f68879f2bff0d3a375e15a34e09aa5d1 100644 --- a/objectstore/RetrieveRequest.cpp +++ b/objectstore/RetrieveRequest.cpp @@ -308,13 +308,12 @@ auto RetrieveRequest::addTransferFailure(uint32_t copyNumber, uint64_t mountId, j.set_totalretries(j.totalretries() + 1); *j.mutable_failurelogs()->Add() = failureReason; } - if(j.totalretries() < j.maxtotalretries()) { EnqueueingNextStep ret; - ret.nextStatus = serializers::RetrieveJobStatus::RJS_ToTransferForUser; + ret.nextStatus = serializers::RetrieveJobStatus::RJS_ToTransferForUser; if(j.retrieswithinmount() < j.maxretrieswithinmount()) // Job can try again within this mount - ret.nextStep = EnqueueingNextStep::NextStep::EnqueueForTransfer; + ret.nextStep = EnqueueingNextStep::NextStep::EnqueueForTransferForUser; else // No more retries within this mount: job remains owned by this session and will be garbage collected ret.nextStep = EnqueueingNextStep::NextStep::Nothing; @@ -350,7 +349,7 @@ auto RetrieveRequest::addReportFailure(uint32_t copyNumber, uint64_t sessionId, } else { // Status is unchanged ret.nextStatus = j.status(); - ret.nextStep = EnqueueingNextStep::NextStep::EnqueueForReport; + ret.nextStep = EnqueueingNextStep::NextStep::EnqueueForReportForUser; } return ret; } @@ -696,8 +695,13 @@ auto RetrieveRequest::determineNextStep(uint32_t copyNumberUpdated, JobEvent job switch(jobEvent) { case JobEvent::TransferFailed: - ret.nextStep = EnqueueingNextStep::NextStep::EnqueueForReport; - ret.nextStatus = serializers::RetrieveJobStatus::RJS_ToReportToUserForFailure; + if(m_payload.isrepack()){ + ret.nextStep = EnqueueingNextStep::NextStep::EnqueueForReportForRepack; + ret.nextStatus = serializers::RetrieveJobStatus::RJS_ToReportToRepackForFailure; + } else { + ret.nextStep = EnqueueingNextStep::NextStep::EnqueueForReportForUser; + ret.nextStatus = serializers::RetrieveJobStatus::RJS_ToReportToUserForFailure; + } break; case JobEvent::ReportFailed: diff --git a/objectstore/RetrieveRequest.hpp b/objectstore/RetrieveRequest.hpp index 003af3fe3fca815d0e55504b24169870a29f4c7f..bf3a60afcdd20c3953e7cdd439ea04a0df054fbc 100644 --- a/objectstore/RetrieveRequest.hpp +++ b/objectstore/RetrieveRequest.hpp @@ -135,8 +135,9 @@ public: struct EnqueueingNextStep { enum class NextStep { Nothing, - EnqueueForTransfer, - EnqueueForReport, + EnqueueForTransferForUser, + EnqueueForReportForUser, + EnqueueForReportForRepack, StoreInFailedJobsContainer, Delete } nextStep = NextStep::Nothing; diff --git a/objectstore/Sorter.cpp b/objectstore/Sorter.cpp index 5fae0a79e7b20d17babee930ada0c7fb185963f6..e668121da69e52b69d5dc1cc39d982a2f490a2e4 100644 --- a/objectstore/Sorter.cpp +++ b/objectstore/Sorter.cpp @@ -221,7 +221,7 @@ Sorter::RetrieveJob Sorter::createRetrieveJob(std::shared_ptr<RetrieveRequest> r void Sorter::insertRetrieveRequest(std::shared_ptr<RetrieveRequest> retrieveRequest, AgentReferenceInterface &previousOwner, cta::optional<uint32_t> copyNb, log::LogContext & lc){ OStoreRetrieveRequestAccessor requestAccessor(retrieveRequest); - insertRetrieveRequest(requestAccessor, previousOwner, copyNb, lc); + this->insertRetrieveRequest(requestAccessor, previousOwner, copyNb, lc); } void Sorter::insertRetrieveRequest(SorterRetrieveRequest& retrieveRequest, AgentReferenceInterface &previousOwner,cta::optional<uint32_t> copyNb, log::LogContext& lc){ @@ -286,8 +286,9 @@ void Sorter::insertRetrieveRequest(RetrieveRequestInfosAccessorInterface& access try{ Sorter::RetrieveJob jobToAdd = accessor.createRetrieveJob(archiveFile,jobTapeFile.copyNb,jobTapeFile.fSeq,&previousOwner); rjqi->jobToQueue = std::make_tuple(jobToAdd,std::promise<void>()); + std::string containerId = getContainerID(accessor,jobTapeFile.vid, copyNb.value()); threading::MutexLocker mapLocker(m_mutex); - m_retrieveQueuesAndRequests[std::make_tuple(jobTapeFile.vid, jobToAdd.jobQueueType)].emplace_back(rjqi); + m_retrieveQueuesAndRequests[std::make_tuple(containerId, jobToAdd.jobQueueType)].emplace_back(rjqi); params.add("fileId", accessor.getArchiveFile().archiveFileID) .add("copyNb", copyNb.value()) .add("tapeVid", jobTapeFile.vid) @@ -303,28 +304,6 @@ void Sorter::insertRetrieveRequest(RetrieveRequestInfosAccessorInterface& access } } -std::set<std::string> Sorter::getCandidateVidsToTransfer(RetrieveRequest& request){ - using serializers::RetrieveJobStatus; - std::set<std::string> candidateVids; - for (auto & j: request.dumpJobs()) { - if(j.status == RetrieveJobStatus::RJS_ToTransferForUser) { - candidateVids.insert(request.getArchiveFile().tapeFiles.at(j.copyNb).vid); - } - } - return candidateVids; -} - -std::set<std::string> Sorter::getCandidateVidsToTransfer(const SorterRetrieveRequest& request){ - using serializers::RetrieveJobStatus; - std::set<std::string> candidateVids; - for (auto & j: request.retrieveJobs) { - if(j.second.jobDump.status == RetrieveJobStatus::RJS_ToTransferForUser) { - candidateVids.insert(request.archiveFile.tapeFiles.at(j.second.jobDump.copyNb).vid); - } - } - return candidateVids; -} - std::set<std::string> Sorter::getCandidateVidsToTransfer(RetrieveRequestInfosAccessorInterface &requestAccessor){ using serializers::RetrieveJobStatus; std::set<std::string> candidateVids; @@ -336,42 +315,23 @@ std::set<std::string> Sorter::getCandidateVidsToTransfer(RetrieveRequestInfosAcc return candidateVids; } -std::string Sorter::getBestVidForQueueingRetrieveRequest(RetrieveRequest& retrieveRequest, std::set<std::string>& candidateVids, log::LogContext &lc){ - std::string vid; - try{ - vid = Helpers::selectBestRetrieveQueue(candidateVids,m_catalogue,m_objectstore); - } catch (Helpers::NoTapeAvailableForRetrieve & ex) { - log::ScopedParamContainer params(lc); - params.add("fileId", retrieveRequest.getArchiveFile().archiveFileID); - lc.log(log::INFO, "In Sorter::getVidForQueueingRetrieveRequest(): No available tape found."); - throw ex; - } - return vid; -} - -std::string Sorter::getBestVidForQueueingRetrieveRequest(const SorterRetrieveRequest& request, std::set<std::string>& candidateVids, log::LogContext &lc){ +std::string Sorter::getBestVidForQueueingRetrieveRequest(RetrieveRequestInfosAccessorInterface &requestAccessor, std::set<std::string>& candidateVids, log::LogContext &lc){ std::string vid; try{ vid = Helpers::selectBestRetrieveQueue(candidateVids,m_catalogue,m_objectstore); } catch (Helpers::NoTapeAvailableForRetrieve & ex) { log::ScopedParamContainer params(lc); - params.add("fileId", request.archiveFile.archiveFileID); + params.add("fileId", requestAccessor.getArchiveFile().archiveFileID); lc.log(log::INFO, "In Sorter::getVidForQueueingRetrieveRequest(): No available tape found."); throw ex; } return vid; } -std::string Sorter::getBestVidForQueueingRetrieveRequest(RetrieveRequestInfosAccessorInterface &requestAccessor, std::set<std::string>& candidateVids, log::LogContext &lc){ - std::string vid; - try{ - vid = Helpers::selectBestRetrieveQueue(candidateVids,m_catalogue,m_objectstore); - } catch (Helpers::NoTapeAvailableForRetrieve & ex) { - log::ScopedParamContainer params(lc); - params.add("fileId", requestAccessor.getArchiveFile().archiveFileID); - lc.log(log::INFO, "In Sorter::getVidForQueueingRetrieveRequest(): No available tape found."); - throw ex; - } +std::string Sorter::getContainerID(RetrieveRequestInfosAccessorInterface& requestAccessor, const std::string& vid, const uint32_t copyNb){ + serializers::RetrieveJobStatus rjs = requestAccessor.getJobStatus(copyNb); + if(rjs == serializers::RetrieveJobStatus::RJS_ToReportToRepackForSuccess || rjs == serializers::RetrieveJobStatus::RJS_ToReportToRepackForFailure) + return requestAccessor.getRepackAddress(); return vid; } @@ -444,6 +404,14 @@ Sorter::RetrieveJob OStoreRetrieveRequestAccessor::createRetrieveJob(const cta:: return ret; } +serializers::RetrieveJobStatus OStoreRetrieveRequestAccessor::getJobStatus(const uint32_t copyNb){ + return m_retrieveRequest->getJobStatus(copyNb); +} + +std::string OStoreRetrieveRequestAccessor::getRepackAddress(){ + return m_retrieveRequest->getRepackInfo().repackRequestAddress; +} + /* END OF RetrieveRequestAccessor CLASS */ @@ -470,6 +438,14 @@ Sorter::RetrieveJob SorterRetrieveRequestAccessor::createRetrieveJob(const cta:: return m_retrieveRequest.retrieveJobs.at(copyNb); } +serializers::RetrieveJobStatus SorterRetrieveRequestAccessor::getJobStatus(const uint32_t copyNb){ + return m_retrieveRequest.retrieveJobs.at(copyNb).jobDump.status; +} + +std::string SorterRetrieveRequestAccessor::getRepackAddress(){ + return m_retrieveRequest.repackRequestAddress; +} + /* END OF SorterRetrieveRequestAccessor CLASS*/ }} diff --git a/objectstore/Sorter.hpp b/objectstore/Sorter.hpp index 0b4510182462261ee4f43e4f831990738c45aaa5..ddf8d57c58b71f25683fcd6b87bd2d353fe163c1 100644 --- a/objectstore/Sorter.hpp +++ b/objectstore/Sorter.hpp @@ -53,6 +53,7 @@ public: Sorter(AgentReference& agentReference,Backend &objectstore, catalogue::Catalogue& catalogue); ~Sorter(); + //std::string = containerIdentifier typedef std::map<std::tuple<std::string, JobQueueType>, std::list<std::shared_ptr<ArchiveJobQueueInfo>>> MapArchive; typedef std::map<std::tuple<std::string, JobQueueType>, std::list<std::shared_ptr<RetrieveJobQueueInfo>>> MapRetrieve; @@ -133,6 +134,8 @@ public: struct SorterRetrieveRequest{ common::dataStructures::ArchiveFile archiveFile; std::map<uint32_t, RetrieveJob> retrieveJobs; + std::string repackRequestAddress; + bool isRepack; }; /* Retrieve-related methods */ @@ -201,12 +204,9 @@ private: threading::Mutex m_mutex; /* Retrieve-related methods */ - std::set<std::string> getCandidateVidsToTransfer(RetrieveRequest &request); - std::set<std::string> getCandidateVidsToTransfer(const SorterRetrieveRequest& request); std::set<std::string> getCandidateVidsToTransfer(RetrieveRequestInfosAccessorInterface &requestAccessor); - std::string getBestVidForQueueingRetrieveRequest(RetrieveRequest& retrieveRequest, std::set<std::string>& candidateVids, log::LogContext &lc); - std::string getBestVidForQueueingRetrieveRequest(const SorterRetrieveRequest& request, std::set<std::string>& candidateVids, log::LogContext &lc); std::string getBestVidForQueueingRetrieveRequest(RetrieveRequestInfosAccessorInterface &requestAccessor, std::set<std::string>& candidateVids, log::LogContext &lc); + std::string getContainerID(RetrieveRequestInfosAccessorInterface& requestAccessor, const std::string& vid, const uint32_t copyNb); void queueRetrieveRequests(const std::string vid, const JobQueueType jobQueueType, std::list<std::shared_ptr<RetrieveJobQueueInfo>>& archiveJobInfos, log::LogContext &lc); void dispatchRetrieveAlgorithm(const std::string vid, const JobQueueType jobQueueType, std::string& queueAddress, std::list<std::shared_ptr<RetrieveJobQueueInfo>>& retrieveJobsInfos, log::LogContext &lc); Sorter::RetrieveJob createRetrieveJob(std::shared_ptr<RetrieveRequest> retrieveRequest, const cta::common::dataStructures::ArchiveFile archiveFile, const uint32_t copyNb, const uint64_t fSeq, AgentReferenceInterface *previousOwner); @@ -246,6 +246,8 @@ class RetrieveRequestInfosAccessorInterface{ virtual Sorter::RetrieveJob createRetrieveJob(const cta::common::dataStructures::ArchiveFile archiveFile, const uint32_t copyNb, const uint64_t fSeq, AgentReferenceInterface* previousOwner) = 0; virtual ~RetrieveRequestInfosAccessorInterface(); + virtual serializers::RetrieveJobStatus getJobStatus(const uint32_t copyNb) = 0; + virtual std::string getRepackAddress() = 0; }; class OStoreRetrieveRequestAccessor: public RetrieveRequestInfosAccessorInterface{ @@ -256,6 +258,8 @@ class OStoreRetrieveRequestAccessor: public RetrieveRequestInfosAccessorInterfac common::dataStructures::ArchiveFile getArchiveFile(); Sorter::RetrieveJob createRetrieveJob(const cta::common::dataStructures::ArchiveFile archiveFile, const uint32_t copyNb, const uint64_t fSeq, AgentReferenceInterface* previousOwner); + serializers::RetrieveJobStatus getJobStatus(const uint32_t copyNb); + std::string getRepackAddress(); private: std::shared_ptr<RetrieveRequest> m_retrieveRequest; }; @@ -268,6 +272,8 @@ class SorterRetrieveRequestAccessor: public RetrieveRequestInfosAccessorInterfac common::dataStructures::ArchiveFile getArchiveFile(); Sorter::RetrieveJob createRetrieveJob(const cta::common::dataStructures::ArchiveFile archiveFile, const uint32_t copyNb, const uint64_t fSeq, AgentReferenceInterface* previousOwner); + serializers::RetrieveJobStatus getJobStatus(const uint32_t copyNb); + std::string getRepackAddress(); private: Sorter::SorterRetrieveRequest& m_retrieveRequest; }; diff --git a/scheduler/OStoreDB/OStoreDB.cpp b/scheduler/OStoreDB/OStoreDB.cpp index 20fb66ec2876d9124f7e340ad75b1281472e5f14..35ad70b4adfaeb8df97e04da8700b8676d9fbba7 100644 --- a/scheduler/OStoreDB/OStoreDB.cpp +++ b/scheduler/OStoreDB/OStoreDB.cpp @@ -3731,7 +3731,7 @@ void OStoreDB::RetrieveJob::failTransfer(const std::string &failureReason, log:: return; } - case NextStep::EnqueueForReport: { + case NextStep::EnqueueForReportForUser: { typedef objectstore::ContainerAlgorithms<RetrieveQueue,RetrieveQueueToReportForUser> CaRqtr; // Algorithms suppose the objects are not locked @@ -3785,8 +3785,17 @@ void OStoreDB::RetrieveJob::failTransfer(const std::string &failureReason, log:: lc.log(log::INFO, "In RetrieveJob::failTransfer(): enqueued job for reporting"); return; } + + case NextStep::EnqueueForReportForRepack:{ + Sorter sorter(*m_oStoreDB.m_agentReference,m_oStoreDB.m_objectStore,m_oStoreDB.m_catalogue); + std::shared_ptr<objectstore::RetrieveRequest> rr = std::make_shared<objectstore::RetrieveRequest>(m_retrieveRequest); + sorter.insertRetrieveRequest(rr,*this->m_oStoreDB.m_agentReference,cta::optional<uint32_t>(selectedCopyNb),lc); + rel.release(); + sorter.flushOneRetrieve(lc); + return; + } - case NextStep::EnqueueForTransfer: { + case NextStep::EnqueueForTransferForUser: { typedef objectstore::ContainerAlgorithms<RetrieveQueue,RetrieveQueueToTransferForUser> CaRqtr; // Algorithms suppose the objects are not locked @@ -3840,7 +3849,7 @@ void OStoreDB::RetrieveJob::failTransfer(const std::string &failureReason, log:: lc.log(log::INFO, "In RetrieveJob::failTransfer(): requeued job for (potentially in-mount) retry."); return; } - + case NextStep::StoreInFailedJobsContainer: // For retrieve queues, once the job has been queued for report, we don't retry any more, so we // can never arrive at this case @@ -3883,7 +3892,7 @@ void OStoreDB::RetrieveJob::failReport(const std::string &failureReason, log::Lo // Apply the decision switch (enQueueingNextStep.nextStep) { // We have a reduced set of supported next steps as some are not compatible with this event - case NextStep::EnqueueForReport: { + case NextStep::EnqueueForReportForUser: { typedef objectstore::ContainerAlgorithms<RetrieveQueue,RetrieveQueueToReportForUser> CaRqtr; CaRqtr caRqtr(m_oStoreDB.m_objectStore, *m_oStoreDB.m_agentReference); CaRqtr::InsertedElement::list insertedElements; diff --git a/scheduler/RetrieveJob.hpp b/scheduler/RetrieveJob.hpp index 8db9586fb09fde3d2ec7e2f70de73d18bd0501f0..d85f48bdc13371370cdfe683074a3a70a656e23f 100644 --- a/scheduler/RetrieveJob.hpp +++ b/scheduler/RetrieveJob.hpp @@ -82,7 +82,7 @@ public: * Completion will be checked implicitly in RetrieveMount::flushAsyncSuccessReports() */ virtual void asyncSetSuccessful(); - + /** * Triggers a scheduler update following the failure of the job. Retry policy will * be applied by the scheduler. diff --git a/scheduler/SchedulerTest.cpp b/scheduler/SchedulerTest.cpp index 0689fa4ad6f2f6b59ed55247f5cdc589d90ca54b..89675c42835ed40b9a8b57d263d6dc1fd6ac828d 100644 --- a/scheduler/SchedulerTest.cpp +++ b/scheduler/SchedulerTest.cpp @@ -1361,7 +1361,7 @@ TEST_P(SchedulerTest, expandRepackRequest) { //Create an agent to represent this test process std::string agentReferenceName = "expandRepackRequestTest"; std::unique_ptr<objectstore::AgentReference> agentReference(new objectstore::AgentReference(agentReferenceName, dl)); - + const uint64_t capacityInBytes = (uint64_t)10 * 1000 * 1000 * 1000 * 1000; const bool disabledValue = false; @@ -1523,23 +1523,23 @@ TEST_P(SchedulerTest, expandRepackRequest) { executedJobs.push_back(std::move(retrieveJob)); } //Now, report the retrieve jobs to be completed - castor::tape::tapeserver::daemon::RecallReportPacker rrp(retrieveMount.get(),lc); + castor::tape::tapeserver::daemon::RecallReportPacker rrp(retrieveMount.get(),lc); - rrp.startThreads(); - for(auto it = executedJobs.begin(); it != executedJobs.end(); ++it) - { - rrp.reportCompletedJob(std::move(*it)); - } - rrp.setDiskDone(); - rrp.setTapeDone(); + rrp.startThreads(); + for(auto it = executedJobs.begin(); it != executedJobs.end(); ++it) + { + rrp.reportCompletedJob(std::move(*it)); + } + rrp.setDiskDone(); + rrp.setTapeDone(); - rrp.reportDriveStatus(cta::common::dataStructures::DriveStatus::Unmounting); + rrp.reportDriveStatus(cta::common::dataStructures::DriveStatus::Unmounting); - rrp.reportEndOfSession(); - rrp.waitThread(); + rrp.reportEndOfSession(); + rrp.waitThread(); - ASSERT_EQ(rrp.allThreadsDone(),true); - } + ASSERT_EQ(rrp.allThreadsDone(),true); + } uint64_t archiveFileId = 1; for(uint64_t i = 1; i<= nbTapesForTest ;++i) @@ -1676,6 +1676,230 @@ TEST_P(SchedulerTest, expandRepackRequest) { } } +TEST_P(SchedulerTest, expandRepackRequestFailedRetrieve) { + using namespace cta; + using namespace cta::objectstore; + + auto &catalogue = getCatalogue(); + auto &scheduler = getScheduler(); + auto &schedulerDB = getSchedulerDB(); + cta::objectstore::Backend& backend = schedulerDB.getBackend(); + setupDefaultCatalogue(); + + +#ifdef STDOUT_LOGGING + log::StdoutLogger dl("dummy", "unitTest"); +#else + log::DummyLogger dl("", ""); +#endif + log::LogContext lc(dl); + + //Create an agent to represent this test process + cta::objectstore::AgentReference agentReference("expandRepackRequestTest", dl); + cta::objectstore::Agent agent(agentReference.getAgentAddress(), backend); + agent.initialize(); + agent.setTimeout_us(0); + agent.insertAndRegisterSelf(lc); + + const uint64_t capacityInBytes = (uint64_t)10 * 1000 * 1000 * 1000 * 1000; + const bool disabledValue = false; + const bool fullValue = false; + const std::string comment = "Create tape"; + cta::common::dataStructures::SecurityIdentity admin; + admin.username = "admin_user_name"; + admin.host = "admin_host"; + const std::string diskFileUser = "public_disk_user"; + const std::string diskFileGroup = "public_disk_group"; + const std::string diskFileRecoveryBlob = "opaque_disk_file_recovery_contents"; + + //Create a logical library in the catalogue + catalogue.createLogicalLibrary(admin, s_libraryName, "Create logical library"); + + std::ostringstream ossVid; + ossVid << s_vid << "_" << 1; + std::string vid = ossVid.str(); + catalogue.createTape(s_adminOnAdminHost,vid, s_mediaType, s_vendor, s_libraryName, s_tapePoolName, capacityInBytes, + disabledValue, fullValue, comment); + + //Create a storage class in the catalogue + common::dataStructures::StorageClass storageClass; + storageClass.diskInstance = s_diskInstance; + storageClass.name = s_storageClassName; + storageClass.nbCopies = 2; + storageClass.comment = "Create storage class"; + + const std::string checksumType = "checksum_type"; + const std::string checksumValue = "checksum_value"; + const std::string tapeDrive = "tape_drive"; + const uint64_t nbArchiveFilesPerTape = 10; + const uint64_t archiveFileSize = 2 * 1000 * 1000 * 1000; + const uint64_t compressedFileSize = archiveFileSize; + + //Simulate the writing of 10 files per tape in the catalogue + std::set<catalogue::TapeItemWrittenPointer> tapeFilesWrittenCopy1; + { + uint64_t archiveFileId = 1; + std::string currentVid = vid; + for(uint64_t j = 1; j <= nbArchiveFilesPerTape; ++j) { + std::ostringstream diskFileId; + diskFileId << (12345677 + archiveFileId); + std::ostringstream diskFilePath; + diskFilePath << "/public_dir/public_file_"<<1<<"_"<< j; + auto fileWrittenUP=cta::make_unique<cta::catalogue::TapeFileWritten>(); + auto & fileWritten = *fileWrittenUP; + fileWritten.archiveFileId = archiveFileId++; + fileWritten.diskInstance = storageClass.diskInstance; + fileWritten.diskFileId = diskFileId.str(); + fileWritten.diskFilePath = diskFilePath.str(); + fileWritten.diskFileUser = diskFileUser; + fileWritten.diskFileGroup = diskFileGroup; + fileWritten.diskFileRecoveryBlob = diskFileRecoveryBlob; + fileWritten.size = archiveFileSize; + fileWritten.checksumType = checksumType; + fileWritten.checksumValue = checksumValue; + fileWritten.storageClassName = s_storageClassName; + fileWritten.vid = currentVid; + fileWritten.fSeq = j; + fileWritten.blockId = j * 100; + fileWritten.compressedSize = compressedFileSize; + fileWritten.copyNb = 1; + fileWritten.tapeDrive = tapeDrive; + tapeFilesWrittenCopy1.emplace(fileWrittenUP.release()); + } + //update the DB tape + catalogue.filesWrittenToTape(tapeFilesWrittenCopy1); + tapeFilesWrittenCopy1.clear(); + } + //Test the expandRepackRequest method + scheduler.waitSchedulerDbSubthreadsComplete(); + + { + scheduler.queueRepack(admin,vid,"root://repackData/buffer",common::dataStructures::RepackInfo::Type::MoveOnly,lc); + scheduler.waitSchedulerDbSubthreadsComplete(); + //scheduler.waitSchedulerDbSubthreadsComplete(); + + log::TimingList tl; + utils::Timer t; + + //The promoteRepackRequestsToToExpand will only promote 2 RepackRequests to ToExpand status at a time. + scheduler.promoteRepackRequestsToToExpand(lc); + scheduler.waitSchedulerDbSubthreadsComplete(); + + auto repackRequestToExpand = scheduler.getNextRepackRequestToExpand(); + //If we have expanded 2 repack requests, the getNextRepackRequestToExpand will return null as it is not possible + //to promote more than 2 repack requests at a time. So we break here. + + scheduler.expandRepackRequest(repackRequestToExpand,tl,t,lc); + scheduler.waitSchedulerDbSubthreadsComplete(); + } + + { + std::unique_ptr<cta::TapeMount> mount; + mount.reset(scheduler.getNextMount(s_libraryName, "drive0", lc).release()); + ASSERT_NE(nullptr, mount.get()); + ASSERT_EQ(cta::common::dataStructures::MountType::Retrieve, mount.get()->getMountType()); + std::unique_ptr<cta::RetrieveMount> retrieveMount; + retrieveMount.reset(dynamic_cast<cta::RetrieveMount*>(mount.release())); + ASSERT_NE(nullptr, retrieveMount.get()); + std::unique_ptr<cta::RetrieveJob> retrieveJob; + + std::list<std::unique_ptr<cta::RetrieveJob>> executedJobs; + //For each tape we will see if the retrieve jobs are not null + for(uint64_t j = 1; j<=nbArchiveFilesPerTape; ++j) + { + auto jobBatch = retrieveMount->getNextJobBatch(1,archiveFileSize,lc); + retrieveJob.reset(jobBatch.front().release()); + ASSERT_NE(nullptr, retrieveJob.get()); + executedJobs.push_back(std::move(retrieveJob)); + } + //Now, report the retrieve jobs to be completed + castor::tape::tapeserver::daemon::RecallReportPacker rrp(retrieveMount.get(),lc); + + rrp.startThreads(); + + //Report all jobs as succeeded except the first one + auto it = executedJobs.begin(); + it++; + while(it != executedJobs.end()){ + rrp.reportCompletedJob(std::move(*it)); + it++; + } + std::unique_ptr<cta::RetrieveJob> failedJobUniqPtr = std::move(*(executedJobs.begin())); + rrp.reportFailedJob(std::move(failedJobUniqPtr),cta::exception::Exception("FailedJob")); + + rrp.setDiskDone(); + rrp.setTapeDone(); + + rrp.reportDriveStatus(cta::common::dataStructures::DriveStatus::Unmounting); + + rrp.reportEndOfSession(); + rrp.waitThread(); + + ASSERT_EQ(rrp.allThreadsDone(),true); + + scheduler.waitSchedulerDbSubthreadsComplete(); + + { + for(int i = 0; i < 5; ++i){ + std::unique_ptr<cta::TapeMount> mount; + mount.reset(scheduler.getNextMount(s_libraryName, "drive0", lc).release()); + ASSERT_NE(nullptr, mount.get()); + ASSERT_EQ(cta::common::dataStructures::MountType::Retrieve, mount.get()->getMountType()); + std::unique_ptr<cta::RetrieveMount> retrieveMount; + retrieveMount.reset(dynamic_cast<cta::RetrieveMount*>(mount.release())); + ASSERT_NE(nullptr, retrieveMount.get()); + std::unique_ptr<cta::RetrieveJob> retrieveJob; + + std::list<std::unique_ptr<cta::RetrieveJob>> executedJobs; + //For each tape we will see if the retrieve jobs are not null + auto jobBatch = retrieveMount->getNextJobBatch(1,archiveFileSize,lc); + retrieveJob.reset(jobBatch.front().release()); + ASSERT_NE(nullptr, retrieveJob.get()); + + castor::tape::tapeserver::daemon::RecallReportPacker rrp(retrieveMount.get(),lc); + + rrp.startThreads(); + + rrp.reportFailedJob(std::move(retrieveJob),cta::exception::Exception("FailedJob")); + + rrp.setDiskDone(); + rrp.setTapeDone(); + + rrp.reportDriveStatus(cta::common::dataStructures::DriveStatus::Unmounting); + + rrp.reportEndOfSession(); + rrp.waitThread(); + ASSERT_EQ(rrp.allThreadsDone(),true); + } + + { + //Verify that the job is in the RetrieveQueueToReportToRepackForFailure + cta::objectstore::RootEntry re(backend); + cta::objectstore::ScopedExclusiveLock sel(re); + re.fetch(); + + //Get the retrieveQueueToReportToRepackForSuccess + // The queue is named after the repack request: we need to query the repack index + objectstore::RepackIndex ri(re.getRepackIndexAddress(), schedulerDB.getBackend()); + ri.fetchNoLock(); + + std::string retrieveQueueToReportToRepackForFailureAddress = re.getRetrieveQueueAddress(ri.getRepackRequestAddress(vid),cta::objectstore::JobQueueType::JobsToReportToRepackForFailure); + cta::objectstore::RetrieveQueue rq(retrieveQueueToReportToRepackForFailureAddress,backend); + + //Fetch the queue so that we can get the retrieveRequests from it + cta::objectstore::ScopedExclusiveLock rql(rq); + rq.fetch(); + + ASSERT_EQ(rq.dumpJobs().size(),1); + for(auto& job: rq.dumpJobs()){ + ASSERT_EQ(1,job.copyNb); + ASSERT_EQ(archiveFileSize,job.size); + } + } + } + } +} + #undef TEST_MOCK_DB #ifdef TEST_MOCK_DB static cta::MockSchedulerDatabaseFactory mockDbFactory; @@ -1698,4 +1922,3 @@ INSTANTIATE_TEST_CASE_P(OStoreDBPlusMockSchedulerTestRados, SchedulerTest, ::testing::Values(SchedulerTestParam(OStoreDBFactoryRados))); #endif } // namespace unitTests -