Skip to content
Snippets Groups Projects
Commit 2119b20a authored by Sergey Yakubov's avatar Sergey Yakubov
Browse files

add python api, tests

parent c8f0b704
No related branches found
No related tags found
No related merge requests found
Showing
with 191 additions and 84 deletions
......@@ -131,6 +131,14 @@ class Consumer {
*/
virtual std::string GetBeamtimeMeta(Error* err) = 0;
//! Get stream metadata.
/*!
\param stream - stream to use
\param err - return nullptr of operation succeed, error otherwise.
\return stream metadata.
*/
virtual std::string GetStreamMeta(const std::string& stream, Error* err) = 0;
//! Receive next available message.
/*!
\param info - where to store message metadata. Can be set to nullptr only message data is needed.
......
......@@ -292,7 +292,7 @@ Error ConsumerImpl::GetRecordFromServer(std::string* response, std::string group
std::string request_suffix = OpToUriCmd(op);
std::string request_group = OpToUriCmd(op);
std::string request_api = UriPrefix(std::move(stream), "", "");
std::string request_api = BrokerApiUri(std::move(stream), "", "");
uint64_t elapsed_ms = 0;
Error no_data_error;
while (true) {
......@@ -576,7 +576,7 @@ Error ConsumerImpl::ResetLastReadMarker(std::string group_id, std::string stream
Error ConsumerImpl::SetLastReadMarker(std::string group_id, uint64_t value, std::string stream) {
RequestInfo ri;
ri.api = UriPrefix(std::move(stream), std::move(group_id), "resetcounter");
ri.api = BrokerApiUri(std::move(stream), std::move(group_id), "resetcounter");
ri.extra_params = "&value=" + std::to_string(value);
ri.post = true;
......@@ -606,7 +606,7 @@ Error ConsumerImpl::GetRecordFromServerById(uint64_t id, std::string* response,
}
RequestInfo ri;
ri.api = UriPrefix(std::move(stream), std::move(group_id), std::to_string(id));
ri.api = BrokerApiUri(std::move(stream), std::move(group_id), std::to_string(id));
if (dataset) {
......@@ -621,11 +621,19 @@ Error ConsumerImpl::GetRecordFromServerById(uint64_t id, std::string* response,
std::string ConsumerImpl::GetBeamtimeMeta(Error* err) {
RequestInfo ri;
ri.api = UriPrefix("default", "0", "meta/0");
ri.api = BrokerApiUri("default", "0", "meta/0");
return BrokerRequestWithTimeout(ri, err);
}
std::string ConsumerImpl::GetStreamMeta(const std::string& stream, Error* err) {
RequestInfo ri;
ri.api = BrokerApiUri(stream, "0", "meta/1");
return BrokerRequestWithTimeout(ri, err);
}
DataSet DecodeDatasetFromResponse(std::string response, Error* err) {
DataSet res;
if (!res.SetFromJson(std::move(response))) {
......@@ -643,7 +651,7 @@ MessageMetas ConsumerImpl::QueryMessages(std::string query, std::string stream,
}
RequestInfo ri;
ri.api = UriPrefix(std::move(stream), "0", "querymessages");
ri.api = BrokerApiUri(std::move(stream), "0", "querymessages");
ri.post = true;
ri.body = std::move(query);
......@@ -741,7 +749,7 @@ StreamInfos ConsumerImpl::GetStreamList(std::string from, StreamFilter filter, E
RequestInfo ConsumerImpl::GetStreamListRequest(const std::string& from, const StreamFilter& filter) const {
RequestInfo ri;
ri.api = UriPrefix("0", "", "streams");
ri.api = BrokerApiUri("0", "", "streams");
ri.post = false;
if (!from.empty()) {
ri.extra_params = "&from=" + httpclient__->UrlEscape(from);
......@@ -809,7 +817,7 @@ Error ConsumerImpl::Acknowledge(std::string group_id, uint64_t id, std::string s
return ConsumerErrorTemplates::kWrongInput.Generate("empty stream");
}
RequestInfo ri;
ri.api = UriPrefix(std::move(stream), std::move(group_id), std::to_string(id));
ri.api = BrokerApiUri(std::move(stream), std::move(group_id), std::to_string(id));
ri.post = true;
ri.body = "{\"Op\":\"ackmessage\"}";
......@@ -828,7 +836,7 @@ IdList ConsumerImpl::GetUnacknowledgedMessages(std::string group_id,
return {};
}
RequestInfo ri;
ri.api = UriPrefix(std::move(stream), std::move(group_id), "nacks");
ri.api = BrokerApiUri(std::move(stream), std::move(group_id), "nacks");
ri.extra_params = "&from=" + std::to_string(from_id) + "&to=" + std::to_string(to_id);
auto json_string = BrokerRequestWithTimeout(ri, error);
......@@ -851,7 +859,7 @@ uint64_t ConsumerImpl::GetLastAcknowledgedMessage(std::string group_id, std::str
return 0;
}
RequestInfo ri;
ri.api = UriPrefix(std::move(stream), std::move(group_id), "lastack");
ri.api = BrokerApiUri(std::move(stream), std::move(group_id), "lastack");
auto json_string = BrokerRequestWithTimeout(ri, error);
if (*error) {
......@@ -884,7 +892,7 @@ Error ConsumerImpl::NegativeAcknowledge(std::string group_id,
return ConsumerErrorTemplates::kWrongInput.Generate("empty stream");
}
RequestInfo ri;
ri.api = UriPrefix(std::move(stream), std::move(group_id), std::to_string(id));
ri.api = BrokerApiUri(std::move(stream), std::move(group_id), std::to_string(id));
ri.post = true;
ri.body = R"({"Op":"negackmessage","Params":{"DelayMs":)" + std::to_string(delay_ms) + "}}";
......@@ -926,7 +934,7 @@ uint64_t ConsumerImpl::ParseGetCurrentCountResponce(Error* err, const std::strin
RequestInfo ConsumerImpl::GetSizeRequestForSingleMessagesStream(std::string& stream) const {
RequestInfo ri;
ri.api = UriPrefix(std::move(stream), "", "size");
ri.api = BrokerApiUri(std::move(stream), "", "size");
return ri;
}
......@@ -966,7 +974,7 @@ Error ConsumerImpl::GetVersionInfo(std::string* client_info, std::string* server
RequestInfo ConsumerImpl::GetDeleteStreamRequest(std::string stream, DeleteStreamOptions options) const {
RequestInfo ri;
ri.api = UriPrefix(std::move(stream), "", "delete");
ri.api = BrokerApiUri(std::move(stream), "", "delete");
ri.post = true;
ri.body = options.Json();
return ri;
......@@ -979,7 +987,7 @@ Error ConsumerImpl::DeleteStream(std::string stream, DeleteStreamOptions options
return err;
}
std::string ConsumerImpl::UriPrefix( std::string stream, std::string group, std::string suffix) const {
std::string ConsumerImpl::BrokerApiUri(std::string stream, std::string group, std::string suffix) const {
auto stream_encoded = httpclient__->UrlEscape(std::move(stream));
auto group_encoded = group.size() > 0 ? httpclient__->UrlEscape(std::move(group)) : "";
auto uri = "/" + kConsumerProtocol.GetBrokerVersion() + "/beamtime/" + source_credentials_.beamtime_id + "/"
......@@ -995,4 +1003,5 @@ std::string ConsumerImpl::UriPrefix( std::string stream, std::string group, std:
}
}
\ No newline at end of file
......@@ -79,6 +79,7 @@ class ConsumerImpl final : public asapo::Consumer {
std::string GenerateNewGroupId(Error* err) override;
std::string GetBeamtimeMeta(Error* err) override;
std::string GetStreamMeta(const std::string& stream, Error* err) override;
uint64_t GetCurrentSize(std::string stream, Error* err) override;
uint64_t GetCurrentDatasetCount(std::string stream, bool include_incomplete, Error* err) override;
......@@ -150,7 +151,7 @@ class ConsumerImpl final : public asapo::Consumer {
uint64_t GetCurrentCount(std::string stream, const RequestInfo& ri, Error* err);
RequestInfo GetStreamListRequest(const std::string& from, const StreamFilter& filter) const;
Error GetServerVersionInfo(std::string* server_info, bool* supported) ;
std::string UriPrefix( std::string stream, std::string group, std::string suffix) const;
std::string BrokerApiUri(std::string stream, std::string group, std::string suffix) const;
std::string endpoint_;
std::string current_broker_uri_;
......
......@@ -792,7 +792,7 @@ TEST_F(ConsumerImplTests, GetByIdReturnsEndOfStreamWhenIdTooLarge) {
ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kEndOfStream));
}
TEST_F(ConsumerImplTests, GetMetaDataOK) {
TEST_F(ConsumerImplTests, GetBeamtimeMetaDataOK) {
MockGetBrokerUri();
consumer->SetTimeout(100);
......@@ -812,6 +812,25 @@ TEST_F(ConsumerImplTests, GetMetaDataOK) {
}
TEST_F(ConsumerImplTests, GetStreamMetaDataOK) {
MockGetBrokerUri();
consumer->SetTimeout(100);
EXPECT_CALL(mock_http_client, Get_t(expected_broker_api + "/beamtime/beamtime_id/" + expected_data_source_encoded +
"/" + expected_stream_encoded + "/0/meta/1?token="
+ expected_token, _,
_)).WillOnce(DoAll(
SetArgPointee<1>(HttpCode::OK),
SetArgPointee<2>(nullptr),
Return(expected_metadata)));
asapo::Error err;
auto res = consumer->GetStreamMeta(expected_stream, &err);
ASSERT_THAT(err, Eq(nullptr));
ASSERT_THAT(res, Eq(expected_metadata));
}
TEST_F(ConsumerImplTests, QueryMessagesReturnError) {
MockGetBrokerUri();
......
......@@ -82,6 +82,7 @@ cdef extern from "asapo/asapo_consumer.h" namespace "asapo" nogil:
IdList GetUnacknowledgedMessages(string group_id, uint64_t from_id, uint64_t to_id, string stream, Error* error)
string GenerateNewGroupId(Error* err)
string GetBeamtimeMeta(Error* err)
string GetStreamMeta(string stream,Error* err)
MessageMetas QueryMessages(string query, string stream, Error* err)
DataSet GetNextDataset(string group_id, uint64_t min_size, string stream, Error* err)
DataSet GetLastDataset(uint64_t min_size, string stream, Error* err)
......
......@@ -377,6 +377,18 @@ cdef class PyConsumer:
meta = json.loads(_str(meta_str))
del meta['_id']
return meta
def get_stream_meta(self, stream = 'default'):
cdef Error err
cdef string b_stream = _bytes(stream)
cdef string meta_str
with nogil:
meta_str = self.c_consumer.get().GetStreamMeta(b_stream,&err)
if err:
throw_exception(err)
meta = json.loads(_str(meta_str))
del meta['_id']
return meta
def interrupt_current_operation(self):
self.c_consumer.get().InterruptCurrentOperation()
cdef class __PyConsumerFactory:
......
cmake_minimum_required(VERSION 2.8)
project(asapo-consume)
set(CMAKE_CXX_STANDARD 11)
IF(CMAKE_C_COMPILER_ID STREQUAL "GNU")
SET( CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -static-libgcc -static-libstdc++")
ENDIF()
find_package (Threads)
set(TARGET_NAME ${CMAKE_PROJECT_NAME})
set(SOURCE_FILES consume.cpp)
link_directories(asapo/lib)
add_executable(${TARGET_NAME} ${SOURCE_FILES})
target_include_directories(${TARGET_NAME} PUBLIC asapo/include)
target_link_libraries(${TARGET_NAME} asapo-consumer curl ${CMAKE_THREAD_LIBS_INIT})
#include "asapo_consumer.h"
void exit_if_error(std::string error_string, const asapo::Error& err) {
if (err) {
std::cerr << error_string << err << std::endl;
exit(EXIT_FAILURE);
}
}
int main(int argc, char* argv[]) {
asapo::Error err;
auto endpoint = "asapo-services2:8400";
auto beamtime = "asapo_test";
auto token = "KmUDdacgBzaOD3NIJvN1NmKGqWKtx0DK-NyPjdpeWkc=";
auto consumer = asapo::ConsumerFactory::CreateConsumer(endpoint, "", true, asapo::SourceCredentials{beamtime, "", "", token}, &err);
exit_if_error("Cannot create consumer", err);
consumer->SetTimeout((uint64_t) 1000);
auto group_id = consumer->GenerateNewGroupId(&err);
exit_if_error("Cannot create group id", err);
asapo::MessageMeta fi;
asapo::MessageData data;
err = consumer->GetLast(&fi, group_id, &data);
exit_if_error("Cannot get next record", err);
std::cout << "id: " << fi.id << std::endl;
std::cout << "file name: " << fi.name << std::endl;
std::cout << "file content: " << reinterpret_cast<char const*>(data.get()) << std::endl;
return EXIT_SUCCESS;
}
......@@ -57,7 +57,9 @@ export PYTHONPATH=$2:$3:${PYTHONPATH}
$1 $4 127.0.0.1:8400 $source_path $beamtime_id $data_source_in $data_source_out $token $timeout $timeout_producer $nthreads 1 > out
cat out
cat out | grep "Processed 3 file(s)"
cat out | grep "Sent 3 file(s)"
cat out | grep "Sent 5 file(s)"
cat out | grep bt_meta
cat out | grep st_meta
echo "db.data_default.find({"_id":1})" | mongo ${outdatabase_name} | tee /dev/stderr | grep "file1_${data_source_out}"
......
......@@ -34,8 +34,10 @@ set PYTHONPATH=%2;%3
"%1" "%4" 127.0.0.1:8400 %source_path% %beamtime_id% %data_source_in% %data_source_out% %token% %timeout% %timeout_producer% %nthreads% 1 > out
type out
findstr /I /L /C:"Processed 3 file(s)" out || goto :error
findstr /I /L /C:"Processed 5 file(s)" out || goto :error
findstr /I /L /C:"Sent 3 file(s)" out || goto :error
findstr /I /L /C:"bt_meta" out || goto :error
findstr /I /L /C:"st_meta" out || goto :error
echo db.data_default.find({"_id":1}) | %mongo_exe% %outdatabase_name% | findstr /c:"file1_%data_source_out%" || goto :error
......
......@@ -36,6 +36,10 @@ group_id = consumer.generate_group_id()
n_recv = 0
producer.send_beamtime_meta('{"data":"bt_meta"}', callback = callback)
producer.send_stream_meta('{"data":"st_meta"}',stream = 'stream_in', callback = callback)
if transfer_data:
ingest_mode = asapo_producer.DEFAULT_INGEST_MODE
else:
......@@ -55,5 +59,12 @@ while True:
producer.wait_requests_finished(timeout_s_producer*1000)
consumer = asapo_consumer.create_consumer(source,path, True,beamtime,stream_out,token,timeout_s*1000)
bt_meta = consumer.get_beamtime_meta()
st_meta = consumer.get_stream_meta('stream_in')
print ('bt_meta:',bt_meta)
print ('st_meta:',st_meta)
print ("Processed "+str(n_recv)+" file(s)")
print ("Sent "+str(n_send)+" file(s)")
......@@ -131,7 +131,9 @@ class Producer {
\param callback - callback function
\return Error - will be nullptr on success
*/
virtual Error SendStreamMetadata(const std::string& stream, const std::string& metadata, MetaIngestMode mode,
virtual Error SendStreamMetadata(const std::string& metadata,
MetaIngestMode mode,
const std::string& stream,
RequestCallback callback) = 0;
//! Set internal log level
......
......@@ -475,20 +475,22 @@ Error ProducerImpl::DeleteStream(std::string stream, uint64_t timeout_ms, Delete
}
Error ProducerImpl::SendBeamtimeMetadata(const std::string& metadata, MetaIngestMode mode, RequestCallback callback) {
return SendMeta("", metadata, mode, callback);
return SendMeta(metadata, mode, "", callback);
}
Error ProducerImpl::SendStreamMetadata(const std::string& stream,
const std::string& metadata,
Error ProducerImpl::SendStreamMetadata(const std::string& metadata,
MetaIngestMode mode,
const std::string& stream,
RequestCallback callback) {
if (stream.empty()) {
return ProducerErrorTemplates::kWrongInput.Generate("stream is empty");
}
return SendMeta(stream, metadata, mode, callback);
return SendMeta(metadata, mode, stream, callback);
}
Error ProducerImpl::SendMeta(std::string stream, const std::string& metadata, MetaIngestMode mode,
Error ProducerImpl::SendMeta(const std::string& metadata,
MetaIngestMode mode,
std::string stream,
RequestCallback callback) {
GenericRequestHeader request_header{kOpcodeTransferMetaData, 0, metadata.size(), 0,
stream.empty() ? "beamtime_global.meta" : stream + ".meta",
......
......@@ -68,7 +68,9 @@ class ProducerImpl : public Producer {
Error SendMetadata(const std::string& metadata, RequestCallback callback) override;
Error SendBeamtimeMetadata(const std::string& metadata, MetaIngestMode mode, RequestCallback callback) override;
Error SendStreamMetadata(const std::string& stream, const std::string& metadata, MetaIngestMode mode,
Error SendStreamMetadata(const std::string& metadata,
MetaIngestMode mode,
const std::string& stream,
RequestCallback callback) override;
uint64_t GetRequestsQueueSize() override;
......@@ -76,7 +78,10 @@ class ProducerImpl : public Producer {
uint64_t GetRequestsQueueVolumeMb() override;
void SetRequestsQueueLimits(uint64_t size, uint64_t volume) override;
private:
Error SendMeta(const std::string stream, const std::string& metadata, MetaIngestMode mode, RequestCallback callback);
Error SendMeta(const std::string& metadata,
MetaIngestMode mode,
std::string stream,
RequestCallback callback);
StreamInfo StreamRequest(StreamRequestOp op, std::string stream, uint64_t timeout_ms, Error* err) const;
Error Send(const MessageHeader& message_header, std::string stream, MessageData data, std::string full_path,
uint64_t ingest_mode,
......
......@@ -363,7 +363,7 @@ TEST_F(ProducerImplTests, OKAddingSendStreamDataRequest) {
auto mode = asapo::MetaIngestMode{asapo::MetaIngestOp::kInsert, false};
auto err = producer.SendStreamMetadata(expected_stream, expected_metadata, mode, nullptr);
auto err = producer.SendStreamMetadata(expected_metadata, mode, expected_stream, nullptr);
ASSERT_THAT(err, Eq(nullptr));
}
......
......@@ -46,6 +46,15 @@ cdef extern from "asapo/asapo_producer.h" namespace "asapo":
LogLevel LogLevel_Debug "asapo::LogLevel::Debug"
LogLevel LogLevel_Warning "asapo::LogLevel::Warning"
cdef extern from "asapo/asapo_producer.h" namespace "asapo":
cppclass MetaIngestOp:
pass
MetaIngestOp kInsert "asapo::MetaIngestOp::kInsert"
MetaIngestOp kReplace "asapo::MetaIngestOp::kReplace"
MetaIngestOp kUpdate "asapo::MetaIngestOp::kUpdate"
struct MetaIngestMode:
MetaIngestOp op
bool upsert
cdef extern from "asapo/asapo_producer.h" namespace "asapo":
cppclass SourceType:
......@@ -110,6 +119,8 @@ cdef extern from "asapo/asapo_producer.h" namespace "asapo" nogil:
StreamInfo GetLastStream(uint64_t timeout_ms, Error* err)
Error GetVersionInfo(string* client_info,string* server_info, bool* supported)
Error DeleteStream(string stream, uint64_t timeout_ms, DeleteStreamOptions options)
Error SendBeamtimeMetadata(string metadata, MetaIngestMode mode, RequestCallback callback)
Error SendStreamMetadata(string metadata, MetaIngestMode mode, string stream, RequestCallback callback)
cdef extern from "asapo/asapo_producer.h" namespace "asapo":
......
......@@ -34,6 +34,15 @@ cdef bytes _bytes(s):
else:
raise TypeError("Could not convert to unicode.")
cdef MetaIngestOp mode_to_c(mode):
if mode == 'replace':
return kReplace
elif mode == 'update':
return kUpdate
elif mode == 'insert':
return kInsert
else:
raise TypeError("Could not convert to unicode.")
class AsapoProducerError(Exception):
......@@ -174,7 +183,54 @@ cdef class PyProducer:
if callback != None:
Py_XINCREF(<PyObject*>callback)
return
def send_stream_meta(self, metadata, mode = 'replace', upsert = True, stream='default', callback=None):
"""
:param stream: stream name, default "default"
:type stream: string
:param metadata: beamtime metadata in JSON format
:type metadata: string
:param mode: send mode
:type mode: string
:param upsert: send mode
:type upsert: bool
:param callback: callback function, default None
:type callback: callback(info,err), where info - json string with event header that was used to send data and response, err - error string or None
:raises:
AsapoWrongInputError: wrong input (authorization, ...)
AsapoProducerError: other error
"""
cdef MetaIngestMode mode_c
mode_c.op = mode_to_c(mode)
mode_c.upsert = upsert
err = self.c_producer.get().SendStreamMetadata(_bytes(metadata), mode_c,_bytes(stream),
unwrap_callback(<RequestCallbackCython>self.c_callback, <void*>self,<void*>callback if callback != None else NULL))
if err:
throw_exception(err)
if callback != None:
Py_XINCREF(<PyObject*>callback)
def send_beamtime_meta(self, metadata, mode = 'replace', upsert = True, callback=None):
"""
:param metadata: beamtime metadata in JSON format
:type metadata: string
:param mode: send mode
:type mode: string
:param upsert: send mode
:type upsert: bool
:param callback: callback function, default None
:type callback: callback(info,err), where info - json string with event header that was used to send data and response, err - error string or None
:raises:
AsapoWrongInputError: wrong input (authorization, ...)
AsapoProducerError: other error
"""
cdef MetaIngestMode mode_c
mode_c.op = mode_to_c(mode)
mode_c.upsert = upsert
err = self.c_producer.get().SendBeamtimeMetadata(_bytes(metadata), mode_c,
unwrap_callback(<RequestCallbackCython>self.c_callback, <void*>self,<void*>callback if callback != None else NULL))
if err:
throw_exception(err)
if callback != None:
Py_XINCREF(<PyObject*>callback)
def send(self, uint64_t id, exposed_path, data, user_meta=None, dataset=None, ingest_mode = DEFAULT_INGEST_MODE, stream = "default", callback=None):
"""
:param id: unique data id
......
......@@ -28,6 +28,10 @@ done
echo 'db.data_streamfts.insert({"_id":'1',"size":0,"name":"'1'","timestamp":1000,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"test":10}})' | mongo ${database_name}
echo 'db.meta.insert({"_id":"bt","data":"test_bt"})' | mongo ${database_name}
echo 'db.meta.insert({"_id":"st_test","data":"test_st"})' | mongo ${database_name}
for i in `seq 1 5`;
do
echo 'db.data_stream1.insert({"_id":'$i',"size":6,"name":"'1$i'","timestamp":2000,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"test":10}})' | mongo ${database_name}
......
......@@ -14,6 +14,10 @@ for /l %%x in (1, 1, 5) do echo db.data_default.insert({"_id":%%x,"size":6,"name
echo db.data_streamfts.insert({"_id":1,"size":0,"name":"1","timestamp":1000,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"test":10}}) | %mongo_exe% %database_name% || goto :error
echo 'db.meta.insert({"_id":"bt","data":"test_bt"})' | %mongo_exe% %database_name%
echo 'db.meta.insert({"_id":"st_test","data":"test_st"})' | %mongo_exe% %database_name%
for /l %%x in (1, 1, 5) do echo db.data_stream1.insert({"_id":%%x,"size":6,"name":"1%%x","timestamp":2000,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"test":10}}) | %mongo_exe% %database_name% || goto :error
for /l %%x in (1, 1, 5) do echo db.data_stream2.insert({"_id":%%x,"size":6,"name":"2%%x","timestamp":3000,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"test":10}}) | %mongo_exe% %database_name% || goto :error
......
......@@ -232,7 +232,21 @@ def check_single(consumer, group_id):
else:
exit_on_noerr("wrong query")
# delete stream
# metadata
bt_meta = consumer.get_beamtime_meta()
assert_eq(bt_meta['data'], 'test_bt', "beamtime meta ")
st_meta = consumer.get_stream_meta("test")
assert_eq(st_meta['data'], 'test_st', "stream meta ")
try:
consumer.get_stream_meta("notexist")
except asapo_consumer.AsapoNoDataError as err:
print(err)
pass
else:
exit_on_noerr("should be wrong input on non existing stream")
# delete stream
consumer.delete_stream(stream='default')
try:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment