Commit 37b0d9c5 authored by Jan Kotanski's avatar Jan Kotanski
Browse files

New upstream version 20.09.1

parent f68bdf6f
Metadata-Version: 1.0
Name: asapo_producer
Name: asapo_consumer
Version: 20.09.1
Summary: UNKNOWN
Home-page: UNKNOWN
......
This diff is collapsed.
#ifndef ASAPO_ASAPO_WRAPPERS_H
#define ASAPO_ASAPO_WRAPPERS_H
#include <memory>
#include <functional>
namespace asapo {
inline std::string GetErrorString(asapo::Error* err) {
......@@ -13,27 +10,6 @@ inline std::string GetErrorString(asapo::Error* err) {
return "";
}
using RequestCallbackCython = void (*)(void*, void*, RequestCallbackPayload payload, Error err);
using RequestCallbackCythonMemory = void (*)(void*, void*, void*, RequestCallbackPayload payload, Error err);
RequestCallback unwrap_callback(RequestCallbackCython callback, void* c_self, void* py_func) {
if (py_func == NULL) {
return nullptr;
}
RequestCallback wrapper = [ = ](RequestCallbackPayload payload, Error err) -> void {
callback(c_self, py_func, std::move(payload), std::move(err));
};
return wrapper;
}
RequestCallback unwrap_callback_with_memory(RequestCallbackCythonMemory callback, void* c_self, void* py_func,
void* nd_array) {
RequestCallback wrapper = [ = ](RequestCallbackPayload payload, Error err) -> void {
callback(c_self, py_func, nd_array, std::move(payload), std::move(err));
};
return wrapper;
}
}
......
/** @defgroup consumer The Consumer Group
* This is the consumer group
* @{
*/
#ifndef ASAPO_ASAPO_CONSUMER_H
#define ASAPO_ASAPO_CONSUMER_H
#include "consumer/data_broker.h"
#include "consumer/consumer_error.h"
#include "common/version.h"
#include <ostream>
#endif //ASAPO_ASAPO_CONSUMER_H
/** @} */ // end of consumer
/** @defgroup producer The Producer Group
* This is the producer group
* @{
*/
/** @} */ // end of producer
/**
* @file asapo_producer.h
* @ingroup producer
*/
#ifndef ASAPO_ASAPO_PRODUCER_H
#define ASAPO_ASAPO_PRODUCER_H
#include "common/io_error.h"
#include "common/version.h"
#include "producer/producer.h"
#include "producer/producer_error.h"
#endif //ASAPO_ASAPO_PRODUCER_H
#ifndef ASAPO_CONSUMER_ERROR_H
#define ASAPO_CONSUMER_ERROR_H
#include "common/error.h"
#include "common/io_error.h"
namespace asapo {
enum class ConsumerErrorType {
kNoData,
kEndOfStream,
kStreamFinished,
kUnavailableService,
kInterruptedTransaction,
kLocalIOError,
kWrongInput
};
using ConsumerErrorTemplate = ServiceErrorTemplate<ConsumerErrorType, ErrorType::kConsumerError>;
class ConsumerErrorData : public CustomErrorData {
public:
uint64_t id;
uint64_t id_max;
std::string next_substream;
};
namespace ConsumerErrorTemplates {
auto const kLocalIOError = ConsumerErrorTemplate {
"local i/o error", ConsumerErrorType::kLocalIOError
};
auto const kStreamFinished = ConsumerErrorTemplate {
"stream finished", ConsumerErrorType::kStreamFinished
};
auto const kEndOfStream = ConsumerErrorTemplate {
"no data - end of stream", ConsumerErrorType::kEndOfStream
};
auto const kNoData = ConsumerErrorTemplate {
"no data", ConsumerErrorType::kNoData
};
auto const kWrongInput = ConsumerErrorTemplate {
"wrong input", ConsumerErrorType::kWrongInput
};
auto const kInterruptedTransaction = ConsumerErrorTemplate {
"error from broker server", ConsumerErrorType::kInterruptedTransaction
};
auto const kUnavailableService = ConsumerErrorTemplate {
"service unavailable", ConsumerErrorType::kUnavailableService
};
}
}
#endif //ASAPO_CONSUMER_ERROR_H
#ifndef ASAPO_DATASOURCE_H
#define ASAPO_DATASOURCE_H
#include <memory>
#include <string>
#include <vector>
#include <chrono>
#include "common/data_structs.h"
#include "common/error.h"
#include "common/networking.h"
namespace asapo {
class DataBroker {
public:
//! Reset counter for the specific group.
/*!
\param group_id - group id to use.
\return nullptr of command was successful, otherwise error.
*/
virtual Error ResetLastReadMarker(std::string group_id) = 0;
virtual Error ResetLastReadMarker(std::string group_id, std::string substream) = 0;
virtual Error SetLastReadMarker(uint64_t value, std::string group_id) = 0;
virtual Error SetLastReadMarker(uint64_t value, std::string group_id, std::string substream) = 0;
//! Acknowledge data tuple for specific group id and substream.
/*!
\param group_id - group id to use.
\param id - data tuple id
\param substream (optional) - substream
\return nullptr of command was successful, otherwise error.
*/
virtual Error Acknowledge(std::string group_id, uint64_t id, std::string substream = kDefaultSubstream) = 0;
//! Negative acknowledge data tuple for specific group id and substream.
/*!
\param group_id - group id to use.
\param id - data tuple id
\param delay_sec - data tuple will be redelivered after delay, 0 to redeliver immediately
\param substream (optional) - substream
\return nullptr of command was successful, otherwise error.
*/
virtual Error NegativeAcknowledge(std::string group_id, uint64_t id, uint64_t delay_sec,
std::string substream = kDefaultSubstream) = 0;
//! Get unacknowledged tuple for specific group id and substream.
/*!
\param group_id - group id to use.
\param substream (optional) - substream
\param from_id - return tuples with ids greater or equal to from (use 0 disable limit)
\param to_id - return tuples with ids less or equal to to (use 0 to disable limit)
\param in (optional) - substream
\param err - set to nullptr of operation succeed, error otherwise.
\return vector of ids, might be empty
*/
virtual IdList GetUnacknowledgedTupleIds(std::string group_id, std::string substream, uint64_t from_id, uint64_t to_id,
Error* error) = 0;
virtual IdList GetUnacknowledgedTupleIds(std::string group_id, uint64_t from_id, uint64_t to_id, Error* error) = 0;
//! Set timeout for broker operations. Default - no timeout
virtual void SetTimeout(uint64_t timeout_ms) = 0;
//! Will disable RDMA.
//! If RDMA is disabled, not available or the first connection fails to build up, it will automatically fall back to TCP.
//! This will only have an effect if no previous connection attempted was made on this DataBroker.
virtual void ForceNoRdma() = 0;
//! Returns the current network connection type
/*!
* \return current network connection type. If no connection was made, the result is NetworkConnectionType::kUndefined
*/
virtual NetworkConnectionType CurrentConnectionType() const = 0;
//! Get list of substreams, set from to "" to get all substreams
virtual StreamInfos GetSubstreamList(std::string from, Error* err) = 0;
//! Get current number of datasets
/*!
\param err - return nullptr of operation succeed, error otherwise.
\return number of datasets.
*/
virtual uint64_t GetCurrentSize(Error* err) = 0;
virtual uint64_t GetCurrentSize(std::string substream, Error* err) = 0;
//! Generate new GroupID.
/*!
\param err - return nullptr of operation succeed, error otherwise.
\return group ID.
*/
virtual std::string GenerateNewGroupId(Error* err) = 0;
//! Get Beamtime metadata.
/*!
\param err - return nullptr of operation succeed, error otherwise.
\return beamtime metadata.
*/
virtual std::string GetBeamtimeMeta(Error* err) = 0;
//! Receive next available image.
/*!
\param info - where to store image metadata. Can be set to nullptr only image data is needed.
\param group_id - group id to use.
\param data - where to store image data. Can be set to nullptr only image metadata is needed.
\return Error if both pointers are nullptr or data cannot be read, nullptr otherwise.
*/
virtual Error GetNext(FileInfo* info, std::string group_id, FileData* data) = 0;
virtual Error GetNext(FileInfo* info, std::string group_id, std::string substream, FileData* data) = 0;
//! Retrieves image using fileinfo.
/*!
\param info - image metadata to use, can be updated after operation
\param data - where to store image data. Can be set to nullptr only image metadata is needed.
\return Error if data is nullptr or data cannot be read, nullptr otherwise.
*/
virtual Error RetrieveData(FileInfo* info, FileData* data) = 0;
//! Receive next available completed dataset.
/*!
\param err - will be set to error data cannot be read, nullptr otherwise.
\param group_id - group id to use.
\return DataSet - information about the dataset
*/
virtual DataSet GetNextDataset(std::string group_id, Error* err) = 0;
virtual DataSet GetNextDataset(std::string group_id, std::string substream, Error* err) = 0;
//! Receive last available completed dataset.
/*!
\param err - will be set to error data cannot be read, nullptr otherwise.
\param group_id - group id to use.
\return DataSet - information about the dataset
*/
virtual DataSet GetLastDataset(std::string group_id, Error* err) = 0;
virtual DataSet GetLastDataset(std::string group_id, std::string substream, Error* err) = 0;
//! Receive dataset by id.
/*!
\param id - dataset id
\param err - will be set to error data cannot be read or dataset is incomplete, nullptr otherwise.
\param group_id - group id to use.
\return DataSet - information about the dataset
*/
virtual DataSet GetDatasetById(uint64_t id, std::string group_id, Error* err) = 0;
virtual DataSet GetDatasetById(uint64_t id, std::string group_id, std::string substream, Error* err) = 0;
//! Receive single image by id.
/*!
\param id - image id
\param info - where to store image metadata. Can be set to nullptr only image data is needed.
\param data - where to store image data. Can be set to nullptr only image metadata is needed.
\return Error if both pointers are nullptr or data cannot be read, nullptr otherwise.
*/
virtual Error GetById(uint64_t id, FileInfo* info, std::string group_id, FileData* data) = 0;
virtual Error GetById(uint64_t id, FileInfo* info, std::string group_id, std::string substream, FileData* data) = 0;
//! Receive id of last acknowledged data tuple
/*!
\param group_id - group id to use.
\param substream (optional) - substream
\param err - will be set in case of error, nullptr otherwise.
\return id of the last acknowledged image, 0 if error
*/
virtual uint64_t GetLastAcknowledgedTulpeId(std::string group_id, std::string substream, Error* error) = 0;
virtual uint64_t GetLastAcknowledgedTulpeId(std::string group_id, Error* error) = 0;
//! Receive last available image.
/*!
\param info - where to store image metadata. Can be set to nullptr only image data is needed.
\param group_id - group id to use.
\param data - where to store image data. Can be set to nullptr only image metadata is needed.
\return Error if both pointers are nullptr or data cannot be read, nullptr otherwise.
*/
virtual Error GetLast(FileInfo* info, std::string group_id, FileData* data) = 0;
virtual Error GetLast(FileInfo* info, std::string group_id, std::string substream, FileData* data) = 0;
//! Get all images matching the query.
/*!
\param sql_query - query string in SQL format. Limit subset is supported
\param err - will be set in case of error, nullptr otherwise
\return vector of image metadata matchiing to specified query. Empty if nothing found or error
*/
virtual FileInfos QueryImages(std::string query, Error* err) = 0;
virtual FileInfos QueryImages(std::string query, std::string substream, Error* err) = 0;
//! Configure resending nonacknowledged data
/*!
\param resend - where to resend
\param delay_sec - how many seconds to wait before resending
\param resend_attempts - how many resend attempts to make
*/
virtual void SetResendNacs(bool resend, uint64_t delay_sec, uint64_t resend_attempts) = 0;
virtual ~DataBroker() = default; // needed for unique_ptr to delete itself
};
/*! A class to create a data broker instance. The class's only function Create is used for this */
class DataBrokerFactory {
public:
static std::unique_ptr<DataBroker> CreateServerBroker(std::string server_name, std::string source_path,
bool has_filesystem, SourceCredentials source, Error* error) noexcept;
};
}
#endif //ASAPO_DATASOURCE_H
#ifndef ASAPO_LOGGER_H
#define ASAPO_LOGGER_H
#include <memory>
#include <string>
#include "common/error.h"
#include <ostream>
namespace asapo {
enum class LogLevel {
None,
Error,
Info,
Debug,
Warning
};
class LogMessageWithFields {
public:
LogMessageWithFields(std::string key, uint64_t val);
LogMessageWithFields(std::string key, double val, int precision);
LogMessageWithFields(std::string key, std::string val);
LogMessageWithFields& Append(std::string key, uint64_t val);
LogMessageWithFields& Append(std::string key, double val, int precision);
LogMessageWithFields& Append(std::string key, std::string val);
std::string LogString() const;
private:
std::string log_string_;
};
class AbstractLogger {
public:
virtual void SetLogLevel(LogLevel level) = 0;
virtual void Info(const std::string& text) const = 0;
virtual void Error(const std::string& text) const = 0;
virtual void Debug(const std::string& text) const = 0;
virtual void Warning(const std::string& text) const = 0;
virtual void Info(const LogMessageWithFields& msg) const = 0;
virtual void Error(const LogMessageWithFields& msg) const = 0;
virtual void Debug(const LogMessageWithFields& msg) const = 0;
virtual void Warning(const LogMessageWithFields& msg) const = 0;
virtual void EnableLocalLog(bool enable) = 0;
virtual void EnableRemoteLog(bool enable) = 0;
virtual ~AbstractLogger() = default;
};
using Logger = std::unique_ptr<AbstractLogger>;
Logger CreateDefaultLoggerBin(const std::string& name);
Logger CreateDefaultLoggerApi(const std::string& name, const std::string& endpoint_uri);
LogLevel StringToLogLevel(const std::string& name, Error* err);
}
#endif //ASAPO_LOGGER_H
#ifndef ASAPO_PRODUCER_COMMON_H
#define ASAPO_PRODUCER_COMMON_H
#include <cstdint>
#include <functional>
#include "common/networking.h"
#include "common/error.h"
namespace asapo {
const uint8_t kMaxProcessingThreads = 32;
struct RequestCallbackPayload {
GenericRequestHeader original_header;
std::string response;
};
using RequestCallback = std::function<void(RequestCallbackPayload, Error)>;
enum class RequestHandlerType {
kTcp,
kFilesystem
};
struct EventHeader {
EventHeader() {};
EventHeader(uint64_t file_id_i, uint64_t file_size_i, std::string file_name_i,
std::string user_metadata_i = "",
uint64_t subset_id_i = 0,
uint64_t subset_size_i = 0 ):
file_id{file_id_i}, file_size{file_size_i},
file_name{std::move(file_name_i)},
user_metadata{std::move(user_metadata_i)},
subset_id{subset_id_i},
subset_size{subset_size_i} {};
uint64_t file_id = 0;
uint64_t file_size = 0;
std::string file_name;
std::string user_metadata;
uint64_t subset_id = 0;
uint64_t subset_size = 0;
};
}
#endif //ASAPO_PRODUCER_COMMON_H
#ifndef ASAPO_PRODUCER__PRODUCER_H
#define ASAPO_PRODUCER__PRODUCER_H
#include <memory>
#include <string>
#include "logger/logger.h"
#include "producer/common.h"
#include "common/data_structs.h"
namespace asapo {
/** @ingroup producer */
class Producer {
public:
//! Creates a new producer
/*!
* @return A unique_ptr to a new producer instance
*/
static std::unique_ptr<Producer> Create(const std::string& endpoint, uint8_t n_processing_threads,
asapo::RequestHandlerType type, SourceCredentials source_cred,
uint64_t timeout_sec,
Error* err);
virtual ~Producer() = default;
//! Get substream information from receiver
/*!
\param substream (optional) - substream
\param timeout_sec - operation timeout in seconds
\return StreamInfo - a structure with substream information
*/
virtual StreamInfo GetStreamInfo(std::string substream, uint64_t timeout_sec, Error* err) const = 0;
virtual StreamInfo GetStreamInfo(uint64_t timeout_sec, Error* err) const = 0;
//! Get substream that has the newest ingested data
/*!
\param timeout_ms - operation timeout in seconds
\return StreamInfo - a structure with substream information
*/
virtual StreamInfo GetLastSubstream(uint64_t timeout_sec, Error* err) const = 0;
//! Sends data to the receiver
/*!
\param event_header - A stucture with the meta information (file name, size, a string with user metadata (JSON format)).
\param data - A pointer to the data to send
\return Error - Will be nullptr on success
*/
virtual Error SendData(const EventHeader& event_header, FileData data, uint64_t ingest_mode,
RequestCallback callback) = 0;
//! Sends data to the receiver - same as SendData - memory should not be freed until send is finished
//! used e.g. for Python bindings
virtual Error SendData__(const EventHeader& event_header, void* data, uint64_t ingest_mode,
RequestCallback callback) = 0;
//! Sends data to the receiver
/*!
\param event_header - A stucture with the meta information (file name, size, a string with user metadata (JSON format)).
\param data - A pointer to the data to send
\return Error - Will be nullptr on success
*/
virtual Error SendData(const EventHeader& event_header, std::string substream, FileData data, uint64_t ingest_mode,
RequestCallback callback) = 0;
//! Sends data to the receiver - same as SendData - memory should not be freed until send is finished
//! used e.g. for Python bindings
virtual Error SendData__(const EventHeader& event_header, std::string substream, void* data, uint64_t ingest_mode,
RequestCallback callback) = 0;
//! Stop processing threads
//! used e.g. for Python bindings
virtual void StopThreads__() = 0;
//! Sends files to the default substream
/*!
\param event_header - A stucture with the meta information (file name, size is ignored).
\param full_path - A full path of the file to send
\return Error - Will be nullptr on success
*/
virtual Error SendFile(const EventHeader& event_header, std::string full_path, uint64_t ingest_mode,
RequestCallback callback) = 0;
//! Sends files to the substream
/*!
\param event_header - A stucture with the meta information (file name, size is ignored).
\param full_path - A full path of the file to send
\return Error - Will be nullptr on success
*/
virtual Error SendFile(const EventHeader& event_header, std::string substream, std::string full_path,
uint64_t ingest_mode,
RequestCallback callback) = 0;
//! Marks substream finished
/*!
\param substream - Name of the substream to makr finished
\param last_id - ID of the last image in substream
\param next_substream - Name of the next substream (empty if not set)
\return Error - Will be nullptr on success
*/
virtual Error SendSubstreamFinishedFlag(std::string substream, uint64_t last_id, std::string next_substream,
RequestCallback callback) = 0;
//! Sends metadata for the current beamtime to the receiver
/*!
\param metadata - a JSON string with metadata
\param callback - callback function
\return Error - will be nullptr on success
*/
virtual Error SendMetaData(const std::string& metadata, RequestCallback callback) = 0;
//! Set internal log level
virtual void SetLogLevel(LogLevel level) = 0;
//! Enables/Disables logs output to stdout
virtual void EnableLocalLog(bool enable) = 0;
//! Enables/Disables sending logs to the central server
virtual void EnableRemoteLog(bool enable) = 0;
//! Set beamtime id which producer will use to send data
virtual Error SetCredentials(SourceCredentials source_cred) = 0;
//! Set get current size of the requests queue
virtual uint64_t GetRequestsQueueSize() = 0;
//! Wait until all current requests are processed or timeout
virtual Error WaitRequestsFinished(uint64_t timeout_ms) = 0;
};
}
#endif //ASAPO_PRODUCER__PRODUCER_H
#ifndef ASAPO_PRODUCER_ERROR_H
#define ASAPO_PRODUCER_ERROR_H