Commit 4b25d29d authored by Eric Cano's avatar Eric Cano
Browse files

Created RepackRequest and RepackQueue

Added partial support for repack queueing in scheduler database,
scheduler and CLI.
parent 210a47d9
......@@ -41,8 +41,6 @@
#include "common/dataStructures/MountType.hpp"
#include "common/dataStructures/MountPolicy.hpp"
#include "common/dataStructures/ReadTestResult.hpp"
#include "common/dataStructures/RepackInfo.hpp"
#include "common/dataStructures/RepackType.hpp"
#include "common/dataStructures/RequesterGroupMountRule.hpp"
#include "common/dataStructures/RequesterMountRule.hpp"
#include "common/dataStructures/RetrieveFileQueueCriteria.hpp"
......
......@@ -288,6 +288,7 @@ const std::map<std::string, OptionUInt64::Key> uint64Options = {
* Map string options to Protocol Buffer enum values
*/
const std::map<std::string, OptionString::Key> strOptions = {
{ "--bufferurl", OptionString::BUFFERURL },
{ "--comment", OptionString::COMMENT },
{ "--diskid", OptionString::DISKID },
{ "--drive", OptionString::DRIVE },
......@@ -361,6 +362,7 @@ const std::map<AdminCmd::Cmd, CmdHelp> cmdHelp = {
const Option opt_all { Option::OPT_FLAG, "--all", "-a", "" };
const Option opt_archivefileid { Option::OPT_UINT, "--id", "-I", " <archive_file_id>" };
const Option opt_archivepriority { Option::OPT_UINT, "--archivepriority", "--ap", " <priority_value>" };
const Option opt_bufferurl { Option::OPT_STR, "--bufferurl", "-b", " <buffer URL>" };
const Option opt_capacity { Option::OPT_UINT, "--capacity", "-c", " <capacity_in_bytes>" };
const Option opt_checkchecksum { Option::OPT_FLAG, "--checkchecksum", "-c", "" };
const Option opt_comment { Option::OPT_STR, "--comment", "-m", " <\"comment\">" };
......@@ -470,7 +472,7 @@ const std::map<cmd_key_t, cmd_val_t> cmdOptions = {
{{ AdminCmd::CMD_MOUNTPOLICY, AdminCmd::SUBCMD_RM }, { opt_mountpolicy_alias }},
{{ AdminCmd::CMD_MOUNTPOLICY, AdminCmd::SUBCMD_LS }, { opt_header.optional() }},
{{ AdminCmd::CMD_REPACK, AdminCmd::SUBCMD_ADD },
{ opt_vid.optional(), opt_vidfile.optional(), opt_justexpand.optional(), opt_justrepack.optional() }},
{ opt_vid.optional(), opt_vidfile.optional(), opt_bufferurl, opt_justexpand.optional(), opt_justrepack.optional() }},
{{ AdminCmd::CMD_REPACK, AdminCmd::SUBCMD_RM }, { opt_vid }},
{{ AdminCmd::CMD_REPACK, AdminCmd::SUBCMD_LS }, { opt_header.optional(), opt_vid.optional() }},
{{ AdminCmd::CMD_REPACK, AdminCmd::SUBCMD_ERR }, { opt_vid }},
......
......@@ -21,11 +21,11 @@
std::string cta::common::dataStructures::toString(cta::common::dataStructures::RepackType type) {
switch(type) {
case cta::common::dataStructures::RepackType::expandandrepack:
return "expandandrepack";
return "expand and repack";
case cta::common::dataStructures::RepackType::justexpand:
return "justexpand";
return "expand only";
case cta::common::dataStructures::RepackType::justrepack:
return "justrepack";
return "repack only";
default:
return "UNKNOWN";
}
......
Repack lifecycle notes:
# Repack structure
- A VID + timestamp queue for insertion and asynchronous processing.
- A registry of the tape being or having been repacked.
- A new step in the garbage collector loop, to be renamed to background tasks
- Step zero: on insertion, the repack tape job is created and added to the "to blow up queue".
- Step one: pop from the queue and create the state entry (conditionally). Take ownership. State=starting.
- Step one bis: in case of garbage collection, we re-enqueue the request and switch the ownership
of the request back to the registry.
- Step two: blow up the VID into individual retrieve-with-hook structures. If the repack is a recovery repack, try
to third party copy the files from the EOSCTAXXX instance instead of retrieving. If the files can be TPCed from
the EOSCTA instance to the repack disk pool (probably also an EOS instance). Initialize VID's repack statistics.
- Step three: retrieve succeed, create an archive, and update statistics (bulk update the statistics, including
the targeted tape pools)
... or temporarily failed, just requeue, potentially a different copy from a different VID (reference the new VID in the request if needed).
... Or finally failed, just count the failures in statistics.
- Step four: after archive success, update catalogue and statistics. Clean up file.
- Step five: a repack entry with nothing pending can be deleted by the operator.
- A an index referencing all the repack requests (one per VID).
- Only one request per VID is allowed an any point in time.
- The repack request has a status and contains global counters for tape files on the tape, plus references to the individual requests to allow canceling.
- When requests require processing, they get queued. They remain referenced in the index at all times.
- Feeder queue: requests are queue there initially (status=Pending)
- To expand queue: requests are fed in this queue in a controlled way, to be picked up for expansion in file-level requests (status=ToExpand)
- The next status does not require queueing: (status=Starting). All file level requests are created, but none are reported as executed yet.
- (status=Running). At least one file request is reported as done (for retrieve or archive, failure or success). We can go straight from ToExpand
to Running if the file jobs get processed before the request creation completes.
- (status=Success): all jobs done.
- (status=Failed): at least a job not done, but all are complete.
- Aborting:
- This takes a macroscopic amount of time. We need to find the requests (archive, retrieve).
For this we need to sift through the queues (listed in the request). Then we have to cancel
and dequeue any request present. The requests that are selected by the tape servers will need
to be updated accordingly as events occur to them (requeue, success, failure).
\ No newline at end of file
- File level jobs are retrieve jobs converting into archive job(s) on completion. Job conversion is handled by the maintenance process.
Success and failure reporting is also handled by it like for the EOS reporting. We can get several archive jobs after a single retrieve in the
case of an expand type repack (expand meaning re-creation of missing tape files). This is done with a single archive request object as for
initial archives.
- Aborting: the file requests are referenced by the repack request, and can be found and removed through it. Any reporting on them will fail instantly.
Disk copies should be cleared if present.
- Disk destination: the disk destination is decided per repack request and is provided as a base URL which will be complemented by the file
location (VID/fseq(08%d)).
- During expansion, disk buffer is scanned for already existing files before creating the retrieve requests (or directly the archive ones). Files with correct size and checksum are kept and archive requests are created directly. Non-matching files get deleted.
......@@ -36,6 +36,8 @@ set (CTAProtoDependants objectstore/Agent.hpp
objectstore/DriveState.hpp
objectstore/GenericObject.hpp
objectstore/ObjectOps.hpp
objectstore/RepackIndex.hpp
objectstore/RepackRequest.hpp
objectstore/RetrieveRequest.hpp
objectstore/RootEntry.hpp
objectstore/SchedulerGlobalLock.hpp
......@@ -68,6 +70,8 @@ add_library (ctaobjectstore SHARED
RetrieveRequest.cpp
DriveRegister.cpp
DriveState.cpp
RepackIndex.cpp
RepackRequest.cpp
BackendVFS.cpp
BackendRados.cpp
BackendPopulator.cpp
......
......@@ -24,6 +24,7 @@
#include "RootEntry.hpp"
#include "DriveRegister.hpp"
#include "DriveState.hpp"
#include "RepackIndex.hpp"
#include "catalogue/Catalogue.hpp"
#include "common/exception/NonRetryableError.hpp"
#include <random>
......@@ -557,5 +558,29 @@ std::list<cta::common::dataStructures::DriveState> Helpers::getAllDriveStates(Ba
return ret;
}
//------------------------------------------------------------------------------
// Helpers::registerRepackRequestToIndex()
//------------------------------------------------------------------------------
void Helpers::registerRepackRequestToIndex(const std::string& vid, const std::string& requestAddress,
AgentReference & agentReference, Backend& backend, log::LogContext& lc) {
// Try to reference the object in the index (will fail if there is already a request with this VID.
RootEntry re(backend);
re.fetchNoLock();
std::string repackIndexAddress;
// First, try to get the address of of the repack index lockfree.
try {
re.getRepackIndexAddress();
} catch (RootEntry::NotAllocated &){
ScopedExclusiveLock rel(re);
re.fetch();
repackIndexAddress = re.addOrGetRepackIndexAndCommit(agentReference, lc);
}
RepackIndex ri(repackIndexAddress, backend);
ScopedExclusiveLock ril(ri);
ri.fetch();
ri.addRepackRequestAddress(vid, requestAddress);
}
}} // namespace cta::objectstore.
\ No newline at end of file
......@@ -122,6 +122,12 @@ public:
* Helper to fetch in parallel all the drive statuses.
*/
static std::list<cta::common::dataStructures::DriveState> getAllDriveStates(Backend & backend, log::LogContext & lc);
/**
* Helper to get the repack index. As this structure was developed late, we potentially have to create it on the fly.
*/
static void registerRepackRequestToIndex(const std::string & vid, const std::string & requestAddress,
AgentReference & agentReference, Backend & backend, log::LogContext & lc);
};
}} // namespace cta::objectstore
\ No newline at end of file
/*
* The CERN Tape Archive (CTA) project
* Copyright (C) 2015 CERN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "RepackIndex.hpp"
#include "GenericObject.hpp"
#include <google/protobuf/util/json_util.h>
namespace cta { namespace objectstore {
//------------------------------------------------------------------------------
// RepackIndex::DriveRegister()
//------------------------------------------------------------------------------
RepackIndex::RepackIndex(const std::string& address, Backend& os):
ObjectOps<serializers::RepackIndex, serializers::RepackIndex_t>(os, address) { }
//------------------------------------------------------------------------------
// RepackIndex::DriveRegister()
//------------------------------------------------------------------------------
RepackIndex::RepackIndex(GenericObject& go):
ObjectOps<serializers::RepackIndex, serializers::RepackIndex_t>(go.objectStore()) {
// Here we transplant the generic object into the new object
go.transplantHeader(*this);
// And interpret the header.
getPayloadFromHeader();
}
//------------------------------------------------------------------------------
// RepackIndex::dump()
//------------------------------------------------------------------------------
std::string RepackIndex::dump() {
checkPayloadReadable();
google::protobuf::util::JsonPrintOptions options;
options.add_whitespace = true;
options.always_print_primitive_fields = true;
std::string headerDump;
google::protobuf::util::MessageToJsonString(m_payload, &headerDump, options);
return headerDump;
}
//------------------------------------------------------------------------------
// RepackIndex::initialize()
//------------------------------------------------------------------------------
void RepackIndex::initialize() {
// Setup underlying object
ObjectOps<serializers::RepackIndex, serializers::RepackIndex_t>::initialize();
m_payloadInterpreted = true;
}
//------------------------------------------------------------------------------
// RepackIndex::garbageCollect()
//------------------------------------------------------------------------------
void RepackIndex::garbageCollect(const std::string &presumedOwner, AgentReference & agentReference, log::LogContext & lc,
cta::catalogue::Catalogue & catalogue) {
checkPayloadWritable();
// We should never have to garbage collect
log::ScopedParamContainer params(lc);
params.add("repackTapeRegister", getAddressIfSet())
.add("currentOwner", getOwner())
.add("backupOwner", getBackupOwner())
.add("presumedOwner", presumedOwner);
lc.log(log::ERR, "In RepackIndex::garbageCollect(): Repack Tape Register should not require garbage collection.");
throw exception::Exception("In RepackIndex::garbageCollect(): Repack Tape Register should not require garbage collection");
}
//------------------------------------------------------------------------------
// RepackIndex::getRepackTapeAddress()
//------------------------------------------------------------------------------
std::string RepackIndex::getRepackRequestAddress(const std::string& vid) {
checkPayloadReadable();
for (auto & rt: m_payload.repacktapepointers()) {
if (rt.vid() == vid)
return rt.address();
}
throw NoSuchVID("In RepackIndex::getRepackTapeAddress(): no such VID");
}
//------------------------------------------------------------------------------
// RepackIndex::getRepackTapeAddresses()
//------------------------------------------------------------------------------
std::list<RepackIndex::RepackRequestAddress> RepackIndex::getRepackRequestsAddresses() {
checkHeaderReadable();
std::list<RepackRequestAddress> ret;
for (auto &rt: m_payload.repacktapepointers()) {
ret.push_back(RepackRequestAddress());
ret.back().driveStateAddress = rt.address();
ret.back().vid = rt.vid();
}
return ret;
}
//------------------------------------------------------------------------------
// RepackIndex::isEmpty()
//------------------------------------------------------------------------------
bool RepackIndex::isEmpty() {
checkPayloadReadable();
return m_payload.repacktapepointers().empty();
}
//------------------------------------------------------------------------------
// DriveRegister::getDriveAddresse()
//------------------------------------------------------------------------------
void RepackIndex::removeRepackRequest(const std::string& vid) {
checkPayloadWritable();
bool found = false;
auto vidToRemove = m_payload.mutable_repacktapepointers()->begin();
while (vidToRemove != m_payload.mutable_repacktapepointers()->end()) {
if ( vidToRemove->vid() == vid) {
vidToRemove = m_payload.mutable_repacktapepointers()->erase(vidToRemove);
found = true;
} else {
vidToRemove++;
}
}
if (!found) {
std::stringstream err;
err << "In RepackIndex::removeRepackTape(): vid not found: " << vid;
throw cta::exception::Exception(err.str());
}
}
//------------------------------------------------------------------------------
// DriveRegister::setRepackTapeAddress()
//------------------------------------------------------------------------------
void RepackIndex::addRepackRequestAddress(const std::string& vid,
const std::string& repackTapeAddress) {
checkPayloadWritable();
for (int i=0; i< m_payload.mutable_repacktapepointers()->size(); i++) {
auto rt=m_payload.mutable_repacktapepointers(i);
if (rt->vid() == vid) {
throw VidAlreadyRegistered("In RepackIndex::addRepackRequestAddress(): VID already has a repack request.");
}
}
auto rt=m_payload.mutable_repacktapepointers()->Add();
rt->set_vid(vid);
rt->set_address(repackTapeAddress);
return;
}
}} // namespace cta::objectstore
/*
* The CERN Tape Archive (CTA) project
* Copyright (C) 2015 CERN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include "ObjectOps.hpp"
#include "objectstore/cta.pb.h"
namespace cta { namespace objectstore {
class Backend;
class Agent;
class GenericObject;
class EntryLogSerDeser;
class RepackIndex: public ObjectOps<serializers::RepackIndex, serializers::RepackIndex_t> {
public:
RepackIndex(const std::string & address, Backend & os);
RepackIndex(GenericObject & go);
void initialize();
CTA_GENERATE_EXCEPTION_CLASS(NotEmpty);
void garbageCollect(const std::string &presumedOwner, AgentReference & agentReference, log::LogContext & lc,
cta::catalogue::Catalogue & catalogue) override;
bool isEmpty();
/**
* A repack tape register entry (vid + object address)
*/
struct RepackRequestAddress {
std::string vid;
std::string driveStateAddress;
};
// Repack tapes management =========================================================
/**
* Returns all the repack tape states addresses stored in the drive registry.
* @return a list of all the drive states
*/
std::list<RepackRequestAddress> getRepackRequestsAddresses();
CTA_GENERATE_EXCEPTION_CLASS(NoSuchVID);
/**
* Returns the repack tape address for the given vid.
* @return the object address
*/
std::string getRepackRequestAddress(const std::string & vid);
/**
* Adds a drive status reference to the register. Throws an exception if a request is already recorded for this VID.
* @param driveName
* @param driveAddress
*/
void addRepackRequestAddress(const std::string & vid, const std::string &repackRequestAddress);
CTA_GENERATE_EXCEPTION_CLASS(VidAlreadyRegistered);
/**
* Removes entry from drive addresses.
* @param driveName
*/
void removeRepackRequest(const std::string & vid);
/**
* JSON dump of the drive
* @return
*/
std::string dump();
};
}}
\ No newline at end of file
/*
* The CERN Tape Archive (CTA) project
* Copyright (C) 2015 CERN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "RepackRequest.hpp"
namespace cta { namespace objectstore {
//------------------------------------------------------------------------------
// Constructor
//------------------------------------------------------------------------------
RepackRequest::RepackRequest(const std::string& address, Backend& os):
ObjectOps<serializers::RepackRequest, serializers::RepackRequest_t> (os, address) { }
//------------------------------------------------------------------------------
// RepackRequest::initialize()
//------------------------------------------------------------------------------
void RepackRequest::initialize() {
// Setup underlying object
ObjectOps<serializers::RepackRequest, serializers::RepackRequest_t>::initialize();
m_payload.set_status(serializers::RepackRequestStatus::RRS_Pending);
m_payload.set_expandmode(true);
m_payload.set_repackmode(true);
// This object is good to go (to storage)
m_payloadInterpreted = true;
}
//------------------------------------------------------------------------------
// RepackRequest::setVid()
//------------------------------------------------------------------------------
void RepackRequest::setVid(const std::string& vid) {
checkPayloadWritable();
if (vid.empty()) throw exception::Exception("In RepackRequest::setVid(): empty vid");
m_payload.set_vid(vid);
}
//------------------------------------------------------------------------------
// RepackRequest::setRepackType()
//------------------------------------------------------------------------------
void RepackRequest::setRepackType(common::dataStructures::RepackType repackType) {
checkPayloadWritable();
typedef common::dataStructures::RepackType RepackType;
switch (repackType) {
case RepackType::expandandrepack:
// Nothing to do, this is the default case.
break;
case RepackType::justexpand:
m_payload.set_repackmode(false);
break;
case RepackType::justrepack:
m_payload.set_expandmode(false);
break;
default:
throw exception::Exception("In RepackRequest::setRepackType(): unexpected type.");
}
}
//------------------------------------------------------------------------------
// RepackRequest::garbageCollect()
//------------------------------------------------------------------------------
void RepackRequest::garbageCollect(const std::string& presumedOwner, AgentReference& agentReference,
log::LogContext& lc, cta::catalogue::Catalogue& catalogue) {
throw exception::Exception("In RepackRequest::garbageCollect(): not implemented.");
}
}} // namespace cta::objectstore
\ No newline at end of file
/*
* The CERN Tape Archive (CTA) project
* Copyright (C) 2015 CERN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include "ObjectOps.hpp"
#include "objectstore/cta.pb.h"
#include "common/dataStructures/RepackType.hpp"
namespace cta { namespace objectstore {
class Agent;
class GenericObject;
class RepackRequest: public ObjectOps<serializers::RepackRequest, serializers::RepackRequest_t> {
public:
RepackRequest(const std::string & address, Backend & os);
RepackRequest(Backend & os);
RepackRequest(GenericObject & go);
void initialize();
// Parameters interface
void setVid(const std::string & vid);
void setRepackType(common::dataStructures::RepackType repackType);
void garbageCollect(const std::string &presumedOwner, AgentReference & agentReference, log::LogContext & lc,
cta::catalogue::Catalogue & catalogue) override;
};
}} // namespace cta::objectstore
\ No newline at end of file
......@@ -25,6 +25,7 @@
#include "DriveRegister.hpp"
#include "GenericObject.hpp"
#include "SchedulerGlobalLock.hpp"
#include "RepackIndex.hpp"
#include <cxxabi.h>
#include "ProtocolBuffersAlgorithms.hpp"
#include <google/protobuf/util/json_util.h>
......@@ -693,6 +694,74 @@ void RootEntry::removeSchedulerGlobalLockAndCommit(log::LogContext & lc) {
commit();
}
// =============================================================================
// ================ Repack index lock manipulation =============================
// =============================================================================
std::string RootEntry::getRepackIndexAddress() {
checkPayloadReadable();
if (m_payload.has_repackindexpointer() &&
m_payload.repackindexpointer().address().size()) {
return m_payload.repackindexpointer().address();
}
throw NotAllocated("In RootEntry::getRepackTapeRegistry: repack tape register not yet allocated");
}
std::string RootEntry::addOrGetRepackIndexAndCommit(AgentReference& agentRef, log::LogContext & lc) {
checkPayloadWritable();
// Check if the repack tape register exists
try {
return getRepackIndexAddress();
} catch (NotAllocated &) {
// TODO: this insertion method is much simpler than the ones used for other objects.
// It implies the only dangling pointer situation we can get is the one where
// the object does not exist.
// As the object never changes ownership, the garbage collection can be left
// empty. There should never be garbage collection for this object type.
//
// decide on the object's name.
std::string rtrAddress (agentRef.nextId("RepackTapeRegister"));
// Then prepare the repack tape register object
RepackIndex ri(rtrAddress, m_objectStore);
ri.initialize();
ri.setOwner(getAddressIfSet());
ri.setBackupOwner(getAddressIfSet());
// Reference the registry in the root entry
auto * rtrp = m_payload.mutable_repackindexpointer();
rtrp->set_address(rtrAddress);
commit();
// Create the repack tape register
ri.insert();
// done.
return rtrAddress;
}
}
void RootEntry::removeRepackIndexAndCommit(log::LogContext& lc) {
checkPayloadWritable();
// Get the address of the scheduler lock (nothing to do if there is none)
if (!m_payload.has_repackindexpointer() ||
!m_payload.repackindexpointer().address().size())
return;
std::string rtrAddress = m_payload.repackindexpointer().address();
RepackIndex ri(rtrAddress, ObjectOps<serializers::RootEntry, serializers::RootEntry_t>::m_objectStore);
ScopedExclusiveLock rtrl(ri);
ri.fetch();
// Check the drive register is empty
if (!ri.isEmpty()) {
throw DriveRegisterNotEmpty("In RootEntry::removeRepackTapeRegistryAndCommit: "
"trying to remove a non-empty repack tape register");
}
// we can delete the drive register
ri.remove();
log::ScopedParamContainer params(lc);
params.add("repackTapeRegister", ri.getAddressIfSet());