diff --git a/objectstore/BackendTest.cpp b/objectstore/BackendTest.cpp index 7f8b1062fdc6e4321b54853b844ace45952419a6..0a73e3983c4429ae7b912cae41a059c997000fad 100644 --- a/objectstore/BackendTest.cpp +++ b/objectstore/BackendTest.cpp @@ -57,7 +57,7 @@ TEST_P(BackendAbstractTest, ParametersInterface) { } cta::objectstore::BackendVFS osVFS; -#define TEST_RADOS 0 +#define TEST_RADOS 1 #if TEST_RADOS cta::objectstore::BackendRados osRados("tapetest", "tapetest"); INSTANTIATE_TEST_CASE_P(BackendTest, BackendAbstractTest, ::testing::Values(&osVFS, &osRados)); diff --git a/objectstore/CMakeLists.txt b/objectstore/CMakeLists.txt index b1b334d31e13ff23ba69681eea6e115685e78e05..f12ac0f9611c6319e7d7078da1878c46fab9e33f 100644 --- a/objectstore/CMakeLists.txt +++ b/objectstore/CMakeLists.txt @@ -19,6 +19,7 @@ add_library (CTAObjectStore BackendRados.cpp ObjectOps.cpp ProtocolBuffersAlgorithms.cpp + FIFO.cpp exception/Backtrace.cpp exception/Errnum.cpp exception/Exception.cpp @@ -55,6 +56,7 @@ add_library (CTAObjectStore set(ObjectStoreUnitTests BackendTest.cpp RootEntryTest.cpp + FIFOTest.cpp ) add_executable(unitTests unitTests.cpp ${ObjectStoreUnitTests}) diff --git a/objectstore/FIFO.hpp b/objectstore/FIFO.hpp index b3a22e1220146c9ab26b0bb6259a89d9c58da58a..61797dbebff63995684b7e166ad84219f40ce7a7 100644 --- a/objectstore/FIFO.hpp +++ b/objectstore/FIFO.hpp @@ -7,131 +7,34 @@ namespace cta { namespace objectstore { -class FIFO: private ObjectOps<serializers::FIFO> { +class FIFO: public ObjectOps<serializers::FIFO> { public: - FIFO(const std::string & name, Agent & agent): - ObjectOps<serializers::FIFO>(agent.objectStore(), name) { - serializers::FIFO fs; - getPayloadFromObjectStoreAutoLock(fs, agent.getFreeContext()); - } + FIFO(const std::string & name, Backend & os): + ObjectOps<serializers::FIFO>(os, name) {} + + void initialize(); -private: - void lock(ContextHandle & ctx) { - lockExclusiveAndRead(m_currentState, ctx, __func__); - } - -public: class FIFOEmpty: public cta::exception::Exception { public: FIFOEmpty(const std::string & context): cta::exception::Exception(context) {} }; - friend class Transaction; - class Transaction { - public: - Transaction(FIFO & fifo, Agent & agent): - m_fifo(fifo), m_ctx(agent.getFreeContext()), m_writeDone(false) { - m_fifo.lock(m_ctx); - } - - ~Transaction() { - try { - if(!m_writeDone) - m_fifo.unlock(m_ctx); - } catch (std::exception&) { - } catch (...) {throw;} - } - - std::string peek() { - if (m_writeDone) - throw cta::exception::Exception("In FIFO::Transaction::peek: write already occurred"); - if (m_fifo.m_currentState.readpointer() >= (uint64_t)m_fifo.m_currentState.name_size()) { - throw FIFOEmpty("In FIFO::Transaction::peek: FIFO empty"); - } - return m_fifo.m_currentState.name(m_fifo.m_currentState.readpointer()); - } - - void popAndUnlock() { - if (m_writeDone) - throw cta::exception::Exception("In FIFO::Transaction::popAndUnlock: write already occurred"); - if (m_fifo.m_currentState.readpointer() >= (uint64_t)m_fifo.m_currentState.name_size()) { - throw FIFOEmpty("In FIFO::Transaction::popAndUnlock: FIFO empty"); - } - m_fifo.m_currentState.set_readpointer(m_fifo.m_currentState.readpointer()+1); - if (m_fifo.m_currentState.readpointer() > 50) { - m_fifo.compactCurrentState(); - } - m_fifo.write(m_fifo.m_currentState); - m_fifo.unlock(m_ctx); - m_writeDone = true; - } - private: - FIFO & m_fifo; - ContextHandle & m_ctx; - bool m_writeDone; - }; - - Transaction startTransaction(Agent & agent) { - return Transaction(*this, agent); - } + std::string peek(); + + void pop(); - void push(std::string name, Agent & agent) { - serializers::FIFO fs; - ContextHandle & context = agent.getFreeContext(); - lockExclusiveAndRead(fs, context, __func__); - fs.add_name(name); - write(fs); - unlock(context); - } + void push(std::string name); - std::string dump(Agent & agent) { - serializers::FIFO fs; - getPayloadFromObjectStoreAutoLock(fs, agent.getFreeContext()); - std::stringstream ret; - ret<< "<<<< FIFO dump start" << std::endl - << "Read pointer=" << fs.readpointer() << std::endl - << "Array size=" << fs.name_size() << std::endl; -// for (int i=fs.readpointer(); i<fs.name_size(); i++) { -// ret << "name[phys=" << i << " ,log=" << i-fs.readpointer() -// << "]=" << fs.name(i) << std::endl; -// } - ret<< ">>>> FIFO dump end." << std::endl; - return ret.str(); - } + std::string dump(); - uint64_t size(Agent & agent) { - serializers::FIFO fs; - getPayloadFromObjectStoreAutoLock(fs, agent.getFreeContext()); - uint64_t ret = fs.name_size() - fs.readpointer(); - return ret; - } + uint64_t size(); + std::list<std::string> getContent(); + private: - serializers::FIFO m_currentState; - void compactCurrentState() { - uint64_t oldReadPointer = m_currentState.readpointer(); - uint64_t oldSize = m_currentState.name_size(); - // Copy the elements at position oldReadPointer + i to i (squash all) - // before the read pointer - for (int i = oldReadPointer; i<m_currentState.mutable_name()->size(); i++) { - *m_currentState.mutable_name(i-oldReadPointer) = m_currentState.name(i); - } - // Shorten the name array by oldReadPointer elements - for (uint64_t i = 0; i < oldReadPointer; i++) { - m_currentState.mutable_name()->RemoveLast(); - } - // reset the read pointer - m_currentState.set_readpointer(0); - // Check the size is as expected - if ((uint64_t)m_currentState.name_size() != oldSize - oldReadPointer) { - std::stringstream err; - err << "In FIFO::compactCurrentState: wrong size after compaction: " - << "oldSize=" << oldSize << " oldReadPointer=" << oldReadPointer - << " newSize=" << m_currentState.name_size(); - throw cta::exception::Exception(err.str()); - } - } + void compact(); + static const size_t c_compactionSize; }; }} \ No newline at end of file diff --git a/objectstore/ObjectOps.cpp b/objectstore/ObjectOps.cpp index 3b7c03068411a602ac118c45686313413f69135d..87c64d9aa0a2f0b3e4ec9a4357d8d83ba8ea4e41 100644 --- a/objectstore/ObjectOps.cpp +++ b/objectstore/ObjectOps.cpp @@ -13,6 +13,7 @@ namespace cta { namespace objectstore { MAKE_CTA_OBJECTSTORE_OBJECTOPS_TYPEID(MigrationFIFO); MAKE_CTA_OBJECTSTORE_OBJECTOPS_TYPEID(RecallJob); MAKE_CTA_OBJECTSTORE_OBJECTOPS_TYPEID(Counter); + MAKE_CTA_OBJECTSTORE_OBJECTOPS_TYPEID(FIFO); #undef MAKE_CTA_OBJECTSTORE_OBJECTOPS_TYPEID }} \ No newline at end of file diff --git a/objectstore/cta.proto b/objectstore/cta.proto index 917e98cd385959ed3f70f7f6e13f3127dbe9b7d7..559b632ff2e36d18352a63cd943959d47792ae6b 100644 --- a/objectstore/cta.proto +++ b/objectstore/cta.proto @@ -11,6 +11,7 @@ enum ObjectType { MigrationFIFO_t = 5; RecallJob_t = 6; Counter_t = 7; + FIFO_t = 8; } // The base object header. This will allow introspection and automatic