Skip to content
Snippets Groups Projects
Commit fafeac4d authored by Eric Cano's avatar Eric Cano
Browse files

#62: Created queued operations for agents.

The queue is housed in the AgentReference class. It currently covers the add to ownership
and remove from ownership operations. This queued system guarantees in-order execution
of operations.
parent cc45c5a6
Branches
Tags
No related merge requests found
......@@ -27,6 +27,7 @@
namespace cta { namespace objectstore {
class GenericObject;
class AgentReference;
/**
* Class containing agent information and managing the update of the
......@@ -38,6 +39,7 @@ class GenericObject;
*/
class Agent: public ObjectOps<serializers::Agent, serializers::Agent_t> {
friend class AgentReference;
public:
CTA_GENERATE_EXCEPTION_CLASS(AgentStillOwnsObjects);
Agent(GenericObject & go);
......@@ -102,10 +104,12 @@ public:
bool m_present;
};*/
//private:
void addToOwnership(std::string name);
void removeFromOwnership(std::string name);
//public:
std::list<std::string> getOwnershipList();
std::string dump();
......
......@@ -17,6 +17,7 @@
*/
#include "AgentReference.hpp"
#include "Agent.hpp"
#include "common/exception/Errnum.hpp"
#include <sstream>
......@@ -47,6 +48,9 @@ AgentReference::AgentReference(const std::string & clientType) {
<< std::setw(2) << localNow.tm_min << ":"
<< std::setw(2) << localNow.tm_sec;
m_agentAddress = aid.str();
// Initialize the serialization token for queued actions
m_nextQueueExecutionPromise.reset(new std::promise<void>);
m_nextQueueExecutionPromise->set_value();
}
std::string AgentReference::getAgentAddress() {
......@@ -59,4 +63,99 @@ std::string AgentReference::nextId(const std::string& childType) {
return id.str();
}
}}
\ No newline at end of file
void AgentReference::addToOwnership(const std::string& objectAddress, objectstore::Backend& backend) {
Action a{AgentOperation::Add, objectAddress, std::promise<void>()};
queueAndExecuteAction(a, backend);
}
void AgentReference::removeFromOnership(const std::string& objectAddress, objectstore::Backend& backend) {
Action a{AgentOperation::Remove, objectAddress, std::promise<void>()};
queueAndExecuteAction(a, backend);
}
void AgentReference::queueAndExecuteAction(Action& action, objectstore::Backend& backend) {
// First, we need to determine if a queue exists or not.
// If so, we just use it, and if not, we create and serve it.
std::unique_lock<std::mutex> ulGlobal(m_currentQueueMutex);
if (m_currentQueue) {
// There is already a queue
std::unique_lock<std::mutex> ulQueue(m_currentQueue->mutex);
m_currentQueue->queue.push_back(&action);
// If this is time to run, wake up the serving thread
if (m_currentQueue->queue.size() + 1 >= m_maxQueuedItems) {
m_currentQueue->promise.set_value();
m_currentQueue = nullptr;
}
// Release the locks and wait for action execution
ulQueue.unlock();
ulGlobal.unlock();
action.promise.get_future().get();
} else {
// There is not queue, so we need to create and serve it ourselves
ActionQueue q;
// Lock the queue
std::unique_lock<std::mutex> ulq(q.mutex);
// Get it referenced
m_currentQueue = &q;
// Get our execution promise and leave one behind
std::unique_ptr<std::promise<void>> promiseForThisQueue(m_nextQueueExecutionPromise.release());
// Leave a promise behind for the next queue
m_nextQueueExecutionPromise.reset(new std::promise<void>);
// Keep a pointer to it, so we will signal our own completion to our successor queue.
std::promise<void> * promiseForNextQueue = m_nextQueueExecutionPromise.get();
// We can now unlock the queue and the general lock: queuing is open.
ulq.unlock();
ulGlobal.unlock();
// We wait for time or size of queue
q.promise.get_future().wait_for(std::chrono::milliseconds(100));
// Make sure we are not listed anymore a the queue taking jobs (this would happen
// in case of timeout.
ulGlobal.lock();
if (m_currentQueue == &q)
m_currentQueue = nullptr;
ulGlobal.unlock();
// Make sure no leftover thread is still writing to the queue.
ulq.lock();
// Off we go! Add the actions to the queue
try {
objectstore::Agent ag(m_agentAddress, backend);
ag.fetch();
// First we apply our own modification
appyAction(action, ag);
// Then those of other threads
for (auto a: q.queue)
appyAction(*a, ag);
// and commit
ag.commit();
} catch (...) {
// Something wend wrong: , we release the next batch of changes
promiseForNextQueue->set_value();
// We now pass the exception to all threads
for (auto a: q.queue)
a->promise.set_exception(std::current_exception());
// And to our own caller
throw;
}
// Things went well. We pass the token to the next queue
promiseForNextQueue->set_value();
// and release the other threads
for (auto a: q.queue)
a->promise.set_value();
}
}
void AgentReference::appyAction(Action& action, objectstore::Agent& agent) {
switch (action.op) {
case AgentOperation::Add:
agent.addToOwnership(action.objectAddress);
break;
case AgentOperation::Remove:
agent.removeFromOwnership(action.objectAddress);
break;
default:
throw cta::exception::Exception("In AgentReference::appyAction(): unknown operation.");
}
}
}} // namespace cta::objectstore
\ No newline at end of file
......@@ -18,12 +18,16 @@
#pragma once
#include "common/threading/Mutex.hpp"
#include "objectstore/Backend.hpp"
#include <atomic>
#include <string>
#include <future>
#include <list>
namespace cta { namespace objectstore {
class Agent;
/**
* A class allowing the passing of the address of an Agent object, plus a thread safe
* object name generator, that will allow unique name generation by several threads.
......@@ -47,6 +51,22 @@ public:
*/
std::string nextId(const std::string & childType);
/**
* Adds an object address to the referenced agent. The additions and removals
* are queued in memory so that several threads can share the same access.
* The execution order is guaranteed.
* @param objectAddress
*/
void addToOwnership(const std::string &objectAddress, objectstore::Backend& backend);
/**
* Removes an object address from the referenced agent. The additions and removals
* are queued in memory so that several threads can share the same access.
* The execution order is guaranteed.
* @param objectAddress
*/
void removeFromOnership(const std::string &objectAddress, objectstore::Backend& backend);
/**
* Gets the address of the Agent object generated on construction.
* @return the agent object address.
......@@ -55,6 +75,61 @@ public:
private:
std::atomic<uint64_t> m_nextId;
std::string m_agentAddress;
/**
* An enumeration describing all the queueable operations
*/
enum class AgentOperation: char {
Add,
Remove
};
/**
* An operation with its parameter and promise
*/
struct Action {
AgentOperation op;
const std::string & objectAddress;
std::promise<void> promise;
};
/**
* The queue with the lock and flush control
*/
struct ActionQueue {
std::mutex mutex;
std::list<Action *> queue;
std::promise<void> promise;
};
/**
* Helper function applying the action to the already fetched agent.
* @param action
* @param agent
*/
void appyAction(Action &action, objectstore::Agent & agent);
/**
* The global function actually doing the job: creates a queue if needed, add
* the action to it and flushes them based on time and count. Uses an algorithm
* similar to queueing in ArchiveQueues and RetrieveQeueues.
* @param action the action
*/
void queueAndExecuteAction(Action& action, objectstore::Backend& backend);
std::mutex m_currentQueueMutex;
ActionQueue * m_currentQueue = nullptr;
/**
* This pointer holds a promise that will be picked up by the thread managing
* the a queue in memory (promise(n)). The same thread will leave a fresh promise
* (promise(n+1) in this pointer for the next thread to pick up. The thread will
* then wait for promise(n) to be fullfilled to flush to queue to the object store
* and will fullfill promise(n+1) after doing so.
* This will ensure that the queues will be flushed in order, one at a time.
* One at a time also minimize contention on the object store.
*/
std::unique_ptr<std::promise<void>> m_nextQueueExecutionPromise;
const size_t m_maxQueuedItems = 100;
};
}}
......@@ -25,7 +25,7 @@ namespace cta { namespace objectstore {
//------------------------------------------------------------------------------
// Constructor
//------------------------------------------------------------------------------
BackendPopulator::BackendPopulator(cta::objectstore::Backend & be): m_backend(be), m_agentReference("OStoreDBFactory") {
BackendPopulator::BackendPopulator(cta::objectstore::Backend & be): m_backend(be), m_agentReference("OStoreDBFactoryAgent") {
cta::objectstore::RootEntry re(m_backend);
cta::objectstore::ScopedExclusiveLock rel(re);
re.fetch();
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment