diff --git a/catalogue/Catalogue.hpp b/catalogue/Catalogue.hpp index e3f21acb9aa1bf75e3bc1a4333fe6ec7907a0529..f07e1d46941985a9fff75731afb43903ba45e271 100644 --- a/catalogue/Catalogue.hpp +++ b/catalogue/Catalogue.hpp @@ -41,8 +41,6 @@ #include "common/dataStructures/MountType.hpp" #include "common/dataStructures/MountPolicy.hpp" #include "common/dataStructures/ReadTestResult.hpp" -#include "common/dataStructures/RepackInfo.hpp" -#include "common/dataStructures/RepackType.hpp" #include "common/dataStructures/RequesterGroupMountRule.hpp" #include "common/dataStructures/RequesterMountRule.hpp" #include "common/dataStructures/RetrieveFileQueueCriteria.hpp" diff --git a/cmdline/CtaAdminCmdParse.hpp b/cmdline/CtaAdminCmdParse.hpp index a20bc7e15d0b0f253f88ed9c0922cca9568795ca..bcd7682dff946d44f7c837442ae9d032ecac2562 100644 --- a/cmdline/CtaAdminCmdParse.hpp +++ b/cmdline/CtaAdminCmdParse.hpp @@ -288,6 +288,7 @@ const std::map<std::string, OptionUInt64::Key> uint64Options = { * Map string options to Protocol Buffer enum values */ const std::map<std::string, OptionString::Key> strOptions = { + { "--bufferurl", OptionString::BUFFERURL }, { "--comment", OptionString::COMMENT }, { "--diskid", OptionString::DISKID }, { "--drive", OptionString::DRIVE }, @@ -361,6 +362,7 @@ const std::map<AdminCmd::Cmd, CmdHelp> cmdHelp = { const Option opt_all { Option::OPT_FLAG, "--all", "-a", "" }; const Option opt_archivefileid { Option::OPT_UINT, "--id", "-I", " <archive_file_id>" }; const Option opt_archivepriority { Option::OPT_UINT, "--archivepriority", "--ap", " <priority_value>" }; +const Option opt_bufferurl { Option::OPT_STR, "--bufferurl", "-b", " <buffer URL>" }; const Option opt_capacity { Option::OPT_UINT, "--capacity", "-c", " <capacity_in_bytes>" }; const Option opt_checkchecksum { Option::OPT_FLAG, "--checkchecksum", "-c", "" }; const Option opt_comment { Option::OPT_STR, "--comment", "-m", " <\"comment\">" }; @@ -470,7 +472,7 @@ const std::map<cmd_key_t, cmd_val_t> cmdOptions = { {{ AdminCmd::CMD_MOUNTPOLICY, AdminCmd::SUBCMD_RM }, { opt_mountpolicy_alias }}, {{ AdminCmd::CMD_MOUNTPOLICY, AdminCmd::SUBCMD_LS }, { opt_header.optional() }}, {{ AdminCmd::CMD_REPACK, AdminCmd::SUBCMD_ADD }, - { opt_vid.optional(), opt_vidfile.optional(), opt_justexpand.optional(), opt_justrepack.optional() }}, + { opt_vid.optional(), opt_vidfile.optional(), opt_bufferurl, opt_justexpand.optional(), opt_justrepack.optional() }}, {{ AdminCmd::CMD_REPACK, AdminCmd::SUBCMD_RM }, { opt_vid }}, {{ AdminCmd::CMD_REPACK, AdminCmd::SUBCMD_LS }, { opt_header.optional(), opt_vid.optional() }}, {{ AdminCmd::CMD_REPACK, AdminCmd::SUBCMD_ERR }, { opt_vid }}, diff --git a/common/dataStructures/RepackType.cpp b/common/dataStructures/RepackType.cpp index 4ea7225f58fa57881f988e3cf32f9a4fffa43b1a..1d1384020ad91f2c1eb23e5517e2fcfc096ab6a1 100644 --- a/common/dataStructures/RepackType.cpp +++ b/common/dataStructures/RepackType.cpp @@ -21,11 +21,11 @@ std::string cta::common::dataStructures::toString(cta::common::dataStructures::RepackType type) { switch(type) { case cta::common::dataStructures::RepackType::expandandrepack: - return "expandandrepack"; + return "expand and repack"; case cta::common::dataStructures::RepackType::justexpand: - return "justexpand"; + return "expand only"; case cta::common::dataStructures::RepackType::justrepack: - return "justrepack"; + return "repack only"; default: return "UNKNOWN"; } diff --git a/objectstore/CMakeLists.txt b/objectstore/CMakeLists.txt index ca01814a65a5f402e636b96648ed30c51c610f64..23225ef3da44b459c5e07d951e0d2ad5cf6a8cc2 100644 --- a/objectstore/CMakeLists.txt +++ b/objectstore/CMakeLists.txt @@ -36,6 +36,8 @@ set (CTAProtoDependants objectstore/Agent.hpp objectstore/DriveState.hpp objectstore/GenericObject.hpp objectstore/ObjectOps.hpp + objectstore/RepackIndex.hpp + objectstore/RepackRequest.hpp objectstore/RetrieveRequest.hpp objectstore/RootEntry.hpp objectstore/SchedulerGlobalLock.hpp @@ -68,6 +70,8 @@ add_library (ctaobjectstore SHARED RetrieveRequest.cpp DriveRegister.cpp DriveState.cpp + RepackIndex.cpp + RepackRequest.cpp BackendVFS.cpp BackendRados.cpp BackendPopulator.cpp diff --git a/objectstore/Helpers.cpp b/objectstore/Helpers.cpp index 15dbfb187bda0b1170b835b1a34bfc624f7c7e88..f7893cb6e6c8ef3e29f2eb2d0c6e891e18df7dfe 100644 --- a/objectstore/Helpers.cpp +++ b/objectstore/Helpers.cpp @@ -24,6 +24,7 @@ #include "RootEntry.hpp" #include "DriveRegister.hpp" #include "DriveState.hpp" +#include "RepackIndex.hpp" #include "catalogue/Catalogue.hpp" #include "common/exception/NonRetryableError.hpp" #include <random> @@ -557,5 +558,29 @@ std::list<cta::common::dataStructures::DriveState> Helpers::getAllDriveStates(Ba return ret; } +//------------------------------------------------------------------------------ +// Helpers::registerRepackRequestToIndex() +//------------------------------------------------------------------------------ +void Helpers::registerRepackRequestToIndex(const std::string& vid, const std::string& requestAddress, + AgentReference & agentReference, Backend& backend, log::LogContext& lc) { + // Try to reference the object in the index (will fail if there is already a request with this VID. + RootEntry re(backend); + re.fetchNoLock(); + std::string repackIndexAddress; + // First, try to get the address of of the repack index lockfree. + try { + re.getRepackIndexAddress(); + } catch (RootEntry::NotAllocated &){ + ScopedExclusiveLock rel(re); + re.fetch(); + repackIndexAddress = re.addOrGetRepackIndexAndCommit(agentReference, lc); + } + RepackIndex ri(repackIndexAddress, backend); + ScopedExclusiveLock ril(ri); + ri.fetch(); + ri.addRepackRequestAddress(vid, requestAddress); +} + + }} // namespace cta::objectstore. \ No newline at end of file diff --git a/objectstore/Helpers.hpp b/objectstore/Helpers.hpp index 45dfba55a1906590e2bd5ee30cbea0369d645ed9..469e5f055fffd8b9447bb42a90638a1d6bdf0b4f 100644 --- a/objectstore/Helpers.hpp +++ b/objectstore/Helpers.hpp @@ -122,6 +122,12 @@ public: * Helper to fetch in parallel all the drive statuses. */ static std::list<cta::common::dataStructures::DriveState> getAllDriveStates(Backend & backend, log::LogContext & lc); + + /** + * Helper to get the repack index. As this structure was developed late, we potentially have to create it on the fly. + */ + static void registerRepackRequestToIndex(const std::string & vid, const std::string & requestAddress, + AgentReference & agentReference, Backend & backend, log::LogContext & lc); }; }} // namespace cta::objectstore \ No newline at end of file diff --git a/objectstore/RepackIndex.cpp b/objectstore/RepackIndex.cpp new file mode 100644 index 0000000000000000000000000000000000000000..5336ff3ca8b72132184bedf6fea4652b04527b6c --- /dev/null +++ b/objectstore/RepackIndex.cpp @@ -0,0 +1,155 @@ +/* + * The CERN Tape Archive (CTA) project + * Copyright (C) 2015 CERN + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#include "RepackIndex.hpp" +#include "GenericObject.hpp" +#include <google/protobuf/util/json_util.h> + +namespace cta { namespace objectstore { + +//------------------------------------------------------------------------------ +// RepackIndex::DriveRegister() +//------------------------------------------------------------------------------ +RepackIndex::RepackIndex(const std::string& address, Backend& os): + ObjectOps<serializers::RepackIndex, serializers::RepackIndex_t>(os, address) { } + +//------------------------------------------------------------------------------ +// RepackIndex::DriveRegister() +//------------------------------------------------------------------------------ +RepackIndex::RepackIndex(GenericObject& go): + ObjectOps<serializers::RepackIndex, serializers::RepackIndex_t>(go.objectStore()) { + // Here we transplant the generic object into the new object + go.transplantHeader(*this); + // And interpret the header. + getPayloadFromHeader(); +} + +//------------------------------------------------------------------------------ +// RepackIndex::dump() +//------------------------------------------------------------------------------ +std::string RepackIndex::dump() { + checkPayloadReadable(); + google::protobuf::util::JsonPrintOptions options; + options.add_whitespace = true; + options.always_print_primitive_fields = true; + std::string headerDump; + google::protobuf::util::MessageToJsonString(m_payload, &headerDump, options); + return headerDump; +} + +//------------------------------------------------------------------------------ +// RepackIndex::initialize() +//------------------------------------------------------------------------------ +void RepackIndex::initialize() { + // Setup underlying object + ObjectOps<serializers::RepackIndex, serializers::RepackIndex_t>::initialize(); + m_payloadInterpreted = true; +} + +//------------------------------------------------------------------------------ +// RepackIndex::garbageCollect() +//------------------------------------------------------------------------------ +void RepackIndex::garbageCollect(const std::string &presumedOwner, AgentReference & agentReference, log::LogContext & lc, + cta::catalogue::Catalogue & catalogue) { + checkPayloadWritable(); + // We should never have to garbage collect + log::ScopedParamContainer params(lc); + params.add("repackTapeRegister", getAddressIfSet()) + .add("currentOwner", getOwner()) + .add("backupOwner", getBackupOwner()) + .add("presumedOwner", presumedOwner); + lc.log(log::ERR, "In RepackIndex::garbageCollect(): Repack Tape Register should not require garbage collection."); + throw exception::Exception("In RepackIndex::garbageCollect(): Repack Tape Register should not require garbage collection"); +} + +//------------------------------------------------------------------------------ +// RepackIndex::getRepackTapeAddress() +//------------------------------------------------------------------------------ +std::string RepackIndex::getRepackRequestAddress(const std::string& vid) { + checkPayloadReadable(); + for (auto & rt: m_payload.repacktapepointers()) { + if (rt.vid() == vid) + return rt.address(); + } + throw NoSuchVID("In RepackIndex::getRepackTapeAddress(): no such VID"); +} + + +//------------------------------------------------------------------------------ +// RepackIndex::getRepackTapeAddresses() +//------------------------------------------------------------------------------ +std::list<RepackIndex::RepackRequestAddress> RepackIndex::getRepackRequestsAddresses() { + checkHeaderReadable(); + std::list<RepackRequestAddress> ret; + for (auto &rt: m_payload.repacktapepointers()) { + ret.push_back(RepackRequestAddress()); + ret.back().driveStateAddress = rt.address(); + ret.back().vid = rt.vid(); + } + return ret; +} + +//------------------------------------------------------------------------------ +// RepackIndex::isEmpty() +//------------------------------------------------------------------------------ +bool RepackIndex::isEmpty() { + checkPayloadReadable(); + return m_payload.repacktapepointers().empty(); +} + +//------------------------------------------------------------------------------ +// DriveRegister::getDriveAddresse() +//------------------------------------------------------------------------------ +void RepackIndex::removeRepackRequest(const std::string& vid) { + checkPayloadWritable(); + bool found = false; + auto vidToRemove = m_payload.mutable_repacktapepointers()->begin(); + while (vidToRemove != m_payload.mutable_repacktapepointers()->end()) { + if ( vidToRemove->vid() == vid) { + vidToRemove = m_payload.mutable_repacktapepointers()->erase(vidToRemove); + found = true; + } else { + vidToRemove++; + } + } + if (!found) { + std::stringstream err; + err << "In RepackIndex::removeRepackTape(): vid not found: " << vid; + throw cta::exception::Exception(err.str()); + } +} + +//------------------------------------------------------------------------------ +// DriveRegister::setRepackTapeAddress() +//------------------------------------------------------------------------------ +void RepackIndex::addRepackRequestAddress(const std::string& vid, + const std::string& repackTapeAddress) { + checkPayloadWritable(); + for (int i=0; i< m_payload.mutable_repacktapepointers()->size(); i++) { + auto rt=m_payload.mutable_repacktapepointers(i); + if (rt->vid() == vid) { + throw VidAlreadyRegistered("In RepackIndex::addRepackRequestAddress(): VID already has a repack request."); + } + } + auto rt=m_payload.mutable_repacktapepointers()->Add(); + rt->set_vid(vid); + rt->set_address(repackTapeAddress); + return; +} + +}} // namespace cta::objectstore diff --git a/objectstore/RepackIndex.hpp b/objectstore/RepackIndex.hpp new file mode 100644 index 0000000000000000000000000000000000000000..25fb3106bcda7919459d00e25426db5045b22597 --- /dev/null +++ b/objectstore/RepackIndex.hpp @@ -0,0 +1,85 @@ +/* + * The CERN Tape Archive (CTA) project + * Copyright (C) 2015 CERN + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#pragma once + +#include "ObjectOps.hpp" +#include "objectstore/cta.pb.h" + +namespace cta { namespace objectstore { + +class Backend; +class Agent; +class GenericObject; +class EntryLogSerDeser; + +class RepackIndex: public ObjectOps<serializers::RepackIndex, serializers::RepackIndex_t> { +public: + RepackIndex(const std::string & address, Backend & os); + RepackIndex(GenericObject & go); + void initialize(); + CTA_GENERATE_EXCEPTION_CLASS(NotEmpty); + void garbageCollect(const std::string &presumedOwner, AgentReference & agentReference, log::LogContext & lc, + cta::catalogue::Catalogue & catalogue) override; + bool isEmpty(); + + /** + * A repack tape register entry (vid + object address) + */ + struct RepackRequestAddress { + std::string vid; + std::string driveStateAddress; + }; + + // Repack tapes management ========================================================= + /** + * Returns all the repack tape states addresses stored in the drive registry. + * @return a list of all the drive states + */ + std::list<RepackRequestAddress> getRepackRequestsAddresses(); + + CTA_GENERATE_EXCEPTION_CLASS(NoSuchVID); + /** + * Returns the repack tape address for the given vid. + * @return the object address + */ + std::string getRepackRequestAddress(const std::string & vid); + + /** + * Adds a drive status reference to the register. Throws an exception if a request is already recorded for this VID. + * @param driveName + * @param driveAddress + */ + void addRepackRequestAddress(const std::string & vid, const std::string &repackRequestAddress); + + CTA_GENERATE_EXCEPTION_CLASS(VidAlreadyRegistered); + + /** + * Removes entry from drive addresses. + * @param driveName + */ + void removeRepackRequest(const std::string & vid); + + /** + * JSON dump of the drive + * @return + */ + std::string dump(); +}; + +}} \ No newline at end of file diff --git a/objectstore/RepackRequest.cpp b/objectstore/RepackRequest.cpp new file mode 100644 index 0000000000000000000000000000000000000000..4a4dc702ca2a8c1d36697a71740d3adc57f4a540 --- /dev/null +++ b/objectstore/RepackRequest.cpp @@ -0,0 +1,82 @@ +/* + * The CERN Tape Archive (CTA) project + * Copyright (C) 2015 CERN + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#include "RepackRequest.hpp" + + +namespace cta { namespace objectstore { + +//------------------------------------------------------------------------------ +// Constructor +//------------------------------------------------------------------------------ +RepackRequest::RepackRequest(const std::string& address, Backend& os): + ObjectOps<serializers::RepackRequest, serializers::RepackRequest_t> (os, address) { } + +//------------------------------------------------------------------------------ +// RepackRequest::initialize() +//------------------------------------------------------------------------------ +void RepackRequest::initialize() { + // Setup underlying object + ObjectOps<serializers::RepackRequest, serializers::RepackRequest_t>::initialize(); + m_payload.set_status(serializers::RepackRequestStatus::RRS_Pending); + m_payload.set_expandmode(true); + m_payload.set_repackmode(true); + // This object is good to go (to storage) + m_payloadInterpreted = true; +} + +//------------------------------------------------------------------------------ +// RepackRequest::setVid() +//------------------------------------------------------------------------------ +void RepackRequest::setVid(const std::string& vid) { + checkPayloadWritable(); + if (vid.empty()) throw exception::Exception("In RepackRequest::setVid(): empty vid"); + m_payload.set_vid(vid); +} + +//------------------------------------------------------------------------------ +// RepackRequest::setRepackType() +//------------------------------------------------------------------------------ +void RepackRequest::setRepackType(common::dataStructures::RepackType repackType) { + checkPayloadWritable(); + typedef common::dataStructures::RepackType RepackType; + switch (repackType) { + case RepackType::expandandrepack: + // Nothing to do, this is the default case. + break; + case RepackType::justexpand: + m_payload.set_repackmode(false); + break; + case RepackType::justrepack: + m_payload.set_expandmode(false); + break; + default: + throw exception::Exception("In RepackRequest::setRepackType(): unexpected type."); + } +} + +//------------------------------------------------------------------------------ +// RepackRequest::garbageCollect() +//------------------------------------------------------------------------------ +void RepackRequest::garbageCollect(const std::string& presumedOwner, AgentReference& agentReference, + log::LogContext& lc, cta::catalogue::Catalogue& catalogue) { + throw exception::Exception("In RepackRequest::garbageCollect(): not implemented."); +} + + +}} // namespace cta::objectstore \ No newline at end of file diff --git a/objectstore/RepackRequest.hpp b/objectstore/RepackRequest.hpp new file mode 100644 index 0000000000000000000000000000000000000000..c68205b8a0f1b2381a3a6f1d7e7bcfdba28834e1 --- /dev/null +++ b/objectstore/RepackRequest.hpp @@ -0,0 +1,45 @@ +/* + * The CERN Tape Archive (CTA) project + * Copyright (C) 2015 CERN + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#pragma once + +#include "ObjectOps.hpp" +#include "objectstore/cta.pb.h" +#include "common/dataStructures/RepackType.hpp" + +namespace cta { namespace objectstore { + +class Agent; +class GenericObject; + +class RepackRequest: public ObjectOps<serializers::RepackRequest, serializers::RepackRequest_t> { +public: + RepackRequest(const std::string & address, Backend & os); + RepackRequest(Backend & os); + RepackRequest(GenericObject & go); + void initialize(); + + // Parameters interface + void setVid(const std::string & vid); + void setRepackType(common::dataStructures::RepackType repackType); + + void garbageCollect(const std::string &presumedOwner, AgentReference & agentReference, log::LogContext & lc, + cta::catalogue::Catalogue & catalogue) override; +}; + +}} // namespace cta::objectstore \ No newline at end of file diff --git a/objectstore/RootEntry.cpp b/objectstore/RootEntry.cpp index 11ecc4d5d62417bac00f60bc5c523baf07fd9c70..80711b415fda51f7b324bb9fb88d0a5662a94c84 100644 --- a/objectstore/RootEntry.cpp +++ b/objectstore/RootEntry.cpp @@ -25,6 +25,7 @@ #include "DriveRegister.hpp" #include "GenericObject.hpp" #include "SchedulerGlobalLock.hpp" +#include "RepackIndex.hpp" #include <cxxabi.h> #include "ProtocolBuffersAlgorithms.hpp" #include <google/protobuf/util/json_util.h> @@ -693,6 +694,74 @@ void RootEntry::removeSchedulerGlobalLockAndCommit(log::LogContext & lc) { commit(); } +// ============================================================================= +// ================ Repack index lock manipulation ============================= +// ============================================================================= + +std::string RootEntry::getRepackIndexAddress() { + checkPayloadReadable(); + if (m_payload.has_repackindexpointer() && + m_payload.repackindexpointer().address().size()) { + return m_payload.repackindexpointer().address(); + } + throw NotAllocated("In RootEntry::getRepackTapeRegistry: repack tape register not yet allocated"); +} + +std::string RootEntry::addOrGetRepackIndexAndCommit(AgentReference& agentRef, log::LogContext & lc) { + checkPayloadWritable(); + // Check if the repack tape register exists + try { + return getRepackIndexAddress(); + } catch (NotAllocated &) { + // TODO: this insertion method is much simpler than the ones used for other objects. + // It implies the only dangling pointer situation we can get is the one where + // the object does not exist. + // As the object never changes ownership, the garbage collection can be left + // empty. There should never be garbage collection for this object type. + // + // decide on the object's name. + std::string rtrAddress (agentRef.nextId("RepackTapeRegister")); + // Then prepare the repack tape register object + RepackIndex ri(rtrAddress, m_objectStore); + ri.initialize(); + ri.setOwner(getAddressIfSet()); + ri.setBackupOwner(getAddressIfSet()); + // Reference the registry in the root entry + auto * rtrp = m_payload.mutable_repackindexpointer(); + rtrp->set_address(rtrAddress); + commit(); + // Create the repack tape register + ri.insert(); + // done. + return rtrAddress; + } +} + +void RootEntry::removeRepackIndexAndCommit(log::LogContext& lc) { + checkPayloadWritable(); + // Get the address of the scheduler lock (nothing to do if there is none) + if (!m_payload.has_repackindexpointer() || + !m_payload.repackindexpointer().address().size()) + return; + std::string rtrAddress = m_payload.repackindexpointer().address(); + RepackIndex ri(rtrAddress, ObjectOps<serializers::RootEntry, serializers::RootEntry_t>::m_objectStore); + ScopedExclusiveLock rtrl(ri); + ri.fetch(); + // Check the drive register is empty + if (!ri.isEmpty()) { + throw DriveRegisterNotEmpty("In RootEntry::removeRepackTapeRegistryAndCommit: " + "trying to remove a non-empty repack tape register"); + } + // we can delete the drive register + ri.remove(); + log::ScopedParamContainer params(lc); + params.add("repackTapeRegister", ri.getAddressIfSet()); + lc.log(log::INFO, "In RootEntry::removeRepackTapeRegistryAndCommit(): removed repack tape register object."); + // And update the root entry + m_payload.mutable_schedulerlockpointer()->set_address(""); + // We commit for safety and symmetry with the add operation + commit(); +} // Dump the root entry std::string RootEntry::dump () { diff --git a/objectstore/RootEntry.hpp b/objectstore/RootEntry.hpp index 82e2d0ac238d4d6a1c7106a1e6552c9b43f276a1..2a1cc7a195ba3ae1181fc0d84c77b9341b5d23a3 100644 --- a/objectstore/RootEntry.hpp +++ b/objectstore/RootEntry.hpp @@ -125,12 +125,17 @@ public: std::string addOrGetAgentRegisterPointerAndCommit(AgentReference & agentRef, const EntryLogSerDeser & log, log::LogContext & lc); void removeAgentRegisterAndCommit(log::LogContext & lc); - - // Agent register manipulations ============================================== + + // Scheduler global lock manipulations ======================================= std::string getSchedulerGlobalLock(); std::string addOrGetSchedulerGlobalLockAndCommit(AgentReference & agentRef, const EntryLogSerDeser & log); void removeSchedulerGlobalLockAndCommit(log::LogContext & lc); + // Repack index manipulations ================================================ + std::string getRepackIndexAddress(); + std::string addOrGetRepackIndexAndCommit(AgentReference & agentRef, log::LogContext & lc); + void removeRepackIndexAndCommit(log::LogContext & lc); + private: void addIntendedAgentRegistry(const std::string & address, log::LogContext & lc); diff --git a/objectstore/cta.proto b/objectstore/cta.proto index 54fb99e3c9bcc25e6b774e206799ea9c97e8a8fd..c77006ba82f637b3939efb8a53b097fcd16b2ca7 100644 --- a/objectstore/cta.proto +++ b/objectstore/cta.proto @@ -31,6 +31,9 @@ enum ObjectType { ArchiveQueueShard_t = 90; RetrieveQueue_t = 10; RetrieveQueueShard_t = 100; + RepackRequest_t = 11; + RepackIndex_t = 12; + RepackQueue_t = 13; GenericObject_t = 1000; } @@ -79,7 +82,7 @@ message AgentRegisterPointer { required EntryLog log = 101; } -message RepackTapeRegisterPointer { +message RepackIndexPointer { required string address = 105; required EntryLog log = 106; } @@ -114,7 +117,7 @@ message RootEntry { repeated ArchiveQueuePointer archivejobstoreportqueuepointers = 1068; optional DriveRegisterPointer driveregisterpointer = 1070; optional AgentRegisterPointer agentregisterpointer = 1080; - optional RepackTapeRegisterPointer repacktaperegisterpointer = 1085; + optional RepackIndexPointer repackindexpointer = 1085; optional string agentregisterintent = 1090; optional SchedulerGlobalLockPointer schedulerlockpointer = 1100; } @@ -438,52 +441,29 @@ message RetrieveQueue { // ------------- Repack data strcutures ---------------------------------------- -enum RepackTapeStatus { - RTS_Starting = 1; - RTS_Ongoing = 2; - RTS_Finished = 3; - RTS_Failed = 4; - RTS_Aborting = 5; - RTS_Aborted = 6; - RTS_Submitted = 7; +enum RepackRequestStatus { + RRS_Pending = 1; + RRS_ToExpand = 2; + RRS_Starting = 3; + RRS_Running = 4; + RRS_Aborting = 5; + RRS_Aborted = 6; + RRS_Complete = 7; + RRS_Failed = 8; } -message RepackBypassElement { - required uint64 fseq = 11500; - required string url = 11510; +message RepackRequest { + required string vid = 11000; + required RepackRequestStatus status = 11010; + required bool expandmode = 11400; + required bool repackmode = 11410; } -message RepackTapeState { - required string vid = 11000; - required RepackTapeStatus status = 11010; - required uint64 creationtime = 11020; - required uint64 starttime = 11030; - required uint64 completiontime = 11040; - required uint64 failuretime = 11050; - required uint64 abortrequesttime = 11060; - required uint64 abortcompletiontime = 11070; - required uint64 submissiontime = 11080; - required uint64 filescount = 11090; - required uint64 bytescount = 11100; - required uint64 filesretrieved = 11110; - required uint64 bytesretrieved = 11120; - required uint64 filessuccessful = 11130; - required uint64 bytessuvvessful = 11140; - required uint64 filesfailed = 11150; - required uint64 bytesfailed = 11160; - required uint64 activeretrieves = 11170; - required uint64 activearchives = 11180; - repeated string sourcevids = 11190; - repeated string targettapepools = 11200; - repeated RepackBypassElement bypasselements = 11300; -} - -message RepackTapePointer { +message RepackRequestPointer { required string vid = 12000; required string address = 12010; } -message RepackTapeRegister { - repeated RepackTapePointer repacktapepointers = 12100; - repeated string tapestoblowup = 12110; +message RepackIndex { + repeated RepackRequestPointer repacktapepointers = 12100; } diff --git a/scheduler/OStoreDB/MemQueues.hpp b/scheduler/OStoreDB/MemQueues.hpp index be267ea56d925bd1142934e42e83c8f83649bd6a..9165ce11d3affa41e3cb75e5fa59a431e74f0761 100644 --- a/scheduler/OStoreDB/MemQueues.hpp +++ b/scheduler/OStoreDB/MemQueues.hpp @@ -321,7 +321,7 @@ std::shared_ptr<SharedQueueLock<Queue, Request>> MemQueue<Request, Queue>::share jta.push_back({maqr->m_job, maqr->m_request}); addedJobs++; } - // Actually ass the jobs. + // Actually add the jobs. specializedAddJobsToQueueAndCommit(jta, queue, *oStoreDB.m_agentReference, logContext); double queueProcessAndCommitTime = timer.secs(utils::Timer::resetCounter); // Update the cache stats in memory as we hold the queue. diff --git a/scheduler/OStoreDB/OStoreDB.cpp b/scheduler/OStoreDB/OStoreDB.cpp index 8fe67bfacd1c7e8e62321162616021f13b42ff73..21e5f140b1b08618a6cf3eb051c93ab7cb9977be 100644 --- a/scheduler/OStoreDB/OStoreDB.cpp +++ b/scheduler/OStoreDB/OStoreDB.cpp @@ -24,6 +24,8 @@ #include "objectstore/DriveState.hpp" //#include "objectstore/ArchiveRequest.hpp" //#include "objectstore/RetrieveRequest.hpp" +#include "objectstore/RepackRequest.hpp" +#include "objectstore/RepackIndex.hpp" #include "objectstore/Helpers.hpp" #include "common/exception/Exception.hpp" #include "common/utils/utils.hpp" @@ -1021,6 +1023,38 @@ OStoreDB::RetrieveQueueItor_t OStoreDB::getRetrieveJobItor(const std::string &vi return RetrieveQueueItor_t(m_objectStore, vid); } +//------------------------------------------------------------------------------ +// OStoreDB::queueRepack() +//------------------------------------------------------------------------------ +void OStoreDB::queueRepack(const std::string& vid, const std::string& bufferURL, + common::dataStructures::RepackType repackType, log::LogContext & lc) { + // Prepare the repack request object in memory. + assertAgentAddressSet(); + cta::utils::Timer t; + cta::objectstore::RepackRequest rr(m_agentReference->nextId("RepackTapeRequest"), m_objectStore); + rr.initialize(); + // We need to own the request until it is queued in the the pending queue. + rr.setOwner(m_agentReference->getAgentAddress()); + rr.setVid(vid); + rr.setRepackType(repackType); + // Try to reference the object in the index (will fail if there is already a request with this VID. + RootEntry re(m_objectStore); + re.fetchNoLock(); + RepackIndex ri(re.addOrGetRepackIndexAndCommit(*m_agentReference, lc), m_objectStore); + try { + Helpers::registerRepackRequestToIndex(vid, rr.getAddressIfSet(), *m_agentReference, m_objectStore, lc); + } catch (objectstore::RepackIndex::VidAlreadyRegistered &) { + throw exception::UserError("A repack request already exists for this VID."); + } + // We're good to go to create the object. We need to own it. + m_agentReference->addToOwnership(rr.getAddressIfSet(), m_objectStore); + rr.insert(); + // If latency needs to the improved, the next steps could be deferred like they are for archive and retrieve requests. + // TODO typedef objectstore::ContainerAlgorithms<RepackPendingQueue> RPQA; + + +} + //------------------------------------------------------------------------------ // OStoreDB::getDriveStates() diff --git a/scheduler/OStoreDB/OStoreDB.hpp b/scheduler/OStoreDB/OStoreDB.hpp index ca6ae49affdf9e4fcc0c3956d19b5811e72385bb..c0de2094d6e6b66e5ed534b015b56288e7573e4b 100644 --- a/scheduler/OStoreDB/OStoreDB.hpp +++ b/scheduler/OStoreDB/OStoreDB.hpp @@ -274,6 +274,10 @@ public: typedef QueueItor<objectstore::RootEntry::RetrieveQueueDump, objectstore::RetrieveQueue> RetrieveQueueItor_t; RetrieveQueueItor_t getRetrieveJobItor(const std::string &vid) const; + /* === Repack requests handling =========================================== */ + void queueRepack(const std::string& vid, const std::string& bufferURL, common::dataStructures::RepackType repackType, + log::LogContext &logContext) override; + /* === Drive state handling ============================================== */ /** * Get states of all drives. diff --git a/scheduler/OStoreDB/OStoreDBFactory.hpp b/scheduler/OStoreDB/OStoreDBFactory.hpp index c0d5a8d4258cf4d1122355a5423fe78ae9279c77..16d1e53dea1caa78a9467059037009928ad2f3d7 100644 --- a/scheduler/OStoreDB/OStoreDBFactory.hpp +++ b/scheduler/OStoreDB/OStoreDBFactory.hpp @@ -151,6 +151,11 @@ public: return m_OStoreDB.queueRetrieve(rqst, criteria, logContext); } + + void queueRepack(const std::string& vid, const std::string& bufferURL, common::dataStructures::RepackType repackType, log::LogContext& lc) override { + m_OStoreDB.queueRepack(vid, bufferURL, repackType, lc); + } + std::list<cta::common::dataStructures::DriveState> getDriveStates(log::LogContext & lc) const override { return m_OStoreDB.getDriveStates(lc); } diff --git a/scheduler/Scheduler.cpp b/scheduler/Scheduler.cpp index d7b01ae23eff026766056acd90d112af30fea3ce..d3daeba4e34e9aba683dae866048a51fcb343ba6 100644 --- a/scheduler/Scheduler.cpp +++ b/scheduler/Scheduler.cpp @@ -230,7 +230,8 @@ void Scheduler::queueRetrieve( //------------------------------------------------------------------------------ // deleteArchive //------------------------------------------------------------------------------ -void Scheduler::deleteArchive(const std::string &instanceName, const common::dataStructures::DeleteArchiveRequest &request, log::LogContext & lc) { +void Scheduler::deleteArchive(const std::string &instanceName, const common::dataStructures::DeleteArchiveRequest &request, + log::LogContext & lc) { // We have different possible scenarios here. The file can be safe in the catalogue, // fully queued, or partially queued. // First, make sure the file is not queued anymore. @@ -292,8 +293,20 @@ void Scheduler::queueLabel(const common::dataStructures::SecurityIdentity &cliId //------------------------------------------------------------------------------ // repack //------------------------------------------------------------------------------ -void Scheduler::queueRepack(const common::dataStructures::SecurityIdentity &cliIdentity, const std::string &vid, const common::dataStructures::RepackType) { - throw exception::Exception(std::string("Not implemented: ") + __PRETTY_FUNCTION__); +void Scheduler::queueRepack(const common::dataStructures::SecurityIdentity &cliIdentity, const std::string &vid, + const std::string & bufferURL, const common::dataStructures::RepackType repackType, log::LogContext & lc) { + // Check request sanity + if (vid.empty()) throw exception::UserError("Empty VID name."); + if (bufferURL.empty()) throw exception::UserError("Empty buffer URL."); + utils::Timer t; + m_db.queueRepack(vid, bufferURL, repackType, lc); + log::TimingList tl; + tl.insertAndReset("schedulerDbTime", t); + log::ScopedParamContainer params(lc); + params.add("VID", vid) + .add("repackType", toString(repackType)); + tl.addToLog(params); + lc.log(log::INFO, "In Scheduler::queueRepack(): success."); } //------------------------------------------------------------------------------ diff --git a/scheduler/Scheduler.hpp b/scheduler/Scheduler.hpp index 7229fdb98dc81d51e2b6596482658d9fc9c86510..4b18c7a65c58cf5054a2fb01011f01b32cf2f7a5 100644 --- a/scheduler/Scheduler.hpp +++ b/scheduler/Scheduler.hpp @@ -190,8 +190,8 @@ public: void queueLabel(const cta::common::dataStructures::SecurityIdentity &cliIdentity, const std::string &vid, const bool force, const bool lbp); - void queueRepack(const cta::common::dataStructures::SecurityIdentity &cliIdentity, const std::string &vid, - const cta::common::dataStructures::RepackType); + void queueRepack(const common::dataStructures::SecurityIdentity &cliIdentity, const std::string &vid, + const std::string & bufferURL, const common::dataStructures::RepackType repackType, log::LogContext & lc); void cancelRepack(const cta::common::dataStructures::SecurityIdentity &cliIdentity, const std::string &vid); std::list<cta::common::dataStructures::RepackInfo> getRepacks( const cta::common::dataStructures::SecurityIdentity &cliIdentity); diff --git a/scheduler/SchedulerDatabase.hpp b/scheduler/SchedulerDatabase.hpp index 5509d514fbed6caffb814afd7bb86a43e2cef8ae..a1145991df7d3aee885342d26001b739356eb7f2 100644 --- a/scheduler/SchedulerDatabase.hpp +++ b/scheduler/SchedulerDatabase.hpp @@ -30,6 +30,7 @@ #include "common/dataStructures/MountPolicy.hpp" #include "common/dataStructures/RetrieveJob.hpp" #include "common/dataStructures/RetrieveRequest.hpp" +#include "common/dataStructures/RepackType.hpp" #include "common/dataStructures/SecurityIdentity.hpp" #include "common/remoteFS/RemotePathAndStatus.hpp" #include "common/log/LogContext.hpp" @@ -351,6 +352,12 @@ public: virtual ~RetrieveJob() {} }; + /*============ Repack management: user side ================================*/ + virtual void queueRepack(const std::string & vid, const std::string & bufferURL, + common::dataStructures::RepackType repackType, log::LogContext & lc) = 0; + + /*============ Repack management: tape server side =========================*/ + /*============ Label management: user side =================================*/ // TODO diff --git a/xroot_plugins/XrdSsiCtaRequestMessage.cpp b/xroot_plugins/XrdSsiCtaRequestMessage.cpp index e51542855a2b71e45b8232b630bf1d2898bd5414..262812a953bfa0ecd29bea7c9b84d3c9b58afa3a 100644 --- a/xroot_plugins/XrdSsiCtaRequestMessage.cpp +++ b/xroot_plugins/XrdSsiCtaRequestMessage.cpp @@ -1343,6 +1343,7 @@ void RequestMessage::processRepack_Add(const cta::admin::AdminCmd &admincmd, cta // VIDs can be provided as a single option or as a list std::vector<std::string> vid_list; + std::string bufferURL; auto vidl = getOptional(OptionStrList::VID); if(vidl) vid_list = vidl.value(); @@ -1352,6 +1353,12 @@ void RequestMessage::processRepack_Add(const cta::admin::AdminCmd &admincmd, cta if(vid_list.empty()) { throw cta::exception::UserError("Must specify at least one vid, using --vid or --vidfile options"); } + + auto buff = getOptional(OptionString::BUFFERURL); + if (buff) + bufferURL = buff.value(); + else + throw cta::exception::UserError("Must specify the buffer URL using --bufferurl option."); // Expand, repack, or both ? cta::common::dataStructures::RepackType type; @@ -1368,7 +1375,7 @@ void RequestMessage::processRepack_Add(const cta::admin::AdminCmd &admincmd, cta // Process each item in the list for(auto it = vid_list.begin(); it != vid_list.end(); ++it) { - m_scheduler.queueRepack(m_cliIdentity, *it, type); + m_scheduler.queueRepack(m_cliIdentity, *it, bufferURL, type, m_lc); } response.set_type(cta::xrd::Response::RSP_SUCCESS); diff --git a/xrootd-ssi-protobuf-interface b/xrootd-ssi-protobuf-interface index b92937743ef5f2eaf1e30336519c0e7ee055e6b2..c5b7f951d7755ed26c774b2b73c5b25eb0eade9d 160000 --- a/xrootd-ssi-protobuf-interface +++ b/xrootd-ssi-protobuf-interface @@ -1 +1 @@ -Subproject commit b92937743ef5f2eaf1e30336519c0e7ee055e6b2 +Subproject commit c5b7f951d7755ed26c774b2b73c5b25eb0eade9d