From 212357a277055b872154441d3008791e34dc9048 Mon Sep 17 00:00:00 2001 From: Cedric CAFFY <cedric.caffy@cern.ch> Date: Tue, 22 Oct 2019 17:45:18 +0200 Subject: [PATCH] Added informations about migrated files from a repacked tape --- common/dataStructures/RepackInfo.hpp | 9 ++++ objectstore/ArchiveRequest.cpp | 26 +++++++--- objectstore/ArchiveRequest.hpp | 12 ++++- objectstore/RepackRequest.cpp | 47 +++++++++++++++++++ objectstore/RepackRequest.hpp | 3 ++ objectstore/cta.proto | 13 +++++ scheduler/OStoreDB/OStoreDB.cpp | 7 ++- scheduler/SchedulerTest.cpp | 3 ++ .../daemon/MigrationTaskInjector.cpp | 2 +- xroot_plugins/XrdCtaRepackLs.hpp | 7 +++ xrootd-ssi-protobuf-interface | 2 +- 11 files changed, 119 insertions(+), 12 deletions(-) diff --git a/common/dataStructures/RepackInfo.hpp b/common/dataStructures/RepackInfo.hpp index 7f9a9dc7b5..76bf6fe50f 100644 --- a/common/dataStructures/RepackInfo.hpp +++ b/common/dataStructures/RepackInfo.hpp @@ -19,6 +19,7 @@ #pragma once #include <string> +#include <list> #include "objectstore/RepackQueueType.hpp" namespace cta { @@ -30,6 +31,13 @@ namespace dataStructures { */ struct RepackInfo { + struct RepackDestinationInfo { + std::string vid; + uint64_t files = 0; + uint64_t bytes = 0; + typedef std::list<RepackDestinationInfo> List; + }; + std::string vid; std::string repackBufferBaseURL; enum class Type { @@ -62,6 +70,7 @@ struct RepackInfo { uint64_t archivedFiles; bool isExpandFinished; bool forceDisabledTape; + RepackDestinationInfo::List destinationInfos; // std::string tag; // uint64_t totalFiles; // uint64_t totalSize; diff --git a/objectstore/ArchiveRequest.cpp b/objectstore/ArchiveRequest.cpp index 6165c588b1..d18b58e86f 100644 --- a/objectstore/ArchiveRequest.cpp +++ b/objectstore/ArchiveRequest.cpp @@ -584,6 +584,9 @@ ArchiveRequest::AsyncJobOwnerUpdater* ArchiveRequest::asyncUpdateJobOwner(uint32 retRef.m_repackInfo.fileBufferURL = payload.repack_info().file_buffer_url(); retRef.m_repackInfo.isRepack = payload.isrepack(); retRef.m_repackInfo.repackRequestAddress = payload.repack_info().repack_request_address(); + for(auto jobDestination: payload.repack_info().jobs_destination()){ + retRef.m_repackInfo.jobsDestination[jobDestination.copy_nb()] = jobDestination.destination_vid(); + } if (j->failurelogs_size()) { retRef.m_latestError = j->failurelogs(j->failurelogs_size()-1); } @@ -663,12 +666,12 @@ ArchiveRequest::RepackInfo ArchiveRequest::AsyncJobOwnerUpdater::getRepackInfo() //------------------------------------------------------------------------------ // ArchiveRequest::asyncUpdateTransferSuccessful() //------------------------------------------------------------------------------ -ArchiveRequest::AsyncTransferSuccessfulUpdater * ArchiveRequest::asyncUpdateTransferSuccessful(const uint32_t copyNumber ) { +ArchiveRequest::AsyncTransferSuccessfulUpdater * ArchiveRequest::asyncUpdateTransferSuccessful(const std::string destinationVid, const uint32_t copyNumber ) { std::unique_ptr<AsyncTransferSuccessfulUpdater> ret(new AsyncTransferSuccessfulUpdater); // The unique pointer will be std::moved so we need to work with its content (bare pointer or here ref to content). auto retPtr = ret.get(); ret->m_updaterCallback= - [this, copyNumber, retPtr](const std::string &in)->std::string { + [this, destinationVid, copyNumber, retPtr](const std::string &in)->std::string { // We have a locked and fetched object, so we just need to work on its representation. serializers::ObjectHeader oh; oh.ParseFromString(in); @@ -680,11 +683,6 @@ ArchiveRequest::AsyncTransferSuccessfulUpdater * ArchiveRequest::asyncUpdateTran serializers::ArchiveRequest payload; payload.ParseFromString(oh.payload()); retPtr->m_repackInfo.isRepack = payload.isrepack(); - if (payload.isrepack()) { // Default repack info is fine for the no repack case. - ArchiveRequest::RepackInfoSerDeser serDeser; - serDeser.deserialize(payload.repack_info()); - retPtr->m_repackInfo = serDeser; - } if (!payload.isrepack()) { // Non-repack case. We only do one report per request. auto * jl = payload.mutable_jobs(); bool otherJobsToTransfer = false; @@ -713,6 +711,12 @@ ArchiveRequest::AsyncTransferSuccessfulUpdater * ArchiveRequest::asyncUpdateTran } else { // Repack case, the report policy is different (report all jobs). So we just change the job's status. for (auto j: *payload.mutable_jobs()) { if (j.copynb() == copyNumber) { + ArchiveRequest::RepackInfoSerDeser serDeser; + serDeser.deserialize(payload.repack_info()); + //Store the job destination vid in the repack info + serDeser.jobsDestination[copyNumber] = destinationVid; + serDeser.serialize(*(payload.mutable_repack_info())); + retPtr->m_repackInfo = serDeser; j.set_status(serializers::ArchiveJobStatus::AJS_ToReportToRepackForSuccess); oh.set_payload(payload.SerializeAsString()); return oh.SerializeAsString(); @@ -915,6 +919,9 @@ ArchiveRequest::RepackInfo ArchiveRequest::getRepackInfo(){ ret.fileBufferURL = repackInfo.file_buffer_url(); ret.isRepack = true; ret.repackRequestAddress = repackInfo.repack_request_address(); + for(auto jobDestination: repackInfo.jobs_destination()){ + ret.jobsDestination[jobDestination.copy_nb()] = jobDestination.destination_vid(); + } return ret; } @@ -924,6 +931,11 @@ void ArchiveRequest::setRepackInfo(const RepackInfo& repackInfo){ repackInfoToWrite->set_repack_request_address(repackInfo.repackRequestAddress); repackInfoToWrite->set_file_buffer_url(repackInfo.fileBufferURL); repackInfoToWrite->set_fseq(repackInfo.fSeq); + for(const auto kv: repackInfo.jobsDestination){ + auto jobDestination = repackInfoToWrite->mutable_jobs_destination()->Add(); + jobDestination->set_copy_nb(kv.first); + jobDestination->set_destination_vid(kv.second); + } } //------------------------------------------------------------------------------ diff --git a/objectstore/ArchiveRequest.hpp b/objectstore/ArchiveRequest.hpp index f14918b270..2f8cc6bd59 100644 --- a/objectstore/ArchiveRequest.hpp +++ b/objectstore/ArchiveRequest.hpp @@ -109,6 +109,8 @@ public: uint64_t fSeq = 0; std::string repackRequestAddress; std::string fileBufferURL; + //Map successful archive jobs copyNb to the destination vid where the archived file is located + std::map<uint32_t,std::string> jobsDestination; }; void setRepackInfo(const RepackInfo & repackInfo); RepackInfo getRepackInfo(); @@ -165,6 +167,11 @@ public: arri.set_repack_request_address(repackRequestAddress); arri.set_fseq(fSeq); arri.set_file_buffer_url(fileBufferURL); + for(const auto &kv: jobsDestination){ + auto jobDestination = arri.mutable_jobs_destination()->Add(); + jobDestination->set_copy_nb(kv.first); + jobDestination->set_destination_vid(kv.second); + } } void deserialize(const cta::objectstore::serializers::ArchiveRequestRepackInfo & arri) { @@ -172,6 +179,9 @@ public: fileBufferURL = arri.file_buffer_url(); repackRequestAddress = arri.repack_request_address(); fSeq = arri.fseq(); + for(auto jobDestination: arri.jobs_destination()){ + jobsDestination[jobDestination.copy_nb()] = jobDestination.destination_vid(); + } } }; @@ -186,7 +196,7 @@ public: std::function<std::string(const std::string &)> m_updaterCallback; std::unique_ptr<Backend::AsyncUpdater> m_backendUpdater; }; - AsyncTransferSuccessfulUpdater * asyncUpdateTransferSuccessful(uint32_t copyNumber); + AsyncTransferSuccessfulUpdater * asyncUpdateTransferSuccessful(const std::string destinationVid, const uint32_t copyNumber); // An asynchronous request deleter class after report of success. class AsyncRequestDeleter { diff --git a/objectstore/RepackRequest.cpp b/objectstore/RepackRequest.cpp index 2659d30f51..a9384410d8 100644 --- a/objectstore/RepackRequest.cpp +++ b/objectstore/RepackRequest.cpp @@ -148,6 +148,13 @@ common::dataStructures::RepackInfo RepackRequest::getInfo() { ret.userProvidedFiles = m_payload.userprovidedfiles(); ret.isExpandFinished = m_payload.is_expand_finished(); ret.forceDisabledTape = m_payload.force_disabled_tape(); + for(auto & rdi: m_payload.destination_infos()){ + RepackInfo::RepackDestinationInfo rdiToInsert; + rdiToInsert.vid = rdi.vid(); + rdiToInsert.files = rdi.files(); + rdiToInsert.bytes = rdi.bytes(); + ret.destinationInfos.emplace_back(rdiToInsert); + } if (m_payload.move_mode()) { if (m_payload.add_copies_mode()) { ret.type = RepackInfo::Type::MoveAndAddCopies; @@ -200,6 +207,7 @@ void RepackRequest::deleteAllSubrequests() { checkPayloadWritable(); std::list<std::unique_ptr<Backend::AsyncDeleter>> deleters; if(!m_payload.is_complete()){ + m_payload.mutable_destination_infos()->Clear(); try{ for(auto itor = m_payload.mutable_subrequests()->begin(); itor != m_payload.mutable_subrequests()->end(); ++itor){ //Avoid the race condition that can happen during expansion of the RepackRequest @@ -219,6 +227,42 @@ void RepackRequest::setIsComplete(const bool isComplete){ m_payload.set_is_complete(isComplete); } +void RepackRequest::updateRepackDestinationInfos(const cta::common::dataStructures::ArchiveFile & archiveFile, const std::string & destinationVid){ + checkPayloadWritable(); + + bool rdiFound = false; + cta::objectstore::serializers::RepackDestinationInfo * info = nullptr; + for (auto rdiIter = m_payload.mutable_destination_infos()->begin(); !rdiFound && rdiIter != m_payload.mutable_destination_infos()->end(); ++rdiIter) { + //find the infos for the vid of the archived file + if(rdiIter->vid() == destinationVid){ + info = &(*rdiIter); + rdiFound = true; + } + } + if(!rdiFound){ + //info has to be created, create it and set its vid + //by default the files and the bytes = 0 (see cta.proto) + info = m_payload.mutable_destination_infos()->Add(); + info->set_vid(destinationVid); + } + info->set_files(info->files() + 1); + info->set_bytes(info->bytes() + archiveFile.fileSize); +} + +std::list<common::dataStructures::RepackInfo::RepackDestinationInfo> RepackRequest::getRepackDestinationInfos(){ + checkPayloadReadable(); + + std::list<common::dataStructures::RepackInfo::RepackDestinationInfo> ret; + for(auto rdiIter: m_payload.destination_infos()){ + common::dataStructures::RepackInfo::RepackDestinationInfo rdi; + rdi.vid = rdiIter.vid(); + rdi.files = rdiIter.files(); + rdi.bytes = rdiIter.bytes(); + ret.emplace_back(rdi); + } + return ret; +} + void RepackRequest::setForceDisabledTape(const bool disabledTape){ checkPayloadWritable(); m_payload.set_force_disabled_tape(disabledTape); @@ -480,6 +524,9 @@ serializers::RepackRequestStatus RepackRequest::reportArchiveSuccesses(Subreques auto & p = pointerMap.at(as.fSeq); if (!p.archiveCopyNbsAccounted.count(as.copyNb)) { p.archiveCopyNbsAccounted.insert(as.copyNb); + cta::objectstore::ArchiveRequest ar(p.address,m_objectStore); + ar.fetchNoLock(); + updateRepackDestinationInfos(ar.getArchiveFile(),as.destinationVid); m_payload.set_archivedbytes(m_payload.archivedbytes() + as.bytes); m_payload.set_archivedfiles(m_payload.archivedfiles() + as.files); p.subrequestDeleted = as.subrequestDeleted; diff --git a/objectstore/RepackRequest.hpp b/objectstore/RepackRequest.hpp index 985b703da1..ba96eddd18 100644 --- a/objectstore/RepackRequest.hpp +++ b/objectstore/RepackRequest.hpp @@ -52,6 +52,8 @@ public: common::dataStructures::MountPolicy getMountPolicy(); void deleteAllSubrequests(); void setIsComplete(const bool complete); + void updateRepackDestinationInfos(const common::dataStructures::ArchiveFile & archiveFile, const std::string & destinationVid); + std::list<common::dataStructures::RepackInfo::RepackDestinationInfo> getRepackDestinationInfos(); /** * Set the flag disabledTape to allow the mounting of a * disabled tape for file retrieval @@ -115,6 +117,7 @@ public: uint32_t copyNb = 0; bool subrequestDeleted = false; bool hasUserProvidedFile = false; + std::string destinationVid; typedef std::list<SubrequestStatistics> List; bool operator< (const SubrequestStatistics & o) const { return fSeq < o.fSeq; } }; diff --git a/objectstore/cta.proto b/objectstore/cta.proto index 18130877b1..40021ed9de 100644 --- a/objectstore/cta.proto +++ b/objectstore/cta.proto @@ -335,10 +335,16 @@ message ArchiveJob { repeated string reportfailurelogs = 4413; } +message JobDestinationVid { + optional string destination_vid = 4414; + optional uint32 copy_nb = 4415; +} + message ArchiveRequestRepackInfo { required string repack_request_address = 4450; required string file_buffer_url = 4453; required uint64 fseq = 4455; + repeated JobDestinationVid jobs_destination = 4456; } message ArchiveRequest { @@ -573,6 +579,12 @@ message RepackSubRequestPointer { required bool subrequest_deleted = 10540; } +message RepackDestinationInfo { + required string vid = 20000; + required uint64 files = 20001 [default = 0]; + required uint64 bytes = 20002 [default = 0]; +} + message RepackRequest { required string vid = 11000; required string buffer_url = 11005; @@ -606,6 +618,7 @@ message RepackRequest { required bool force_disabled_tape = 11564; required bool is_complete = 11565; repeated RepackSubRequestPointer subrequests = 11570; + repeated RepackDestinationInfo destination_infos = 11571; } message RepackRequestIndexPointer { diff --git a/scheduler/OStoreDB/OStoreDB.cpp b/scheduler/OStoreDB/OStoreDB.cpp index 69531ba03f..0c357e9d9f 100644 --- a/scheduler/OStoreDB/OStoreDB.cpp +++ b/scheduler/OStoreDB/OStoreDB.cpp @@ -4520,9 +4520,11 @@ void OStoreDB::ArchiveJob::bumpUpTapeFileCount(uint64_t newFileCount) { void OStoreDB::ArchiveJob::asyncSucceedTransfer() { log::LogContext lc(m_oStoreDB.m_logger); log::ScopedParamContainer params(lc); - params.add("requestObject", m_archiveRequest.getAddressIfSet()); + params.add("requestObject", m_archiveRequest.getAddressIfSet()) + .add("destinationVid",tapeFile.vid) + .add("copyNb",tapeFile.copyNb); lc.log(log::DEBUG, "Will start async update archiveRequest for transfer success"); - m_succesfulTransferUpdater.reset(m_archiveRequest.asyncUpdateTransferSuccessful(tapeFile.copyNb)); + m_succesfulTransferUpdater.reset(m_archiveRequest.asyncUpdateTransferSuccessful(tapeFile.vid, tapeFile.copyNb)); } //------------------------------------------------------------------------------ @@ -4626,6 +4628,7 @@ objectstore::RepackRequest::SubrequestStatistics::List OStoreDB::RepackArchiveRe ssl.back().files = 1; ssl.back().fSeq = sri.repackInfo.fSeq; ssl.back().copyNb = sri.archivedCopyNb; + ssl.back().destinationVid = sri.repackInfo.jobsDestination[sri.archivedCopyNb]; for(auto &j: sri.archiveJobsStatusMap){ if(j.first != sri.archivedCopyNb){ if((j.second != objectstore::serializers::ArchiveJobStatus::AJS_Complete) && (j.second != objectstore::serializers::ArchiveJobStatus::AJS_Failed)){ diff --git a/scheduler/SchedulerTest.cpp b/scheduler/SchedulerTest.cpp index ec1c78dc7b..e8bf0cde41 100644 --- a/scheduler/SchedulerTest.cpp +++ b/scheduler/SchedulerTest.cpp @@ -2186,6 +2186,9 @@ TEST_P(SchedulerTest, expandRepackRequestArchiveSuccess) { ri.fetchNoLock(); cta::objectstore::RepackRequest rr(ri.getRepackRequestAddress(vid),backend); rr.fetchNoLock(); + ASSERT_EQ(vidDestination,rr.getRepackDestinationInfos().front().vid); + ASSERT_EQ(nbArchiveFilesPerTape,rr.getRepackDestinationInfos().front().files); + ASSERT_EQ(nbArchiveFilesPerTape * archiveFileSize,rr.getRepackDestinationInfos().front().bytes); ASSERT_EQ(common::dataStructures::RepackInfo::Status::Complete,rr.getInfo().status); } } diff --git a/tapeserver/castor/tape/tapeserver/daemon/MigrationTaskInjector.cpp b/tapeserver/castor/tape/tapeserver/daemon/MigrationTaskInjector.cpp index ab809a1bdf..ed0f55da7b 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/MigrationTaskInjector.cpp +++ b/tapeserver/castor/tape/tapeserver/daemon/MigrationTaskInjector.cpp @@ -112,7 +112,7 @@ namespace daemon { bool MigrationTaskInjector::synchronousInjection() { std::list<std::unique_ptr<cta::ArchiveJob> > jobs; try { - //We multiply the number of popped files / bytes by 2 to avoid multiple mounts on Repack + //First popping of files, we multiply the number of popped files / bytes by 2 to avoid multiple mounts on Repack //(it is applied to ArchiveForUser and ArchiveForRepack batches) jobs = m_archiveMount.getNextJobBatch(2 * m_maxFiles, 2 * m_maxBytes,m_lc); } catch (cta::exception::Exception & ex) { diff --git a/xroot_plugins/XrdCtaRepackLs.hpp b/xroot_plugins/XrdCtaRepackLs.hpp index 010fdd29b2..fd62564c2d 100644 --- a/xroot_plugins/XrdCtaRepackLs.hpp +++ b/xroot_plugins/XrdCtaRepackLs.hpp @@ -80,6 +80,13 @@ namespace cta { namespace xrd { repackRequestItem->set_status(toString(repackRequest.status)); //Last expanded fSeq is in reality the next FSeq to Expand. So last one is next - 1 repackRequestItem->set_last_expanded_fseq(repackRequest.lastExpandedFseq != 0 ? repackRequest.lastExpandedFseq - 1 : 0); + repackRequestItem->mutable_destination_infos()->Clear(); + for(auto destinationInfo: repackRequest.destinationInfos){ + auto * destinationInfoToInsert = repackRequestItem->mutable_destination_infos()->Add(); + destinationInfoToInsert->set_vid(destinationInfo.vid); + destinationInfoToInsert->set_files(destinationInfo.files); + destinationInfoToInsert->set_bytes(destinationInfo.bytes); + } is_buffer_full = streambuf->Push(record); } diff --git a/xrootd-ssi-protobuf-interface b/xrootd-ssi-protobuf-interface index 38ca9bca9a..290e03c8e5 160000 --- a/xrootd-ssi-protobuf-interface +++ b/xrootd-ssi-protobuf-interface @@ -1 +1 @@ -Subproject commit 38ca9bca9acc5cc31a08cbd1f1ecc45e78bf3401 +Subproject commit 290e03c8e59b9681e1c06c4b945bf1c0eb4ce405 -- GitLab