Commit 7f9086eb authored by Eric Cano's avatar Eric Cano
Browse files

Implemented OStoreDB::getRapckInfo()

Merged RepackInfo and RepackType.
Simplifed RepackInfo to what is currently implemented.
Implmented repack related dumps.
parent 5d0b3e01
......@@ -51,7 +51,6 @@ set (COMMON_LIB_SRC_FILES
dataStructures/QueueAndMountSummary.cpp
dataStructures/ReadTestResult.cpp
dataStructures/RepackInfo.cpp
dataStructures/RepackType.cpp
dataStructures/RequesterGroupMountRule.cpp
dataStructures/RequesterMountRule.cpp
dataStructures/RetrieveJob.cpp
......
......@@ -17,66 +17,22 @@
*/
#include "common/dataStructures/RepackInfo.hpp"
#include "common/dataStructures/utils.hpp"
#include "common/exception/Exception.hpp"
namespace cta {
namespace common {
namespace dataStructures {
//------------------------------------------------------------------------------
// constructor
//------------------------------------------------------------------------------
RepackInfo::RepackInfo():
totalFiles(0),
totalSize(0),
filesToRetrieve(0),
filesToArchive(0),
filesFailed(0),
filesArchived(0) {}
//------------------------------------------------------------------------------
// operator==
//------------------------------------------------------------------------------
bool RepackInfo::operator==(const RepackInfo &rhs) const {
return vid==rhs.vid
&& tag==rhs.tag
&& totalFiles==rhs.totalFiles
&& totalSize==rhs.totalSize
&& filesToRetrieve==rhs.filesToRetrieve
&& filesToArchive==rhs.filesToArchive
&& filesFailed==rhs.filesFailed
&& filesArchived==rhs.filesArchived
&& repackType==rhs.repackType
&& repackStatus==rhs.repackStatus
&& errors==rhs.errors
&& creationLog==rhs.creationLog;
}
//------------------------------------------------------------------------------
// operator!=
//------------------------------------------------------------------------------
bool RepackInfo::operator!=(const RepackInfo &rhs) const {
return !operator==(rhs);
}
//------------------------------------------------------------------------------
// operator<<
//------------------------------------------------------------------------------
std::ostream &operator<<(std::ostream &os, const RepackInfo &obj) {
os << "(vid=" << obj.vid
<< " tag=" << obj.tag
<< " totalFiles=" << obj.totalFiles
<< " totalSize=" << obj.totalSize
<< " filesToRetrieve=" << obj.filesToRetrieve
<< " filesToArchive=" << obj.filesToArchive
<< " filesFailed=" << obj.filesFailed
<< " filesArchived=" << obj.filesArchived
<< " repackType=" << obj.repackType
<< " repackStatus=" << obj.repackStatus
<< " errors=" << obj.errors
<< " creationLog=" << obj.creationLog << ")";
return os;
std::string toString(RepackInfo::Type type) {
switch(type) {
case RepackInfo::Type::ExpandAndRepack:
return "expand and repack";
case RepackInfo::Type::ExpandOnly:
return "expand only";
case RepackInfo::Type::RepackOnly:
return "repack only";
default:
return "UNKNOWN";
}
}
} // namespace dataStructures
......
......@@ -18,14 +18,8 @@
#pragma once
#include <list>
#include <map>
#include <stdint.h>
#include <string>
#include "common/dataStructures/EntryLog.hpp"
#include "common/dataStructures/RepackType.hpp"
namespace cta {
namespace common {
namespace dataStructures {
......@@ -35,28 +29,39 @@ namespace dataStructures {
*/
struct RepackInfo {
RepackInfo();
bool operator==(const RepackInfo &rhs) const;
bool operator!=(const RepackInfo &rhs) const;
std::string vid;
std::string tag;
uint64_t totalFiles;
uint64_t totalSize;
uint64_t filesToRetrieve;
uint64_t filesToArchive;
uint64_t filesFailed;
uint64_t filesArchived;
RepackType repackType;
std::string repackStatus;
std::map<uint64_t,std::string> errors;
EntryLog creationLog;
enum class Type {
ExpandAndRepack,
ExpandOnly,
RepackOnly
} type;
enum class Status {
// Those values are matching the cta.proto values
Pending = 1,
ToExpand = 2,
Starting = 3,
Running = 4,
Aborting = 5,
Aborted = 6,
Complete = 7,
Failed = 8
} status;
// std::string tag;
// uint64_t totalFiles;
// uint64_t totalSize;
// uint64_t filesToRetrieve;
// uint64_t filesToArchive;
// uint64_t filesFailed;
// uint64_t filesArchived;
// RepackType repackType;
// std::string repackStatus;
// std::map<uint64_t,std::string> errors;
// EntryLog creationLog;
}; // struct RepackInfo
std::ostream &operator<<(std::ostream &os, const RepackInfo &obj);
std::string toString(RepackInfo::Type type);
std::string toString(RepackInfo::Status status);
} // namespace dataStructures
} // namespace common
......
/*
* The CERN Tape Archive (CTA) project
* Copyright (C) 2015 CERN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "common/dataStructures/RepackType.hpp"
std::string cta::common::dataStructures::toString(cta::common::dataStructures::RepackType type) {
switch(type) {
case cta::common::dataStructures::RepackType::expandandrepack:
return "expand and repack";
case cta::common::dataStructures::RepackType::justexpand:
return "expand only";
case cta::common::dataStructures::RepackType::justrepack:
return "repack only";
default:
return "UNKNOWN";
}
}
/*
* The CERN Tape Archive (CTA) project
* Copyright (C) 2015 CERN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include <string>
namespace cta {
namespace common {
namespace dataStructures {
enum RepackType {
expandandrepack,
justexpand,
justrepack
};
std::string toString(RepackType type);
} // namespace dataStructures
} // namespace common
} // namespace cta
......@@ -29,6 +29,9 @@
#include "RetrieveQueue.hpp"
#include "RetrieveQueueShard.hpp"
#include "DriveRegister.hpp"
#include "RepackIndex.hpp"
#include "RepackQueue.hpp"
#include "RepackRequest.hpp"
#include <stdexcept>
#include <google/protobuf/util/json_util.h>
......@@ -202,6 +205,15 @@ std::string GenericObject::dump() {
case serializers::SchedulerGlobalLock_t:
bodyDump = dumpWithType<SchedulerGlobalLock>(this);
break;
case serializers::RepackIndex_t:
bodyDump = dumpWithType<RepackIndex>(this);
break;
case serializers::RepackRequest_t:
bodyDump = dumpWithType<RepackRequest>(this);
break;
case serializers::RepackQueue_t:
bodyDump = dumpWithType<RepackQueue>(this);
break;
default:
std::stringstream err;
err << "Unsupported type: " << m_header.type();
......
......@@ -663,7 +663,7 @@ void Helpers::registerRepackRequestToIndex(const std::string& vid, const std::st
std::string repackIndexAddress;
// First, try to get the address of of the repack index lockfree.
try {
re.getRepackIndexAddress();
repackIndexAddress = re.getRepackIndexAddress();
} catch (RootEntry::NotAllocated &){
ScopedExclusiveLock rel(re);
re.fetch();
......
......@@ -23,13 +23,19 @@
namespace cta { namespace objectstore {
//------------------------------------------------------------------------------
// RepackIndex::DriveRegister()
// RepackIndex::RepackIndex()
//------------------------------------------------------------------------------
RepackIndex::RepackIndex(const std::string& address, Backend& os):
ObjectOps<serializers::RepackIndex, serializers::RepackIndex_t>(os, address) { }
//------------------------------------------------------------------------------
// RepackIndex::DriveRegister()
// RepackIndex::RepackIndex()
//------------------------------------------------------------------------------
RepackIndex::RepackIndex(Backend& os):
ObjectOps<serializers::RepackIndex, serializers::RepackIndex_t>(os) { }
//------------------------------------------------------------------------------
// RepackIndex::RepackIndex()
//------------------------------------------------------------------------------
RepackIndex::RepackIndex(GenericObject& go):
ObjectOps<serializers::RepackIndex, serializers::RepackIndex_t>(go.objectStore()) {
......@@ -98,7 +104,7 @@ std::list<RepackIndex::RepackRequestAddress> RepackIndex::getRepackRequestsAddre
std::list<RepackRequestAddress> ret;
for (auto &rt: m_payload.repackrequestpointers()) {
ret.push_back(RepackRequestAddress());
ret.back().driveStateAddress = rt.address();
ret.back().repackRequestAddress = rt.address();
ret.back().vid = rt.vid();
}
return ret;
......@@ -113,7 +119,7 @@ bool RepackIndex::isEmpty() {
}
//------------------------------------------------------------------------------
// DriveRegister::getDriveAddresse()
// RepackIndex::getDriveAddresse()
//------------------------------------------------------------------------------
void RepackIndex::removeRepackRequest(const std::string& vid) {
checkPayloadWritable();
......
......@@ -30,6 +30,7 @@ class EntryLogSerDeser;
class RepackIndex: public ObjectOps<serializers::RepackIndex, serializers::RepackIndex_t> {
public:
RepackIndex(Backend & os);
RepackIndex(const std::string & address, Backend & os);
RepackIndex(GenericObject & go);
void initialize();
......@@ -43,7 +44,7 @@ public:
*/
struct RepackRequestAddress {
std::string vid;
std::string driveStateAddress;
std::string repackRequestAddress;
};
// Repack tapes management =========================================================
......@@ -76,7 +77,7 @@ public:
void removeRepackRequest(const std::string & vid);
/**
* JSON dump of the drive
* JSON dump of the index
* @return
*/
std::string dump();
......
......@@ -17,21 +17,34 @@
*/
#include "RepackQueue.hpp"
#include "GenericObject.hpp"
#include <google/protobuf/util/json_util.h>
namespace cta { namespace objectstore {
//------------------------------------------------------------------------------
// Constructor
// RepackQueue::RepackQueue()
//------------------------------------------------------------------------------
RepackQueue::RepackQueue(const std::string& address, Backend& os):
ObjectOps<serializers::RepackQueue, serializers::RepackQueue_t>(os, address) {}
//------------------------------------------------------------------------------
// Constructor
// RepackQueue::RepackQueue()
//------------------------------------------------------------------------------
RepackQueue::RepackQueue(Backend& os):
ObjectOps<serializers::RepackQueue, serializers::RepackQueue_t>(os) { }
//------------------------------------------------------------------------------
// RepackQueue::RepackQueue()
//------------------------------------------------------------------------------
RepackQueue::RepackQueue(GenericObject& go):
ObjectOps<serializers::RepackQueue, serializers::RepackQueue_t>(go.objectStore()) {
// Here we transplant the generic object into the new object
go.transplantHeader(*this);
// And interpret the header.
getPayloadFromHeader();
}
//------------------------------------------------------------------------------
// RepackQueue::initialize()
//------------------------------------------------------------------------------
......@@ -116,6 +129,19 @@ bool RepackQueue::isEmpty() {
return m_payload.repackrequestpointers().size();
}
//------------------------------------------------------------------------------
// RepackQueue::dump()
//------------------------------------------------------------------------------
std::string RepackQueue::dump() {
checkPayloadReadable();
google::protobuf::util::JsonPrintOptions options;
options.add_whitespace = true;
options.always_print_primitive_fields = true;
std::string headerDump;
google::protobuf::util::MessageToJsonString(m_payload, &headerDump, options);
return headerDump;
}
}} // namespace cta::objectstore.
......@@ -55,6 +55,7 @@ public:
void garbageCollect(const std::string& presumedOwner, AgentReference& agentReference, log::LogContext& lc, cta::catalogue::Catalogue& catalogue) override;
std::string dump();
};
class RepackQueuePending: public RepackQueue {
......
......@@ -17,6 +17,8 @@
*/
#include "RepackRequest.hpp"
#include "GenericObject.hpp"
#include <google/protobuf/util/json_util.h>
namespace cta { namespace objectstore {
......@@ -26,6 +28,17 @@ namespace cta { namespace objectstore {
RepackRequest::RepackRequest(const std::string& address, Backend& os):
ObjectOps<serializers::RepackRequest, serializers::RepackRequest_t> (os, address) { }
//------------------------------------------------------------------------------
// RepackRequest::RepackRequest()
//------------------------------------------------------------------------------
RepackRequest::RepackRequest(GenericObject& go):
ObjectOps<serializers::RepackRequest, serializers::RepackRequest_t>(go.objectStore()) {
// Here we transplant the generic object into the new object
go.transplantHeader(*this);
// And interpret the header.
getPayloadFromHeader();
}
//------------------------------------------------------------------------------
// RepackRequest::initialize()
//------------------------------------------------------------------------------
......@@ -51,17 +64,17 @@ void RepackRequest::setVid(const std::string& vid) {
//------------------------------------------------------------------------------
// RepackRequest::setRepackType()
//------------------------------------------------------------------------------
void RepackRequest::setRepackType(common::dataStructures::RepackType repackType) {
void RepackRequest::setType(common::dataStructures::RepackInfo::Type repackType) {
checkPayloadWritable();
typedef common::dataStructures::RepackType RepackType;
typedef common::dataStructures::RepackInfo::Type RepackType;
switch (repackType) {
case RepackType::expandandrepack:
case RepackType::ExpandAndRepack:
// Nothing to do, this is the default case.
break;
case RepackType::justexpand:
case RepackType::ExpandOnly:
m_payload.set_repackmode(false);
break;
case RepackType::justrepack:
case RepackType::RepackOnly:
m_payload.set_expandmode(false);
break;
default:
......@@ -69,6 +82,29 @@ void RepackRequest::setRepackType(common::dataStructures::RepackType repackType)
}
}
//------------------------------------------------------------------------------
// RepackRequest::getInfo()
//------------------------------------------------------------------------------
common::dataStructures::RepackInfo RepackRequest::getInfo() {
checkPayloadReadable();
typedef common::dataStructures::RepackInfo RepackInfo;
RepackInfo ret;
ret.vid = m_payload.vid();
ret.status = (RepackInfo::Status) m_payload.status();
if (m_payload.repackmode()) {
if (m_payload.expandmode()) {
ret.type = RepackInfo::Type::ExpandAndRepack;
} else {
ret.type = RepackInfo::Type::RepackOnly;
}
} else if (m_payload.expandmode()) {
ret.type = RepackInfo::Type::ExpandOnly;
} else {
throw exception::Exception("In RepackRequest::getInfo(): unexpcted mode: neither expand nor repack.");
}
return ret;
}
//------------------------------------------------------------------------------
// RepackRequest::garbageCollect()
//------------------------------------------------------------------------------
......@@ -131,7 +167,17 @@ void RepackRequest::AsyncOwnerUpdater::wait() {
m_timingReport.insertAndReset("commitUnlockTime", m_timer);
}
//------------------------------------------------------------------------------
// RepackRequest::dump()
//------------------------------------------------------------------------------
std::string RepackRequest::dump() {
checkPayloadReadable();
google::protobuf::util::JsonPrintOptions options;
options.add_whitespace = true;
options.always_print_primitive_fields = true;
std::string headerDump;
google::protobuf::util::MessageToJsonString(m_payload, &headerDump, options);
return headerDump;
}
}} // namespace cta::objectstore
\ No newline at end of file
......@@ -20,7 +20,7 @@
#include "ObjectOps.hpp"
#include "objectstore/cta.pb.h"
#include "common/dataStructures/RepackType.hpp"
#include "common/dataStructures/RepackInfo.hpp"
#include "common/log/TimingList.hpp"
#include "common/Timer.hpp"
......@@ -38,10 +38,14 @@ public:
// Parameters interface
void setVid(const std::string & vid);
void setRepackType(common::dataStructures::RepackType repackType);
void setType(common::dataStructures::RepackInfo::Type repackType);
void setStatus(common::dataStructures::RepackInfo::Status repackStatus);
common::dataStructures::RepackInfo getInfo();
void garbageCollect(const std::string &presumedOwner, AgentReference & agentReference, log::LogContext & lc,
cta::catalogue::Catalogue & catalogue) override;
std::string dump();
// An asynchronous request ownership updating class.
class AsyncOwnerUpdater {
......
......@@ -448,6 +448,7 @@ message RetrieveQueue {
// ------------- Repack data strcutures ----------------------------------------
enum RepackRequestStatus {
// those value are matching the dataStructures/RepackInfo.hpp: RepackInfo::Status values.
RRS_Pending = 1;
RRS_ToExpand = 2;
RRS_Starting = 3;
......
......@@ -1030,7 +1030,7 @@ OStoreDB::RetrieveQueueItor_t OStoreDB::getRetrieveJobItor(const std::string &vi
// OStoreDB::queueRepack()
//------------------------------------------------------------------------------
void OStoreDB::queueRepack(const std::string& vid, const std::string& bufferURL,
common::dataStructures::RepackType repackType, log::LogContext & lc) {
common::dataStructures::RepackInfo::Type repackType, log::LogContext & lc) {
// Prepare the repack request object in memory.
assertAgentAddressSet();
cta::utils::Timer t;
......@@ -1039,7 +1039,7 @@ void OStoreDB::queueRepack(const std::string& vid, const std::string& bufferURL,
// We need to own the request until it is queued in the the pending queue.
rr->setOwner(m_agentReference->getAgentAddress());
rr->setVid(vid);
rr->setRepackType(repackType);
rr->setType(repackType);
// Try to reference the object in the index (will fail if there is already a request with this VID.
try {
Helpers::registerRepackRequestToIndex(vid, rr->getAddressIfSet(), *m_agentReference, m_objectStore, lc);
......@@ -1065,8 +1065,26 @@ void OStoreDB::queueRepack(const std::string& vid, const std::string& bufferURL,
// OStoreDB::queueRepack()
//------------------------------------------------------------------------------
std::list<common::dataStructures::RepackInfo> OStoreDB::getRepackInfo() {
// TODO
throw exception::Exception("No implemented");
RootEntry re(m_objectStore);
re.fetchNoLock();
RepackIndex ri(m_objectStore);
std::list<common::dataStructures::RepackInfo> ret;
// First, try to get the address of of the repack index lockfree.
try {
ri.setAddress(re.getRepackIndexAddress());
} catch (RootEntry::NotAllocated &) {
return ret;
}
ri.fetchNoLock();
auto rrAddresses = ri.getRepackRequestsAddresses();
for (auto & rra: rrAddresses) {
try {
RepackRequest rr(rra.repackRequestAddress, m_objectStore);
rr.fetchNoLock();
ret.push_back(rr.getInfo());
} catch (cta::exception::Exception &) {}
}
return ret;
}
//------------------------------------------------------------------------------
......
......@@ -275,8 +275,8 @@ public:
RetrieveQueueItor_t getRetrieveJobItor(const std::string &vid) const;
/* === Repack requests handling =========================================== */
void queueRepack(const std::string& vid, const std::string& bufferURL, common::dataStructures::RepackType repackType,
log::LogContext &logContext) override;
void queueRepack(const std::string& vid, const std::string& bufferURL,
common::dataStructures::RepackInfo::Type repackType, log::LogContext &logContext) override;
std::list<common::dataStructures::RepackInfo> getRepackInfo() override;
......
......@@ -152,7 +152,7 @@ public:
}
void queueRepack(const std::string& vid, const std::string& bufferURL, common::dataStructures::RepackType repackType, log::LogContext& lc) override {
void queueRepack(const std::string& vid, const std::string& bufferURL, common::dataStructures::RepackInfo::Type repackType, log::LogContext& lc) override {
m_OStoreDB.queueRepack(vid, bufferURL, repackType, lc);
}
......
......@@ -294,7 +294,7 @@ void Scheduler::queueLabel(const common::dataStructures::SecurityIdentity &cliId
// repack
//------------------------------------------------------------------------------
void Scheduler::queueRepack(const common::dataStructures::SecurityIdentity &cliIdentity, const std::string &vid,
const std::string & bufferURL, const common::dataStructures::RepackType repackType, log::LogContext & lc) {
const std::string & bufferURL, const common::dataStructures::RepackInfo::Type repackType, log::LogContext & lc) {
// Check request sanity
if (vid.empty()) throw exception::UserError("Empty VID name.");
if (bufferURL.empty()) throw exception::UserError("Empty buffer URL.");
......