Skip to content
Snippets Groups Projects
Commit 69063cb3 authored by Juergen Hannappel's avatar Juergen Hannappel
Browse files

compiling version

parent c737cc34
No related branches found
No related tags found
1 merge request!4Python driver and data analysis part
...@@ -77,7 +77,7 @@ void worker(const std::string& dataHandlerName, ...@@ -77,7 +77,7 @@ void worker(const std::string& dataHandlerName,
} }
} }
timed::instance i(dumpAnchor,data.size); timed::instance i(dumpAnchor,data.size);
outputHandler->dump(id,data.size,data.buf); outputHandler->dump(eventNo, id, data.size, data.buf);
bytesWritten+=data.size; bytesWritten+=data.size;
lastSize = data.size; lastSize = data.size;
} }
......
...@@ -13,9 +13,10 @@ namespace output { ...@@ -13,9 +13,10 @@ namespace output {
#define CLASS_NAME "output" #define CLASS_NAME "output"
#include "reflectionMagic.h" #include "reflectionMagic.h"
virtual ~base() = default; virtual ~base() = default;
virtual void dump(const std::string& id, virtual void dump(unsigned eventNo,
size_t size, const std::string& id,
const void* data) = 0; size_t size,
const void* data) = 0;
virtual void write(const std::string& outputDir, virtual void write(const std::string& outputDir,
unsigned threadNo) = 0; unsigned threadNo) = 0;
}; };
......
...@@ -13,8 +13,9 @@ namespace output { ...@@ -13,8 +13,9 @@ namespace output {
static options::single<std::string> beamTime('\0',"beamTime","beamtime id"); static options::single<std::string> beamTime('\0',"beamTime","beamtime id");
static options::single<std::string> token('\0',"token","asapo token"); static options::single<std::string> token('\0',"token","asapo token");
static options::single<std::string> endpoint('\0',"endpoint","asapo endpoint"); static options::single<std::string> endpoint('\0',"endpoint","asapo endpoint");
static options::single<uint64_t> ingest_mode('\0',"ingest_mode","asapo inget mode",0);
asapo::Producer& getProducer() { asapo::Producer& asapoProd::getProducer() {
asapo::SourceCredentials credentials(asapo::SourceType::kRaw, asapo::SourceCredentials credentials(asapo::SourceType::kRaw,
"auto", // instance io "auto", // instance io
"test_source", // pipeline step "test_source", // pipeline step
...@@ -23,6 +24,7 @@ namespace output { ...@@ -23,6 +24,7 @@ namespace output {
"test", // data source "test", // data source
token token
); );
asapo::Error err;
static std::unique_ptr<asapo::Producer> producer = asapo::Producer::Create(endpoint, static std::unique_ptr<asapo::Producer> producer = asapo::Producer::Create(endpoint,
1, 1,
asapo::RequestHandlerType::kTcp, asapo::RequestHandlerType::kTcp,
...@@ -32,49 +34,26 @@ namespace output { ...@@ -32,49 +34,26 @@ namespace output {
return *producer; return *producer;
} }
asapoProd::asapoProd(unsigned /*threadNo*/): asapoProd::asapoProd(unsigned threadNo):
openAnchor("open"), sendAnchor("send"),
writeAnchor("write"), writeAnchor("write") {
closeAnchor("close") { stream = std::to_string(threadNo);
dirFd = throwcall::badval(open(dataDir.c_str(),O_DIRECTORY),
-1,
"can't open data directory ", dataDir);
} }
asapoProd::~asapoProd() { asapoProd::~asapoProd() {
} }
void asapoProd::dump(const std::string& id, void asapoProd::dump(unsigned eventNo,const std::string& /*id*/,
size_t size, size_t size,
const void* data) { const void* data) {
int fd; timed::instance i(sendAnchor, size);
{ asapo::MessageHeader header(eventNo, size, "");
timed::instance i(openAnchor,id.size()); getProducer().Send__(header, const_cast<void*>(data), ingest_mode, stream, nullptr);
fd = throwcall::badval(openat(dirFd, id.c_str(), O_CREAT | O_TRUNC | O_WRONLY,0200),
-1,"can't open file ", id);
}
{
timed::instance i(writeAnchor, size);
auto ptr = static_cast<const char*>(data);
for (auto stillToWrite = size; stillToWrite; ) {
auto nWritten = ::write(fd,ptr,stillToWrite);
if (nWritten < 0) {
throwcall::good0(nWritten,"can't write to file", id);
}
stillToWrite -= nWritten;
ptr += stillToWrite;
}
}
{
timed::instance i(closeAnchor, size);
throwcall::good0(close(fd), "can't close ", id);
}
} }
void asapoProd::write(const std::string& outputDir, void asapoProd::write(const std::string& outputDir,
unsigned threadNo) { unsigned threadNo) {
openAnchor.write(outputDir, threadNo); sendAnchor.write(outputDir, threadNo);
writeAnchor.write(outputDir, threadNo); writeAnchor.write(outputDir, threadNo);
closeAnchor.write(outputDir, threadNo);
} }
}; //end namespace output }; //end namespace output
...@@ -10,10 +10,12 @@ namespace output { ...@@ -10,10 +10,12 @@ namespace output {
timed::anchor sendAnchor; timed::anchor sendAnchor;
timed::anchor writeAnchor; timed::anchor writeAnchor;
static asapo::Producer& getProducer(); static asapo::Producer& getProducer();
std::string stream;
public: public:
asapoProd(unsigned threadNo); asapoProd(unsigned threadNo);
virtual ~asapoProd() override; virtual ~asapoProd() override;
virtual void dump(const std::string& id, virtual void dump(unsigned eventNo,
const std::string& id,
size_t size, size_t size,
const void* data) override; const void* data) override;
virtual void write(const std::string& outputDir, virtual void write(const std::string& outputDir,
......
...@@ -15,7 +15,8 @@ namespace output { ...@@ -15,7 +15,8 @@ namespace output {
public: public:
daosFs(unsigned threadNo); daosFs(unsigned threadNo);
virtual ~daosFs() override; virtual ~daosFs() override;
virtual void dump(const std::string& id, virtual void dump(unsigned eventNo,
const std::string& id,
size_t size, size_t size,
const void* data) override; const void* data) override;
virtual void write(const std::string& outputDir, virtual void write(const std::string& outputDir,
...@@ -38,7 +39,8 @@ namespace output { ...@@ -38,7 +39,8 @@ namespace output {
"can't umount daos fs in containr ", containerName, "can't umount daos fs in containr ", containerName,
" in pool ", poolName, " on sys ", sysName); " in pool ", poolName, " on sys ", sysName);
} }
void daosFs::dump(const std::string& id, void daosFs::dump(unsigned /*eventNo*/,
const std::string& id,
size_t size, size_t size,
const void* data) { const void* data) {
dfs_obj_t* obj; dfs_obj_t* obj;
......
...@@ -13,7 +13,8 @@ namespace output { ...@@ -13,7 +13,8 @@ namespace output {
public: public:
daosKv(unsigned threadNo); daosKv(unsigned threadNo);
virtual ~daosKv() override; virtual ~daosKv() override;
virtual void dump(const std::string& id, virtual void dump(unsigned eventNo,
const std::string& id,
size_t size, size_t size,
const void* data) override; const void* data) override;
virtual void write(const std::string& outputDir, virtual void write(const std::string& outputDir,
...@@ -36,7 +37,8 @@ namespace output { ...@@ -36,7 +37,8 @@ namespace output {
"can't close kb object in ", containerName, "can't close kb object in ", containerName,
" in pool ", poolName, " on sys ", sysName); " in pool ", poolName, " on sys ", sysName);
} }
void daosKv::dump(const std::string& id, void daosKv::dump(unsigned /*eventNo*/,
const std::string& id,
size_t size, size_t size,
const void* data) { const void* data) {
timed::instance i(writeAnchor, size); timed::instance i(writeAnchor, size);
......
...@@ -5,7 +5,8 @@ namespace output { ...@@ -5,7 +5,8 @@ namespace output {
public: public:
ignore(unsigned threadNo); ignore(unsigned threadNo);
virtual ~ignore() override; virtual ~ignore() override;
virtual void dump(const std::string& id, virtual void dump(unsigned eventNo,
const std::string& id,
size_t size, size_t size,
const void* data) override; const void* data) override;
virtual void write(const std::string& outputDir, virtual void write(const std::string& outputDir,
...@@ -19,9 +20,10 @@ namespace output { ...@@ -19,9 +20,10 @@ namespace output {
} }
ignore::~ignore() { ignore::~ignore() {
} }
void ignore::dump(const std::string& /*id*/, void ignore::dump(unsigned /*eventNo*/,
size_t /*size*/, const std::string& /*id*/,
const void* /*data*/) { size_t /*size*/,
const void* /*data*/) {
} }
void ignore::write(const std::string& /*outputDir*/, void ignore::write(const std::string& /*outputDir*/,
......
...@@ -23,7 +23,8 @@ namespace output { ...@@ -23,7 +23,8 @@ namespace output {
} }
posix::~posix() { posix::~posix() {
} }
void posix::dump(const std::string& id, void posix::dump(unsigned /*eventNo*/,
const std::string& id,
size_t size, size_t size,
const void* data) { const void* data) {
int fd; int fd;
......
...@@ -13,7 +13,8 @@ namespace output { ...@@ -13,7 +13,8 @@ namespace output {
public: public:
posix(unsigned threadNo); posix(unsigned threadNo);
virtual ~posix() override; virtual ~posix() override;
virtual void dump(const std::string& id, virtual void dump(unsigned eventNo,
const std::string& id,
size_t size, size_t size,
const void* data) override; const void* data) override;
virtual void write(const std::string& outputDir, virtual void write(const std::string& outputDir,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment