Commit 0acb3264 authored by Eric Cano's avatar Eric Cano
Browse files

Created separate command line binaries for each action and corrected various bugs.

parent 29943385
......@@ -115,7 +115,7 @@ public:
m_agent.create();
RootEntry re(m_agent);
AgentRegister ar(re.getAgentRegister(m_agent), m_agent);
std::cout << name.str() << " starting" << std::endl;
std::cout << m_agent.name() << " starting" << std::endl;
utils::Timer noAgentTimer;
std::map<std::string, AgentWatchdog *> watchdogs;
while (true) {
......@@ -133,6 +133,7 @@ public:
for (std::map<std::string, AgentWatchdog *>::iterator i=watchdogs.begin();
i != watchdogs.end();) {
if (std::find(agentNames.begin(), agentNames.end(), i->first) == agentNames.end()) {
std::cout << "Stopping to watch completed agent: " << i->first << std::endl;
delete i->second;
// The post-increment returns the iterator to erase,
// but sets i to the next value, which will remain valid.
......@@ -146,8 +147,13 @@ public:
i != agentNames.end(); i++) {
if(watchdogs.find(*i) == watchdogs.end()) {
try {
std::cout << "Trying to check for agent " << *i << "... ";
watchdogs[*i] = new AgentWatchdog(*i, m_agent);
} catch ( cta::exception::Exception & ) {}
std::cout << "OK" << std::endl;
} catch ( cta::exception::Exception & ) {
std::cout << "The agent structure is gone. Removing it from register." << std::endl;
ar.removeElement(*i, m_agent);
}
}
}
// And now check the heartbeats of the agents
......@@ -157,6 +163,10 @@ public:
if (!i->second->checkAlive(m_agent)) {
collectGarbage(i->first);
delete i->second;
// Delete the agent record and de-reference it
std::cout << "Deleting agent " << i->first << " and removing it from register" << std::endl;
try { m_agent.objectStore().remove(i->first); } catch (std::exception &) {}
try { ar.removeElement(i->first, m_agent); } catch (std::exception &) {}
// The post-increment returns the iterator to erase,
// but sets i to the next value, which will remain valid.
watchdogs.erase(i++);
......@@ -168,6 +178,7 @@ public:
watchdogs.erase(i++);
}
}
usleep(100*1000);
}
}
......@@ -181,7 +192,7 @@ private:
std::cout << "In garbage collector, found a dead agent: " << agentName << std::endl;
// If the agent entry does not exist anymore, we're done.
// print the recall FIFO
std::cout << "FIFO before garbage collection:" << std::endl;
std::cout << "Recall FIFO before garbage collection:" << std::endl;
try {
RootEntry re(m_agent);
JobPool jp(re.getJobPool(m_agent), m_agent);
......@@ -190,79 +201,22 @@ private:
} catch (std::exception&) {
}
AgentVisitor ag(agentName, m_agent);
// Iterate on intended objects
std::list<AgentVisitor::intentEntry> intendedObjects = ag.getIntentLog(m_agent);
for (std::list<AgentVisitor::intentEntry>::iterator i=intendedObjects.begin();
i != intendedObjects.end(); i++) {
switch (i->objectType) {
case serializers::JobPool_t:
{
// We need to check whether the job pool is plugged into the
// root entry and just discard it if it is not.
RootEntry re(m_agent);
std::string jopPoolName;
try {
jopPoolName = re.getJobPool(m_agent);
} catch (std::exception&) {
// If we cannot find a reference to any job pool, then
// this one is not referenced
m_agent.objectStore().remove(i->name);
break;
}
if (jopPoolName != i->name)
m_agent.objectStore().remove(i->name);
break;
}
case serializers::RecallFIFO_t:
// We need to check that this recall FIFO is plugged in the job pool
// structure
{
RootEntry re(m_agent);
std::string recallFIFOName;
try {
JobPool jp(re.getJobPool(m_agent), m_agent);
recallFIFOName = jp.getRecallFIFO(m_agent);
} catch (std::exception&) {
// If we cannot find a reference to any recall FIFO, then
// this one is not referenced
m_agent.objectStore().remove(i->name);
break;
}
if (recallFIFOName != i->name)
m_agent.objectStore().remove(i->name);
break;
}
case serializers::RecallJob_t:
{
// The recall job here can only think of on FIFO it could be in. This
// will be more varied in the future.
RecallJob rj(i->name, m_agent);
FIFO RecallFIFO(rj.owner(m_agent), m_agent);
// We repost the job unconditionally in the FIFO. In case of an attempted
// pop, the actual ownership (should be the FIFO) will be checked by
// the consumer. If the owner is not right, FIFO entry will simply be
// discarded.
RecallFIFO.push(i->name, m_agent);
break;
}
case serializers::RootEntry_t:
std::cout << "Unexpected entry type in garbage collection: RootEntry" << std::endl;
break;
case serializers::AgentRegister_t:
std::cout << "Unexpected entry type in garbage collection: AgentRegister" << std::endl;
break;
case serializers::Agent_t:
std::cout << "Unexpected entry type in garbage collection: Agent" << std::endl;
break;
case serializers::MigrationFIFO_t:
std::cout << "Unexpected entry type in garbage collection: MigrationFIFO" << std::endl;
break;
case serializers::Counter_t:
std::cout << "Unexpected entry type in garbage collection: Counter" << std::endl;
break;
}
collectIntendedObject(*i);
}
// Iterate on owned objects
std::list<AgentVisitor::ownershipEntry> ownedObjects = ag.getOwnershipLog(m_agent);
for (std::list<AgentVisitor::ownershipEntry>::iterator i=ownedObjects.begin();
i != ownedObjects.end(); i++) {
collectOwnedObject(*i);
std::cout << "Considering owned object " << i->name
<< " (type:" << i->objectType << ")" << std::endl;
}
// print the recall FIFO
std::cout << "FIFO before garbage collection:" << std::endl;
std::cout << "Recall FIFO after garbage collection:" << std::endl;
try {
RootEntry re(m_agent);
JobPool jp(re.getJobPool(m_agent), m_agent);
......@@ -273,6 +227,135 @@ private:
} catch (std::exception&) {
}
}
void collectIntendedObject(AgentVisitor::intentEntry & i) {
std::cout << "Considering intended object " << i.name << " (container:" << i.container
<< " type:" << i.objectType << ")" << std::endl;
switch (i.objectType) {
case serializers::JobPool_t:
{
// We need to check whether the job pool is plugged into the
// root entry and just discard it if it is not.
RootEntry re(m_agent);
std::string jopPoolName;
try {
jopPoolName = re.getJobPool(m_agent);
} catch (std::exception&) {
// If we cannot find a reference to any job pool, then
// this one is not referenced
std::cout << "Could not get the job pool name from the root entry."
<< " Deleting this job pool." << std::endl;
m_agent.objectStore().remove(i.name);
break;
}
if (jopPoolName != i.name) {
std::cout << "This job pool is not the one referenced in the root entry. (root's="
<< jopPoolName << ")"
<< " Deleting this job pool." << std::endl;
m_agent.objectStore().remove(i.name);
}
break;
}
case serializers::RecallFIFO_t:
// We need to check that this recall FIFO is plugged in the job pool
// structure
{
RootEntry re(m_agent);
std::string recallFIFOName;
try {
JobPool jp(re.getJobPool(m_agent), m_agent);
recallFIFOName = jp.getRecallFIFO(m_agent);
} catch (std::exception&) {
std::cout << "Could not get the recall FIFO name from the job pool."
<< " Deleting this recallFIFO." << std::endl;
// If we cannot find a reference to any recall FIFO, then
// this one is not referenced
m_agent.objectStore().remove(i.name);
break;
}
if (recallFIFOName != i.name) {
std::cout << "This recallFIFO is not the one referenced in the job pool entry. (jp's="
<< recallFIFOName << ")"
<< " Deleting this recall FIFO." << std::endl;
m_agent.objectStore().remove(i.name);
}
break;
}
case serializers::RecallJob_t:
{
// The recall job here can only think of on FIFO it could be in. This
// will be more varied in the future.
RecallJob rj(i.name, m_agent);
std::string owner = rj.owner(m_agent);
std::cout << "Will post the recall job back to its owner FIFO." << std::endl;
FIFO RecallFIFO(owner, m_agent);
// We repost the job unconditionally in the FIFO. In case of an attempted
// pop, the actual ownership (should be the FIFO) will be checked by
// the consumer. If the owner is not right, FIFO entry will simply be
// discarded.
RecallFIFO.push(i.name, m_agent);
break;
}
case serializers::RootEntry_t:
std::cout << "Unexpected entry type in garbage collection: RootEntry" << std::endl;
break;
case serializers::AgentRegister_t:
std::cout << "Unexpected entry type in garbage collection: AgentRegister" << std::endl;
break;
case serializers::Agent_t:
std::cout << "Unexpected entry type in garbage collection: Agent" << std::endl;
break;
case serializers::MigrationFIFO_t:
std::cout << "Unexpected entry type in garbage collection: MigrationFIFO" << std::endl;
break;
case serializers::Counter_t:
std::cout << "Unexpected entry type in garbage collection: Counter" << std::endl;
break;
}
}
void collectOwnedObject(AgentVisitor::ownershipEntry & i) {
std::cout << "Considering owned object " << i.name
<< " (type:" << i.objectType << ")" << std::endl;
switch (i.objectType) {
case serializers::JobPool_t:
std::cout << "Unexpected entry type in garbage collection: JobPool" << std::endl;
break;
case serializers::RecallFIFO_t:
std::cout << "Unexpected entry type in garbage collection: RecallFIFO" << std::endl;
break;
case serializers::RecallJob_t:
{
// The recall job here can only think of on FIFO it could be in. This
// will be more varied in the future.
RecallJob rj(i.name, m_agent);
std::string owner = rj.owner(m_agent);
std::cout << "Will post the recall job back to its owner FIFO: " << owner << std::endl;
FIFO RecallFIFO(owner, m_agent);
// We repost the job unconditionally in the FIFO. In case of an attempted
// pop, the actual ownership (should be the FIFO) will be checked by
// the consumer. If the owner is not right, FIFO entry will simply be
// discarded.
RecallFIFO.push(i.name, m_agent);
break;
}
case serializers::RootEntry_t:
std::cout << "Unexpected entry type in garbage collection: RootEntry" << std::endl;
break;
case serializers::AgentRegister_t:
std::cout << "Unexpected entry type in garbage collection: AgentRegister" << std::endl;
break;
case serializers::Agent_t:
std::cout << "Unexpected entry type in garbage collection: Agent" << std::endl;
break;
case serializers::MigrationFIFO_t:
std::cout << "Unexpected entry type in garbage collection: MigrationFIFO" << std::endl;
break;
case serializers::Counter_t:
std::cout << "Unexpected entry type in garbage collection: Counter" << std::endl;
break;
}
}
};
}}
......
......@@ -219,7 +219,7 @@ void cta::objectstore::Agent::heartbeat(Agent& agent) {
ContextHandle & context = agent.getFreeContext();
serializers::Agent as;
lockExclusiveAndRead(as, context, __func__);
as.set_heartbeatcount(as.has_heartbeatcount()+1);
as.set_heartbeatcount(as.heartbeatcount()+1);
write(as);
unlock(context);
}
......
......@@ -2,6 +2,7 @@
#include "ObjectOps.hpp"
#include "objectstore/cta.pb.h"
#include "Agent.hpp"
#include <string>
namespace cta { namespace objectstore {
......@@ -101,9 +102,9 @@ public:
serializers::Agent as;
updateFromObjectStore(as, agent.getFreeContext());
std::list<ownershipEntry> ret;
for (int i=0; i<as.creationintent_size(); i++) {
ret.push_back(ownershipEntry(as.creationintent(i).name(),
as.creationintent(i).type()));
for (int i=0; i<as.ownershipintent_size(); i++) {
ret.push_back(ownershipEntry(as.ownershipintent(i).name(),
as.ownershipintent(i).type()));
}
return ret;
}
......@@ -124,6 +125,7 @@ public:
std::stringstream ret;
ret<< "<<<< Agent " << selfName() << " dump start" << std::endl
<< "name=" << as.name() << std::endl
<< "hearbeat=" << as.heartbeatcount() << std::endl
<< "Ownership intent size=" << as.ownershipintent_size() << std::endl;
for (int i=0; i<as.ownershipintent_size(); i++) {
ret << "ownershipIntent[" << i << "]: name=" << as.ownershipintent(i).name()
......@@ -150,7 +152,7 @@ public:
bool checkAlive(Agent & agent) {
uint64_t newHeartBeatCount = m_agentVisitor.getHeartbeatCount(agent);
if (newHeartBeatCount == m_hearbeatCounter && m_timer.secs() > 120)
if (newHeartBeatCount == m_hearbeatCounter && m_timer.secs() > 5)
return false;
m_hearbeatCounter = newHeartBeatCount;
return true;
......
......@@ -10,7 +10,7 @@ set (CTAProtoFiles
PROTOBUF_GENERATE_CPP(CTAProtoSources CTAProtoHeaders ${CTAProtoFiles})
add_executable (tapeResourceMangerTest tapeResourceManagerTest.cpp
add_library (CTAObjectStore
${CTAProtoSources}
RootEntry.cpp
Agent.cpp
......@@ -24,6 +24,26 @@ add_executable (tapeResourceMangerTest tapeResourceManagerTest.cpp
threading/Threading.cpp
utils/Timer.cpp)
target_link_libraries(tapeResourceMangerTest
protobuf
rados)
add_executable(tapeResourceManagerTest tapeResourceManagerTest.cpp)
target_link_libraries(tapeResourceManagerTest
protobuf rados CTAObjectStore)
add_executable(dumpStructure dumpStructure.cpp)
target_link_libraries(dumpStructure
protobuf rados CTAObjectStore)
add_executable(jobPoster jobPoster.cpp)
target_link_libraries(jobPoster
protobuf rados CTAObjectStore)
add_executable(recaller recaller.cpp)
target_link_libraries(recaller
protobuf rados CTAObjectStore)
add_executable(garbageCollector garbageCollector.cpp)
target_link_libraries(garbageCollector
protobuf rados CTAObjectStore)
add_executable(createEnvironment createEnvironment.cpp)
target_link_libraries(createEnvironment
protobuf rados CTAObjectStore)
\ No newline at end of file
......@@ -88,7 +88,7 @@ public:
updateFromObjectStore(res, agent.getFreeContext());
// If the registry is defined, return it, job done.
if (res.recallcounter().size())
return res.recall();
return res.recallcounter();
throw NotAllocatedEx("In RootEntry::getRecallCounter: recallCounter not yet allocated");
}
......
......@@ -73,6 +73,11 @@ public:
throw cta::exception::Errnum("Failed to create temporary directory");
}
}
void noDeleteOnExit() {
m_deleteOnExit = false;
}
ObjectStoreVFS(std::string path, std::string user, std::string pool):
m_root(path), m_deleteOnExit(false) {}
virtual ~ObjectStoreVFS() {
......
......@@ -5,6 +5,7 @@
#include "Agent.hpp"
#include "JobPool.hpp"
#include "Counter.hpp"
#include "AgentVisitor.hpp"
#include <iostream>
namespace cta { namespace objectstore {
......
#include "ObjectStoreChoice.hpp"
#include "Action.hpp"
#include "exception/Exception.hpp"
#include <iostream>
#include <stdint.h>
int main(int argc, char** argv) {
try {
#if USE_RADOS
myOS os("", "tapetest", "tapetest");
#else
myOS os;
os.noDeleteOnExit();
#endif
std::cout << "os.path=" << os.path() << " os.user=" << os.user()
<< "os.pool=" << os.pool() << std::endl;
cta::objectstore::RootEntry::init(os);
cta::objectstore::Agent agent(os, "CommandLineCreateEnvironment");
agent.create();
// Get hold of the root entry
cta::objectstore::RootEntry re(agent);
// Create the job pool
std::cout << "=============== About to add job pool" << std::endl;
cta::objectstore::JobPool jobPool(re.allocateOrGetJobPool(agent), agent);
// Create the recall FIFO
std::cout << "=============== About to add recall FIFO" << std::endl;
cta::objectstore::FIFO recallFIFO(jobPool.allocateOrGetRecallFIFO(agent), agent);
// Create the counter
std::cout << "=============== About to add recall counter" << std::endl;
cta::objectstore::Counter recallCounter(jobPool.allocateOrGetRecallCounter(agent), agent);
} catch (std::exception & e) {
std::cout << "Got exception: " << e.what();
return EXIT_FAILURE;
}
return EXIT_SUCCESS;
}
#include "ObjectStructureDumper.hpp"
#include "ObjectStoreChoice.hpp"
#include <exception>
#include <iostream>
int main(int argc, char** argv) {
if (argc != 4) {
std::cout << "Expected 3 parameters: <path for VFS storage> <username for Ceph> <pool for Ceph>" << std::endl;
return EXIT_FAILURE;
}
try {
myOS os(argv[1], argv[2], argv[3]);
cta::objectstore::Agent agent(os, "CommandLineDumper");
agent.create();
cta::objectstore::ObjectStructureDumper osd;
std::cout << osd.dump(agent);
} catch (std::exception & e) {
std::cout << "Got exception: " << e.what();
return EXIT_FAILURE;
}
return EXIT_SUCCESS;
}
\ No newline at end of file
#include "ObjectStoreChoice.hpp"
#include "Action.hpp"
#include "exception/Exception.hpp"
#include <iostream>
#include <stdint.h>
int main(int argc, char** argv) {
if (argc != 4) {
std::cout << "Expected 3 parameters: <path for VFS storage> <username for Ceph> <pool for Ceph>" << std::endl;
return EXIT_FAILURE;
}
try {
myOS os(argv[1], argv[2], argv[3]);
cta::objectstore::Agent agent(os);
cta::objectstore::GarbageCollector garbageCollector(agent);
garbageCollector.execute();
} catch (std::exception & e) {
std::cout << "Got exception: " << e.what();
return EXIT_FAILURE;
}
return EXIT_SUCCESS;
}
\ No newline at end of file
#include "ObjectStoreChoice.hpp"
#include "Action.hpp"
#include "exception/Exception.hpp"
#include <iostream>
#include <stdint.h>
int main(int argc, char** argv) {
if (argc != 5) {
std::cout << "Expected 4 parameters: <path for VFS storage> <username for Ceph> <pool for Ceph>" << std::endl;
return EXIT_FAILURE;
}
try {
myOS os(argv[1], argv[2], argv[3]);
uint64_t count;
if (! (std::istringstream(argv[4]) >> count)) {
throw cta::exception::Exception("Cound not convert 4th argument into a number");
}
cta::objectstore::Agent agent(os);
cta::objectstore::JobPoster poster(agent, 0, count);
poster.execute();
} catch (std::exception & e) {
std::cout << "Got exception: " << e.what();
return EXIT_FAILURE;
}
return EXIT_SUCCESS;
}
#include "ObjectStoreChoice.hpp"
#include "Action.hpp"
#include "exception/Exception.hpp"
#include "Agent.hpp"
#include <iostream>
#include <stdint.h>
int main(int argc, char** argv) {
if (argc != 5) {
std::cout << "Expected 4 parameters: <path for VFS storage> <username for Ceph> <pool for Ceph>" << std::endl;
return EXIT_FAILURE;
}
try {
myOS os(argv[1], argv[2], argv[3]);
uint64_t count;
if (! (std::istringstream(argv[4]) >> count)) {
throw cta::exception::Exception("Cound not convert 4th argument into a number");
}
cta::objectstore::Agent agent(os);
cta::objectstore::Recaller recaller(agent, count);
recaller.execute();
} catch (std::exception & e) {
std::cout << "Got exception: " << e.what();
return EXIT_FAILURE;
}
return EXIT_SUCCESS;
}
\ No newline at end of file
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment