Commit 1433a457 authored by Jan Kotanski's avatar Jan Kotanski
Browse files

New upstream version 20.09.0

parent c64afcac
Metadata-Version: 1.0
Name: asapo_producer
Version: 20.09.0
Summary: UNKNOWN
Home-page: UNKNOWN
Author: UNKNOWN
Author-email: UNKNOWN
License: UNKNOWN
Description: UNKNOWN
Platform: UNKNOWN
This diff is collapsed.
#ifndef ASAPO_ASAPO_WRAPPERS_H
#define ASAPO_ASAPO_WRAPPERS_H
#include <memory>
#include <functional>
namespace asapo {
inline std::string GetErrorString(asapo::Error* err) {
if (*err) {
return (*err)->Explain();
}
return "";
}
using RequestCallbackCython = void (*)(void*, void*, RequestCallbackPayload payload, Error err);
using RequestCallbackCythonMemory = void (*)(void*, void*, void*, RequestCallbackPayload payload, Error err);
RequestCallback unwrap_callback(RequestCallbackCython callback, void* c_self, void* py_func) {
if (py_func == NULL) {
return nullptr;
}
RequestCallback wrapper = [ = ](RequestCallbackPayload payload, Error err) -> void {
callback(c_self, py_func, std::move(payload), std::move(err));
};
return wrapper;
}
RequestCallback unwrap_callback_with_memory(RequestCallbackCythonMemory callback, void* c_self, void* py_func,
void* nd_array) {
RequestCallback wrapper = [ = ](RequestCallbackPayload payload, Error err) -> void {
callback(c_self, py_func, nd_array, std::move(payload), std::move(err));
};
return wrapper;
}
}
#endif //ASAPO_ASAPO_WRAPPERS_H
/** @defgroup producer The Producer Group
* This is the producer group
* @{
*/
/** @} */ // end of producer
/**
* @file asapo_producer.h
* @ingroup producer
*/
#ifndef ASAPO_ASAPO_PRODUCER_H
#define ASAPO_ASAPO_PRODUCER_H
#include "common/io_error.h"
#include "common/version.h"
#include "producer/producer.h"
#include "producer/producer_error.h"
#endif //ASAPO_ASAPO_PRODUCER_H
#ifndef ASAPO_FILE_INFO_H
#define ASAPO_FILE_INFO_H
#include <cinttypes>
#include <chrono>
#include <memory>
#include <vector>
#include <string>
#include "error.h"
namespace asapo {
std::string IsoDateFromEpochNanosecs(uint64_t time_from_epoch_nanosec);
uint64_t NanosecsEpochFromISODate(std::string date_time);
uint64_t EpochNanosecsFromNow();
class FileInfo {
public:
std::string name;
std::chrono::system_clock::time_point modify_date;
uint64_t size{0};
uint64_t id{0};
std::string source;
std::string metadata;
uint64_t buf_id{0};
std::string Json() const;
bool SetFromJson(const std::string& json_string);
std::string FullName(const std::string& base_path) const;
};
struct StreamInfo {
uint64_t last_id{0};
std::string Json() const;
bool SetFromJson(const std::string& json_string);
};
inline bool operator==(const FileInfo& lhs, const FileInfo& rhs) {
return (lhs.name == rhs.name &&
lhs.id == rhs.id &&
lhs.modify_date == rhs.modify_date &&
lhs.size == rhs.size);
}
using FileData = std::unique_ptr<uint8_t[]>;
using FileInfos = std::vector<FileInfo>;
using IdList = std::vector<uint64_t>;
struct DataSet {
uint64_t id;
FileInfos content;
bool SetFromJson(const std::string& json_string);
};
using SubDirList = std::vector<std::string>;
enum class SourceType {
kProcessed,
kRaw
};
Error GetSourceTypeFromString(std::string stype,SourceType *type);
std::string GetStringFromSourceType(SourceType type);
struct SourceCredentials {
SourceCredentials(SourceType type, std::string beamtime, std::string beamline, std::string stream, std::string token):
beamtime_id{std::move(beamtime)},
beamline{std::move(beamline)},
stream{std::move(stream)},
user_token{std::move(token)},
type{type}{};
SourceCredentials() {};
static const std::string kDefaultStream;
static const std::string kDefaultBeamline;
static const std::string kDefaultBeamtimeId;
std::string beamtime_id;
std::string beamline;
std::string stream;
std::string user_token;
SourceType type = SourceType::kProcessed;
std::string GetString() {
return (type==SourceType::kRaw?std::string("raw"):std::string("processed")) + "%"+ beamtime_id + "%" + beamline + "%" + stream + "%" + user_token;
};
};
enum IngestModeFlags : uint64_t {
kTransferData = 1 << 0,
kTransferMetaDataOnly = 1 << 1,
kStoreInFilesystem = 1 << 2,
};
const uint64_t kDefaultIngestMode = kTransferData | kStoreInFilesystem;
const std::string kDefaultSubstream = "default";
}
#endif //ASAPO_FILE_INFO_H
#ifndef ASAPO_ERROR_H
#define ASAPO_ERROR_H
#include <string>
#include <memory>
#include <utility>
#include <ostream>
namespace asapo {
enum class ErrorType {
kUnknownError = 0,
kAsapoError,
kHttpError,
kIOError,
kDBError,
kReceiverError,
kProducerError,
kConsumerError,
kMemoryAllocationError,
kEndOfFile,
kFabricError,
};
class ErrorInterface;
class ErrorTemplateInterface;
class CustomErrorData;
// nullptr == noError
// Example check:
// void TestError(Error* err) {
// if(*err) {
// [...] //An error occurred
// }
// }
using Error = std::unique_ptr<ErrorInterface>;
class ErrorInterface {
public:
virtual std::string Explain() const noexcept = 0;
virtual void Append(const std::string& value) noexcept = 0;
virtual ErrorType GetErrorType() const noexcept = 0;
virtual const CustomErrorData* GetCustomData() = 0;
virtual void SetCustomData(std::unique_ptr<CustomErrorData> data) = 0;
virtual ~ErrorInterface() = default; // needed for unique_ptr to delete itself
};
class ErrorTemplateInterface {
public:
virtual ErrorType GetErrorType() const noexcept = 0;
virtual Error Generate() const noexcept = 0;
virtual Error Generate(const std::string& suffix) const noexcept = 0;
virtual std::string Text() const noexcept = 0;
virtual inline bool operator == (const Error& rhs) const {
return rhs != nullptr &&
GetErrorType() == rhs->GetErrorType();
}
virtual inline bool operator != (const Error& rhs) const {
return !(operator==(rhs));
}
};
static inline bool operator == (const Error& lhs, const ErrorTemplateInterface& rhs) {
return rhs.operator == (lhs);
}
static inline bool operator != (const Error& lhs, const ErrorTemplateInterface& rhs) {
return rhs.operator != (lhs);
}
static inline std::ostream& operator<<(std::ostream& os, const Error& err) {
if(err) {
os << err->Explain();
} else {
static std::string no_error = "No error";
os << no_error;
}
return os;
}
class CustomErrorData {
public:
virtual ~CustomErrorData() = default;
};
class SimpleError: public ErrorInterface {
private:
std::string error_;
std::unique_ptr<CustomErrorData> custom_data_;
ErrorType error_type_ = ErrorType::kAsapoError;
public:
explicit SimpleError(std::string error): error_{std::move(error)} {
}
SimpleError(std::string error, ErrorType error_type ): error_{std::move(error)}, error_type_{error_type} {
}
const CustomErrorData* GetCustomData() override {
if (custom_data_) {
return custom_data_.get();
} else {
return nullptr;
}
};
void SetCustomData(std::unique_ptr<CustomErrorData> data) override {
custom_data_ = std::move(data);
}
void Append(const std::string& value) noexcept override {
error_ += ": " + value;
}
std::string Explain() const noexcept override {
return error_;
}
ErrorType GetErrorType() const noexcept override {
return error_type_;
}
};
/*
* IMPORTANT:
* Never use the same ErrorType for two different errors,
* otherwise the == operator might not work as expected!
*/
class SimpleErrorTemplate : public ErrorTemplateInterface {
protected:
std::string error_;
ErrorType error_type_ = ErrorType::kAsapoError;
public:
explicit SimpleErrorTemplate(std::string error): error_{std::move(error)} {
}
virtual std::string Text() const noexcept override {
return error_;
}
SimpleErrorTemplate(std::string error, ErrorType error_type ): error_{std::move(error)}, error_type_{error_type} {
}
inline ErrorType GetErrorType() const noexcept override {
return error_type_;
}
inline Error Generate() const noexcept override {
return Error(new SimpleError{error_, error_type_});
}
inline Error Generate(const std::string& suffix) const noexcept override {
return Error(new SimpleError{error_ + " :" + suffix, error_type_});
}
};
static inline std::ostream& operator<<(std::ostream& os, const SimpleErrorTemplate& err) {
return os << err.Text();
}
inline Error TextError(const std::string& error) {
return Error{new SimpleError{error}};
}
inline Error TextErrorWithType(const std::string& error, ErrorType error_type) {
return Error{new SimpleError{error, error_type}};
}
namespace ErrorTemplates {
auto const kMemoryAllocationError = SimpleErrorTemplate {
"kMemoryAllocationError", ErrorType::kMemoryAllocationError
};
auto const kEndOfFile = SimpleErrorTemplate {
"End of file", ErrorType::kEndOfFile
};
}
template <typename ServiceErrorType, ErrorType MainErrorType>
class ServiceError : public SimpleError {
private:
ServiceErrorType error_type_;
public:
ServiceError(const std::string& error, ServiceErrorType error_type) : SimpleError(error, MainErrorType) {
error_type_ = error_type;
}
ServiceErrorType GetServiceErrorType() const noexcept {
return error_type_;
}
};
template <typename ServiceErrorType, ErrorType MainErrorType>
class ServiceErrorTemplate : public SimpleErrorTemplate {
protected:
ServiceErrorType error_type_;
public:
ServiceErrorTemplate(const std::string& error, ServiceErrorType error_type) : SimpleErrorTemplate(error,
MainErrorType) {
error_type_ = error_type;
}
inline ServiceErrorType GetServiceErrorType() const noexcept {
return error_type_;
}
inline Error Generate() const noexcept override {
auto err = new ServiceError<ServiceErrorType, MainErrorType>(error_, error_type_);
return Error(err);
}
inline Error Generate(const std::string& suffix) const noexcept override {
return Error(new ServiceError<ServiceErrorType, MainErrorType>(error_ + ": " + suffix, error_type_));
}
inline bool operator==(const Error& rhs) const override {
return SimpleErrorTemplate::operator==(rhs)
&& GetServiceErrorType() == ((ServiceError<ServiceErrorType, MainErrorType>*) rhs.get())->GetServiceErrorType();
}
};
}
#endif //ASAPO_ERROR_H
#ifndef ASAPO_SYSTEM__IO_ERROR_H
#define ASAPO_SYSTEM__IO_ERROR_H
#include "common/error.h"
namespace asapo {
enum class IOErrorType {
kUnknownIOError,
kBadFileNumber,
kResourceTemporarilyUnavailable,
kFileNotFound,
kReadError,
kPermissionDenied,
kUnsupportedAddressFamily,
kInvalidAddressFormat,
kAddressAlreadyInUse,
kConnectionRefused,
kConnectionResetByPeer,
kTimeout,
kFileAlreadyExists,
kNoSpaceLeft,
kSocketOperationOnNonSocket,
kInvalidMemoryAddress,
kUnableToResolveHostname,
kSocketOperationUnknownAtLevel,
kSocketOperationValueOutOfBound,
kAddressNotValid,
kBrokenPipe
};
using IOError = ServiceError<IOErrorType, ErrorType::kIOError>;
using IOErrorTemplate = ServiceErrorTemplate<IOErrorType, ErrorType::kIOError>;
namespace IOErrorTemplates {
auto const kUnknownIOError = IOErrorTemplate {
"Unknown Error", IOErrorType::kUnknownIOError
};
auto const kFileNotFound = IOErrorTemplate {
"No such file or directory", IOErrorType::kFileNotFound
};
auto const kReadError = IOErrorTemplate {
"Read error", IOErrorType::kReadError
};
auto const kBadFileNumber = IOErrorTemplate {
"Bad file number", IOErrorType::kBadFileNumber
};
auto const kResourceTemporarilyUnavailable = IOErrorTemplate {
"Resource temporarily unavailable", IOErrorType::kResourceTemporarilyUnavailable
};
auto const kPermissionDenied = IOErrorTemplate {
"Permission denied", IOErrorType::kPermissionDenied
};
auto const kUnsupportedAddressFamily = IOErrorTemplate {
"Unsupported address family", IOErrorType::kUnsupportedAddressFamily
};
auto const kInvalidAddressFormat = IOErrorTemplate {
"Invalid address format", IOErrorType::kInvalidAddressFormat
};
auto const kAddressAlreadyInUse = IOErrorTemplate {
"Address already in use", IOErrorType::kAddressAlreadyInUse
};
auto const kConnectionRefused = IOErrorTemplate {
"Connection refused", IOErrorType::kConnectionRefused
};
auto const kConnectionResetByPeer = IOErrorTemplate {
"kConnectionResetByPeer", IOErrorType::kConnectionResetByPeer
};
auto const kTimeout = IOErrorTemplate {
"timeout", IOErrorType::kTimeout
};
auto const kFileAlreadyExists = IOErrorTemplate {
"kFileAlreadyExists", IOErrorType::kFileAlreadyExists
};
auto const kNoSpaceLeft = IOErrorTemplate {
"kNoSpaceLeft", IOErrorType::kNoSpaceLeft
};
auto const kSocketOperationOnNonSocket = IOErrorTemplate {
"kSocketOperationOnNonSocket", IOErrorType::kSocketOperationOnNonSocket
};
auto const kInvalidMemoryAddress = IOErrorTemplate {
"kInvalidMemoryAddress", IOErrorType::kInvalidMemoryAddress
};
auto const kUnableToResolveHostname = IOErrorTemplate {
"kUnableToResolveHostname", IOErrorType::kUnableToResolveHostname
};
auto const kSocketOperationUnknownAtLevel = IOErrorTemplate {
"kSocketOperationUnknownAtLevel", IOErrorType::kSocketOperationUnknownAtLevel
};
auto const kSocketOperationValueOutOfBound = IOErrorTemplate {
"kSocketOperationValueOutOfBound", IOErrorType::kSocketOperationValueOutOfBound
};
auto const kAddressNotValid = IOErrorTemplate {
"Address not valid", IOErrorType::kAddressNotValid
};
auto const kBrokenPipe = IOErrorTemplate {
"Broken pipe/connection", IOErrorType::kBrokenPipe
};
}
}
#endif //ASAPO_SYSTEM__IO_ERROR_H
#ifndef ASAPO_COMMON__NETWORKING_H
#define ASAPO_COMMON__NETWORKING_H
#include <cstdint>
#include <algorithm>
#include <string>
#include <cstring>
#include "data_structs.h"
namespace asapo {
typedef uint64_t NetworkRequestId;
enum class NetworkConnectionType : uint32_t {
kUndefined,
kAsapoTcp, // ASAPOs TCP (Multiple connections for parallel data transfers)
kFabric, // Fabric connection (Primarily used for InfiniBand verbs)
};
enum Opcode : uint8_t {
kOpcodeUnknownOp = 1,
kOpcodeTransferData,
kOpcodeTransferSubsetData,
kOpcodeStreamInfo,
kOpcodeGetBufferData,
kOpcodeAuthorize,
kOpcodeTransferMetaData,
kOpcodeCount,
};
enum NetworkErrorCode : uint16_t {
kNetErrorNoError,
kNetErrorReauthorize,
kNetErrorWarning,
kNetErrorWrongRequest,
kNetErrorNoData,
kNetAuthorizationError,
kNetErrorInternalServerError = 65535,
};
//TODO need to use an serialization framework to ensure struct consistency on different computers
const std::size_t kMaxMessageSize = 1024;
const std::size_t kNCustomParams = 3;
using CustomRequestData = uint64_t[kNCustomParams];
const std::size_t kPosIngestMode = 0;
const std::size_t kPosDataSetId = 1;
const std::size_t kPosDataSetSize = 2;
struct GenericRequestHeader {
GenericRequestHeader(const GenericRequestHeader& header) {
op_code = header.op_code, data_id = header.data_id, data_size = header.data_size, meta_size = header.meta_size,
memcpy(custom_data, header.custom_data, kNCustomParams * sizeof(uint64_t)),
memcpy(message, header.message, kMaxMessageSize);
strncpy(substream, header.substream, kMaxMessageSize);
}
/* Keep in mind that the message here is just strncpy'ed, you can change the message later */
GenericRequestHeader(Opcode i_op_code = kOpcodeUnknownOp, uint64_t i_data_id = 0,
uint64_t i_data_size = 0, uint64_t i_meta_size = 0, const std::string& i_message = "",
const std::string& i_substream = ""):
op_code{i_op_code}, data_id{i_data_id}, data_size{i_data_size}, meta_size{i_meta_size} {
strncpy(message, i_message.c_str(), kMaxMessageSize);
strncpy(substream, i_substream.c_str(), kMaxMessageSize);
}
Opcode op_code;
uint64_t data_id;
uint64_t data_size;
uint64_t meta_size;
CustomRequestData custom_data;
char message[kMaxMessageSize]; /* Can also be a binary message (e.g. MemoryRegionDetails) */
char substream[kMaxMessageSize]; /* Must be a string (strcpy is used) */
std::string Json() {
std::string s = "{\"id\":" + std::to_string(data_id) + ","
"\"buffer\":\"" + std::string(message) + "\"" + ","
"\"substream\":\"" + std::string(substream) + "\""
+ "}";
return s;
};
};
struct GenericNetworkResponse {
Opcode op_code;
NetworkErrorCode error_code;
char message[kMaxMessageSize];
};