Skip to content
Snippets Groups Projects
Commit b0b0b828 authored by Eric Cano's avatar Eric Cano
Browse files

Completed the garbage collection unit test.

parent bb0b10af
No related branches found
No related tags found
No related merge requests found
...@@ -16,6 +16,14 @@ cta::objectstore::Agent::Agent(Backend & os): ...@@ -16,6 +16,14 @@ cta::objectstore::Agent::Agent(Backend & os):
cta::objectstore::Agent::Agent(const std::string & name, Backend & os): cta::objectstore::Agent::Agent(const std::string & name, Backend & os):
ObjectOps<serializers::Agent>(os, name), m_nextId(0) {} ObjectOps<serializers::Agent>(os, name), m_nextId(0) {}
void cta::objectstore::Agent::initialize() {
ObjectOps<serializers::Agent>::initialize();
m_payload.set_heartbeat(0);
m_payload.set_timeout_us(60*1000*1000);
m_payload.set_description("");
m_payloadInterpreted = true;
}
void cta::objectstore::Agent::generateName(const std::string & typeName) { void cta::objectstore::Agent::generateName(const std::string & typeName) {
std::stringstream aid; std::stringstream aid;
// Get time // Get time
......
...@@ -23,6 +23,8 @@ public: ...@@ -23,6 +23,8 @@ public:
Agent(const std::string & name, Backend & os); Agent(const std::string & name, Backend & os);
void initialize();
void generateName(const std::string & typeName); void generateName(const std::string & typeName);
std::string nextId(const std::string & childType); std::string nextId(const std::string & childType);
...@@ -116,6 +118,7 @@ public: ...@@ -116,6 +118,7 @@ public:
// If not, it is a dangling pointer. Pop and continue // If not, it is a dangling pointer. Pop and continue
if (!object.exists()) { if (!object.exists()) {
container.pop(); container.pop();
container.commit();
continue; continue;
} }
// Try to lock the object. Exception will be let through // Try to lock the object. Exception will be let through
...@@ -126,6 +129,7 @@ public: ...@@ -126,6 +129,7 @@ public:
if (container.getNameIfSet() != object.getOwner()) { if (container.getNameIfSet() != object.getOwner()) {
objLock.release(); objLock.release();
container.pop(); container.pop();
container.commit();
continue; continue;
} }
// If we get here, then we can proceed with the ownership transfer // If we get here, then we can proceed with the ownership transfer
...@@ -141,7 +145,9 @@ public: ...@@ -141,7 +145,9 @@ public:
object.setBackupOwner(container.getNameIfSet()); object.setBackupOwner(container.getNameIfSet());
// Commit the object // Commit the object
object.commit(); object.commit();
// Job done. // And remove the now dangling pointer from the container
container.pop();
container.commit();
return; return;
} }
} }
......
...@@ -57,7 +57,7 @@ TEST_P(BackendAbstractTest, ParametersInterface) { ...@@ -57,7 +57,7 @@ TEST_P(BackendAbstractTest, ParametersInterface) {
} }
cta::objectstore::BackendVFS osVFS; cta::objectstore::BackendVFS osVFS;
#define TEST_RADOS 1 #define TEST_RADOS 0
#if TEST_RADOS #if TEST_RADOS
cta::objectstore::BackendRados osRados("tapetest", "tapetest"); cta::objectstore::BackendRados osRados("tapetest", "tapetest");
INSTANTIATE_TEST_CASE_P(BackendTest, BackendAbstractTest, ::testing::Values(&osVFS, &osRados)); INSTANTIATE_TEST_CASE_P(BackendTest, BackendAbstractTest, ::testing::Values(&osVFS, &osRados));
......
...@@ -60,7 +60,8 @@ void cta::objectstore::FIFO::push(std::string name) { ...@@ -60,7 +60,8 @@ void cta::objectstore::FIFO::push(std::string name) {
void cta::objectstore::FIFO::pushIfNotPresent(std::string name) { void cta::objectstore::FIFO::pushIfNotPresent(std::string name) {
checkPayloadWritable(); checkPayloadWritable();
try { try {
serializers::findString(m_payload.mutable_name(), name); serializers::findStringFrom(m_payload.mutable_name(), m_payload.readpointer(),
name);
} catch (serializers::NotFound &) { } catch (serializers::NotFound &) {
m_payload.add_name(name); m_payload.add_name(name);
} }
......
...@@ -9,6 +9,7 @@ namespace cta { namespace objectstore { ...@@ -9,6 +9,7 @@ namespace cta { namespace objectstore {
class FIFO: public ObjectOps<serializers::FIFO> { class FIFO: public ObjectOps<serializers::FIFO> {
public: public:
FIFO(Backend & os): ObjectOps<serializers::FIFO>(os) {}
FIFO(const std::string & name, Backend & os): FIFO(const std::string & name, Backend & os):
ObjectOps<serializers::FIFO>(os, name) {} ObjectOps<serializers::FIFO>(os, name) {}
......
...@@ -16,6 +16,10 @@ GarbageCollector::GarbageCollector(Backend & os, Agent & agent): ...@@ -16,6 +16,10 @@ GarbageCollector::GarbageCollector(Backend & os, Agent & agent):
ScopedSharedLock arLock(m_agentRegister); ScopedSharedLock arLock(m_agentRegister);
m_agentRegister.fetch(); m_agentRegister.fetch();
} }
void GarbageCollector::setTimeout(double timeout) {
m_timeout = timeout;
}
void GarbageCollector::runOnePass() { void GarbageCollector::runOnePass() {
// Bump our own heart beat // Bump our own heart beat
...@@ -60,8 +64,8 @@ void GarbageCollector::aquireTargets() { ...@@ -60,8 +64,8 @@ void GarbageCollector::aquireTargets() {
std::list<std::string>::const_iterator c = candidatesList.begin(); std::list<std::string>::const_iterator c = candidatesList.begin();
// We can now take ownership of new agents, up to our max... // We can now take ownership of new agents, up to our max...
// and we don't monitor ourselves! // and we don't monitor ourselves!
while (m_watchedAgents.size() < c_maxWatchedAgentsPerGC for (;m_watchedAgents.size() < c_maxWatchedAgentsPerGC
&& c!=candidatesList.end()) { && c!=candidatesList.end(); c++) {
// We don't monitor ourselves // We don't monitor ourselves
if (*c != m_ourAgent.getNameIfSet()) { if (*c != m_ourAgent.getNameIfSet()) {
// So we have a candidate we might want to monitor // So we have a candidate we might want to monitor
...@@ -97,9 +101,16 @@ void GarbageCollector::aquireTargets() { ...@@ -97,9 +101,16 @@ void GarbageCollector::aquireTargets() {
// (we hold an exclusive lock all along) // (we hold an exclusive lock all along)
m_agentRegister.trackAgent(ag.getNameIfSet()); m_agentRegister.trackAgent(ag.getNameIfSet());
m_agentRegister.commit(); m_agentRegister.commit();
// Agent is now officially ours, let's track it // Agent is officially our, we can remove it from the untracked agent's
m_watchedAgents[ag.getNameIfSet()] = // list
new AgentWatchdog(ag.getNameIfSet(), m_objectStore); m_agentRegister.trackAgent(ag.getNameIfSet());
// Agent is now officially ours, let's track it. We have the release the
// lock to the agent before constructing the watchdog, which builds
// its own agent objects (and need to lock the object store representation)
std::string agentName = ag.getNameIfSet();
agLock.release();
m_watchedAgents[agentName] =
new AgentWatchdog(agentName, m_objectStore);
m_watchedAgents[ag.getNameIfSet()]->setTimeout(m_timeout); m_watchedAgents[ag.getNameIfSet()]->setTimeout(m_timeout);
} }
} }
...@@ -157,8 +168,9 @@ void GarbageCollector::checkHeartbeats() { ...@@ -157,8 +168,9 @@ void GarbageCollector::checkHeartbeats() {
} }
ScopedSharedLock gContLock(gContainter); ScopedSharedLock gContLock(gContainter);
gContainter.fetch(); gContainter.fetch();
serializers::ObjectType containerType = gContainter.type();
gContLock.release(); gContLock.release();
switch(gContainter.type()) { switch(containerType) {
case serializers::FIFO_t: { case serializers::FIFO_t: {
FIFO fifo(go.getBackupOwner(), m_objectStore); FIFO fifo(go.getBackupOwner(), m_objectStore);
ScopedExclusiveLock ffLock(fifo); ScopedExclusiveLock ffLock(fifo);
...@@ -170,19 +182,15 @@ void GarbageCollector::checkHeartbeats() { ...@@ -170,19 +182,15 @@ void GarbageCollector::checkHeartbeats() {
go.commit(); go.commit();
ffLock.release(); ffLock.release();
goLock.release(); goLock.release();
break;
} }
default: { default: {
throw cta::exception::Exception("In GarbageCollector::cleanupDeadAgent: unexpected container type!"); throw cta::exception::Exception("In GarbageCollector::cleanupDeadAgent: unexpected container type!");
} }
} }
// We now processed all the owned objects. We can delete the agent's entry
agent.remove();
// And remove the (dangling) pointers to it
ScopedExclusiveLock arLock(m_agentRegister);
m_agentRegister.fetch();
m_agentRegister.removeAgent(name);
m_agentRegister.commit();
} }
// We now processed all the owned objects. We can delete the agent's entry
agent.deleteAndUnregisterSelf();
} }
......
...@@ -21,21 +21,19 @@ TEST(GarbageCollector, BasicFuctionnality) { ...@@ -21,21 +21,19 @@ TEST(GarbageCollector, BasicFuctionnality) {
cta::objectstore::Agent agA(be), agB(be); cta::objectstore::Agent agA(be), agB(be);
agA.initialize(); agA.initialize();
agA.generateName("unitTestAgentA"); agA.generateName("unitTestAgentA");
agA.insert();
agA.insertAndRegisterSelf(); agA.insertAndRegisterSelf();
agB.initialize(); agB.initialize();
agB.generateName("unitTestAgentB"); agB.generateName("unitTestAgentB");
agB.insert();
agB.insertAndRegisterSelf(); agB.insertAndRegisterSelf();
// Create target FIFO // Create target FIFO
std::string fifoName = agent.nextId("FIFO"); std::string fifoName = agent.nextId("FIFO");
std::list<std::string> expectedData; std::list<std::string> expectedData;
{ // Try to create the FIFO entry
// Try to create the FIFO entry cta::objectstore::FIFO ff(fifoName,be);
cta::objectstore::FIFO ff(fifoName,be); ff.initialize();
ff.initialize(); ff.insert();
ff.insert(); // And lock it for later
} cta::objectstore::ScopedExclusiveLock ffLock;
{ {
for (int i=0; i<100; i++) { for (int i=0; i<100; i++) {
// We create FIFOs here, but any object can do. // We create FIFOs here, but any object can do.
...@@ -45,7 +43,7 @@ TEST(GarbageCollector, BasicFuctionnality) { ...@@ -45,7 +43,7 @@ TEST(GarbageCollector, BasicFuctionnality) {
cta::objectstore::FIFO centralFifo(fifoName, be); cta::objectstore::FIFO centralFifo(fifoName, be);
cta::objectstore::ScopedExclusiveLock lock(centralFifo); cta::objectstore::ScopedExclusiveLock lock(centralFifo);
centralFifo.fetch(); centralFifo.fetch();
expectedData.push_back(agent.nextId("TestData")); expectedData.push_back(newFIFO.getNameIfSet());
centralFifo.push(expectedData.back()); centralFifo.push(expectedData.back());
centralFifo.commit(); centralFifo.commit();
lock.release(); lock.release();
...@@ -56,9 +54,30 @@ TEST(GarbageCollector, BasicFuctionnality) { ...@@ -56,9 +54,30 @@ TEST(GarbageCollector, BasicFuctionnality) {
newFIFO.insert(); newFIFO.insert();
} }
} }
ffLock.lock(ff);
ff.fetch();
ASSERT_EQ(100, ff.size());
ffLock.release();
for (int i=0; i<10; i++) { for (int i=0; i<10; i++) {
cta::objectstore::ScopedExclusiveLock objALock, objBLock;
cta::objectstore::FIFO objA(be), objB(be);
agA.popFromContainer(ff, objA, objALock);
agB.popFromContainer(ff, objB, objBLock);
} }
// TODO: take ownership of FIFO contents in agA and agB, and then garbage collect. ffLock.lock(ff);
// The FIFO should get all its objects back. ff.fetch();
ASSERT_EQ(80, ff.size());
ffLock.release();
// Create the garbage colletor and run it twice.
cta::objectstore::Agent gcAgent(be);
gcAgent.initialize();
gcAgent.generateName("unitTestGarbageCollector");
gcAgent.insertAndRegisterSelf();
cta::objectstore::GarbageCollector gc(be, gcAgent);
gc.setTimeout(0);
gc.runOnePass();
gc.runOnePass();
ffLock.lock(ff);
ff.fetch();
ASSERT_EQ(100, ff.size());
} }
\ No newline at end of file
...@@ -8,7 +8,7 @@ namespace cta { namespace objectstore { ...@@ -8,7 +8,7 @@ namespace cta { namespace objectstore {
class GenericObject: public ObjectOps<serializers::GenericObject> { class GenericObject: public ObjectOps<serializers::GenericObject> {
public: public:
GenericObject(const std::string & name, Backend & os): GenericObject(const std::string & name, Backend & os):
ObjectOps<serializers::GenericObject>(os) {}; ObjectOps<serializers::GenericObject>(os, name) {};
class ForbiddenOperation: public cta::exception::Exception { class ForbiddenOperation: public cta::exception::Exception {
public: public:
......
...@@ -15,4 +15,7 @@ class NotFound: public cta::exception::Exception { ...@@ -15,4 +15,7 @@ class NotFound: public cta::exception::Exception {
size_t findString(::google::protobuf::RepeatedPtrField< ::std::string>* field, size_t findString(::google::protobuf::RepeatedPtrField< ::std::string>* field,
const std::string & value); const std::string & value);
size_t findStringFrom(::google::protobuf::RepeatedPtrField< ::std::string>* field,
size_t fromIndex, const std::string & value);
}}} }}}
\ No newline at end of file
...@@ -25,4 +25,15 @@ size_t cta::objectstore::serializers::findString( ...@@ -25,4 +25,15 @@ size_t cta::objectstore::serializers::findString(
} }
} }
throw NotFound("In cta::objectstore::serializers::findString: string not found"); throw NotFound("In cta::objectstore::serializers::findString: string not found");
}
size_t cta::objectstore::serializers::findStringFrom(
::google::protobuf::RepeatedPtrField< ::std::string>* field, size_t fromIndex,
const std::string& value) {
for (size_t i=fromIndex; i<(size_t)field->size(); i++) {
if (value == field->Get(i)) {
return i;
}
}
throw NotFound("In cta::objectstore::serializers::findString: string not found");
} }
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment