diff --git a/CMakeLists.txt b/CMakeLists.txt index 77518e44b44e550a38725ec56b897f437e207e98..18a9f861fb643ccb86f9ade6106fdd498ce58831 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -129,6 +129,12 @@ if (BEEGFS_HEADER AND USE_BEEGFS) list(APPEND DETSIM_EXTRAS "BEEGFS") endif() +set (ASAPO_SHARED_LIBS ON) +cmake_policy(SET CMP0057 NEW) +find_package (Asapo COMPONENTS Producer) +if (Asapo_FOUND) + message("Using ASAPO") +endif() # # add a target to generate API documentation with Doxygen # find_package(Doxygen) diff --git a/src/detsim.cpp b/src/detsim.cpp index 986d1ee5172f3637dd20023704b66de7c4976688..47122ace3c7a5dae50570da74db4ef6377ccce32 100644 --- a/src/detsim.cpp +++ b/src/detsim.cpp @@ -79,7 +79,7 @@ void worker(const std::string& dataHandlerName, } } timed::instance i(dumpAnchor,data.size); - outputHandler->dump(id,data.size,data.buf); + outputHandler->dump(eventNo, id, data.size, data.buf); bytesWritten+=data.size; lastSize = data.size; } diff --git a/src/output.h b/src/output.h index 4b74573b5622c1eac1b5a9f0ec34fcada6432749..8166da7d3d2e34a3d82c6b5c616e1208c434bf0f 100644 --- a/src/output.h +++ b/src/output.h @@ -13,9 +13,10 @@ namespace output { #define CLASS_NAME "output" #include "reflectionMagic.h" virtual ~base() = default; - virtual void dump(const std::string& id, - size_t size, - const void* data) = 0; + virtual void dump(unsigned eventNo, + const std::string& id, + size_t size, + const void* data) = 0; virtual void write(const std::string& outputDir, unsigned threadNo) = 0; }; diff --git a/src/outputAsapo.cpp b/src/outputAsapo.cpp new file mode 100644 index 0000000000000000000000000000000000000000..4a310c3a90b5e64c6d32ca4dc48ff27156ee6337 --- /dev/null +++ b/src/outputAsapo.cpp @@ -0,0 +1,59 @@ +#include "outputAsapo.h" +#include "timed.h" +#include <Options.h> +#include <throwcall.h> +#include <sys/types.h> +#include <sys/stat.h> +#include <fcntl.h> +#include <unistd.h> + +namespace output { + decltype(asapoProd::factory) asapoProd::factory("asapo"); + + static options::single<std::string> beamTime('\0',"beamTime","beamtime id"); + static options::single<std::string> token('\0',"token","asapo token"); + static options::single<std::string> endpoint('\0',"endpoint","asapo endpoint"); + static options::single<uint64_t> ingest_mode('\0',"ingest_mode","asapo inget mode",0); + + asapo::Producer& asapoProd::getProducer() { + asapo::SourceCredentials credentials(asapo::SourceType::kRaw, + "auto", // instance io + "test_source", // pipeline step + beamTime, + "auto", // beamline + "test", // data source + token + ); + asapo::Error err; + static std::unique_ptr<asapo::Producer> producer = asapo::Producer::Create(endpoint, + 1, + asapo::RequestHandlerType::kTcp, + credentials, + 60000, + &err); + return *producer; + } + + asapoProd::asapoProd(unsigned threadNo): + sendAnchor("send"), + writeAnchor("write") { + stream = std::to_string(threadNo); + + } + asapoProd::~asapoProd() { + } + void asapoProd::dump(unsigned eventNo,const std::string& /*id*/, + size_t size, + const void* data) { + timed::instance i(sendAnchor, size); + asapo::MessageHeader header(eventNo, size, ""); + getProducer().Send__(header, const_cast<void*>(data), ingest_mode, stream, nullptr); + } + + void asapoProd::write(const std::string& outputDir, + unsigned threadNo) { + sendAnchor.write(outputDir, threadNo); + writeAnchor.write(outputDir, threadNo); + } + +}; //end namespace output diff --git a/src/outputAsapo.h b/src/outputAsapo.h new file mode 100644 index 0000000000000000000000000000000000000000..7d322f388a4c84d0ce4c2d8811f4a65a5b519578 --- /dev/null +++ b/src/outputAsapo.h @@ -0,0 +1,29 @@ +#ifndef __outputAsapo_h__ +#define __outputAsapo_h__ +#include "output.h" +#include "timed.h" +#include "asapo/asapo_producer.h" + +namespace output { + class asapoProd: public base { + static factoryTemplate<asapoProd> factory; + timed::anchor sendAnchor; + timed::anchor writeAnchor; + static asapo::Producer& getProducer(); + std::string stream; + public: + asapoProd(unsigned threadNo); + virtual ~asapoProd() override; + virtual void dump(unsigned eventNo, + const std::string& id, + size_t size, + const void* data) override; + virtual void write(const std::string& outputDir, + unsigned threadNo) override; + }; + + +}; // end namespace output + + +#endif diff --git a/src/outputDaosFs.cpp b/src/outputDaosFs.cpp index fc1533d5af81bddf2104a6816b3a6421fb4a5860..04cbdcc9ebab547613e7e7c6382451354806374c 100644 --- a/src/outputDaosFs.cpp +++ b/src/outputDaosFs.cpp @@ -15,7 +15,8 @@ namespace output { public: daosFs(unsigned threadNo); virtual ~daosFs() override; - virtual void dump(const std::string& id, + virtual void dump(unsigned eventNo, + const std::string& id, size_t size, const void* data) override; virtual void write(const std::string& outputDir, @@ -38,7 +39,8 @@ namespace output { "can't umount daos fs in containr ", containerName, " in pool ", poolName, " on sys ", sysName); } - void daosFs::dump(const std::string& id, + void daosFs::dump(unsigned /*eventNo*/, + const std::string& id, size_t size, const void* data) { dfs_obj_t* obj; diff --git a/src/outputDaosKv.cpp b/src/outputDaosKv.cpp index f591a348827fd1ed86b748af9342295a4def1af6..47e8a07b797d7abd5926a27cea0e9e60709d77c7 100644 --- a/src/outputDaosKv.cpp +++ b/src/outputDaosKv.cpp @@ -13,7 +13,8 @@ namespace output { public: daosKv(unsigned threadNo); virtual ~daosKv() override; - virtual void dump(const std::string& id, + virtual void dump(unsigned eventNo, + const std::string& id, size_t size, const void* data) override; virtual void write(const std::string& outputDir, @@ -36,7 +37,8 @@ namespace output { "can't close kb object in ", containerName, " in pool ", poolName, " on sys ", sysName); } - void daosKv::dump(const std::string& id, + void daosKv::dump(unsigned /*eventNo*/, + const std::string& id, size_t size, const void* data) { timed::instance i(writeAnchor, size); diff --git a/src/outputIgnore.cpp b/src/outputIgnore.cpp index 41e1f4f455d5c4f196db032a4d674b62f5ec561f..456114ea95473a6102c29072807251108e9aad4c 100644 --- a/src/outputIgnore.cpp +++ b/src/outputIgnore.cpp @@ -5,7 +5,8 @@ namespace output { public: ignore(unsigned threadNo); virtual ~ignore() override; - virtual void dump(const std::string& id, + virtual void dump(unsigned eventNo, + const std::string& id, size_t size, const void* data) override; virtual void write(const std::string& outputDir, @@ -19,9 +20,10 @@ namespace output { } ignore::~ignore() { } -void ignore::dump(const std::string& /*id*/, - size_t /*size*/, - const void* /*data*/) { + void ignore::dump(unsigned /*eventNo*/, + const std::string& /*id*/, + size_t /*size*/, + const void* /*data*/) { } void ignore::write(const std::string& /*outputDir*/, diff --git a/src/outputPosix.cpp b/src/outputPosix.cpp index 2c4fdb0b52a90044c673ed3cf9444c25c410bb85..34b1023f82a5262feca036b049049109ccd3edaa 100644 --- a/src/outputPosix.cpp +++ b/src/outputPosix.cpp @@ -23,7 +23,8 @@ namespace output { } posix::~posix() { } - void posix::dump(const std::string& id, + void posix::dump(unsigned /*eventNo*/, + const std::string& id, size_t size, const void* data) { int fd; diff --git a/src/outputPosix.h b/src/outputPosix.h index 9e72df18138b6a0f866a340a9c9800868b7364b4..54cb53d5af4c877c4cb296a63cff64d18ec5adaf 100644 --- a/src/outputPosix.h +++ b/src/outputPosix.h @@ -13,7 +13,8 @@ namespace output { public: posix(unsigned threadNo); virtual ~posix() override; - virtual void dump(const std::string& id, + virtual void dump(unsigned eventNo, + const std::string& id, size_t size, const void* data) override; virtual void write(const std::string& outputDir,