Skip to content
Snippets Groups Projects
Commit b82b9eba authored by Eric Cano's avatar Eric Cano
Browse files

#62: Made the AgentReference timeout setable.

The timeout is now set to zero in unit tests.
parent 121587b8
Branches
Tags
No related merge requests found
......@@ -63,6 +63,10 @@ std::string AgentReference::nextId(const std::string& childType) {
return id.str();
}
void AgentReference::setQueueFlushTimeout(std::chrono::duration<uint64_t, std::milli> timeout) {
m_queueFlushTimeout = timeout;
}
void AgentReference::addToOwnership(const std::string& objectAddress, objectstore::Backend& backend) {
Action a{AgentOperation::Add, objectAddress, std::promise<void>()};
queueAndExecuteAction(a, backend);
......@@ -107,7 +111,7 @@ void AgentReference::queueAndExecuteAction(Action& action, objectstore::Backend&
ulq.unlock();
ulGlobal.unlock();
// We wait for time or size of queue
q.promise.get_future().wait_for(std::chrono::milliseconds(100));
q.promise.get_future().wait_for(m_queueFlushTimeout);
// Make sure we are not listed anymore a the queue taking jobs (this would happen
// in case of timeout.
ulGlobal.lock();
......
......@@ -51,6 +51,12 @@ public:
*/
std::string nextId(const std::string & childType);
/**
* Modifies the timeout for queue flushes. Useful for unit tests.
* @param timeout the new timeout
*/
void setQueueFlushTimeout(std::chrono::duration<uint64_t, std::milli> timeout);
/**
* Adds an object address to the referenced agent. The additions and removals
* are queued in memory so that several threads can share the same access.
......@@ -130,6 +136,7 @@ private:
*/
std::unique_ptr<std::promise<void>> m_nextQueueExecutionPromise;
const size_t m_maxQueuedItems = 100;
std::chrono::duration<uint64_t, std::milli> m_queueFlushTimeout = std::chrono::milliseconds(100);
};
}}
......@@ -97,6 +97,7 @@ TEST(ObjectStore, GarbageCollectorRegister) {
rel.release();
// Create an agent and add and agent register to it as an owned object
cta::objectstore::AgentReference agrA("unitTestAgentA");
agrA.setQueueFlushTimeout(std::chrono::milliseconds(0));
cta::objectstore::Agent agA(agrA.getAgentAddress(), be);
agA.initialize();
agA.setTimeout_us(0);
......@@ -114,6 +115,7 @@ TEST(ObjectStore, GarbageCollectorRegister) {
}
// Create the garbage colletor and run it twice.
cta::objectstore::AgentReference gcAgentRef("unitTestGarbageCollector");
gcAgentRef.setQueueFlushTimeout(std::chrono::milliseconds(0));
cta::objectstore::Agent gcAgent(gcAgentRef.getAgentAddress(), be);
gcAgent.initialize();
gcAgent.setTimeout_us(0);
......@@ -138,6 +140,7 @@ TEST(ObjectStore, GarbageCollectorArchiveQueue) {
// Here we check that can successfully call agentRegister's garbage collector
cta::objectstore::BackendVFS be;
cta::objectstore::AgentReference agentRef("unitTestGarbageCollector");
agentRef.setQueueFlushTimeout(std::chrono::milliseconds(0));
cta::objectstore::Agent agent(agentRef.getAgentAddress(), be);
// Create the root entry
cta::objectstore::RootEntry re(be);
......@@ -151,6 +154,7 @@ TEST(ObjectStore, GarbageCollectorArchiveQueue) {
rel.release();
// Create an agent and add and agent register to it as an owned object
cta::objectstore::AgentReference agrA("unitTestAgentA");
agrA.setQueueFlushTimeout(std::chrono::milliseconds(0));
cta::objectstore::Agent agA(agrA.getAgentAddress(), be);
agA.initialize();
agA.setTimeout_us(0);
......@@ -168,6 +172,7 @@ TEST(ObjectStore, GarbageCollectorArchiveQueue) {
}
// Create the garbage colletor and run it twice.
cta::objectstore::AgentReference gcAgentRef("unitTestGarbageCollector");
gcAgentRef.setQueueFlushTimeout(std::chrono::milliseconds(0));
cta::objectstore::Agent gcAgent(gcAgentRef.getAgentAddress(), be);
gcAgent.initialize();
gcAgent.setTimeout_us(0);
......@@ -192,6 +197,7 @@ TEST(ObjectStore, GarbageCollectorDriveRegister) {
// Here we check that can successfully call agentRegister's garbage collector
cta::objectstore::BackendVFS be;
cta::objectstore::AgentReference agentRef("unitTestGarbageCollector");
agentRef.setQueueFlushTimeout(std::chrono::milliseconds(0));
cta::objectstore::Agent agent(agentRef.getAgentAddress(), be);
// Create the root entry
cta::objectstore::RootEntry re(be);
......@@ -205,6 +211,7 @@ TEST(ObjectStore, GarbageCollectorDriveRegister) {
rel.release();
// Create an agent and add the drive register to it as an owned object
cta::objectstore::AgentReference agrA("unitTestAgentA");
agrA.setQueueFlushTimeout(std::chrono::milliseconds(0));
cta::objectstore::Agent agA(agrA.getAgentAddress(), be);
agA.initialize();
agA.setTimeout_us(0);
......@@ -222,6 +229,7 @@ TEST(ObjectStore, GarbageCollectorDriveRegister) {
}
// Create the garbage colletor and run it twice.
cta::objectstore::AgentReference gcAgentRef("unitTestGarbageCollector");
gcAgentRef.setQueueFlushTimeout(std::chrono::milliseconds(0));
cta::objectstore::Agent gcAgent(gcAgentRef.getAgentAddress(), be);
gcAgent.initialize();
gcAgent.setTimeout_us(0);
......@@ -246,6 +254,7 @@ TEST(ObjectStore, GarbageCollectorArchiveRequest) {
// Here we check that can successfully call ArchiveRequests's garbage collector
cta::objectstore::BackendVFS be;
cta::objectstore::AgentReference agentRef("unitTestGarbageCollector");
agentRef.setQueueFlushTimeout(std::chrono::milliseconds(0));
cta::objectstore::Agent agent(agentRef.getAgentAddress(), be);
// Create the root entry
cta::objectstore::RootEntry re(be);
......@@ -259,6 +268,7 @@ TEST(ObjectStore, GarbageCollectorArchiveRequest) {
rel.release();
// Create an agent
cta::objectstore::AgentReference agrA("unitTestAgentA");
agrA.setQueueFlushTimeout(std::chrono::milliseconds(0));
cta::objectstore::Agent agA(agrA.getAgentAddress(), be);
agA.initialize();
agA.setTimeout_us(0);
......@@ -371,6 +381,7 @@ TEST(ObjectStore, GarbageCollectorArchiveRequest) {
}
// Create the garbage collector and run it twice.
cta::objectstore::AgentReference gcAgentRef("unitTestGarbageCollector");
gcAgentRef.setQueueFlushTimeout(std::chrono::milliseconds(0));
cta::objectstore::Agent gcAgent(gcAgentRef.getAgentAddress(), be);
gcAgent.initialize();
gcAgent.setTimeout_us(0);
......
......@@ -73,7 +73,8 @@ TEST (ObjectStore, RootEntryArchiveQueues) {
cta::objectstore::BackendVFS be;
cta::objectstore::EntryLogSerDeser el("user0",
"unittesthost", time(NULL));
cta::objectstore::AgentReference agr("UnitTests");
cta::objectstore::AgentReference agr("UnitTests");
agr.setQueueFlushTimeout(std::chrono::milliseconds(0));
cta::objectstore::Agent ag(agr.getAgentAddress(), be);
ag.initialize();
{
......@@ -140,6 +141,7 @@ TEST (ObjectStore, RootEntryDriveRegister) {
cta::objectstore::EntryLogSerDeser el("user0",
"unittesthost", time(NULL));
cta::objectstore::AgentReference agr("UnitTests");
agr.setQueueFlushTimeout(std::chrono::milliseconds(0));
cta::objectstore::Agent ag(agr.getAgentAddress(), be);
ag.initialize();
{
......@@ -241,7 +243,8 @@ TEST (ObjectStore, RootEntrySchedulerGlobalLock) {
}
cta::objectstore::EntryLogSerDeser el("user0",
"unittesthost", time(NULL));
cta::objectstore::AgentReference agr("UnitTests");
cta::objectstore::AgentReference agr("UnitTests");
agr.setQueueFlushTimeout(std::chrono::milliseconds(0));
cta::objectstore::Agent ag(agr.getAgentAddress(), be);
ag.initialize();
{
......
......@@ -144,6 +144,7 @@ OStoreDBWrapper<cta::objectstore::BackendVFS>::OStoreDBWrapper(
m_backend(new cta::objectstore::BackendVFS()),
m_OStoreDB(*m_backend), m_agentReference("OStoreDBFactory") {
// We need to populate the root entry before using.
m_agentReference.setQueueFlushTimeout(std::chrono::milliseconds(0));
objectstore::RootEntry re(*m_backend);
re.initialize();
re.insert();
......@@ -167,6 +168,7 @@ OStoreDBWrapper<cta::objectstore::BackendRados>::OStoreDBWrapper(
const std::string &context, const std::string &URL) :
m_backend(cta::objectstore::BackendFactory::createBackend(URL).release()),
m_OStoreDB(*m_backend), m_agentReference("OStoreDBFactory") {
m_agentReference.setQueueFlushTimeout(std::chrono::milliseconds(0));
// We need to first clean up possible left overs in the pool
auto l = m_backend->list();
for (auto o=l.begin(); o!=l.end(); o++) {
......
......@@ -140,7 +140,6 @@ TEST_F(castor_tape_tapeserver_daemon_RecallReportPackerTest, RecallReportPackerB
cta::log::StringLogger log("castor_tape_tapeserver_RecallReportPackerBadBadEnd",cta::log::DEBUG);
cta::log::LogContext lc(log);
std::unique_ptr<cta::SchedulerDatabase> mdb(new cta::OStoreDBWrapper<cta::objectstore::BackendVFS>("UnitTest"));
castor::tape::tapeserver::daemon::RecallReportPacker rrp(&retrieveMount,lc);
rrp.startThreads();
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment