diff --git a/src/detsim.cpp b/src/detsim.cpp index dd8e95f5414b9a42a06e01c30e71fa9194073918..24038fb8d7037461cd96395914ed96aab6fcc114 100644 --- a/src/detsim.cpp +++ b/src/detsim.cpp @@ -77,7 +77,7 @@ void worker(const std::string& dataHandlerName, } } timed::instance i(dumpAnchor,data.size); - outputHandler->dump(id,data.size,data.buf); + outputHandler->dump(eventNo, id, data.size, data.buf); bytesWritten+=data.size; lastSize = data.size; } diff --git a/src/output.h b/src/output.h index 4b74573b5622c1eac1b5a9f0ec34fcada6432749..8166da7d3d2e34a3d82c6b5c616e1208c434bf0f 100644 --- a/src/output.h +++ b/src/output.h @@ -13,9 +13,10 @@ namespace output { #define CLASS_NAME "output" #include "reflectionMagic.h" virtual ~base() = default; - virtual void dump(const std::string& id, - size_t size, - const void* data) = 0; + virtual void dump(unsigned eventNo, + const std::string& id, + size_t size, + const void* data) = 0; virtual void write(const std::string& outputDir, unsigned threadNo) = 0; }; diff --git a/src/outputAsapo.cpp b/src/outputAsapo.cpp index 331245638a3ddd3e13adc6e2bc7af7109d02a0a7..4a310c3a90b5e64c6d32ca4dc48ff27156ee6337 100644 --- a/src/outputAsapo.cpp +++ b/src/outputAsapo.cpp @@ -13,8 +13,9 @@ namespace output { static options::single<std::string> beamTime('\0',"beamTime","beamtime id"); static options::single<std::string> token('\0',"token","asapo token"); static options::single<std::string> endpoint('\0',"endpoint","asapo endpoint"); + static options::single<uint64_t> ingest_mode('\0',"ingest_mode","asapo inget mode",0); - asapo::Producer& getProducer() { + asapo::Producer& asapoProd::getProducer() { asapo::SourceCredentials credentials(asapo::SourceType::kRaw, "auto", // instance io "test_source", // pipeline step @@ -23,6 +24,7 @@ namespace output { "test", // data source token ); + asapo::Error err; static std::unique_ptr<asapo::Producer> producer = asapo::Producer::Create(endpoint, 1, asapo::RequestHandlerType::kTcp, @@ -32,49 +34,26 @@ namespace output { return *producer; } - asapoProd::asapoProd(unsigned /*threadNo*/): - openAnchor("open"), - writeAnchor("write"), - closeAnchor("close") { - dirFd = throwcall::badval(open(dataDir.c_str(),O_DIRECTORY), - -1, - "can't open data directory ", dataDir); + asapoProd::asapoProd(unsigned threadNo): + sendAnchor("send"), + writeAnchor("write") { + stream = std::to_string(threadNo); } asapoProd::~asapoProd() { } - void asapoProd::dump(const std::string& id, - size_t size, - const void* data) { - int fd; - { - timed::instance i(openAnchor,id.size()); - fd = throwcall::badval(openat(dirFd, id.c_str(), O_CREAT | O_TRUNC | O_WRONLY,0200), - -1,"can't open file ", id); - } - { - timed::instance i(writeAnchor, size); - auto ptr = static_cast<const char*>(data); - for (auto stillToWrite = size; stillToWrite; ) { - auto nWritten = ::write(fd,ptr,stillToWrite); - if (nWritten < 0) { - throwcall::good0(nWritten,"can't write to file", id); - } - stillToWrite -= nWritten; - ptr += stillToWrite; - } - } - { - timed::instance i(closeAnchor, size); - throwcall::good0(close(fd), "can't close ", id); - } + void asapoProd::dump(unsigned eventNo,const std::string& /*id*/, + size_t size, + const void* data) { + timed::instance i(sendAnchor, size); + asapo::MessageHeader header(eventNo, size, ""); + getProducer().Send__(header, const_cast<void*>(data), ingest_mode, stream, nullptr); } void asapoProd::write(const std::string& outputDir, unsigned threadNo) { - openAnchor.write(outputDir, threadNo); + sendAnchor.write(outputDir, threadNo); writeAnchor.write(outputDir, threadNo); - closeAnchor.write(outputDir, threadNo); } }; //end namespace output diff --git a/src/outputAsapo.h b/src/outputAsapo.h index 82b1b9abef37d2b7794f1a0f005e58e31c784301..7d322f388a4c84d0ce4c2d8811f4a65a5b519578 100644 --- a/src/outputAsapo.h +++ b/src/outputAsapo.h @@ -10,10 +10,12 @@ namespace output { timed::anchor sendAnchor; timed::anchor writeAnchor; static asapo::Producer& getProducer(); + std::string stream; public: asapoProd(unsigned threadNo); virtual ~asapoProd() override; - virtual void dump(const std::string& id, + virtual void dump(unsigned eventNo, + const std::string& id, size_t size, const void* data) override; virtual void write(const std::string& outputDir, diff --git a/src/outputDaosFs.cpp b/src/outputDaosFs.cpp index fc1533d5af81bddf2104a6816b3a6421fb4a5860..04cbdcc9ebab547613e7e7c6382451354806374c 100644 --- a/src/outputDaosFs.cpp +++ b/src/outputDaosFs.cpp @@ -15,7 +15,8 @@ namespace output { public: daosFs(unsigned threadNo); virtual ~daosFs() override; - virtual void dump(const std::string& id, + virtual void dump(unsigned eventNo, + const std::string& id, size_t size, const void* data) override; virtual void write(const std::string& outputDir, @@ -38,7 +39,8 @@ namespace output { "can't umount daos fs in containr ", containerName, " in pool ", poolName, " on sys ", sysName); } - void daosFs::dump(const std::string& id, + void daosFs::dump(unsigned /*eventNo*/, + const std::string& id, size_t size, const void* data) { dfs_obj_t* obj; diff --git a/src/outputDaosKv.cpp b/src/outputDaosKv.cpp index f591a348827fd1ed86b748af9342295a4def1af6..47e8a07b797d7abd5926a27cea0e9e60709d77c7 100644 --- a/src/outputDaosKv.cpp +++ b/src/outputDaosKv.cpp @@ -13,7 +13,8 @@ namespace output { public: daosKv(unsigned threadNo); virtual ~daosKv() override; - virtual void dump(const std::string& id, + virtual void dump(unsigned eventNo, + const std::string& id, size_t size, const void* data) override; virtual void write(const std::string& outputDir, @@ -36,7 +37,8 @@ namespace output { "can't close kb object in ", containerName, " in pool ", poolName, " on sys ", sysName); } - void daosKv::dump(const std::string& id, + void daosKv::dump(unsigned /*eventNo*/, + const std::string& id, size_t size, const void* data) { timed::instance i(writeAnchor, size); diff --git a/src/outputIgnore.cpp b/src/outputIgnore.cpp index 41e1f4f455d5c4f196db032a4d674b62f5ec561f..456114ea95473a6102c29072807251108e9aad4c 100644 --- a/src/outputIgnore.cpp +++ b/src/outputIgnore.cpp @@ -5,7 +5,8 @@ namespace output { public: ignore(unsigned threadNo); virtual ~ignore() override; - virtual void dump(const std::string& id, + virtual void dump(unsigned eventNo, + const std::string& id, size_t size, const void* data) override; virtual void write(const std::string& outputDir, @@ -19,9 +20,10 @@ namespace output { } ignore::~ignore() { } -void ignore::dump(const std::string& /*id*/, - size_t /*size*/, - const void* /*data*/) { + void ignore::dump(unsigned /*eventNo*/, + const std::string& /*id*/, + size_t /*size*/, + const void* /*data*/) { } void ignore::write(const std::string& /*outputDir*/, diff --git a/src/outputPosix.cpp b/src/outputPosix.cpp index 2c4fdb0b52a90044c673ed3cf9444c25c410bb85..34b1023f82a5262feca036b049049109ccd3edaa 100644 --- a/src/outputPosix.cpp +++ b/src/outputPosix.cpp @@ -23,7 +23,8 @@ namespace output { } posix::~posix() { } - void posix::dump(const std::string& id, + void posix::dump(unsigned /*eventNo*/, + const std::string& id, size_t size, const void* data) { int fd; diff --git a/src/outputPosix.h b/src/outputPosix.h index 9e72df18138b6a0f866a340a9c9800868b7364b4..54cb53d5af4c877c4cb296a63cff64d18ec5adaf 100644 --- a/src/outputPosix.h +++ b/src/outputPosix.h @@ -13,7 +13,8 @@ namespace output { public: posix(unsigned threadNo); virtual ~posix() override; - virtual void dump(const std::string& id, + virtual void dump(unsigned eventNo, + const std::string& id, size_t size, const void* data) override; virtual void write(const std::string& outputDir,