From 05048e54ac6bbd1f4646b6e50e32907929031db0 Mon Sep 17 00:00:00 2001 From: Michael Davis <michael.davis@cern.ch> Date: Fri, 16 Nov 2018 17:14:49 +0100 Subject: [PATCH] [os-generic-queues] Implements OStoreDB::setRetrieveJobBatchReported() --- scheduler/OStoreDB/OStoreDB.cpp | 96 +++++++-------------------------- 1 file changed, 19 insertions(+), 77 deletions(-) diff --git a/scheduler/OStoreDB/OStoreDB.cpp b/scheduler/OStoreDB/OStoreDB.cpp index 33c82666f1..4873c5ea74 100644 --- a/scheduler/OStoreDB/OStoreDB.cpp +++ b/scheduler/OStoreDB/OStoreDB.cpp @@ -786,23 +786,12 @@ void OStoreDB::setRetrieveJobBatchReported(std::list<cta::SchedulerDatabase::Ret RetrieveJob * job; }; - for (auto &j : jobsBatch) { - OStoreDB::RetrieveJob *rj = dynamic_cast<OStoreDB::RetrieveJob*>(j); - auto &job = *rj; -std::cerr << "Archive ID: " << job.archiveFile.archiveFileID << std::endl; -std::cerr << "Report Type: " << static_cast<int>(job.reportType) << std::endl; -std::cerr << "Error Report URL: " << job.retrieveRequest.errorReportURL << std::endl; -std::cerr << "VID: " << job.archiveFile.tapeFiles.begin()->second.vid << std::endl; - } - // Sort jobs to be updated std::map<std::string, std::list<FailedJobToQueue>> failedQueues; for(auto &j : jobsBatch) { switch(j->reportType) { case SchedulerDatabase::RetrieveJob::ReportType::FailureReport: { - // Here we assume the simple case of one tape copy. - // In the case of multiple tape copies, do we report all vids? Or how to tell which vid to report? - auto vid = j->archiveFile.tapeFiles.begin()->second.vid; + auto &vid = j->archiveFile.tapeFiles.at(j->selectedCopyNb).vid; failedQueues[vid].push_back(FailedJobToQueue{ castFromSchedDBJob(j) }); break; } @@ -815,80 +804,33 @@ std::cerr << "VID: " << job.archiveFile.tapeFiles.begin()->second.vid << std::en } } - throw std::runtime_error("OStoreDB::setRetrieveJobBatchReported() not implemented."); -#if 0 - // We can have a mixture of failed and successful jobs, so we will sort them before batch queue/discarding them. - // First, sort the jobs. Done jobs get deleted (no need to sort further) and failed jobs go to their per-VID queues/containers. - // Status gets updated on the fly on the latter case. - std::list<ArchiveJob *> completeJobsToDelete; - struct FailedJobToQueue { - ArchiveJob * job; - }; - // Sort jobs to be updated. - std::map<std::string, std::list<FailedJobToQueue>> failedQueues; - for (auto &j: jobsBatch) { - switch (j->reportType) { - case SchedulerDatabase::ArchiveJob::ReportType::CompletionReport: - completeJobsToDelete.push_back(castFromSchedDBJob(j)); - break; - case SchedulerDatabase::ArchiveJob::ReportType::FailureReport: - failedQueues[j->tapeFile.vid].push_back(FailedJobToQueue()); - failedQueues[j->tapeFile.vid].back().job = castFromSchedDBJob(j); - break; - default: - { - log::ScopedParamContainer params(lc); - params.add("fileId", j->archiveFile.archiveFileID) - .add("objectAddress", castFromSchedDBJob(j)->m_archiveRequest.getAddressIfSet()); - lc.log(log::ERR, "In OStoreDB::setArchiveJobBatchReported(): unexpected job status. Leaving the job as-is."); + // Put the failed jobs in the failed queue + for(auto &queue : failedQueues) { + typedef objectstore::ContainerAlgorithms<RetrieveQueue,RetrieveQueueFailed> CaRQF; + CaRQF caRQF(m_objectStore, *m_agentReference); + CaRQF::InsertedElement::list insertedElements; + for(auto &j : queue.second) { + auto tf_it = j.job->archiveFile.tapeFiles.begin(); + while(tf_it != j.job->archiveFile.tapeFiles.end()) { + if(queue.first == tf_it->second.vid) break; } - } - } - if (completeJobsToDelete.size()) { - // Launch deletion. - for (auto &j: completeJobsToDelete) { - j->asyncDeleteRequest(); - } - timingList.insertAndReset("deleteLaunchTime", t); - for (auto &j: completeJobsToDelete) { - try { - j->waitAsyncDelete(); - log::ScopedParamContainer params(lc); - params.add("fileId", j->archiveFile.archiveFileID) - .add("objectAddress", j->m_archiveRequest.getAddressIfSet()); - lc.log(log::INFO, "In OStoreDB::setArchiveJobBatchReported(): deleted ArchiveRequest after completion and reporting."); - } catch (cta::exception::Exception & ex) { - log::ScopedParamContainer params(lc); - params.add("fileId", j->archiveFile.archiveFileID) - .add("objectAddress", j->m_archiveRequest.getAddressIfSet()) - .add("exceptionMSG", ex.getMessageValue()); - lc.log(log::ERR, "In OStoreDB::setArchiveJobBatchReported(): failed to delete ArchiveRequest after completion and reporting."); - } - } - timingList.insertAndReset("deletionCompletionTime", t); - } - for (auto & queue: failedQueues) { - // Put the jobs in the failed queue - typedef objectstore::ContainerAlgorithms<ArchiveQueue,ArchiveQueueFailed> CaAQF; - CaAQF caAQF(m_objectStore, *m_agentReference); - // TODOTODO: also switch status in one step. - CaAQF::InsertedElement::list insertedElements; - for (auto &j: queue.second) { - insertedElements.emplace_back(CaAQF::InsertedElement{&j.job->m_archiveRequest, j.job->tapeFile.copyNb, j.job->archiveFile, - cta::nullopt, serializers::ArchiveJobStatus::AJS_Failed}); + if(tf_it == j.job->archiveFile.tapeFiles.end()) throw cta::exception::Exception( + "In OStoreDB::setRetrieveJobBatchReported(): tape copy not found" + ); + insertedElements.emplace_back(CaRQF::InsertedElement{ + &j.job->m_retrieveRequest, tf_it->second.copyNb, tf_it->second.fSeq, tf_it->second.compressedSize, + common::dataStructures::MountPolicy(), serializers::RetrieveJobStatus::RJS_Failed + }); } try { - caAQF.referenceAndSwitchOwnership(queue.first, QueueType::FailedJobs, m_agentReference->getAgentAddress(), - insertedElements, lc); - } catch (exception::Exception & ex) { + caRQF.referenceAndSwitchOwnership(queue.first, QueueType::FailedJobs, m_agentReference->getAgentAddress(), insertedElements, lc); + } catch(exception::Exception &ex) { log::ScopedParamContainer params(lc); - } log::TimingList tl; tl.insertAndReset("queueAndSwitchStateTime", t); timingList += tl; } -#endif } //------------------------------------------------------------------------------ -- GitLab