Skip to content
Snippets Groups Projects
Commit 2d12a0ed authored by Juergen Hannappel's avatar Juergen Hannappel Committed by Juergen Hannappel
Browse files

now with simple consumer example

parent 0939603a
Branches
Tags
No related merge requests found
#define __CONSUMER_C_INTERFACE_IMPLEMENTATION__
#include <consumer.h>
typedef asapo::Consumer* asapoConsumer;
typedef asapo::SourceCredentials* asapoSourceCredentials;
typedef asapo::ErrorInterface* asapoError;
#include "consumer_c.h"
#include <algorithm>
extern c {
asapoConsumer asapoCreateConsumer(const char* server_name,
const char* source_path,
_Bool has_filesysytem,
asapoSourceCredentials source,
asapoErrorBuffer* error) {
auto c = asapo::ConsumerFactory::CreateConsumer(server_name,
source_path,
has_filesysytem,
source_path,
&(error->error));
static void timePointToTimeSpec(std::chrono::system_clock::time_point tp,
struct timespec& stamp) {
stamp.tv_sec = std::chrono::duration_cast<std::chrono::seconds>(tp.time_since_epoch()).count();
stamp.tv_nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(tp.time_since_epoch()).count() % 1000000000;
}
void asapoErrorExplain(const asapoError error, char* buf, size_t maxSize) {
auto explanation = error->Explain().substr(0, maxSize-1);
std::copy(explanation.begin(), explanation.end(), buf);
buf[explanation.size()] = '\0';
}
enum asapoErrorType asapoErrorGetType(const asapoError error) {
return static_cast<asapoErrorType>(error->GetErrorType);
}
void asapoClearError(asapoError* error) {
delete *error;
error = nullptr;
}
asapoConsumer asapoCreateConsumer(const char* server_name,
const char* source_path,
_Bool has_filesysytem,
asapoSourceCredentials source,
asapoError* error) {
asapo::Error err;
auto c = asapo::ConsumerFactory::CreateConsumer(server_name,
source_path,
has_filesysytem,
source_path,
&err);
if (err) {
error = err.release();
}
return c.release():
}
void asapoDeleteConsumer(asapoConsumer* consumer) {
delete *consumer;
consumer = nullptr;
}
asapoSourceCredentials asapoCreateSourceCredentials(const char* type,
const char* beamtime,
const char* beamline,
const char* data_source,
const char* token) {
asapo::SourceType t;
auto error = asapo::GetSourceTypeFromString(type,&t);
return new asapo::SourceCredentials(t, beamtime, beamline,
data_source, token);
}
void asapoDeleteSourceCredentials(asapoSourceCredentials* cred) {
delete *cred;
cred = nullptr;
}
const char* asapoMessageMetaGetName(const asapoMessageMeta md) {
return md->name.c_str();
}
void asapoMessageMetaGetTimestamp(const asapoMessageMeta md,
struct timespec* stamp) {
timePointToTimeSpec(md->timestamp, stamp);
}
uint64_t asapoMessageMetaGetSize(const asapoMessageMeta md) {
return md->size;
}
uint64_t asapoMessageMetaGetId(const asapoMessageMeta md) {
return md->id;
}
const char* asapoMessageMetaGetSource(const asapoMessageMeta md) {
return md->source.c_str();
}
const char* asapoMessageMetaGetMetaData(const asapoMessageMeta md) {
return md->metadata.c_str();
}
uint64_t asapoMessageMetaGetBuf_id(const asapoMessageMeta md) {
return md->buf_id;
}
uint64_t asapoMessageMetaGetDataset_Substream(const asapoMessageMeta md) {
return md->dataset_substream;
}
return c.release():
}
#include <string.h>
#include "consumer_c.h"
void exit_if_error(const char *error_string, const asapoError err) {
if (err) {
char buf[1024];
asapoErrorExplain(err, buf, sizeof(buf));
printf("%s %s\n", error_string, buf);
exit(EXIT_FAILURE);
}
}
int main(int argc, char* argv[]) {
asapoError err;
const char *endpoint = "asapo-services2:8400";
const char *beamtime = "asapo_test";
const char *token = "KmUDdacgBzaOD3NIJvN1NmKGqWKtx0DK-NyPjdpeWkc=";
asapoSourceCredentials cred = asapoCreateSourceCredentials("processed",
beamtime,
"", "", token);
asapoConsumer consumer = asapoCreateConsumer(endpoint,
"", true,
asapo::SourceCredentials{beamtime,
"", "", token},
&err);
asapoDeleteSourceCredentials(&cred);
exit_if_error("Cannot create consumer", err);
consumer->SetTimeout((uint64_t) 1000);
asapoGroupId group_id = asapoConsumerGenerateNewGroupId(consumer, &err);
exit_if_error("Cannot create group id", err);
asapoMessageMeta fi = asapoCreateMessageMeta();
asapoMessageData data;
err = asappConsumerGetLast(consumer,&fi, group_id, &data);
exit_if_error("Cannot get next record", err);
printf("id: %llu\n", asapoMessageMetaGetId(fi);
printf("file name: %s\n", asapoMessageMetaGetName(fi);
std::cout << "file content: " << reinterpret_cast<char const*>(data.get()) << std::endl;
asapoDeleteMessageMeta(&fi);
asapoDeleteConsumer(&consumer);
return EXIT_SUCCESS;
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment