Skip to content
Snippets Groups Projects
Commit c737cc34 authored by Juergen Hannappel's avatar Juergen Hannappel
Browse files

non-working asapo interface

parent 12072f59
No related branches found
No related tags found
1 merge request!4Python driver and data analysis part
......@@ -122,6 +122,12 @@ if (BEEGFS_HEADER AND USE_BEEGFS)
list(APPEND DETSIM_EXTRAS "BEEGFS")
endif()
set (ASAPO_SHARED_LIBS ON)
cmake_policy(SET CMP0057 NEW)
find_package (Asapo COMPONENTS Producer)
if (Asapo_FOUND)
message("Using ASAPO")
endif()
# # add a target to generate API documentation with Doxygen
# find_package(Doxygen)
......
......@@ -68,6 +68,10 @@ if (RDKAFKA_FOUND)
list(APPEND DETSIM_INCLUDES ${RDKAFKA_INCLUDE_DIRS})
list(APPEND DETSIM_OPTIONS ${RDKAFKA_CFLAGS_OTHER})
endif()
if (Asapo_FOUND)
list(APPEND DETSIM_SOURCES outputAsapo.cpp)
list(APPEND DETSIM_LIBS imported::asapo-producer)
endif()
list(REMOVE_DUPLICATES DETSIM_SOURCES)
#list(REMOVE_DUPLICATES DETSIM_INCLUDES)
#list(REMOVE_DUPLICATES DETSIM_OPTIONS)
......
#include "outputAsapo.h"
#include "timed.h"
#include <Options.h>
#include <throwcall.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
namespace output {
decltype(asapoProd::factory) asapoProd::factory("asapo");
static options::single<std::string> beamTime('\0',"beamTime","beamtime id");
static options::single<std::string> token('\0',"token","asapo token");
static options::single<std::string> endpoint('\0',"endpoint","asapo endpoint");
asapo::Producer& getProducer() {
asapo::SourceCredentials credentials(asapo::SourceType::kRaw,
"auto", // instance io
"test_source", // pipeline step
beamTime,
"auto", // beamline
"test", // data source
token
);
static std::unique_ptr<asapo::Producer> producer = asapo::Producer::Create(endpoint,
1,
asapo::RequestHandlerType::kTcp,
credentials,
60000,
&err);
return *producer;
}
asapoProd::asapoProd(unsigned /*threadNo*/):
openAnchor("open"),
writeAnchor("write"),
closeAnchor("close") {
dirFd = throwcall::badval(open(dataDir.c_str(),O_DIRECTORY),
-1,
"can't open data directory ", dataDir);
}
asapoProd::~asapoProd() {
}
void asapoProd::dump(const std::string& id,
size_t size,
const void* data) {
int fd;
{
timed::instance i(openAnchor,id.size());
fd = throwcall::badval(openat(dirFd, id.c_str(), O_CREAT | O_TRUNC | O_WRONLY,0200),
-1,"can't open file ", id);
}
{
timed::instance i(writeAnchor, size);
auto ptr = static_cast<const char*>(data);
for (auto stillToWrite = size; stillToWrite; ) {
auto nWritten = ::write(fd,ptr,stillToWrite);
if (nWritten < 0) {
throwcall::good0(nWritten,"can't write to file", id);
}
stillToWrite -= nWritten;
ptr += stillToWrite;
}
}
{
timed::instance i(closeAnchor, size);
throwcall::good0(close(fd), "can't close ", id);
}
}
void asapoProd::write(const std::string& outputDir,
unsigned threadNo) {
openAnchor.write(outputDir, threadNo);
writeAnchor.write(outputDir, threadNo);
closeAnchor.write(outputDir, threadNo);
}
}; //end namespace output
#ifndef __outputAsapo_h__
#define __outputAsapo_h__
#include "output.h"
#include "timed.h"
#include "asapo/asapo_producer.h"
namespace output {
class asapoProd: public base {
static factoryTemplate<asapoProd> factory;
timed::anchor sendAnchor;
timed::anchor writeAnchor;
static asapo::Producer& getProducer();
public:
asapoProd(unsigned threadNo);
virtual ~asapoProd() override;
virtual void dump(const std::string& id,
size_t size,
const void* data) override;
virtual void write(const std::string& outputDir,
unsigned threadNo) override;
};
}; // end namespace output
#endif
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment