Commit 19266cc0 authored by Eric Cano's avatar Eric Cano
Browse files

Expanded retrieve requests for repack

The retrieve requests now carry all the information necessary to create the archive jobs.
Retrive requests carry an isRepack boolean. The distinction between repack and non-repack behaviour
is pushed as low as possible in the api (in the object store) so that client code can be more
generic. For the same reason, the
The expand repack mode (re-creation of missing tape files) is not supported yet.
Changed the represenation of copy numbers to uint32 (as this is the smalled number supported but protocol buffers).
Added ArchiveRoute::StorageClassMap and ArchiveRoute::FullMap to store archive routing in memory.
Added repack buffer base URL support, and mechanism to compute temporary copy location in disk cache.
Fixed bug in ObjectOps<>::asyncInsert();
Inplemented the path generation for repack disk storage (<BaseURL>/<VID>/<FSeq (09%d)>).
parent 6fc2d98d
......@@ -59,7 +59,7 @@ bool ArchiveFile::operator!=(const ArchiveFile &rhs) const {
//------------------------------------------------------------------------------
// operator<<
//------------------------------------------------------------------------------
std::ostream &operator<<(std::ostream &os, ArchiveFile &obj) {
std::ostream &operator<<(std::ostream &os, const ArchiveFile &obj) {
os <<
"{"
"archiveFileID=" << obj.archiveFileID << ","
......@@ -69,8 +69,8 @@ std::ostream &operator<<(std::ostream &os, ArchiveFile &obj) {
"checksumType=" << obj.checksumType << ","
"checksumValue=" << obj.checksumValue << ","
"storageClass=" << obj.storageClass << ","
"diskFileInfo=" << obj.diskFileInfo << ","
"tapeFiles=" << obj.tapeFiles << ","
"diskFileInfo=" << obj.diskFileInfo << ","
"tapeFiles=" << obj.tapeFiles << ","
"creationTime=" << obj.creationTime << ","
"reconciliationTime=" << obj.reconciliationTime <<
"}";
......
......@@ -68,8 +68,8 @@ struct ArchiveFile {
* to be listed by the operator. For example, if the listing requested is
* for a single tape, the map will contain only one element.
*/
std::map<uint64_t,TapeFile> tapeFiles;
time_t creationTime;
std::map<uint32_t,TapeFile> tapeFiles;
time_t creationTime;
time_t reconciliationTime;
}; // struct ArchiveFile
......
......@@ -61,6 +61,9 @@ struct ArchiveRoute {
EntryLog creationLog;
EntryLog lastModificationLog;
std::string comment;
typedef std::map<uint32_t, ArchiveRoute> StorageClassMap;
typedef std::map<std::tuple<std::string/*disk instance*/, std::string /*storage class*/>, StorageClassMap> FullMap;
}; // struct ArchiveRoute
......
......@@ -30,6 +30,7 @@ namespace dataStructures {
struct RepackInfo {
std::string vid;
std::string repackBufferBaseURL;
enum class Type {
ExpandAndRepack,
ExpandOnly,
......
......@@ -43,7 +43,7 @@ struct RetrieveJob {
RetrieveRequest request;
uint64_t fileSize;
std::map<std::string,std::pair<uint64_t,TapeFile>> tapeCopies;
std::map<std::string,std::pair<uint32_t,TapeFile>> tapeCopies;
std::list<std::string> failurelogs;
}; // struct RetrieveJob
......
......@@ -27,7 +27,7 @@ namespace dataStructures {
//------------------------------------------------------------------------------
// constructor
//------------------------------------------------------------------------------
RetrieveRequest::RetrieveRequest(): archiveFileID(0),isRepack(false) {}
RetrieveRequest::RetrieveRequest(): archiveFileID(0) {}
//------------------------------------------------------------------------------
// operator==
......@@ -37,8 +37,7 @@ bool RetrieveRequest::operator==(const RetrieveRequest &rhs) const {
&& archiveFileID==rhs.archiveFileID
&& dstURL==rhs.dstURL
&& diskFileInfo==rhs.diskFileInfo
&& creationLog==rhs.creationLog
&& isRepack == rhs.isRepack;
&& creationLog==rhs.creationLog;
}
//------------------------------------------------------------------------------
......@@ -56,8 +55,7 @@ std::ostream &operator<<(std::ostream &os, const RetrieveRequest &obj) {
<< " archiveFileID=" << obj.archiveFileID
<< " dstURL=" << obj.dstURL
<< " diskFileInfo=" << obj.diskFileInfo
<< " creationLog=" << obj.creationLog
<< " isRepack=" << obj.isRepack<<")";
<< " creationLog=" << obj.creationLog <<")";
return os;
}
......
......@@ -26,6 +26,7 @@
#include "common/dataStructures/DiskFileInfo.hpp"
#include "common/dataStructures/EntryLog.hpp"
#include "common/dataStructures/UserIdentity.hpp"
#include "common/dataStructures/ArchiveRoute.hpp"
namespace cta {
namespace common {
......@@ -48,8 +49,6 @@ struct RetrieveRequest {
std::string errorReportURL;
DiskFileInfo diskFileInfo;
EntryLog creationLog;
bool isRepack;
std::string tapePool;
}; // struct RetrieveRequest
std::ostream &operator<<(std::ostream &os, const RetrieveRequest &obj);
......
......@@ -22,7 +22,7 @@ namespace cta {
namespace common {
namespace dataStructures {
std::ostream &operator<<(std::ostream &os, const std::map<uint64_t,TapeFile> &map) {
std::ostream &operator<<(std::ostream &os, const std::map<uint32_t,TapeFile> &map) {
os << "(";
for(auto it = map.begin(); it != map.end(); it++) {
os << " key=" << it->first << " value=" << it->second << " ";
......@@ -54,7 +54,7 @@ std::ostream &operator<<(std::ostream &os, const std::map<uint64_t,std::pair<std
return os;
}
std::ostream &operator<<(std::ostream &os, const std::map<std::string,std::pair<uint64_t,TapeFile>> &map) {
std::ostream &operator<<(std::ostream &os, const std::map<std::string,std::pair<uint32_t,TapeFile>> &map) {
os << "(";
for(auto it = map.begin(); it != map.end(); it++) {
os << " key=" << it->first << " value.first=" << it->second.first << " value.second=" << it->second.second;
......
......@@ -27,11 +27,11 @@ namespace cta {
namespace common {
namespace dataStructures {
std::ostream &operator<<(std::ostream &os, const std::map<uint64_t,TapeFile> &map);
std::ostream &operator<<(std::ostream &os, const std::map<uint32_t,TapeFile> &map);
std::ostream &operator<<(std::ostream &os, const std::map<uint64_t,std::string> &map);
std::ostream &operator<<(std::ostream &os, const std::pair<std::string,std::string> &pair);
std::ostream &operator<<(std::ostream &os, const std::map<uint64_t,std::pair<std::string,std::string>> &map);
std::ostream &operator<<(std::ostream &os, const std::map<std::string,std::pair<uint64_t,TapeFile>> &map);
std::ostream &operator<<(std::ostream &os, const std::map<std::string,std::pair<uint32_t,TapeFile>> &map);
std::ostream &operator<<(std::ostream &os, const std::map<uint64_t,std::pair<std::string,std::string>> &map);
} // namespace dataStructures
......
......@@ -197,6 +197,8 @@ void Helpers::getLockedAndFetchedJobQueue<RetrieveQueue>(RetrieveQueue& retrieve
queueFetchTime = t.secs(utils::Timer::resetCounter);
log::ScopedParamContainer params(lc);
params.add("attemptNb", i+1)
.add("queueName", vid.value())
.add("queueType", toString(queueType))
.add("queueObject", retrieveQueue.getAddressIfSet())
.add("rootFetchNoLockTime", rootFetchNoLockTime)
.add("rootRelockExclusiveTime", rootRelockExclusiveTime)
......
......@@ -496,6 +496,7 @@ public:
AsyncInserter * asyncInsert() {
std::unique_ptr<AsyncInserter> ret;
ret.reset(new AsyncInserter(*this));
// Current simplification: the parsing of the header/payload is synchronous.
// This could be delegated to the backend.
// Check that we are not dealing with an existing object
......
......@@ -59,7 +59,7 @@ void RepackRequest::initialize() {
m_payload.set_retrievedbytes(0);
m_payload.set_archivedfiles(0);
m_payload.set_archivedbytes(0);
m_payload.set_failedtoretievefiles(0);
m_payload.set_failedtoretrievefiles(0);
m_payload.set_failedtoretrievebytes(0);
m_payload.set_failedtoarchivefiles(0);
m_payload.set_failedtoarchivebytes(0);
......@@ -98,6 +98,16 @@ void RepackRequest::setType(common::dataStructures::RepackInfo::Type repackType)
}
}
//------------------------------------------------------------------------------
// RepackRequest::setStatus()
//------------------------------------------------------------------------------
void RepackRequest::setStatus(common::dataStructures::RepackInfo::Status repackStatus) {
checkPayloadWritable();
// common::dataStructures::RepackInfo::Status and serializers::RepackRequestStatus are defined using the same values,
// hence the cast.
m_payload.set_status((serializers::RepackRequestStatus)repackStatus);
}
//------------------------------------------------------------------------------
// RepackRequest::getInfo()
//------------------------------------------------------------------------------
......@@ -107,6 +117,7 @@ common::dataStructures::RepackInfo RepackRequest::getInfo() {
RepackInfo ret;
ret.vid = m_payload.vid();
ret.status = (RepackInfo::Status) m_payload.status();
ret.repackBufferBaseURL = m_payload.buffer_url();
if (m_payload.repackmode()) {
if (m_payload.expandmode()) {
ret.type = RepackInfo::Type::ExpandAndRepack;
......@@ -121,6 +132,14 @@ common::dataStructures::RepackInfo RepackRequest::getInfo() {
return ret;
}
//------------------------------------------------------------------------------
// RepackRequest::setBufferURL()
//------------------------------------------------------------------------------
void RepackRequest::setBufferURL(const std::string& bufferURL) {
checkPayloadWritable();
m_payload.set_buffer_url(bufferURL);
}
//------------------------------------------------------------------------------
// RepackRequest::RepackSubRequestPointer::serialize()
//------------------------------------------------------------------------------
......@@ -148,7 +167,7 @@ void RepackRequest::RepackSubRequestPointer::deserialize(const serializers::Repa
//------------------------------------------------------------------------------
// RepackRequest::getOrPrepareSubrequestInfo()
//------------------------------------------------------------------------------
auto RepackRequest::getOrPrepareSubrequestInfo(std::set<uint32_t> fSeqs, AgentReference& agentRef)
auto RepackRequest::getOrPrepareSubrequestInfo(std::set<uint64_t> fSeqs, AgentReference& agentRef)
-> SubrequestInfo::set {
checkPayloadWritable();
RepackSubRequestPointer::Map pointerMap;
......@@ -165,7 +184,7 @@ auto RepackRequest::getOrPrepareSubrequestInfo(std::set<uint32_t> fSeqs, AgentRe
retInfo.fSeq = srp.fSeq;
retInfo.subrequestDeleted = srp.subrequestDeleted;
} catch (std::out_of_range &) {
retInfo.address = agentRef.nextId("repackSubRequest");
retInfo.address = agentRef.nextId("RepackSubRequest");
retInfo.fSeq = fs;
retInfo.subrequestDeleted = false;
auto & p = pointerMap[fs];
......@@ -179,7 +198,7 @@ auto RepackRequest::getOrPrepareSubrequestInfo(std::set<uint32_t> fSeqs, AgentRe
// Record changes, if any.
if (newElementCreated) {
m_payload.mutable_subrequests()->Clear();
for (auto & p: pointerMap) p.second.deserialize(*m_payload.mutable_subrequests()->Add());
for (auto & p: pointerMap) p.second.serialize(*m_payload.mutable_subrequests()->Add());
}
return ret;
}
......@@ -188,7 +207,7 @@ auto RepackRequest::getOrPrepareSubrequestInfo(std::set<uint32_t> fSeqs, AgentRe
// RepackRequest::setLastExpandedFSeq()
//------------------------------------------------------------------------------
void RepackRequest::setLastExpandedFSeq(uint64_t lastExpandedFSeq) {
checkWritable();
checkPayloadWritable();
m_payload.set_lastexpandedfseq(lastExpandedFSeq);
}
......@@ -243,7 +262,7 @@ void RepackRequest::reportRetriveFailures(SubrequestStatistics::List& retrieveFa
if (!p.failureAccounted) {
p.failureAccounted = true;
m_payload.set_failedtoretrievebytes(m_payload.failedtoretrievebytes() + rs.bytes);
m_payload.set_failedtoretievefiles(m_payload.failedtoretievefiles() + rs.files);
m_payload.set_failedtoretrievefiles(m_payload.failedtoretrievefiles() + rs.files);
didUpdate = true;
}
} catch (std::out_of_range &) {
......@@ -338,6 +357,30 @@ void RepackRequest::reportSubRequestsForDeletion(std::list<uint64_t>& fSeqs) {
}
}
//------------------------------------------------------------------------------
// RepackRequest::reportSubRequestsForDeletion()
//------------------------------------------------------------------------------
auto RepackRequest::getStats() -> std::map<StatsType, StatsValues> {
checkPayloadReadable();
std::map<StatsType, StatsValues> ret;
ret[StatsType::ArchiveTotal].files = m_payload.totalfilestoarchive();
ret[StatsType::ArchiveTotal].bytes = m_payload.totalbytestoarchive();
ret[StatsType::RetrieveTotal].files = m_payload.totalfilestoretrieve();
ret[StatsType::RetrieveTotal].bytes = m_payload.totalbytestoretrieve();
ret[StatsType::UserProvided].files = m_payload.userprovidedfiles();
ret[StatsType::UserProvided].bytes = m_payload.userprovidedbytes();
ret[StatsType::RetrieveFailure].files = m_payload.failedtoretrievefiles();
ret[StatsType::RetrieveFailure].bytes = m_payload.failedtoretrievebytes();
ret[StatsType::RetrieveSuccess].files = m_payload.retrievedfiles();
ret[StatsType::RetrieveSuccess].bytes = m_payload.retrievedbytes();
ret[StatsType::ArchiveFailure].files = m_payload.failedtoarchivefiles();
ret[StatsType::ArchiveFailure].bytes = m_payload.failedtoarchivebytes();
ret[StatsType::ArchiveSuccess].files = m_payload.archivedfiles();
ret[StatsType::ArchiveSuccess].bytes = m_payload.archivedbytes();
return ret;
}
//------------------------------------------------------------------------------
// RepackRequest::garbageCollect()
//------------------------------------------------------------------------------
......@@ -387,6 +430,7 @@ RepackRequest::AsyncOwnerAndStatusUpdater* RepackRequest::asyncUpdateOwnerAndSta
typedef common::dataStructures::RepackInfo RepackInfo;
retRef.m_repackInfo.status = (RepackInfo::Status) payload.status();
retRef.m_repackInfo.vid = payload.vid();
retRef.m_repackInfo.repackBufferBaseURL = payload.buffer_url();
if (payload.repackmode()) {
if (payload.expandmode()) {
retRef.m_repackInfo.type = RepackInfo::Type::ExpandAndRepack;
......
......@@ -41,6 +41,7 @@ public:
void setType(common::dataStructures::RepackInfo::Type repackType);
void setStatus(common::dataStructures::RepackInfo::Status repackStatus);
common::dataStructures::RepackInfo getInfo();
void setBufferURL(const std::string & bufferURL);
// Sub request management
struct SubrequestInfo {
......@@ -59,7 +60,7 @@ public:
* yet not update the object to reflect the last fSeq created.
* This function implicitly records the information it generates (commit up t the caller);
*/
SubrequestInfo::set getOrPrepareSubrequestInfo (std::set<uint32_t> fSeqs, AgentReference & agentRef);
SubrequestInfo::set getOrPrepareSubrequestInfo (std::set<uint64_t> fSeqs, AgentReference & agentRef);
private:
struct RepackSubRequestPointer {
std::string address;
......@@ -90,6 +91,20 @@ public:
void reportArchiveSuccesses (SubrequestStatistics::List & archiveSuccesses);
void reportArchiveFailures (SubrequestStatistics::List & archiveFailures);
void reportSubRequestsForDeletion (std::list<uint64_t>& fSeqs);
enum class StatsType: uint8_t {
UserProvided,
RetrieveSuccess,
RetrieveFailure,
RetrieveTotal,
ArchiveSuccess,
ArchiveFailure,
ArchiveTotal,
};
struct StatsValues {
uint64_t files = 0;
uint64_t bytes = 0;
};
std::map<StatsType, StatsValues> getStats();
void garbageCollect(const std::string &presumedOwner, AgentReference & agentReference, log::LogContext & lc,
cta::catalogue::Catalogue & catalogue) override;
......
......@@ -55,6 +55,7 @@ struct ContainerTraits<RetrieveQueue,C>
common::dataStructures::RetrieveRequest rr;
std::string errorReportURL;
SchedulerDatabase::RetrieveJob::ReportType reportType;
RetrieveRequest::RepackInfo repackInfo;
};
struct PoppedElementsSummary;
struct PopCriteria {
......@@ -382,6 +383,7 @@ switchElementsOwnership(PoppedElementsBatch &poppedElementBatch, const Container
u.get()->wait();
e.archiveFile = u.get()->getArchiveFile();
e.rr = u.get()->getRetrieveRequest();
e.repackInfo = u.get()->getRepackInfo();
switch(u.get()->getJobStatus()) {
case serializers::RetrieveJobStatus::RJS_ToReportToUserForFailure:
e.reportType = SchedulerDatabase::RetrieveJob::ReportType::FailureReport;
......@@ -486,7 +488,8 @@ getPoppingElementsCandidates(Container &cont, PopCriteria &unfulfilledCriteria,
cjfq.size,
common::dataStructures::ArchiveFile(),
common::dataStructures::RetrieveRequest(),
"", SchedulerDatabase::RetrieveJob::ReportType::NoReportRequired
"", SchedulerDatabase::RetrieveJob::ReportType::NoReportRequired,
RetrieveRequest::RepackInfo()
});
ret.summary.files++;
}
......
......@@ -55,7 +55,8 @@ getPoppingElementsCandidates(Container &cont, PopCriteria &unfulfilledCriteria,
common::dataStructures::ArchiveFile(),
common::dataStructures::RetrieveRequest(),
"",
SchedulerDatabase::RetrieveJob::ReportType::NoReportRequired
SchedulerDatabase::RetrieveJob::ReportType::NoReportRequired,
RetrieveRequest::RepackInfo()
});
ret.summary.bytes += cjfq.size;
ret.summary.files++;
......
......@@ -58,7 +58,6 @@ void RetrieveRequest::initialize() {
m_payload.set_failurereportlog("");
m_payload.set_failurereporturl("");
m_payload.set_isrepack(false);
m_payload.set_tapepool("");
// This object is good to go (to storage)
m_payloadInterpreted = true;
}
......@@ -372,7 +371,6 @@ std::string RetrieveRequest::getLastActiveVid() {
return m_payload.archivefile().tapefiles(0).vid();
}
//------------------------------------------------------------------------------
// RetrieveRequest::setSchedulerRequest()
//------------------------------------------------------------------------------
......@@ -402,8 +400,6 @@ cta::common::dataStructures::RetrieveRequest RetrieveRequest::getSchedulerReques
objectstore::EntryLogSerDeser el(ret.creationLog);
el.deserialize(m_payload.schedulerrequest().entrylog());
ret.dstURL = m_payload.schedulerrequest().dsturl();
ret.isRepack = m_payload.isrepack();
ret.tapePool = m_payload.tapepool();
ret.errorReportURL = m_payload.schedulerrequest().retrieveerrorreporturl();
objectstore::DiskFileInfoSerDeser dfisd;
dfisd.deserialize(m_payload.schedulerrequest().diskfileinfo());
......@@ -516,6 +512,37 @@ bool RetrieveRequest::addJobFailure(uint32_t copyNumber, uint64_t mountId,
throw NoSuchJob ("In RetrieveRequest::addJobFailure(): could not find job");
}
//------------------------------------------------------------------------------
// RetrieveRequest::setRepackInfo()
//------------------------------------------------------------------------------
void RetrieveRequest::setRepackInfo(const RepackInfo& repackInfo) {
checkPayloadWritable();
m_payload.set_isrepack(repackInfo.isRepack);
if (repackInfo.isRepack) {
for (auto & ar: repackInfo.archiveRouteMap) {
auto * plar=m_payload.mutable_repack_info()->mutable_archive_routes()->Add();
plar->set_copynb(ar.first);
plar->set_tapepool(ar.second);
}
for (auto cntr: repackInfo.copyNbsToRearchive) {
m_payload.mutable_repack_info()->mutable_copy_nbs_to_rearchive()->Add(cntr);
}
m_payload.mutable_repack_info()->set_file_buffer_url(repackInfo.fileBufferURL);
m_payload.mutable_repack_info()->set_repack_request_address(repackInfo.repackRequestAddress);
}
}
//------------------------------------------------------------------------------
// RetrieveRequest::getRepackInfo()
//------------------------------------------------------------------------------
RetrieveRequest::RepackInfo RetrieveRequest::getRepackInfo() {
checkPayloadReadable();
RepackInfoSerDeser ret;
if (m_payload.isrepack())
ret.deserialize(m_payload.repack_info());
return ret;
}
//------------------------------------------------------------------------------
// RetrieveRequest::getRetryStatus()
//------------------------------------------------------------------------------
......@@ -744,8 +771,6 @@ auto RetrieveRequest::asyncUpdateJobOwner(uint32_t copyNumber, const std::string
dfi.deserialize(payload.schedulerrequest().diskfileinfo());
retRef.m_retrieveRequest.diskFileInfo = dfi;
retRef.m_retrieveRequest.dstURL = payload.schedulerrequest().dsturl();
retRef.m_retrieveRequest.isRepack = payload.isrepack();
retRef.m_retrieveRequest.tapePool = payload.tapepool();
retRef.m_retrieveRequest.errorReportURL = payload.schedulerrequest().retrieveerrorreporturl();
retRef.m_retrieveRequest.requester.name = payload.schedulerrequest().requester().name();
retRef.m_retrieveRequest.requester.group = payload.schedulerrequest().requester().group();
......@@ -753,6 +778,18 @@ auto RetrieveRequest::asyncUpdateJobOwner(uint32_t copyNumber, const std::string
af.deserialize(payload.archivefile());
retRef.m_archiveFile = af;
retRef.m_jobStatus = j.status();
if (payload.isrepack()) {
RetrieveRequest::RepackInfo & ri = retRef.m_repackInfo;
for (auto &ar: payload.repack_info().archive_routes()) {
ri.archiveRouteMap[ar.copynb()] = ar.tapepool();
}
for (auto cntr: payload.repack_info().copy_nbs_to_rearchive()) {
ri.copyNbsToRearchive.insert(cntr);
}
ri.fileBufferURL = payload.repack_info().file_buffer_url();
ri.isRepack = true;
ri.repackRequestAddress = payload.repack_info().repack_request_address();
}
// TODO serialization of payload maybe not necessary
oh.set_payload(payload.SerializePartialAsString());
return oh.SerializeAsString();
......@@ -779,6 +816,13 @@ const common::dataStructures::ArchiveFile& RetrieveRequest::AsyncJobOwnerUpdater
return m_archiveFile;
}
//------------------------------------------------------------------------------
// RetrieveRequest::AsyncJobOwnerUpdater::getRepackInfo()
//------------------------------------------------------------------------------
const RetrieveRequest::RepackInfo& RetrieveRequest::AsyncJobOwnerUpdater::getRepackInfo() {
return m_repackInfo;
}
//------------------------------------------------------------------------------
// RetrieveRequest::AsyncJobOwnerUpdater::getRetrieveRequest()
//------------------------------------------------------------------------------
......@@ -970,24 +1014,29 @@ RetrieveRequest::AsyncRetrieveToArchiveTransformer * RetrieveRequest::asyncTrans
//TODO : Should creation log just be initialized or should it be copied from the retrieveRequest ?
cta::objectstore::serializers::EntryLog *archiveRequestCL = archiveRequestPayload.mutable_creationlog();
archiveRequestCL->CopyFrom(retrieveRequestMP.creationlog());
//Add the jobs of the old RetrieveRequest to the new ArchiveRequest
for(auto retrieveJob: retrieveRequestPayload.jobs()){
//Create archive jobs for each copyNb ro rearchive
RetrieveRequest::RepackInfoSerDeser repackInfoSerDeser;
repackInfoSerDeser.deserialize(retrieveRequestPayload.repack_info());
// TODO: for the moment we just clone the retrieve request's policy.
auto maxRetriesWithinMount = retrieveRequestPayload.jobs(0).maxretrieswithinmount();
auto maxTotalRetries = retrieveRequestPayload.jobs(0).maxtotalretries();
auto maxReportRetries = retrieveRequestPayload.jobs(0).maxreportretries();
for(auto cntr: repackInfoSerDeser.copyNbsToRearchive) {
auto *archiveJob = archiveRequestPayload.add_jobs();
archiveJob->set_status(cta::objectstore::serializers::ArchiveJobStatus::AJS_ToTransferForUser);
archiveJob->set_copynb(retrieveJob.copynb());
archiveJob->set_copynb(cntr);
archiveJob->set_archivequeueaddress("");
archiveJob->set_totalreportretries(0);
archiveJob->set_lastmountwithfailure(0);
archiveJob->set_totalretries(0);
archiveJob->set_retrieswithinmount(0);
archiveJob->set_maxretrieswithinmount(retrieveJob.maxretrieswithinmount()); //TODO : should we put the same value as the retrieveJob ?
archiveJob->set_maxretrieswithinmount(maxRetriesWithinMount); //TODO : should we put the same value as the retrieveJob ?
archiveJob->set_totalreportretries(0);
archiveJob->set_maxtotalretries(retrieveJob.maxtotalretries()); //TODO : should we put the same value as the retrieveJob ?
archiveJob->set_maxreportretries(retrieveJob.maxreportretries()); //TODO : should we put the same value as the retrieveJob ?
archiveJob->set_tapepool(retrieveRequestPayload.tapepool());
archiveJob->set_maxtotalretries(maxTotalRetries); //TODO : should we put the same value as the retrieveJob ?
archiveJob->set_maxreportretries(maxReportRetries); //TODO : should we put the same value as the retrieveJob ?
archiveJob->set_tapepool(repackInfoSerDeser.archiveRouteMap[cntr]);
archiveJob->set_owner(processAgentAddress);
}
//Serialize the new ArchiveRequest so that it replaces the RetrieveRequest
oh.set_payload(archiveRequestPayload.SerializeAsString());
//Change the type of the RetrieveRequest to ArchiveRequest
......@@ -1044,24 +1093,4 @@ void RetrieveRequest::setJobStatus(uint32_t copyNumber, const serializers::Retri
throw exception::Exception("In RetrieveRequest::setJobStatus(): job not found.");
}
bool RetrieveRequest::isRepack(){
checkPayloadReadable();
return m_payload.isrepack();
}
void RetrieveRequest::setIsRepack(bool isRepack){
checkPayloadWritable();
m_payload.set_isrepack(isRepack);
}
std::string RetrieveRequest::getTapePool(){
checkPayloadReadable();
return m_payload.tapepool();
}
void RetrieveRequest::setTapePool(const std::string tapePool)
{
checkPayloadWritable();
m_payload.set_tapepool(tapePool);
}
}} // namespace cta::objectstore
......@@ -143,11 +143,38 @@ public:
//! The copy number to enqueue. It could be different from the updated one in mixed success/failure scenario.
serializers::RetrieveJobStatus nextStatus;
};
bool isRepack();
void setIsRepack(bool isRepack);
std::string getTapePool();
void setTapePool(const std::string tapePool);
struct RepackInfo {
bool isRepack = false;
std::map<uint32_t, std::string> archiveRouteMap;
std::set<uint32_t> copyNbsToRearchive;
std::string repackRequestAddress;
std::string fileBufferURL;
};
void setRepackInfo(const RepackInfo & repackInfo);
RepackInfo getRepackInfo();
struct RepackInfoSerDeser: public RepackInfo {
operator RepackInfo() { return RepackInfo(*this); }
void serialize(cta::objectstore::serializers::RetrieveRequestRepackInfo & rrri) {
if (!isRepack) throw exception::Exception("In RetrieveRequest::RepackInfoSerDeser::serialize(): isRepack is false.");
for (auto &route: archiveRouteMap) {
auto * ar = rrri.mutable_archive_routes()->Add();
ar->set_copynb(route.first);
ar->set_tapepool(route.second);
}
for (auto cntr: copyNbsToRearchive) rrri.mutable_copy_nbs_to_rearchive()->Add(cntr);
rrri.set_file_buffer_url(fileBufferURL);
rrri.set_repack_request_address(repackRequestAddress);
}
void deserialize(const cta::objectstore::serializers::RetrieveRequestRepackInfo & rrri) {
isRepack = true;
for(auto &route: rrri.archive_routes()) { archiveRouteMap[route.copynb()] = route.tapepool(); }
for(auto &cntr: rrri.copy_nbs_to_rearchive()) { copyNbsToRearchive.insert(cntr); }
fileBufferURL = rrri.file_buffer_url();
repackRequestAddress = rrri.repack_request_address();
}
};
private:
/*!
* Determine and set the new status of the job.
......@@ -185,11 +212,13 @@ public:
serializers::RetrieveJobStatus getJobStatus() { return m_jobStatus; }
const common::dataStructures::RetrieveRequest &getRetrieveRequest();
const common::dataStructures::ArchiveFile &getArchiveFile();
const RepackInfo &getRepackInfo();
private:
std::function<std::string(const std::string &)> m_updaterCallback;
std::unique_ptr<Backend::AsyncUpdater> m_backendUpdater;
common::dataStructures::RetrieveRequest m_retrieveRequest;
common::dataStructures::ArchiveFile m_archiveFile;
RepackInfo m_repackInfo;
serializers::RetrieveJobStatus m_jobStatus;
};
// An owner updater factory. The owner MUST be previousOwner for the update to be executed.
......
......@@ -427,7 +427,8 @@ std::string RootEntry::getRetrieveQueueAddress(const std::string& vid, JobQueueT
auto & rqp = serializers::findElement(retrieveQueuePointers(queueType), vid);
return rqp.address();
} catch (serializers::NotFound &) {
throw NoSuchRetrieveQueue("In RootEntry::getRetreveQueueAddress: retrieve queue not allocated");
throw NoSuchRetrieveQueue(std::string("In RootEntry::getRetreveQueueAddress: retrieve queue not allocated ")+