Skip to content
Snippets Groups Projects
Commit 36c40c01 authored by Eric Cano's avatar Eric Cano
Browse files

Simplified the context system and the locking for VFS version of object store.

parent d6da0d80
No related branches found
No related tags found
No related merge requests found
......@@ -72,21 +72,26 @@ public:
// Pop a job from the FIFO
FIFO::Transaction tr = fifo.startTransaction(m_agent);
std::string rjName = tr.peek();
m_agent.addToOwnership(tr.peek(), serializers::RecallJob_t);
m_agent.addToOwnership(rjName, serializers::RecallJob_t);
tr.popAndUnlock();
RecallJob rj(rjName, m_agent);
// Sleep on it for a while
usleep(100 * 1000);
// Log the deletion
std::cout << "RecallJob " << rj.source(m_agent) << " => "
<< rj.destination(m_agent) << " is done" << std::endl;
<< rj.destination(m_agent) << " is done by recaller "
<< m_number << " (tid=" << syscall(SYS_gettid) << ")"
<< std::endl;
rj.remove();
m_agent.removeFromOwnership(rjName, serializers::RecallJob_t);
} catch (FIFO::FIFOEmpty &) {
cta::utils::Timer timeout;
bool somethingMore = false;
while (timeout.secs() < 1.0) {
try {
if (fifo.size(m_agent) > 0) {
continue;
somethingMore = true;
break;
} else {
usleep (100 * 1000);
}
......@@ -94,6 +99,7 @@ public:
throw;
} catch (...) {}
}
if (somethingMore) continue;
std::cout << name.str() << " complete: FIFO empty" << std::endl;
break;
}
......@@ -141,21 +147,30 @@ public:
i++;
}
}
// On a second pass, check that we are acquaint with all processes
// On a second pass, check that we are acquainted with all processes
for (std::list<std::string>::iterator i=agentNames.begin();
i != agentNames.end(); i++) {
if(watchdogs.find(*i) == watchdogs.end()) {
watchdogs[*i] = new AgentWatchdog(*i, m_agent);
try {
watchdogs[*i] = new AgentWatchdog(*i, m_agent);
} catch ( cta::exception::Exception & ) {}
}
}
// And now check the heartbeats of the agents
for (std::map<std::string, AgentWatchdog *>::iterator i=watchdogs.begin();
i != watchdogs.end();) {
if (!i->second->checkAlive(m_agent)) {
collectGarbage(i->first);
try {
if (!i->second->checkAlive(m_agent)) {
collectGarbage(i->first);
delete i->second;
// The post-increment returns the iterator to erase,
// but sets i to the next value, which will remain valid.
watchdogs.erase(i++);
} else {
i++;
}
} catch ( cta::exception::Exception & ) {
delete i->second;
// The post-increment returns the iterator to erase,
// but sets i to the next value, which will remain valid.
watchdogs.erase(i++);
}
}
......@@ -169,7 +184,18 @@ private:
// intended and owned objects, validate that they still exist, are owned by
// the dead agent, and re-post them to the container where they should be.
try {
std::cout << "In garbage collector, found a dead agent: " << agentName << std::endl;
// If the agent entry does not exist anymore, we're done.
// print the recall FIFO
std::cout << "FIFO before garbage collection:" << std::endl;
try {
RootEntry re(m_agent);
JobPool jp(re.getJobPool(m_agent), m_agent);
FIFO rf(jp.getRecallFIFO(m_agent), m_agent);
std::cout << rf.dump(m_agent) << std::endl;
} catch (abi::__forced_unwind&) {
throw;
} catch (...) {}
AgentVisitor ag(agentName, m_agent);
std::list<AgentVisitor::intentEntry> intendedObjects = ag.getIntentLog(m_agent);
for (std::list<AgentVisitor::intentEntry>::iterator i=intendedObjects.begin();
......@@ -243,6 +269,16 @@ private:
break;
}
}
// print the recall FIFO
std::cout << "FIFO before garbage collection:" << std::endl;
try {
RootEntry re(m_agent);
JobPool jp(re.getJobPool(m_agent), m_agent);
FIFO rf(jp.getRecallFIFO(m_agent), m_agent);
std::cout << rf.dump(m_agent) << std::endl;
} catch (abi::__forced_unwind&) {
throw;
} catch (...) {}
} catch (abi::__forced_unwind&) {
throw;
} catch (...) {}
......
......@@ -66,6 +66,12 @@ std::string cta::objectstore::Agent::name() {
return selfName();
}
void cta::objectstore::Agent::flushContexts() {
for(size_t i=0; i< c_handleCount; i++) {
m_contexts[i].release();
}
}
cta::objectstore::Agent::~Agent() {
for (size_t i=0; i < c_handleCount; i++) {
m_contexts[i].release();
......@@ -92,7 +98,7 @@ std::string cta::objectstore::Agent::nextId(const std::string & childType) {
return id.str();
}
cta::objectstore::ContextHandleImplementation<myOS> &
cta::objectstore::ContextHandle &
cta::objectstore::Agent::getFreeContext() {
for (size_t i=0; i < c_handleCount; i++) {
if (!m_contexts[i].isSet())
......@@ -167,8 +173,8 @@ void cta::objectstore::Agent::removeFromOwnership(std::string name, serializers:
if (name == as.mutable_ownershipintent(i)->name() &&
objectType == as.mutable_ownershipintent(i)->type()) {
found = true;
as.mutable_creationintent()->SwapElements(i, as.ownershipintent_size()-1);
as.mutable_creationintent()->RemoveLast();
as.mutable_ownershipintent()->SwapElements(i, as.ownershipintent_size()-1);
as.mutable_ownershipintent()->RemoveLast();
break;
}
}
......
......@@ -2,7 +2,6 @@
#include "ObjectStoreChoice.hpp"
#include "ObjectOps.hpp"
#include "ContextHandle.hpp"
#include "objectstore/cta.pb.h"
#include "utils/Timer.hpp"
......@@ -47,11 +46,13 @@ public:
std::string name();
void flushContexts();
~Agent();
std::string nextId(const std::string & childType);
ContextHandleImplementation<myOS> & getFreeContext();
ContextHandle & getFreeContext();
void addToIntend (std::string container, std::string name, serializers::ObjectType objectType);
......@@ -98,7 +99,7 @@ private:
bool m_observerVersion;
uint64_t m_nextId;
static const size_t c_handleCount = 100;
ContextHandleImplementation<myOS> m_contexts[c_handleCount];
ContextHandle m_contexts[c_handleCount];
};
}}
\ No newline at end of file
......@@ -150,7 +150,7 @@ public:
bool checkAlive(Agent & agent) {
uint64_t newHeartBeatCount = m_agentVisitor.getHeartbeatCount(agent);
if (newHeartBeatCount == m_hearbeatCounter && m_timer.secs() > 0.1)
if (newHeartBeatCount == m_hearbeatCounter && m_timer.secs() > 120)
return false;
m_hearbeatCounter = newHeartBeatCount;
return true;
......
#pragma once
#include "ObjectStores.hpp"
namespace cta { namespace objectstore {
template <class C>
class ContextHandleImplementation: public ContextHandle {};
template <>
class ContextHandleImplementation <ObjectStoreVFS>: public ContextHandle {
public:
ContextHandleImplementation(): m_set(false), m_fd(-1) {}
virtual void set(int fd) { m_set = true; m_fd = fd; __sync_synchronize(); }
virtual int get(int) { return m_fd; }
virtual void reset() { m_set = false; m_fd = -1; __sync_synchronize(); }
virtual bool isSet() { return m_set; }
virtual void release() { __sync_synchronize(); if (m_set && -1 != m_fd) ::close(m_fd); reset(); }
virtual ~ContextHandleImplementation() {}
private:
bool m_set;
volatile int m_fd;
};
template <>
class ContextHandleImplementation <ObjectStoreRados>: public ContextHandle {
public:
ContextHandleImplementation(): m_set(false) {}
virtual void set(int) { m_set = true; }
virtual int get(int) { return 0; }
virtual void reset() { m_set = false; }
virtual bool isSet() { return m_set; }
virtual void release() { reset(); }
virtual ~ContextHandleImplementation() {}
private:
bool m_set;
};
}}
\ No newline at end of file
......@@ -103,7 +103,8 @@ public:
uint64_t size(Agent & agent) {
serializers::FIFO fs;
updateFromObjectStore(fs, agent.getFreeContext());
return fs.name_size();
uint64_t ret = fs.name_size() - fs.readpointer();
return ret;
}
private:
......
......@@ -17,12 +17,18 @@ namespace cta { namespace objectstore {
class ContextHandle {
public:
virtual void set(int) = 0;
virtual int get(int) = 0;
virtual void reset() = 0;
virtual bool isSet() = 0;
virtual void release() = 0;
virtual ~ContextHandle() {};
ContextHandle(): m_set(false), m_hasFd(false), m_fd(-1) {}
void set(int fd) { m_set = true; m_fd = fd; m_hasFd = true; __sync_synchronize(); }
void set() { m_set = true; }
int get(int) { return m_fd; }
void reset() { m_set = false; m_fd = -1; m_hasFd = false; __sync_synchronize(); }
bool isSet() { return m_set; }
void release() { if (!m_hasFd) return; __sync_synchronize(); if (m_set && -1 != m_fd) ::close(m_fd); reset(); }
~ContextHandle() { release(); }
private:
bool m_set;
bool m_hasFd;
volatile int m_fd;
};
class ObjectStore {
......@@ -117,12 +123,12 @@ public:
virtual void remove(std::string name) {
std::string path = m_root+"/" + name;
cta::exception::Errnum::throwOnNonZero(unlink(name.c_str()));
cta::exception::Errnum::throwOnNonZero(unlink(path.c_str()));
}
void lockHelper(std::string name, ContextHandle & context, int type) {
std::string path = m_root + "/" + name + ".lock";
context.set(::open(path.c_str(), O_CREAT, S_IRWXU));
std::string path = m_root + "/" + name;
context.set(::open(path.c_str(), O_RDONLY, S_IRWXU));
cta::exception::Errnum::throwOnMinusOne(context.get(0),
"In ObjectStoreVFS::lockHelper, failed to open the lock file.");
cta::exception::Errnum::throwOnMinusOne(::flock(context.get(0), LOCK_EX),
......@@ -245,6 +251,7 @@ public:
cta::exception::Errnum::throwOnReturnedErrno(-rc,
std::string("In ObjectStoreRados::lockExclusive, failed to librados::IoCtx::lock_exclusive: ")+
name + "/" + "lock" + "/" + client.str() + "//");
context.set();
std::cout << "LockedExclusive: " << name << "/" << "lock" << "/" << client.str() << "//@" << where << std::endl;
}
......@@ -267,6 +274,7 @@ public:
cta::exception::Errnum::throwOnReturnedErrno(-rc,
std::string("In ObjectStoreRados::lockShared, failed to librados::IoCtx::lock_shared: ")+
name + "/" + "lock" + "/" + client.str() + "//");
context.set();
std::cout << "LockedShared: " << name << "/" << "lock" << "/" << client.str() << "//@" << where << std::endl;
}
......@@ -282,6 +290,7 @@ public:
-m_radosCtx.unlock(name, "lock", client.str()),
std::string("In ObjectStoreRados::lockExclusive, failed to lock_exclusive ")+
name);
context.reset();
std::cout << "Unlocked: " << name << "/" << "lock" << "/" << client.str() << "//@" << where << std::endl;
}
......
......@@ -6,7 +6,6 @@
#include "ObjectStoreChoice.hpp"
#include "RootEntry.hpp"
#include "Action.hpp"
#include "ContextHandle.hpp"
#include "ObjectStructureDumper.hpp"
#include "JobPool.hpp"
#include "AgentRegister.hpp"
......@@ -76,7 +75,7 @@ int main(void){
// Create our own agent representation
cta::objectstore::Agent self(os, "masterProcess");
self.create();
cta::objectstore::ContextHandleImplementation<myOS> ctx;
cta::objectstore::ContextHandle ctx;
// Dump the structure
cta::objectstore::ObjectStrucutreDumper osd;
std::cout << osd.dump(self) << std::endl;
......@@ -158,7 +157,11 @@ int main(void){
// recallJobs.push_back(new jobExecutor(*recallActions[i]));
// recallJobs.back()->start(dc);
// }
// wait for completion of all processes
// wait for FIFO to be empty
while (recallFIFO.size(self)) {
usleep (100 * 1000);
}
while (recallJobs.size()) {
// Wait for completion the job
recallJobs.back()->wait();
......@@ -173,7 +176,9 @@ int main(void){
}
injectorProcess.wait();
gcProcess.kill();
gcProcess.wait();
gcAgent.flushContexts();
// And see the state or affairs
std::cout << osd.dump(self) << std::endl;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment