diff --git a/objectstore/Algorithms.hpp b/objectstore/Algorithms.hpp index 6968fd165b36ba00eebd131b2155e1c76d6ce0a5..0e72d4e9471c65e69f6d4819659e8221beaa31fe 100644 --- a/objectstore/Algorithms.hpp +++ b/objectstore/Algorithms.hpp @@ -97,11 +97,16 @@ public: for (const auto & e: elements) { if (!failedElementsSet.count(ContainerTraits<Q,C>::getElementAddress(e))) { transferedElements.emplace_back(ContainerTraits<Q,C>::getElementAddress(e)); - } + } } if (transferedElements.size()) m_agentReference.removeBatchFromOwnership(transferedElements, m_backend); failureEx.failedElements = failedOwnershipSwitchElements; params.add("errorCount", failedOwnershipSwitchElements.size()); + std::string failedElementsAddresses = ""; + for(auto & failedElement: failedElementsSet){ + failedElementsAddresses += failedElement + " "; + } + params.add("failedElementsAddresses",failedElementsAddresses); lc.log(log::WARNING, "In ContainerAlgorithms::referenceAndSwitchOwnership(): " "Encountered problems while requeuing a batch of elements"); throw failureEx; diff --git a/objectstore/ArchiveQueueAlgorithms.hpp b/objectstore/ArchiveQueueAlgorithms.hpp index 21b22c17260376b37793e7843bd481929dc61053..0e4bb285a7decc4c60c62cf4e2e838fe51c995a6 100644 --- a/objectstore/ArchiveQueueAlgorithms.hpp +++ b/objectstore/ArchiveQueueAlgorithms.hpp @@ -42,6 +42,9 @@ struct ContainerTraits<ArchiveQueue,C> cta::optional<cta::common::dataStructures::MountPolicy> mountPolicy; cta::optional<serializers::ArchiveJobStatus> newStatus; typedef std::list<InsertedElement> list; + bool operator==(InsertedElement & other){ + return archiveRequest->getAddressIfSet() == other.archiveRequest->getAddressIfSet() && copyNb == other.copyNb; + } }; typedef ArchiveRequest::JobDump ElementDescriptor; @@ -109,7 +112,7 @@ struct ContainerTraits<ArchiveQueue,C> }; struct OwnershipSwitchFailure: public cta::exception::Exception { - OwnershipSwitchFailure(const std::string & message): cta::exception::Exception(message) {}; + OwnershipSwitchFailure(const std::string & message): cta::exception::Exception(message) {}; typename OpFailure<InsertedElement>::list failedElements; }; diff --git a/scheduler/OStoreDB/OStoreDB.cpp b/scheduler/OStoreDB/OStoreDB.cpp index 51323a4784c660cf0dc3894c3370edfd891f81ea..ad1ff336a1749783dc3a00a39a2c1196b82b422f 100644 --- a/scheduler/OStoreDB/OStoreDB.cpp +++ b/scheduler/OStoreDB/OStoreDB.cpp @@ -4184,7 +4184,7 @@ void OStoreDB::ArchiveMount::setTapeSessionStats(const castor::tape::tapeserver: //------------------------------------------------------------------------------ void OStoreDB::ArchiveMount::setJobBatchTransferred(std::list<std::unique_ptr<cta::SchedulerDatabase::ArchiveJob>>& jobsBatch, log::LogContext & lc) { - std::set<cta::OStoreDB::ArchiveJob*> jobsToQueueForReportingToUser, jobsToQueueForReportingToRepack; + std::set<cta::OStoreDB::ArchiveJob*> jobsToQueueForReportingToUser, jobsToQueueForReportingToRepack, failedJobsToQueueForReportingForRepack; std::list<std::string> ajToUnown; utils::Timer t; log::TimingList timingList; @@ -4259,6 +4259,7 @@ void OStoreDB::ArchiveMount::setJobBatchTransferred(std::list<std::unique_ptr<ct lc.log(log::INFO, "In OStoreDB::ArchiveMount::setJobBatchTransferred(): will queue request for reporting to repack."); } for (auto &list: insertedElementsLists) { + retry: try { utils::Timer tLocal; aqtrtrCa.referenceAndSwitchOwnership(list.first, m_oStoreDB.m_agentReference->getAgentAddress(), list.second, lc); @@ -4272,6 +4273,15 @@ void OStoreDB::ArchiveMount::setJobBatchTransferred(std::list<std::unique_ptr<ct params.add("tapeVid", list.first) .add("exceptionMSG", ex.getMessageValue()); lc.log(log::WARNING, "In OStoreDB::ArchiveMount::setJobBatchTransferred(): failed to queue a batch of requests for reporting to repack, jobs do not exist in the objectstore."); + } catch (const AqtrtrCa::OwnershipSwitchFailure &ex) { + typedef objectstore::ContainerTraits<ArchiveQueue,ArchiveQueueToReportToRepackForSuccess>::OpFailure<AqtrtrCa::InsertedElement> OpFailure; + list.second.remove_if([&ex](const AqtrtrCa::InsertedElement &elt){ + //Remove the elements that are NOT in the failed elements list + return std::find_if(ex.failedElements.begin(),ex.failedElements.end(),[&elt](const OpFailure &insertedElement){ + return elt.archiveRequest->getAddressIfSet() == insertedElement.element->archiveRequest->getAddressIfSet() && elt.copyNb == insertedElement.element->copyNb; + }) == ex.failedElements.end(); + }); + goto retry; } catch (cta::exception::Exception & ex) { log::ScopedParamContainer params(lc); params.add("tapeVid", list.first)