Commit 3b1243fb authored by Cedric CAFFY's avatar Cedric CAFFY
Browse files

Added support for RepackRequest's garbage collection

parent 166e3c25
......@@ -17,6 +17,7 @@
*/
#include "common/dataStructures/RepackInfo.hpp"
#include "common/exception/Exception.hpp"
namespace cta {
namespace common {
......@@ -54,6 +55,22 @@ std::string toString(RepackInfo::Status status) {
}
}
cta::objectstore::RepackQueueType RepackInfo::getQueueType(){
switch(status){
case RepackInfo::Status::Pending:
return cta::objectstore::RepackQueueType::Pending;
case RepackInfo::Status::ToExpand:
return cta::objectstore::RepackQueueType::ToExpand;
case RepackInfo::Status::Running:
case RepackInfo::Status::Starting:
if(!isExpandFinished){
return cta::objectstore::RepackQueueType::ToExpand;
}
default:
throw cta::exception::Exception("The status "+toString(status)+" have no corresponding queue.");
}
}
} // namespace dataStructures
} // namespace common
} // namespace cta
......@@ -19,6 +19,7 @@
#pragma once
#include <string>
#include "objectstore/RepackQueueType.hpp"
namespace cta {
namespace common {
......@@ -57,6 +58,7 @@ struct RepackInfo {
uint64_t failedBytesToRetrieve;
uint64_t lastExpandedFseq;
uint64_t userProvidedFiles;
bool isExpandFinished;
// std::string tag;
// uint64_t totalFiles;
// uint64_t totalSize;
......@@ -68,7 +70,7 @@ struct RepackInfo {
// std::string repackStatus;
// std::map<uint64_t,std::string> errors;
// EntryLog creationLog;
cta::objectstore::RepackQueueType getQueueType();
}; // struct RepackInfo
std::string toString(RepackInfo::Type type);
......
......@@ -22,6 +22,8 @@
#include "common/exception/Exception.hpp"
#include "common/dataStructures/ArchiveFile.hpp"
#include "common/log/DummyLogger.hpp"
#include "common/log/StdoutLogger.hpp"
#include "common/log/StringLogger.hpp"
#include "tests/TestsCompileTimeSwitches.hpp"
#ifdef STDOUT_LOGGING
#include "common/log/StdoutLogger.hpp"
......@@ -672,4 +674,418 @@ TEST(ObjectStore, GarbageCollectorRetrieveRequest) {
// TODO: this unit test still leaks tape pools and requests
}
TEST(ObjectStore, GarbageCollectorRepackRequestPending) {
// We will need a log object
#ifdef STDOUT_LOGGING
cta::log::StdoutLogger dl("dummy", "unitTest");
#else
cta::log::DummyLogger dl("dummy", "unitTest");
#endif
cta::log::LogContext lc(dl);
// We need a dummy catalogue
cta::catalogue::DummyCatalogue catalogue;
// Here we check that can successfully call RetrieveRequests's garbage collector
cta::objectstore::BackendVFS be;
// Create the root entry
cta::objectstore::RootEntry re(be);
re.initialize();
re.insert();
// Create the agent register
cta::objectstore::EntryLogSerDeser el("user0",
"unittesthost", time(NULL));
cta::objectstore::ScopedExclusiveLock rel(re);
// Create the agent for objects creation
cta::objectstore::AgentReference agentRef("unitTestCreateEnv", dl);
// Finish root creation.
re.addOrGetAgentRegisterPointerAndCommit(agentRef, el, lc);
rel.release();
// continue agent creation.
cta::objectstore::Agent agent(agentRef.getAgentAddress(), be);
agent.initialize();
agent.setTimeout_us(0);
agent.insertAndRegisterSelf(lc);
{
// Create an agent to be garbage collected
cta::objectstore::AgentReference agentReferenceRepackRequest("AgentReferenceRepackRequest", dl);
cta::objectstore::Agent agentRepackRequest(agentReferenceRepackRequest.getAgentAddress(), be);
agentRepackRequest.initialize();
agentRepackRequest.setTimeout_us(0);
agentRepackRequest.insertAndRegisterSelf(lc);
//Create a RepackQueue and insert a RepackRequest with status "Pending" in it
cta::objectstore::RootEntry re(be);
cta::objectstore::ScopedExclusiveLock rel(re);
re.fetch();
//Create the RepackRequest
std::string repackRequestAddr = agentReferenceRepackRequest.nextId("RepackRequest");
agentReferenceRepackRequest.addToOwnership(repackRequestAddr, be);
cta::objectstore::RepackRequest repackRequest(repackRequestAddr,be);
repackRequest.initialize();
repackRequest.setStatus(cta::common::dataStructures::RepackInfo::Status::Pending);
repackRequest.setVid("VIDTest");
repackRequest.setBufferURL("test/buffer/url");
repackRequest.setOwner(agentReferenceRepackRequest.getAgentAddress());
repackRequest.insert();
}
{
//Now we garbage collect the RepackRequest
// Create the garbage collector and run it once.
cta::objectstore::AgentReference gcAgentRef("unitTestGarbageCollector", dl);
cta::objectstore::Agent gcAgent(gcAgentRef.getAgentAddress(), be);
gcAgent.initialize();
gcAgent.setTimeout_us(0);
gcAgent.insertAndRegisterSelf(lc);
{
cta::objectstore::GarbageCollector gc(be, gcAgentRef, catalogue);
gc.runOnePass(lc);
}
}
//The repack request should have been requeued in the RepackQueuePending
{
cta::objectstore::RootEntry re(be);
cta::objectstore::ScopedExclusiveLock rel(re);
re.fetch();
std::string repackQueueAddr = re.addOrGetRepackQueueAndCommit(agentRef,cta::objectstore::RepackQueueType::Pending);
cta::objectstore::RepackQueue rq(repackQueueAddr,be);
cta::objectstore::ScopedExclusiveLock rql(rq);
rq.fetch();
ASSERT_EQ(1,rq.getRequestsSummary().requests);
}
}
TEST(ObjectStore, GarbageCollectorRepackRequestToExpand) {
// We will need a log object
#ifdef STDOUT_LOGGING
cta::log::StdoutLogger dl("dummy", "unitTest");
#else
cta::log::DummyLogger dl("dummy", "unitTest");
#endif
cta::log::LogContext lc(dl);
// We need a dummy catalogue
cta::catalogue::DummyCatalogue catalogue;
// Here we check that can successfully call RetrieveRequests's garbage collector
cta::objectstore::BackendVFS be;
// Create the root entry
cta::objectstore::RootEntry re(be);
re.initialize();
re.insert();
// Create the agent register
cta::objectstore::EntryLogSerDeser el("user0",
"unittesthost", time(NULL));
cta::objectstore::ScopedExclusiveLock rel(re);
// Create the agent for objects creation
cta::objectstore::AgentReference agentRef("unitTestCreateEnv", dl);
// Finish root creation.
re.addOrGetAgentRegisterPointerAndCommit(agentRef, el, lc);
rel.release();
// continue agent creation.
cta::objectstore::Agent agent(agentRef.getAgentAddress(), be);
agent.initialize();
agent.setTimeout_us(0);
agent.insertAndRegisterSelf(lc);
{
// Create an agent to be garbage collected
cta::objectstore::AgentReference agentReferenceRepackRequest("AgentReferenceRepackRequest", dl);
cta::objectstore::Agent agentRepackRequest(agentReferenceRepackRequest.getAgentAddress(), be);
agentRepackRequest.initialize();
agentRepackRequest.setTimeout_us(0);
agentRepackRequest.insertAndRegisterSelf(lc);
//Create a RepackQueue and insert a RepackRequest with status "ToExpand" in it
cta::objectstore::RootEntry re(be);
cta::objectstore::ScopedExclusiveLock rel(re);
re.fetch();
//Create the RepackRequest
std::string repackRequestAddr = agentReferenceRepackRequest.nextId("RepackRequest");
agentReferenceRepackRequest.addToOwnership(repackRequestAddr, be);
cta::objectstore::RepackRequest repackRequest(repackRequestAddr,be);
repackRequest.initialize();
repackRequest.setStatus(cta::common::dataStructures::RepackInfo::Status::ToExpand);
repackRequest.setVid("VID2Test");
repackRequest.setBufferURL("test/buffer/url");
repackRequest.setOwner(agentReferenceRepackRequest.getAgentAddress());
repackRequest.insert();
}
{
// Now we garbage collect the RepackRequest
// Create the garbage collector and run it once.
cta::objectstore::AgentReference gcAgentRef("unitTestGarbageCollector", dl);
cta::objectstore::Agent gcAgent(gcAgentRef.getAgentAddress(), be);
gcAgent.initialize();
gcAgent.setTimeout_us(0);
gcAgent.insertAndRegisterSelf(lc);
{
cta::objectstore::GarbageCollector gc(be, gcAgentRef, catalogue);
gc.runOnePass(lc);
}
}
{
cta::objectstore::RootEntry re(be);
cta::objectstore::ScopedExclusiveLock rel(re);
re.fetch();
std::string repackQueueAddr = re.addOrGetRepackQueueAndCommit(agentRef,cta::objectstore::RepackQueueType::ToExpand);
cta::objectstore::RepackQueue rq(repackQueueAddr,be);
cta::objectstore::ScopedExclusiveLock rql(rq);
rq.fetch();
ASSERT_EQ(1,rq.getRequestsSummary().requests);
}
}
TEST(ObjectStore, GarbageCollectorRepackRequestRunningExpandNotFinished) {
// We will need a log object
#ifdef STDOUT_LOGGING
cta::log::StdoutLogger dl("dummy", "unitTest");
#else
cta::log::DummyLogger dl("dummy", "unitTest");
#endif
cta::log::LogContext lc(dl);
// We need a dummy catalogue
cta::catalogue::DummyCatalogue catalogue;
// Here we check that can successfully call RetrieveRequests's garbage collector
cta::objectstore::BackendVFS be;
// Create the root entry
cta::objectstore::RootEntry re(be);
re.initialize();
re.insert();
// Create the agent register
cta::objectstore::EntryLogSerDeser el("user0",
"unittesthost", time(NULL));
cta::objectstore::ScopedExclusiveLock rel(re);
// Create the agent for objects creation
cta::objectstore::AgentReference agentRef("unitTestCreateEnv", dl);
// Finish root creation.
re.addOrGetAgentRegisterPointerAndCommit(agentRef, el, lc);
rel.release();
// continue agent creation.
cta::objectstore::Agent agent(agentRef.getAgentAddress(), be);
agent.initialize();
agent.setTimeout_us(0);
agent.insertAndRegisterSelf(lc);
{
// Create an agent to be garbage collected
cta::objectstore::AgentReference agentReferenceRepackRequest("AgentReferenceRepackRequest", dl);
cta::objectstore::Agent agentRepackRequest(agentReferenceRepackRequest.getAgentAddress(), be);
agentRepackRequest.initialize();
agentRepackRequest.setTimeout_us(0);
agentRepackRequest.insertAndRegisterSelf(lc);
//Create a RepackQueue and insert a RepackRequest with status "ToExpand" in it
cta::objectstore::RootEntry re(be);
cta::objectstore::ScopedExclusiveLock rel(re);
re.fetch();
//Create the RepackRequest
std::string repackRequestAddr = agentReferenceRepackRequest.nextId("RepackRequest");
agentReferenceRepackRequest.addToOwnership(repackRequestAddr, be);
cta::objectstore::RepackRequest repackRequest(repackRequestAddr,be);
repackRequest.initialize();
repackRequest.setStatus(cta::common::dataStructures::RepackInfo::Status::Running);
repackRequest.setVid("VIDTest");
repackRequest.setBufferURL("test/buffer/url");
repackRequest.setOwner(agentReferenceRepackRequest.getAgentAddress());
repackRequest.setExpandFinished(false);
repackRequest.insert();
}
{
// Now we garbage collect the RepackRequest
// Create the garbage collector and run it once.
cta::objectstore::AgentReference gcAgentRef("unitTestGarbageCollector", dl);
cta::objectstore::Agent gcAgent(gcAgentRef.getAgentAddress(), be);
gcAgent.initialize();
gcAgent.setTimeout_us(0);
gcAgent.insertAndRegisterSelf(lc);
{
cta::objectstore::GarbageCollector gc(be, gcAgentRef, catalogue);
gc.runOnePass(lc);
}
}
{
//The request should be requeued in the ToExpand as it has not finished to expand
cta::objectstore::RootEntry re(be);
cta::objectstore::ScopedExclusiveLock rel(re);
re.fetch();
std::string repackQueueAddr = re.addOrGetRepackQueueAndCommit(agentRef,cta::objectstore::RepackQueueType::ToExpand);
cta::objectstore::RepackQueue rq(repackQueueAddr,be);
cta::objectstore::ScopedExclusiveLock rql(rq);
rq.fetch();
ASSERT_EQ(1,rq.getRequestsSummary().requests);
}
}
TEST(ObjectStore, GarbageCollectorRepackRequestRunningExpandFinished) {
// We will need a log object
#ifdef STDOUT_LOGGING
cta::log::StdoutLogger dl("dummy", "unitTest");
#else
cta::log::DummyLogger dl("dummy", "unitTest");
#endif
cta::log::LogContext lc(dl);
// We need a dummy catalogue
cta::catalogue::DummyCatalogue catalogue;
// Here we check that can successfully call RetrieveRequests's garbage collector
cta::objectstore::BackendVFS be;
// Create the root entry
cta::objectstore::RootEntry re(be);
re.initialize();
re.insert();
// Create the agent register
cta::objectstore::EntryLogSerDeser el("user0",
"unittesthost", time(NULL));
cta::objectstore::ScopedExclusiveLock rel(re);
// Create the agent for objects creation
cta::objectstore::AgentReference agentRef("unitTestCreateEnv", dl);
// Finish root creation.
re.addOrGetAgentRegisterPointerAndCommit(agentRef, el, lc);
rel.release();
// continue agent creation.
cta::objectstore::Agent agent(agentRef.getAgentAddress(), be);
agent.initialize();
agent.setTimeout_us(0);
agent.insertAndRegisterSelf(lc);
{
// Create an agent to be garbage collected
cta::objectstore::AgentReference agentReferenceRepackRequest("AgentReferenceRepackRequest", dl);
cta::objectstore::Agent agentRepackRequest(agentReferenceRepackRequest.getAgentAddress(), be);
agentRepackRequest.initialize();
agentRepackRequest.setTimeout_us(0);
agentRepackRequest.insertAndRegisterSelf(lc);
//Create a RepackQueue and insert a RepackRequest with status "ToExpand" in it
cta::objectstore::RootEntry re(be);
cta::objectstore::ScopedExclusiveLock rel(re);
re.fetch();
//Create the RepackRequest
std::string repackRequestAddr = agentReferenceRepackRequest.nextId("RepackRequest");
agentReferenceRepackRequest.addToOwnership(repackRequestAddr, be);
cta::objectstore::RepackRequest repackRequest(repackRequestAddr,be);
repackRequest.initialize();
repackRequest.setStatus(cta::common::dataStructures::RepackInfo::Status::Running);
repackRequest.setVid("VIDTest");
repackRequest.setBufferURL("test/buffer/url");
repackRequest.setOwner(agentReferenceRepackRequest.getAgentAddress());
repackRequest.setExpandFinished(true);
repackRequest.insert();
}
cta::log::StringLogger strLogger("dummy", "dummy", cta::log::DEBUG);
cta::log::LogContext lc2(strLogger);
{
// Now we garbage collect the RepackRequest
// Create the garbage collector and run it once.
cta::objectstore::AgentReference gcAgentRef("unitTestGarbageCollector", strLogger);
cta::objectstore::Agent gcAgent(gcAgentRef.getAgentAddress(), be);
gcAgent.initialize();
gcAgent.setTimeout_us(0);
gcAgent.insertAndRegisterSelf(lc2);
{
cta::objectstore::GarbageCollector gc(be, gcAgentRef, catalogue);
gc.runOnePass(lc2);
}
}
{
//The request should not be requeued in the ToExpand as it has finished to expand
cta::objectstore::RootEntry re(be);
cta::objectstore::ScopedExclusiveLock rel(re);
re.fetch();
std::string repackQueueAddr = re.addOrGetRepackQueueAndCommit(agentRef,cta::objectstore::RepackQueueType::ToExpand);
cta::objectstore::RepackQueue rq(repackQueueAddr,be);
cta::objectstore::ScopedExclusiveLock rql(rq);
rq.fetch();
ASSERT_EQ(0,rq.getRequestsSummary().requests);
}
{
//The request should not be requeued in the ToExpand as it has not finished to expand
cta::objectstore::RootEntry re(be);
cta::objectstore::ScopedExclusiveLock rel(re);
re.fetch();
std::string repackQueueAddr = re.addOrGetRepackQueueAndCommit(agentRef,cta::objectstore::RepackQueueType::Pending);
cta::objectstore::RepackQueue rq(repackQueueAddr,be);
cta::objectstore::ScopedExclusiveLock rql(rq);
rq.fetch();
ASSERT_EQ(0,rq.getRequestsSummary().requests);
}
//Check the logs contains the failed to requeue message
std::string logToCheck = strLogger.getLog();
logToCheck += "";
ASSERT_NE(std::string::npos,logToCheck.find("MSG=\"In RepackRequest::garbageCollect(): failed to requeue the RepackRequest (leaving it as it is) : The status Running have no corresponding queue.\""));
}
TEST(ObjectStore, GarbageCollectorRepackRequestStarting) {
// We will need a log object
#ifdef STDOUT_LOGGING
cta::log::StdoutLogger dl("dummy", "unitTest");
#else
cta::log::DummyLogger dl("dummy", "unitTest");
#endif
cta::log::LogContext lc(dl);
// We need a dummy catalogue
cta::catalogue::DummyCatalogue catalogue;
// Here we check that can successfully call RetrieveRequests's garbage collector
cta::objectstore::BackendVFS be;
// Create the root entry
cta::objectstore::RootEntry re(be);
re.initialize();
re.insert();
// Create the agent register
cta::objectstore::EntryLogSerDeser el("user0",
"unittesthost", time(NULL));
cta::objectstore::ScopedExclusiveLock rel(re);
// Create the agent for objects creation
cta::objectstore::AgentReference agentRef("unitTestCreateEnv", dl);
// Finish root creation.
re.addOrGetAgentRegisterPointerAndCommit(agentRef, el, lc);
rel.release();
// continue agent creation.
cta::objectstore::Agent agent(agentRef.getAgentAddress(), be);
agent.initialize();
agent.setTimeout_us(0);
agent.insertAndRegisterSelf(lc);
{
// Create an agent to be garbage collected
cta::objectstore::AgentReference agentReferenceRepackRequest("AgentReferenceRepackRequest", dl);
cta::objectstore::Agent agentRepackRequest(agentReferenceRepackRequest.getAgentAddress(), be);
agentRepackRequest.initialize();
agentRepackRequest.setTimeout_us(0);
agentRepackRequest.insertAndRegisterSelf(lc);
//Create a RepackQueue and insert a RepackRequest with status "ToExpand" in it
cta::objectstore::RootEntry re(be);
cta::objectstore::ScopedExclusiveLock rel(re);
re.fetch();
//Create the RepackRequest
std::string repackRequestAddr = agentReferenceRepackRequest.nextId("RepackRequest");
agentReferenceRepackRequest.addToOwnership(repackRequestAddr, be);
cta::objectstore::RepackRequest repackRequest(repackRequestAddr,be);
repackRequest.initialize();
repackRequest.setStatus(cta::common::dataStructures::RepackInfo::Status::Starting);
repackRequest.setVid("VIDTest");
repackRequest.setBufferURL("test/buffer/url");
repackRequest.setOwner(agentReferenceRepackRequest.getAgentAddress());
repackRequest.setExpandFinished(true);
repackRequest.insert();
}
cta::log::StringLogger strLogger("dummy", "dummy", cta::log::DEBUG);
cta::log::LogContext lc2(strLogger);
{
// Now we garbage collect the RepackRequest
// Create the garbage collector and run it once.
cta::objectstore::AgentReference gcAgentRef("unitTestGarbageCollector", strLogger);
cta::objectstore::Agent gcAgent(gcAgentRef.getAgentAddress(), be);
gcAgent.initialize();
gcAgent.setTimeout_us(0);
gcAgent.insertAndRegisterSelf(lc2);
{
cta::objectstore::GarbageCollector gc(be, gcAgentRef, catalogue);
gc.runOnePass(lc2);
}
}
//Check the logs contains the failed to requeue message
std::string logToCheck = strLogger.getLog();
logToCheck += "";
ASSERT_NE(std::string::npos,logToCheck.find("MSG=\"In RepackRequest::garbageCollect(): failed to requeue the RepackRequest (leaving it as it is) : The status Starting have no corresponding queue.\""));
}
}
......@@ -140,6 +140,9 @@ void GenericObject::garbageCollectDispatcher(ScopedExclusiveLock& lock,
case serializers::RetrieveQueue_t:
garbageCollectWithType<RetrieveQueue>(this, lock, presumedOwner, agentReference, lc, catalogue);
break;
case serializers::RepackRequest_t:
garbageCollectWithType<RepackRequest>(this, lock, presumedOwner, agentReference, lc, catalogue);
break;
default: {
std::stringstream err;
err << "In GenericObject::garbageCollect, unsupported type: "
......
......@@ -19,7 +19,10 @@
#include "RepackRequest.hpp"
#include "GenericObject.hpp"
#include "AgentReference.hpp"
#include "RepackQueueAlgorithms.hpp"
#include "Algorithms.hpp"
#include <google/protobuf/util/json_util.h>
#include <iostream>
namespace cta { namespace objectstore {
......@@ -71,6 +74,7 @@ void RepackRequest::initialize() {
m_payload.set_failedtoarchivefiles(0);
m_payload.set_failedtoarchivebytes(0);
m_payload.set_lastexpandedfseq(0);
m_payload.set_is_expand_finished(false);
// This object is good to go (to storage)
m_payloadInterpreted = true;
}
......@@ -135,6 +139,7 @@ common::dataStructures::RepackInfo RepackRequest::getInfo() {
ret.failedBytesToRetrieve = m_payload.failedtoretrievebytes();
ret.lastExpandedFseq = m_payload.lastexpandedfseq();
ret.userProvidedFiles = m_payload.userprovidedfiles();
ret.isExpandFinished = m_payload.is_expand_finished();
if (m_payload.move_mode()) {
if (m_payload.add_copies_mode()) {
ret.type = RepackInfo::Type::MoveAndAddCopies;
......@@ -149,6 +154,22 @@ common::dataStructures::RepackInfo RepackRequest::getInfo() {
return ret;
}
//------------------------------------------------------------------------------
// RepackRequest::setExpandFinished()
//------------------------------------------------------------------------------
void RepackRequest::setExpandFinished(const bool expandFinished){
checkPayloadWritable();
m_payload.set_is_expand_finished(expandFinished);
}
//------------------------------------------------------------------------------
// RepackRequest::getExpandFinished()
//------------------------------------------------------------------------------
bool RepackRequest::isExpandFinished(){
checkPayloadReadable();
return m_payload.is_expand_finished();
}
//------------------------------------------------------------------------------
// RepackRequest::setBufferURL()
//------------------------------------------------------------------------------
......@@ -441,7 +462,35 @@ auto RepackRequest::getStats() -> std::map<StatsType, StatsValues> {
//------------------------------------------------------------------------------
void RepackRequest::garbageCollect(const std::string& presumedOwner, AgentReference& agentReference,
log::LogContext& lc, cta::catalogue::Catalogue& catalogue) {
throw exception::Exception("In RepackRequest::garbageCollect(): not implemented.");
//Let's requeue the RepackRequest if its status is ToExpand or Pending
agentReference.addToOwnership(this->getAddressIfSet(), m_objectStore);
cta::utils::Timer t;
RepackQueue rq(m_objectStore);
ScopedExclusiveLock rql;
try{
Helpers::getLockedAndFetchedRepackQueue(rq, rql, agentReference, this->getInfo().getQueueType(), lc);
} catch(const cta::exception::Exception &e){
lc.log(log::INFO,"In RepackRequest::garbageCollect(): failed to requeue the RepackRequest (leaving it as it is) : "+e.getMessage().str());
return;
}
double queueLockFetchTime = t.secs(utils::Timer::resetCounter);
auto jobsSummary = rq.getRequestsSummary();
uint64_t requestsBefore = jobsSummary.requests;
std::list<std::string> requestsToAdd;
requestsToAdd.push_back(this->getAddressIfSet());
try{
rq.addRequestsAndCommit(requestsToAdd,lc);
jobsSummary = rq.getRequestsSummary();
uint64_t requestsAfter = jobsSummary.requests;
log::ScopedParamContainer params(lc);
params.add("queueLockFetchTime",queueLockFetchTime)
.add("queueAddress",rq.getAddressIfSet())
.add("requestsBefore",requestsBefore)
.add("requestsAfter",requestsAfter);
lc.log(log::INFO,"In RepackRequest::garbageCollect() succesfully requeued the RepackRequest.");
} catch(const cta::exception::Exception &e){
lc.log(log::INFO,"In RepackRequest::garbageCollect() failed to requeue the RepackRequest. Leaving it as it is.");
}
}
//------------------------------------------------------------------------------
......
......@@ -42,7 +42,8 @@ public:
void setStatus(common::dataStructures::RepackInfo::Status repackStatus);
common::dataStructures::RepackInfo getInfo();
void setBufferURL(const std::string & bufferURL);
void setExpandFinished(const bool expandFinished);
bool isExpandFinished();
// Sub request management
struct SubrequestInfo {
std::string address;
......
......@@ -541,6 +541,10 @@ message RepackRequest {
required uint64 failedtoarchivefiles = 11540;
required uint64 failedtoarchivebytes = 11550;
required uint64 lastexpandedfseq = 11560;
</