Skip to content
Snippets Groups Projects
Commit baebafc2 authored by Sergey Yakubov's avatar Sergey Yakubov
Browse files

rename broker -> consumer in APIs

parent b2ecb719
Branches
Tags
No related merge requests found
Showing
with 462 additions and 426 deletions
......@@ -16,9 +16,10 @@ BREAKING CHANGES
* Consumer API - remove group_id argument from get_last/get_by_id/get_last_dataset/get_dataset_by_id functions
* Producer API - changed meaning of subsets (subset_id replaced with id_in_subset and this means now id of the image within a subset (e.g. module number for multi-module detector)), file_id is now a global id of a multi-set data (i.g. multi-image id)
#### renaming - Producer API
* stream -> data_source, stream -> stream
* stream -> data_source, substream -> stream
#### renaming - Consumer API
*
* stream -> data_source, substream -> stream
* broker -> consumer
BUG FIXES
* fix memory leak bug in Python consumer library (lead to problems when creating many consumer instances)
......
set(TARGET_NAME asapo-consumer)
set(SOURCE_FILES
src/data_broker.cpp
src/server_data_broker.cpp
src/tcp_client.cpp
src/consumer.cpp
src/consumer_impl.cpp
src/tcp_consumer_client.cpp
src/tcp_connection_pool.cpp
src/fabric_consumer_client.cpp)
......@@ -31,8 +31,8 @@ target_link_libraries(${TARGET_NAME} ${CURL_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT}
################################
set(TEST_SOURCE_FILES
unittests/test_consumer_api.cpp
unittests/test_server_broker.cpp
unittests/test_tcp_client.cpp
unittests/test_consumer_impl.cpp
unittests/test_tcp_consumer_client.cpp
unittests/test_tcp_connection_pool.cpp
unittests/test_fabric_consumer_client.cpp
unittests/test_rds_error_mapper.cpp
......
/** @defgroup consumer The Consumer Group
* This is the consumer group
* @{
*/
#ifndef ASAPO_ASAPO_CONSUMER_H
#define ASAPO_ASAPO_CONSUMER_H
#include "asapo/consumer/data_broker.h"
#include "asapo/consumer/consumer.h"
#include "asapo/consumer/consumer_error.h"
#include "asapo/common/version.h"
#include <ostream>
#endif //ASAPO_ASAPO_CONSUMER_H
/** @} */ // end of consumer
#endif //ASAPO_ASAPO_CONSUMER_H
......@@ -12,7 +12,7 @@
namespace asapo {
class DataBroker {
class Consumer {
public:
//! Reset counter for the specific group.
/*!
......@@ -60,12 +60,12 @@ class DataBroker {
Error* error) = 0;
virtual IdList GetUnacknowledgedTupleIds(std::string group_id, uint64_t from_id, uint64_t to_id, Error* error) = 0;
//! Set timeout for broker operations. Default - no timeout
//! Set timeout for consumer operations. Default - no timeout
virtual void SetTimeout(uint64_t timeout_ms) = 0;
//! Will disable RDMA.
//! If RDMA is disabled, not available or the first connection fails to build up, it will automatically fall back to TCP.
//! This will only have an effect if no previous connection attempted was made on this DataBroker.
//! This will only have an effect if no previous connection attempted was made on this Consumer.
virtual void ForceNoRdma() = 0;
//! Returns the current network connection type
......@@ -199,14 +199,14 @@ class DataBroker {
//! Will try to interrupt current long runnung operations (mainly needed to exit waiting loop in C from Python)
virtual void InterruptCurrentOperation() = 0;
virtual ~DataBroker() = default; // needed for unique_ptr to delete itself
virtual ~Consumer() = default; // needed for unique_ptr to delete itself
};
/*! A class to create a data broker instance. The class's only function Create is used for this */
class DataBrokerFactory {
/*! A class to create consumer instance. The class's only function Create is used for this */
class ConsumerFactory {
public:
static std::unique_ptr<DataBroker> CreateServerBroker(std::string server_name, std::string source_path,
bool has_filesystem, SourceCredentials source, Error* error) noexcept;
static std::unique_ptr<Consumer> CreateConsumer(std::string server_name, std::string source_path,
bool has_filesystem, SourceCredentials source, Error* error) noexcept;
};
......
......@@ -64,7 +64,7 @@ auto const kWrongInput = ConsumerErrorTemplate {
};
auto const kInterruptedTransaction = ConsumerErrorTemplate {
"error from broker server", ConsumerErrorType::kInterruptedTransaction
"server error", ConsumerErrorType::kInterruptedTransaction
};
auto const kUnavailableService = ConsumerErrorTemplate {
......
#include "asapo/common/networking.h"
#include "asapo/consumer/data_broker.h"
#include "server_data_broker.h"
#include "asapo/consumer/consumer.h"
#include "consumer_impl.h"
#include "asapo/consumer/consumer_error.h"
namespace asapo {
template <typename Broker, typename ...Args>
std::unique_ptr<DataBroker> Create(const std::string& source_name,
Error* error,
Args&& ... args) noexcept {
template <typename C, typename ...Args>
std::unique_ptr<Consumer> Create(const std::string& source_name,
Error* error,
Args&& ... args) noexcept {
if (source_name.empty()) {
*error = ConsumerErrorTemplates::kWrongInput.Generate("Empty Data Source");
return nullptr;
}
std::unique_ptr<DataBroker> p = nullptr;
std::unique_ptr<Consumer> p = nullptr;
try {
p.reset(new Broker(source_name, std::forward<Args>(args)...));
p.reset(new C(source_name, std::forward<Args>(args)...));
error->reset(nullptr);
} catch (...) { // we do not test this part
error->reset(new SimpleError("Memory error"));
......@@ -26,10 +26,10 @@ std::unique_ptr<DataBroker> Create(const std::string& source_name,
}
std::unique_ptr<DataBroker> DataBrokerFactory::CreateServerBroker(std::string server_name, std::string source_path,
bool has_filesystem, SourceCredentials source, Error* error) noexcept {
return Create<ServerDataBroker>(std::move(server_name), error, std::move(source_path), has_filesystem,
std::move(source));
std::unique_ptr<Consumer> ConsumerFactory::CreateConsumer(std::string server_name, std::string source_path,
bool has_filesystem, SourceCredentials source, Error* error) noexcept {
return Create<ConsumerImpl>(std::move(server_name), error, std::move(source_path), has_filesystem,
std::move(source));
}
......
#ifndef ASAPO_SERVER_DATA_BROKER_H
#define ASAPO_SERVER_DATA_BROKER_H
#ifndef ASAPO_CONSUMER_IMPL_H
#define ASAPO_CONSUMER_IMPL_H
#include "asapo/common/networking.h"
#include <mutex>
#include <atomic>
#include "asapo/consumer/data_broker.h"
#include "asapo/consumer/consumer.h"
#include "asapo/io/io.h"
#include "asapo/http_client/http_client.h"
#include "net_client.h"
......@@ -51,10 +51,10 @@ Error ConsumerErrorFromNoDataResponse(const std::string& response);
Error ConsumerErrorFromPartialDataResponse(const std::string& response);
DataSet DecodeDatasetFromResponse(std::string response, Error* err);
class ServerDataBroker final : public asapo::DataBroker {
class ConsumerImpl final : public asapo::Consumer {
public:
explicit ServerDataBroker(std::string server_uri, std::string source_path, bool has_filesystem,
SourceCredentials source);
explicit ConsumerImpl(std::string server_uri, std::string source_path, bool has_filesystem,
SourceCredentials source);
Error Acknowledge(std::string group_id, uint64_t id, std::string stream = kDefaultStream) override;
Error NegativeAcknowledge(std::string group_id, uint64_t id, uint64_t delay_sec,
......@@ -174,4 +174,4 @@ class ServerDataBroker final : public asapo::DataBroker {
};
}
#endif //ASAPO_SERVER_DATA_BROKER_H
#endif //ASAPO_CONSUMER_IMPL_H
#include "tcp_client.h"
#include "tcp_consumer_client.h"
#include "asapo/io/io_factory.h"
#include "asapo/common/networking.h"
#include "rds_response_error.h"
namespace asapo {
TcpClient::TcpClient() : io__{GenerateDefaultIO()}, connection_pool__{new TcpConnectionPool()} {
TcpConsumerClient::TcpConsumerClient() : io__{GenerateDefaultIO()}, connection_pool__{new TcpConnectionPool()} {
}
Error TcpClient::SendGetDataRequest(SocketDescriptor sd, const FileInfo* info) const noexcept {
Error TcpConsumerClient::SendGetDataRequest(SocketDescriptor sd, const FileInfo* info) const noexcept {
Error err;
GenericRequestHeader request_header{kOpcodeGetBufferData, info->buf_id, info->size};
io__->Send(sd, &request_header, sizeof(request_header), &err);
......@@ -21,7 +21,7 @@ Error TcpClient::SendGetDataRequest(SocketDescriptor sd, const FileInfo* info) c
return err;
}
Error TcpClient::ReconnectAndResendGetDataRequest(SocketDescriptor* sd, const FileInfo* info) const noexcept {
Error TcpConsumerClient::ReconnectAndResendGetDataRequest(SocketDescriptor* sd, const FileInfo* info) const noexcept {
Error err;
*sd = connection_pool__->Reconnect(*sd, &err);
if (err) {
......@@ -31,7 +31,7 @@ Error TcpClient::ReconnectAndResendGetDataRequest(SocketDescriptor* sd, const Fi
}
}
Error TcpClient::ReceiveResponce(SocketDescriptor sd) const noexcept {
Error TcpConsumerClient::ReceiveResponce(SocketDescriptor sd) const noexcept {
Error err;
GenericNetworkResponse response;
......@@ -55,7 +55,7 @@ Error TcpClient::ReceiveResponce(SocketDescriptor sd) const noexcept {
return nullptr;
}
Error TcpClient::QueryCacheHasData(SocketDescriptor* sd, const FileInfo* info, bool try_reconnect) const noexcept {
Error TcpConsumerClient::QueryCacheHasData(SocketDescriptor* sd, const FileInfo* info, bool try_reconnect) const noexcept {
Error err;
err = SendGetDataRequest(*sd, info);
if (err && try_reconnect) {
......@@ -68,7 +68,7 @@ Error TcpClient::QueryCacheHasData(SocketDescriptor* sd, const FileInfo* info, b
return ReceiveResponce(*sd);
}
Error TcpClient::ReceiveData(SocketDescriptor sd, const FileInfo* info, FileData* data) const noexcept {
Error TcpConsumerClient::ReceiveData(SocketDescriptor sd, const FileInfo* info, FileData* data) const noexcept {
Error err;
uint8_t* data_array = nullptr;
try {
......@@ -88,7 +88,7 @@ Error TcpClient::ReceiveData(SocketDescriptor sd, const FileInfo* info, FileData
return err;
}
Error TcpClient::GetData(const FileInfo* info, FileData* data) {
Error TcpConsumerClient::GetData(const FileInfo* info, FileData* data) {
Error err;
bool reused;
auto sd = connection_pool__->GetFreeConnection(info->source, &reused, &err);
......
......@@ -7,9 +7,9 @@
namespace asapo {
class TcpClient : public NetClient {
class TcpConsumerClient : public NetClient {
public:
explicit TcpClient();
explicit TcpConsumerClient();
Error GetData(const FileInfo* info, FileData* data) override;
std::unique_ptr<IO> io__;
std::unique_ptr<TcpConnectionPool> connection_pool__;
......
#include <gmock/gmock.h>
#include "asapo/consumer/data_broker.h"
#include "../src/server_data_broker.h"
#include "asapo/consumer/consumer.h"
#include "../src/consumer_impl.h"
#include "asapo/common/error.h"
using asapo::DataBrokerFactory;
using asapo::DataBroker;
using asapo::ServerDataBroker;
using asapo::ConsumerFactory;
using asapo::Consumer;
using asapo::ConsumerImpl;
using asapo::Error;
using ::testing::Eq;
......@@ -16,7 +16,7 @@ using ::testing::Test;
namespace {
class DataBrokerFactoryTests : public Test {
class ConsumerFactoryTests : public Test {
public:
Error error;
void SetUp() override {
......@@ -25,12 +25,17 @@ class DataBrokerFactoryTests : public Test {
};
TEST_F(DataBrokerFactoryTests, CreateServerDataSource) {
TEST_F(ConsumerFactoryTests, CreateServerDataSource) {
auto data_broker = DataBrokerFactory::CreateServerBroker("server", "path", false, asapo::SourceCredentials{asapo::SourceType::kProcessed,"beamtime_id", "", "", "token"}, &error);
auto consumer = ConsumerFactory::CreateConsumer("server",
"path",
false,
asapo::SourceCredentials{asapo::SourceType::kProcessed,
"beamtime_id", "", "", "token"},
&error);
ASSERT_THAT(error, Eq(nullptr));
ASSERT_THAT(dynamic_cast<ServerDataBroker*>(data_broker.get()), Ne(nullptr));
ASSERT_THAT(dynamic_cast<ConsumerImpl*>(consumer.get()), Ne(nullptr));
}
......
......@@ -4,7 +4,7 @@
#include "asapo/io/io.h"
#include "asapo/unittests/MockIO.h"
#include "mocking.h"
#include "../src/tcp_client.h"
#include "../src/tcp_consumer_client.h"
#include "../../../../common/cpp/src/system_io/system_io.h"
#include "asapo/common/networking.h"
......@@ -13,7 +13,7 @@ using asapo::FileInfo;
using asapo::FileData;
using asapo::MockIO;
using asapo::SimpleError;
using asapo::TcpClient;
using asapo::TcpConsumerClient;
using asapo::MockTCPConnectionPool;
......@@ -34,7 +34,7 @@ using ::testing::DoAll;
namespace {
TEST(TcpClient, Constructor) {
auto client = std::unique_ptr<TcpClient> {new TcpClient()};
auto client = std::unique_ptr<TcpConsumerClient> {new TcpConsumerClient()};
ASSERT_THAT(dynamic_cast<asapo::SystemIO*>(client->io__.get()), Ne(nullptr));
ASSERT_THAT(dynamic_cast<asapo::TcpConnectionPool*>(client->connection_pool__.get()), Ne(nullptr));
}
......@@ -55,7 +55,7 @@ ACTION_P(A_WriteSendDataResponse, error_code) {
class TcpClientTests : public Test {
public:
std::unique_ptr<TcpClient> client = std::unique_ptr<TcpClient> {new TcpClient()};
std::unique_ptr<TcpConsumerClient> client = std::unique_ptr<TcpConsumerClient> {new TcpConsumerClient()};
NiceMock<MockIO> mock_io;
NiceMock<MockTCPConnectionPool> mock_connection_pool;
FileInfo info;
......
......@@ -57,8 +57,8 @@ cdef extern from "asapo/asapo_consumer.h" namespace "asapo":
NetworkConnectionType NetworkConnectionType_kFabric "asapo::NetworkConnectionType::kFabric"
cdef extern from "asapo/asapo_consumer.h" namespace "asapo" nogil:
cdef cppclass DataBroker:
DataBroker() except +
cdef cppclass Consumer:
Consumer() except +
void SetTimeout(uint64_t timeout_ms)
void ForceNoRdma()
NetworkConnectionType CurrentConnectionType()
......@@ -84,9 +84,9 @@ cdef extern from "asapo/asapo_consumer.h" namespace "asapo" nogil:
void InterruptCurrentOperation()
cdef extern from "asapo/asapo_consumer.h" namespace "asapo" nogil:
cdef cppclass DataBrokerFactory:
DataBrokerFactory() except +
unique_ptr[DataBroker] CreateServerBroker(string server_name,string source_path,bool has_filesystem,SourceCredentials source,Error* error)
cdef cppclass ConsumerFactory:
ConsumerFactory() except +
unique_ptr[Consumer] CreateConsumer(string server_name,string source_path,bool has_filesystem,SourceCredentials source,Error* error)
cdef extern from "asapo/asapo_consumer.h" namespace "asapo":
......
......@@ -102,8 +102,8 @@ cdef throw_exception(Error& err, res = None):
else:
raise AsapoConsumerError(error_string)
cdef class PyDataBroker:
cdef unique_ptr[DataBroker] c_broker
cdef class PyConsumer:
cdef unique_ptr[Consumer] c_consumer
def _op(self, op, group_id, stream, meta_only, uint64_t id):
cdef FileInfo info
cdef string b_group_id = _bytes(group_id)
......@@ -114,13 +114,13 @@ cdef class PyDataBroker:
cdef np.npy_intp dims[1]
if op == "next":
with nogil:
err = self.c_broker.get().GetNext(&info, b_group_id,b_stream, p_data)
err = self.c_consumer.get().GetNext(&info, b_group_id,b_stream, p_data)
elif op == "last":
with nogil:
err = self.c_broker.get().GetLast(&info, b_stream, p_data)
err = self.c_consumer.get().GetLast(&info, b_stream, p_data)
elif op == "id":
with nogil:
err = self.c_broker.get().GetById(id, &info, b_stream, p_data)
err = self.c_consumer.get().GetById(id, &info, b_stream, p_data)
if err:
throw_exception(err)
info_str = _str(info.Json())
......@@ -146,7 +146,7 @@ cdef class PyDataBroker:
cdef Error err
cdef FileData data
with nogil:
err = self.c_broker.get().RetrieveData(&info, &data)
err = self.c_consumer.get().RetrieveData(&info, &data)
if err:
throw_exception(err)
cdef np.npy_intp dims[1]
......@@ -162,17 +162,17 @@ cdef class PyDataBroker:
cdef uint64_t size
cdef string b_stream = _bytes(stream)
with nogil:
size = self.c_broker.get().GetCurrentSize(b_stream,&err)
size = self.c_consumer.get().GetCurrentSize(b_stream,&err)
err_str = _str(GetErrorString(&err))
if err:
throw_exception(err)
return size
def set_timeout(self,timeout):
self.c_broker.get().SetTimeout(timeout)
self.c_consumer.get().SetTimeout(timeout)
def force_no_rdma(self):
self.c_broker.get().ForceNoRdma()
self.c_consumer.get().ForceNoRdma()
def current_connection_type(self):
cdef NetworkConnectionType connection_type = self.c_broker.get().CurrentConnectionType()
cdef NetworkConnectionType connection_type = self.c_consumer.get().CurrentConnectionType()
cdef int cased = <int>connection_type
cdef string result = "Unknown"
if cased == <int>NetworkConnectionType_kUndefined:
......@@ -188,7 +188,7 @@ cdef class PyDataBroker:
cdef Error err
cdef uint64_t id = value
with nogil:
err = self.c_broker.get().SetLastReadMarker(id,b_group_id,b_stream)
err = self.c_consumer.get().SetLastReadMarker(id,b_group_id,b_stream)
if err:
throw_exception(err)
return
......@@ -197,7 +197,7 @@ cdef class PyDataBroker:
cdef string b_stream = _bytes(stream)
cdef Error err
with nogil:
err = self.c_broker.get().ResetLastReadMarker(b_group_id,b_stream)
err = self.c_consumer.get().ResetLastReadMarker(b_group_id,b_stream)
if err:
throw_exception(err)
return
......@@ -205,7 +205,7 @@ cdef class PyDataBroker:
cdef Error err
cdef string group_id
with nogil:
group_id = self.c_broker.get().GenerateNewGroupId(&err)
group_id = self.c_consumer.get().GenerateNewGroupId(&err)
if err:
throw_exception(err)
return _str(group_id)
......@@ -214,7 +214,7 @@ cdef class PyDataBroker:
cdef vector[StreamInfo] streams
cdef string b_from_stream = _bytes(from_stream)
with nogil:
streams = self.c_broker.get().GetStreamList(b_from_stream,&err)
streams = self.c_consumer.get().GetStreamList(b_from_stream,&err)
if err:
throw_exception(err)
list = []
......@@ -226,7 +226,7 @@ cdef class PyDataBroker:
cdef string b_stream = _bytes(stream)
cdef Error err
with nogil:
err = self.c_broker.get().Acknowledge(b_group_id,id,b_stream)
err = self.c_consumer.get().Acknowledge(b_group_id,id,b_stream)
if err:
throw_exception(err)
def neg_acknowledge(self, group_id, uint64_t id, uint64_t delay_sec, stream = "default"):
......@@ -234,12 +234,12 @@ cdef class PyDataBroker:
cdef string b_stream = _bytes(stream)
cdef Error err
with nogil:
err = self.c_broker.get().NegativeAcknowledge(b_group_id,id,delay_sec,b_stream)
err = self.c_consumer.get().NegativeAcknowledge(b_group_id,id,delay_sec,b_stream)
if err:
throw_exception(err)
def set_resend_nacs(self,bool resend, uint64_t delay_sec, uint64_t resend_attempts):
with nogil:
self.c_broker.get().SetResendNacs(resend,delay_sec,resend_attempts)
self.c_consumer.get().SetResendNacs(resend,delay_sec,resend_attempts)
def get_last_acknowledged_tuple_id(self, group_id, stream = "default"):
cdef string b_group_id = _bytes(group_id)
......@@ -247,7 +247,7 @@ cdef class PyDataBroker:
cdef Error err
cdef uint64_t id
with nogil:
id = self.c_broker.get().GetLastAcknowledgedTulpeId(b_group_id,b_stream,&err)
id = self.c_consumer.get().GetLastAcknowledgedTulpeId(b_group_id,b_stream,&err)
if err:
throw_exception(err)
return id
......@@ -258,7 +258,7 @@ cdef class PyDataBroker:
cdef string b_stream = _bytes(stream)
cdef IdList ids
with nogil:
ids = self.c_broker.get().GetUnacknowledgedTupleIds(b_group_id, b_stream, from_id, to_id, &err)
ids = self.c_consumer.get().GetUnacknowledgedTupleIds(b_group_id, b_stream, from_id, to_id, &err)
if err:
throw_exception(err)
list = []
......@@ -272,7 +272,7 @@ cdef class PyDataBroker:
cdef Error err
cdef FileInfos file_infos
with nogil:
file_infos = self.c_broker.get().QueryImages(b_query,b_stream,&err)
file_infos = self.c_consumer.get().QueryImages(b_query,b_stream,&err)
if err:
throw_exception(err)
json_list = []
......@@ -287,13 +287,13 @@ cdef class PyDataBroker:
cdef Error err
if op == "next":
with nogil:
dataset = self.c_broker.get().GetNextDataset(b_group_id,b_stream, min_size, &err)
dataset = self.c_consumer.get().GetNextDataset(b_group_id,b_stream, min_size, &err)
elif op == "last":
with nogil:
dataset = self.c_broker.get().GetLastDataset(b_stream, min_size, &err)
dataset = self.c_consumer.get().GetLastDataset(b_stream, min_size, &err)
elif op == "id":
with nogil:
dataset = self.c_broker.get().GetDatasetById(id, b_stream, min_size, &err)
dataset = self.c_consumer.get().GetDatasetById(id, b_stream, min_size, &err)
json_list = []
for fi in dataset.content:
json_list.append(json.loads(_str(fi.Json())))
......@@ -311,20 +311,20 @@ cdef class PyDataBroker:
cdef Error err
cdef string meta_str
with nogil:
meta_str = self.c_broker.get().GetBeamtimeMeta(&err)
meta_str = self.c_consumer.get().GetBeamtimeMeta(&err)
if err:
throw_exception(err)
meta = json.loads(_str(meta_str))
del meta['_id']
return meta
def interrupt_current_operation(self):
self.c_broker.get().InterruptCurrentOperation()
cdef class __PyDataBrokerFactory:
cdef DataBrokerFactory c_factory
self.c_consumer.get().InterruptCurrentOperation()
cdef class __PyConsumerFactory:
cdef ConsumerFactory c_factory
def __cinit__(self):
with nogil:
self.c_factory = DataBrokerFactory()
def create_server_broker(self,server_name,source_path,has_filesystem,beamtime_id,data_source,token,timeout):
self.c_factory = ConsumerFactory()
def create_consumer(self,server_name,source_path,has_filesystem,beamtime_id,data_source,token,timeout):
cdef string b_server_name = _bytes(server_name)
cdef string b_source_path = _bytes(source_path)
cdef bool b_has_filesystem = has_filesystem
......@@ -333,15 +333,15 @@ cdef class __PyDataBrokerFactory:
source.user_token = _bytes(token)
source.data_source = _bytes(data_source)
cdef Error err
broker = PyDataBroker()
consumer = PyConsumer()
with nogil:
broker.c_broker = self.c_factory.CreateServerBroker(b_server_name,b_source_path,b_has_filesystem,source,&err)
consumer.c_consumer = self.c_factory.CreateConsumer(b_server_name,b_source_path,b_has_filesystem,source,&err)
if err:
throw_exception(err)
broker.c_broker.get().SetTimeout(timeout)
return broker
consumer.c_consumer.get().SetTimeout(timeout)
return consumer
def create_server_broker(server_name,source_path,has_filesystem,beamtime_id,data_source,token,timeout_ms):
def create_consumer(server_name,source_path,has_filesystem,beamtime_id,data_source,token,timeout_ms):
"""
:param server_name: Server endpoint (hostname:port)
:type server_name: string
......@@ -349,11 +349,11 @@ def create_server_broker(server_name,source_path,has_filesystem,beamtime_id,data
:type source_path: string
:param has_filesystem: True if the source_path is accessible locally, otherwise will use file transfer service to get data
:type has_filesystem: bool
:return: Broker object and error. (None,err) if case of error, (broker, None) if success
:rtype: Tuple with broker object and error.
:return: consumer object and error. (None,err) if case of error, (consumer, None) if success
:rtype: Tuple with consumer object and error.
"""
factory = __PyDataBrokerFactory()
return factory.create_server_broker(server_name,source_path,has_filesystem, beamtime_id,data_source,token,timeout_ms)
factory = __PyConsumerFactory()
return factory.create_consumer(server_name,source_path,has_filesystem, beamtime_id,data_source,token,timeout_ms)
__version__ = "@PYTHON_ASAPO_VERSION@@ASAPO_VERSION_COMMIT@"
find_package(Threads)
add_subdirectory(getnext_broker)
add_subdirectory(getnext)
if(BUILD_EXAMPLES AND BUILD_PYTHON)
add_subdirectory(getnext_broker_python)
add_subdirectory(getnext_python)
endif()
set(TARGET_NAME getnext_broker)
set(SOURCE_FILES getnext_broker.cpp)
set(TARGET_NAME getnext)
set(SOURCE_FILES getnext.cpp)
if (BUILD_EXAMPLES)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment