Commit b1e9cc60 authored by Eric Cano's avatar Eric Cano
Browse files

Adapted the previous FIFO and created a unit test for it.

parent c230b917
......@@ -57,7 +57,7 @@ TEST_P(BackendAbstractTest, ParametersInterface) {
}
cta::objectstore::BackendVFS osVFS;
#define TEST_RADOS 0
#define TEST_RADOS 1
#if TEST_RADOS
cta::objectstore::BackendRados osRados("tapetest", "tapetest");
INSTANTIATE_TEST_CASE_P(BackendTest, BackendAbstractTest, ::testing::Values(&osVFS, &osRados));
......
......@@ -19,6 +19,7 @@ add_library (CTAObjectStore
BackendRados.cpp
ObjectOps.cpp
ProtocolBuffersAlgorithms.cpp
FIFO.cpp
exception/Backtrace.cpp
exception/Errnum.cpp
exception/Exception.cpp
......@@ -55,6 +56,7 @@ add_library (CTAObjectStore
set(ObjectStoreUnitTests
BackendTest.cpp
RootEntryTest.cpp
FIFOTest.cpp
)
add_executable(unitTests unitTests.cpp ${ObjectStoreUnitTests})
......
......@@ -7,131 +7,34 @@
namespace cta { namespace objectstore {
class FIFO: private ObjectOps<serializers::FIFO> {
class FIFO: public ObjectOps<serializers::FIFO> {
public:
FIFO(const std::string & name, Agent & agent):
ObjectOps<serializers::FIFO>(agent.objectStore(), name) {
serializers::FIFO fs;
getPayloadFromObjectStoreAutoLock(fs, agent.getFreeContext());
}
FIFO(const std::string & name, Backend & os):
ObjectOps<serializers::FIFO>(os, name) {}
void initialize();
private:
void lock(ContextHandle & ctx) {
lockExclusiveAndRead(m_currentState, ctx, __func__);
}
public:
class FIFOEmpty: public cta::exception::Exception {
public:
FIFOEmpty(const std::string & context): cta::exception::Exception(context) {}
};
friend class Transaction;
class Transaction {
public:
Transaction(FIFO & fifo, Agent & agent):
m_fifo(fifo), m_ctx(agent.getFreeContext()), m_writeDone(false) {
m_fifo.lock(m_ctx);
}
~Transaction() {
try {
if(!m_writeDone)
m_fifo.unlock(m_ctx);
} catch (std::exception&) {
} catch (...) {throw;}
}
std::string peek() {
if (m_writeDone)
throw cta::exception::Exception("In FIFO::Transaction::peek: write already occurred");
if (m_fifo.m_currentState.readpointer() >= (uint64_t)m_fifo.m_currentState.name_size()) {
throw FIFOEmpty("In FIFO::Transaction::peek: FIFO empty");
}
return m_fifo.m_currentState.name(m_fifo.m_currentState.readpointer());
}
void popAndUnlock() {
if (m_writeDone)
throw cta::exception::Exception("In FIFO::Transaction::popAndUnlock: write already occurred");
if (m_fifo.m_currentState.readpointer() >= (uint64_t)m_fifo.m_currentState.name_size()) {
throw FIFOEmpty("In FIFO::Transaction::popAndUnlock: FIFO empty");
}
m_fifo.m_currentState.set_readpointer(m_fifo.m_currentState.readpointer()+1);
if (m_fifo.m_currentState.readpointer() > 50) {
m_fifo.compactCurrentState();
}
m_fifo.write(m_fifo.m_currentState);
m_fifo.unlock(m_ctx);
m_writeDone = true;
}
private:
FIFO & m_fifo;
ContextHandle & m_ctx;
bool m_writeDone;
};
Transaction startTransaction(Agent & agent) {
return Transaction(*this, agent);
}
std::string peek();
void pop();
void push(std::string name, Agent & agent) {
serializers::FIFO fs;
ContextHandle & context = agent.getFreeContext();
lockExclusiveAndRead(fs, context, __func__);
fs.add_name(name);
write(fs);
unlock(context);
}
void push(std::string name);
std::string dump(Agent & agent) {
serializers::FIFO fs;
getPayloadFromObjectStoreAutoLock(fs, agent.getFreeContext());
std::stringstream ret;
ret<< "<<<< FIFO dump start" << std::endl
<< "Read pointer=" << fs.readpointer() << std::endl
<< "Array size=" << fs.name_size() << std::endl;
// for (int i=fs.readpointer(); i<fs.name_size(); i++) {
// ret << "name[phys=" << i << " ,log=" << i-fs.readpointer()
// << "]=" << fs.name(i) << std::endl;
// }
ret<< ">>>> FIFO dump end." << std::endl;
return ret.str();
}
std::string dump();
uint64_t size(Agent & agent) {
serializers::FIFO fs;
getPayloadFromObjectStoreAutoLock(fs, agent.getFreeContext());
uint64_t ret = fs.name_size() - fs.readpointer();
return ret;
}
uint64_t size();
std::list<std::string> getContent();
private:
serializers::FIFO m_currentState;
void compactCurrentState() {
uint64_t oldReadPointer = m_currentState.readpointer();
uint64_t oldSize = m_currentState.name_size();
// Copy the elements at position oldReadPointer + i to i (squash all)
// before the read pointer
for (int i = oldReadPointer; i<m_currentState.mutable_name()->size(); i++) {
*m_currentState.mutable_name(i-oldReadPointer) = m_currentState.name(i);
}
// Shorten the name array by oldReadPointer elements
for (uint64_t i = 0; i < oldReadPointer; i++) {
m_currentState.mutable_name()->RemoveLast();
}
// reset the read pointer
m_currentState.set_readpointer(0);
// Check the size is as expected
if ((uint64_t)m_currentState.name_size() != oldSize - oldReadPointer) {
std::stringstream err;
err << "In FIFO::compactCurrentState: wrong size after compaction: "
<< "oldSize=" << oldSize << " oldReadPointer=" << oldReadPointer
<< " newSize=" << m_currentState.name_size();
throw cta::exception::Exception(err.str());
}
}
void compact();
static const size_t c_compactionSize;
};
}}
\ No newline at end of file
......@@ -13,6 +13,7 @@ namespace cta { namespace objectstore {
MAKE_CTA_OBJECTSTORE_OBJECTOPS_TYPEID(MigrationFIFO);
MAKE_CTA_OBJECTSTORE_OBJECTOPS_TYPEID(RecallJob);
MAKE_CTA_OBJECTSTORE_OBJECTOPS_TYPEID(Counter);
MAKE_CTA_OBJECTSTORE_OBJECTOPS_TYPEID(FIFO);
#undef MAKE_CTA_OBJECTSTORE_OBJECTOPS_TYPEID
}}
\ No newline at end of file
......@@ -11,6 +11,7 @@ enum ObjectType {
MigrationFIFO_t = 5;
RecallJob_t = 6;
Counter_t = 7;
FIFO_t = 8;
}
// The base object header. This will allow introspection and automatic
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment