diff --git a/CMakeLists.txt b/CMakeLists.txt index 3fad3021a2be297b3b944bf6780bbaff4b7094d7..828d23733d5fbfd48d77a68a16baa466c7193f72 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -122,6 +122,12 @@ if (BEEGFS_HEADER AND USE_BEEGFS) list(APPEND DETSIM_EXTRAS "BEEGFS") endif() +set (ASAPO_SHARED_LIBS ON) +cmake_policy(SET CMP0057 NEW) +find_package (Asapo COMPONENTS Producer) +if (Asapo_FOUND) + message("Using ASAPO") +endif() # # add a target to generate API documentation with Doxygen # find_package(Doxygen) diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 4fca9d949a5049b7199ea53133bf1809de8f521c..8a962d46017798729db319cd159bef6bf28e353b 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -68,6 +68,10 @@ if (RDKAFKA_FOUND) list(APPEND DETSIM_INCLUDES ${RDKAFKA_INCLUDE_DIRS}) list(APPEND DETSIM_OPTIONS ${RDKAFKA_CFLAGS_OTHER}) endif() +if (Asapo_FOUND) + list(APPEND DETSIM_SOURCES outputAsapo.cpp) + list(APPEND DETSIM_LIBS imported::asapo-producer) +endif() list(REMOVE_DUPLICATES DETSIM_SOURCES) #list(REMOVE_DUPLICATES DETSIM_INCLUDES) #list(REMOVE_DUPLICATES DETSIM_OPTIONS) diff --git a/src/outputAsapo.cpp b/src/outputAsapo.cpp new file mode 100644 index 0000000000000000000000000000000000000000..331245638a3ddd3e13adc6e2bc7af7109d02a0a7 --- /dev/null +++ b/src/outputAsapo.cpp @@ -0,0 +1,80 @@ +#include "outputAsapo.h" +#include "timed.h" +#include <Options.h> +#include <throwcall.h> +#include <sys/types.h> +#include <sys/stat.h> +#include <fcntl.h> +#include <unistd.h> + +namespace output { + decltype(asapoProd::factory) asapoProd::factory("asapo"); + + static options::single<std::string> beamTime('\0',"beamTime","beamtime id"); + static options::single<std::string> token('\0',"token","asapo token"); + static options::single<std::string> endpoint('\0',"endpoint","asapo endpoint"); + + asapo::Producer& getProducer() { + asapo::SourceCredentials credentials(asapo::SourceType::kRaw, + "auto", // instance io + "test_source", // pipeline step + beamTime, + "auto", // beamline + "test", // data source + token + ); + static std::unique_ptr<asapo::Producer> producer = asapo::Producer::Create(endpoint, + 1, + asapo::RequestHandlerType::kTcp, + credentials, + 60000, + &err); + return *producer; + } + + asapoProd::asapoProd(unsigned /*threadNo*/): + openAnchor("open"), + writeAnchor("write"), + closeAnchor("close") { + dirFd = throwcall::badval(open(dataDir.c_str(),O_DIRECTORY), + -1, + "can't open data directory ", dataDir); + + } + asapoProd::~asapoProd() { + } + void asapoProd::dump(const std::string& id, + size_t size, + const void* data) { + int fd; + { + timed::instance i(openAnchor,id.size()); + fd = throwcall::badval(openat(dirFd, id.c_str(), O_CREAT | O_TRUNC | O_WRONLY,0200), + -1,"can't open file ", id); + } + { + timed::instance i(writeAnchor, size); + auto ptr = static_cast<const char*>(data); + for (auto stillToWrite = size; stillToWrite; ) { + auto nWritten = ::write(fd,ptr,stillToWrite); + if (nWritten < 0) { + throwcall::good0(nWritten,"can't write to file", id); + } + stillToWrite -= nWritten; + ptr += stillToWrite; + } + } + { + timed::instance i(closeAnchor, size); + throwcall::good0(close(fd), "can't close ", id); + } + } + + void asapoProd::write(const std::string& outputDir, + unsigned threadNo) { + openAnchor.write(outputDir, threadNo); + writeAnchor.write(outputDir, threadNo); + closeAnchor.write(outputDir, threadNo); + } + +}; //end namespace output diff --git a/src/outputAsapo.h b/src/outputAsapo.h new file mode 100644 index 0000000000000000000000000000000000000000..82b1b9abef37d2b7794f1a0f005e58e31c784301 --- /dev/null +++ b/src/outputAsapo.h @@ -0,0 +1,27 @@ +#ifndef __outputAsapo_h__ +#define __outputAsapo_h__ +#include "output.h" +#include "timed.h" +#include "asapo/asapo_producer.h" + +namespace output { + class asapoProd: public base { + static factoryTemplate<asapoProd> factory; + timed::anchor sendAnchor; + timed::anchor writeAnchor; + static asapo::Producer& getProducer(); + public: + asapoProd(unsigned threadNo); + virtual ~asapoProd() override; + virtual void dump(const std::string& id, + size_t size, + const void* data) override; + virtual void write(const std::string& outputDir, + unsigned threadNo) override; + }; + + +}; // end namespace output + + +#endif