Commit 6de4cebd authored by Cedric CAFFY's avatar Cedric CAFFY
Browse files

Merge branch 'repack-dev' of https://gitlab.cern.ch:8443/cta/CTA into repack-dev

parents 6a6374e7 fbdd9574
......@@ -18,6 +18,7 @@
#include "RepackRequest.hpp"
#include "GenericObject.hpp"
#include "AgentReference.hpp"
#include <google/protobuf/util/json_util.h>
namespace cta { namespace objectstore {
......@@ -120,6 +121,223 @@ common::dataStructures::RepackInfo RepackRequest::getInfo() {
return ret;
}
//------------------------------------------------------------------------------
// RepackRequest::RepackSubRequestPointer::serialize()
//------------------------------------------------------------------------------
void RepackRequest::RepackSubRequestPointer::serialize(serializers::RepackSubRequestPointer& rsrp) {
rsrp.set_address(address);
rsrp.set_fseq(fSeq);
rsrp.set_retrieveaccounted(retrieveAccounted);
rsrp.set_archiveaccounted(archiveAccounted);
rsrp.set_failureaccounted(failureAccounted);
rsrp.set_subrequestdeleted(subrequestDeleted);
}
//------------------------------------------------------------------------------
// RepackRequest::RepackSubRequestPointer::deserialize()
//------------------------------------------------------------------------------
void RepackRequest::RepackSubRequestPointer::deserialize(const serializers::RepackSubRequestPointer& rsrp) {
address = rsrp.address();
fSeq = rsrp.fseq();
retrieveAccounted = rsrp.retrieveaccounted();
archiveAccounted = rsrp.archiveaccounted();
failureAccounted = rsrp.failureaccounted();
subrequestDeleted = rsrp.subrequestdeleted();
}
//------------------------------------------------------------------------------
// RepackRequest::getOrPrepareSubrequestInfo()
//------------------------------------------------------------------------------
auto RepackRequest::getOrPrepareSubrequestInfo(std::set<uint32_t> fSeqs, AgentReference& agentRef)
-> SubrequestInfo::set {
checkPayloadWritable();
RepackSubRequestPointer::Map pointerMap;
// Read the map
for (auto &rsrp: m_payload.subrequests()) pointerMap[rsrp.fseq()].deserialize(rsrp);
SubrequestInfo::set ret;
bool newElementCreated = false;
// Prepare to return existing or created address.
for (auto &fs: fSeqs) {
SubrequestInfo retInfo;
try {
auto & srp = pointerMap.at(fs);
retInfo.address = srp.address;
retInfo.fSeq = srp.fSeq;
retInfo.subrequestDeleted = srp.subrequestDeleted;
} catch (std::out_of_range &) {
retInfo.address = agentRef.nextId("repackSubRequest");
retInfo.fSeq = fs;
retInfo.subrequestDeleted = false;
auto & p = pointerMap[fs];
p.address = retInfo.address;
p.fSeq = fs;
p.archiveAccounted = p.retrieveAccounted = p.failureAccounted = p.subrequestDeleted = false;
newElementCreated = true;
}
ret.emplace(retInfo);
}
// Record changes, if any.
if (newElementCreated) {
m_payload.mutable_subrequests()->Clear();
for (auto & p: pointerMap) p.second.deserialize(*m_payload.mutable_subrequests()->Add());
}
return ret;
}
//------------------------------------------------------------------------------
// RepackRequest::setLastExpandedFSeq()
//------------------------------------------------------------------------------
void RepackRequest::setLastExpandedFSeq(uint64_t lastExpandedFSeq) {
checkWritable();
m_payload.set_lastexpandedfseq(lastExpandedFSeq);
}
//------------------------------------------------------------------------------
// RepackRequest::getLastExpandedFSeq()
//------------------------------------------------------------------------------
uint64_t RepackRequest::getLastExpandedFSeq() {
checkPayloadReadable();
return m_payload.lastexpandedfseq();
}
//------------------------------------------------------------------------------
// RepackRequest::reportRetriveSuccesses()
//------------------------------------------------------------------------------
void RepackRequest::reportRetriveSuccesses(SubrequestStatistics::List& retrieveSuccesses) {
checkPayloadWritable();
RepackSubRequestPointer::Map pointerMap;
// Read the map
for (auto &rsrp: m_payload.subrequests()) pointerMap[rsrp.fseq()].deserialize(rsrp);
bool didUpdate = false;
for (auto & rs: retrieveSuccesses) {
try {
auto & p = pointerMap.at(rs.fSeq);
if (!p.retrieveAccounted) {
p.retrieveAccounted = true;
m_payload.set_retrievedbytes(m_payload.retrievedbytes() + rs.bytes);
m_payload.set_retrievedfiles(m_payload.retrievedfiles() + rs.files);
didUpdate = true;
}
} catch (std::out_of_range &) {
throw exception::Exception("In RepackRequest::reportRetriveSuccesses(): got a report for unknown fSeq");
}
}
if (didUpdate) {
m_payload.mutable_subrequests()->Clear();
for (auto & p: pointerMap) p.second.deserialize(*m_payload.mutable_subrequests()->Add());
}
}
//------------------------------------------------------------------------------
// RepackRequest::reportRetriveFailures()
//------------------------------------------------------------------------------
void RepackRequest::reportRetriveFailures(SubrequestStatistics::List& retrieveFailures) {
checkPayloadWritable();
RepackSubRequestPointer::Map pointerMap;
// Read the map
for (auto &rsrp: m_payload.subrequests()) pointerMap[rsrp.fseq()].deserialize(rsrp);
bool didUpdate = false;
for (auto & rs: retrieveFailures) {
try {
auto & p = pointerMap.at(rs.fSeq);
if (!p.failureAccounted) {
p.failureAccounted = true;
m_payload.set_failedtoretrievebytes(m_payload.failedtoretrievebytes() + rs.bytes);
m_payload.set_failedtoretievefiles(m_payload.failedtoretievefiles() + rs.files);
didUpdate = true;
}
} catch (std::out_of_range &) {
throw exception::Exception("In RepackRequest::reportRetriveFailures(): got a report for unknown fSeq");
}
}
if (didUpdate) {
m_payload.mutable_subrequests()->Clear();
for (auto & p: pointerMap) p.second.deserialize(*m_payload.mutable_subrequests()->Add());
}
}
//------------------------------------------------------------------------------
// RepackRequest::reportArchiveSuccesses()
//------------------------------------------------------------------------------
void RepackRequest::reportArchiveSuccesses(SubrequestStatistics::List& archiveSuccesses) {
checkPayloadWritable();
RepackSubRequestPointer::Map pointerMap;
// Read the map
for (auto &rsrp: m_payload.subrequests()) pointerMap[rsrp.fseq()].deserialize(rsrp);
bool didUpdate = false;
for (auto & as: archiveSuccesses) {
try {
auto & p = pointerMap.at(as.fSeq);
if (!p.archiveAccounted) {
p.archiveAccounted = true;
m_payload.set_archivedbytes(m_payload.archivedbytes() + as.bytes);
m_payload.set_archivedfiles(m_payload.archivedfiles() + as.files);
didUpdate = true;
}
} catch (std::out_of_range &) {
throw exception::Exception("In RepackRequest::reportArchiveSuccesses(): got a report for unknown fSeq");
}
}
if (didUpdate) {
m_payload.mutable_subrequests()->Clear();
for (auto & p: pointerMap) p.second.deserialize(*m_payload.mutable_subrequests()->Add());
}
}
//------------------------------------------------------------------------------
// RepackRequest::reportArchiveFailures()
//------------------------------------------------------------------------------
void RepackRequest::reportArchiveFailures(SubrequestStatistics::List& archiveFailures) {
checkPayloadWritable();
RepackSubRequestPointer::Map pointerMap;
// Read the map
for (auto &rsrp: m_payload.subrequests()) pointerMap[rsrp.fseq()].deserialize(rsrp);
bool didUpdate = false;
for (auto & rs: archiveFailures) {
try {
auto & p = pointerMap.at(rs.fSeq);
if (!p.failureAccounted) {
p.failureAccounted = true;
m_payload.set_failedtoarchivebytes(m_payload.failedtoarchivebytes() + rs.bytes);
m_payload.set_failedtoarchivefiles(m_payload.failedtoarchivefiles() + rs.files);
didUpdate = true;
}
} catch (std::out_of_range &) {
throw exception::Exception("In RepackRequest::reportRetriveFailures(): got a report for unknown fSeq");
}
}
if (didUpdate) {
m_payload.mutable_subrequests()->Clear();
for (auto & p: pointerMap) p.second.deserialize(*m_payload.mutable_subrequests()->Add());
}
}
//------------------------------------------------------------------------------
// RepackRequest::reportSubRequestsForDeletion()
//------------------------------------------------------------------------------
void RepackRequest::reportSubRequestsForDeletion(std::list<uint64_t>& fSeqs) {
checkPayloadWritable();
RepackSubRequestPointer::Map pointerMap;
// Read the map
for (auto &rsrp: m_payload.subrequests()) pointerMap[rsrp.fseq()].deserialize(rsrp);
bool didUpdate = false;
for (auto & fs: fSeqs) {
try {
auto & p = pointerMap.at(fs);
if (!p.subrequestDeleted) {
p.subrequestDeleted = true;
didUpdate = true;
}
} catch (std::out_of_range &) {
throw exception::Exception("In RepackRequest::reportSubRequestsForDeletion(): got a report for unknown fSeq");
}
}
if (didUpdate) {
m_payload.mutable_subrequests()->Clear();
for (auto & p: pointerMap) p.second.deserialize(*m_payload.mutable_subrequests()->Add());
}
}
//------------------------------------------------------------------------------
// RepackRequest::garbageCollect()
//------------------------------------------------------------------------------
......
......@@ -42,6 +42,55 @@ public:
void setStatus(common::dataStructures::RepackInfo::Status repackStatus);
common::dataStructures::RepackInfo getInfo();
// Sub request management
struct SubrequestInfo {
std::string address;
uint64_t fSeq;
bool subrequestDeleted; ///< A boolean set to true before deleting a request. Covers the race between request creation recording and request
typedef std::set<SubrequestInfo> set;
bool operator< (const SubrequestInfo & o) const { return fSeq < o.fSeq; }
};
/**
* Provide a list of addresses for a set or fSeqs. For expansion of repack requests.
* The addresses could be provided from the repack request if previously recorded, or
* generated if not. The repack request should then be committed (not done here) before the
* sub requests are actually created. Sub requests could also be already present, and this
* would not be an error case (the previous process doing the expansion managed to create them),
* yet not update the object to reflect the last fSeq created.
* This function implicitly records the information it generates (commit up t the caller);
*/
SubrequestInfo::set getOrPrepareSubrequestInfo (std::set<uint32_t> fSeqs, AgentReference & agentRef);
private:
struct RepackSubRequestPointer {
std::string address;
uint64_t fSeq;
bool retrieveAccounted;
bool archiveAccounted;
bool failureAccounted;
bool subrequestDeleted;
typedef std::map<uint64_t, RepackSubRequestPointer> Map;
void serialize (serializers::RepackSubRequestPointer & rsrp);
void deserialize (const serializers::RepackSubRequestPointer & rsrp);
};
public:
/// Set the last fully created sub-requests address
void setLastExpandedFSeq(uint64_t lastExpandedFSeq);
uint64_t getLastExpandedFSeq();
struct SubrequestStatistics {
uint64_t fSeq;
uint64_t files = 1;
uint64_t bytes;
typedef std::list<SubrequestStatistics> List;
bool operator< (const SubrequestStatistics & o) const { return fSeq < o.fSeq; }
};
void reportRetriveSuccesses (SubrequestStatistics::List & retrieveSuccesses);
void reportRetriveFailures (SubrequestStatistics::List & retrieveFailures);
void reportArchiveSuccesses (SubrequestStatistics::List & archiveSuccesses);
void reportArchiveFailures (SubrequestStatistics::List & archiveFailures);
void reportSubRequestsForDeletion (std::list<uint64_t>& fSeqs);
void garbageCollect(const std::string &presumedOwner, AgentReference & agentReference, log::LogContext & lc,
cta::catalogue::Catalogue & catalogue) override;
......
......@@ -460,27 +460,25 @@ enum RepackRequestStatus {
RRS_Failed = 8;
}
enum RepackSubRequestStatus {
// In order to handle all failure modes and accompanying races, we have to record whether
// a sub request is expected to be present or not. The process deleting the sub request will
// first mark it as delete with this status and then actually delete it. In case of crash in this
// process, the switch of this status will be re-done and the sub request deleted on retry.
// On the opposite side, this information will allow the creation process to know if a reference to the
// sub request should be interpreted as an unfulfilled creation intent (RSRS_Present) and create the
// missing sub request or the completion of the request (which can happen anytime after sub request
// creation). The two delete flavors allow reconstruction of the sub request counters if need
RSRS_Present = 1;
RSRS_DeletedAfterSuccess = 2;
RSRS_DeletedAfterFailure = 3;
}
// In order to properly handle retries in case of failure during reporting, we hold
// a series of booleans tracking the status of the statistics recording and deletion of the
// sub requests. Subrequest deleted records whether a sub request is expected to be present or not.
// The process deleting the sub request will
// first mark it as delete with this status and then actually delete it. In case of crash in this
// process, the switch of this status will be re-done and the sub request deleted on retry.
// On the opposite side, this information will allow the creation process to know if a reference to the
// sub request should be interpreted as an unfulfilled creation intent (deleted=false) and create the
// missing sub request or the completion of the request (which can happen anytime after sub request
// creation).
// Likewise, the "accounted" booleans will prevent double counting in case a report (for success or failure)
// need to be retried after a process failure.
message RepackSubRequestPointer {
required string address = 10500;
required uint64 fseq = 10510;
required uint64 archivefileid = 10520;
required bool retrieveaccounted = 10530;
required uint32 archivecopynb = 10540;
required RepackSubRequestStatus status = 10550;
required bool archiveaccounted = 10534;
required bool failureaccounted = 10537;
required bool subrequestdeleted = 10540;
}
message RepackRequest {
......@@ -505,6 +503,7 @@ message RepackRequest {
required uint64 failedtoarchivefiles = 11540;
required uint64 failedtoarchivebytes = 11550;
required uint64 lastexpandedfseq = 11560;
repeated RepackSubRequestPointer subrequests = 11570;
}
message RepackRequestIndexPointer {
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment