diff --git a/consumer/api/cpp/include/asapo/consumer/consumer.h b/consumer/api/cpp/include/asapo/consumer/consumer.h index e6afb18b8d9de09bd6ce72abf5ec671385fde755..dbd5d816b186855816c6e752a982dfbc0e4cf814 100644 --- a/consumer/api/cpp/include/asapo/consumer/consumer.h +++ b/consumer/api/cpp/include/asapo/consumer/consumer.h @@ -131,6 +131,14 @@ class Consumer { */ virtual std::string GetBeamtimeMeta(Error* err) = 0; + //! Get stream metadata. + /*! + \param stream - stream to use + \param err - return nullptr of operation succeed, error otherwise. + \return stream metadata. + */ + virtual std::string GetStreamMeta(const std::string& stream, Error* err) = 0; + //! Receive next available message. /*! \param info - where to store message metadata. Can be set to nullptr only message data is needed. diff --git a/consumer/api/cpp/src/consumer_impl.cpp b/consumer/api/cpp/src/consumer_impl.cpp index 00ca9b96158f167159ff5817927a059a2f862fec..6dbeeaaef2b7784875e3a0fb57520be94438077e 100644 --- a/consumer/api/cpp/src/consumer_impl.cpp +++ b/consumer/api/cpp/src/consumer_impl.cpp @@ -292,7 +292,7 @@ Error ConsumerImpl::GetRecordFromServer(std::string* response, std::string group std::string request_suffix = OpToUriCmd(op); std::string request_group = OpToUriCmd(op); - std::string request_api = UriPrefix(std::move(stream), "", ""); + std::string request_api = BrokerApiUri(std::move(stream), "", ""); uint64_t elapsed_ms = 0; Error no_data_error; while (true) { @@ -576,7 +576,7 @@ Error ConsumerImpl::ResetLastReadMarker(std::string group_id, std::string stream Error ConsumerImpl::SetLastReadMarker(std::string group_id, uint64_t value, std::string stream) { RequestInfo ri; - ri.api = UriPrefix(std::move(stream), std::move(group_id), "resetcounter"); + ri.api = BrokerApiUri(std::move(stream), std::move(group_id), "resetcounter"); ri.extra_params = "&value=" + std::to_string(value); ri.post = true; @@ -606,7 +606,7 @@ Error ConsumerImpl::GetRecordFromServerById(uint64_t id, std::string* response, } RequestInfo ri; - ri.api = UriPrefix(std::move(stream), std::move(group_id), std::to_string(id)); + ri.api = BrokerApiUri(std::move(stream), std::move(group_id), std::to_string(id)); if (dataset) { @@ -621,11 +621,19 @@ Error ConsumerImpl::GetRecordFromServerById(uint64_t id, std::string* response, std::string ConsumerImpl::GetBeamtimeMeta(Error* err) { RequestInfo ri; - ri.api = UriPrefix("default", "0", "meta/0"); + ri.api = BrokerApiUri("default", "0", "meta/0"); return BrokerRequestWithTimeout(ri, err); } +std::string ConsumerImpl::GetStreamMeta(const std::string& stream, Error* err) { + RequestInfo ri; + ri.api = BrokerApiUri(stream, "0", "meta/1"); + + return BrokerRequestWithTimeout(ri, err); +} + + DataSet DecodeDatasetFromResponse(std::string response, Error* err) { DataSet res; if (!res.SetFromJson(std::move(response))) { @@ -643,7 +651,7 @@ MessageMetas ConsumerImpl::QueryMessages(std::string query, std::string stream, } RequestInfo ri; - ri.api = UriPrefix(std::move(stream), "0", "querymessages"); + ri.api = BrokerApiUri(std::move(stream), "0", "querymessages"); ri.post = true; ri.body = std::move(query); @@ -741,7 +749,7 @@ StreamInfos ConsumerImpl::GetStreamList(std::string from, StreamFilter filter, E RequestInfo ConsumerImpl::GetStreamListRequest(const std::string& from, const StreamFilter& filter) const { RequestInfo ri; - ri.api = UriPrefix("0", "", "streams"); + ri.api = BrokerApiUri("0", "", "streams"); ri.post = false; if (!from.empty()) { ri.extra_params = "&from=" + httpclient__->UrlEscape(from); @@ -809,7 +817,7 @@ Error ConsumerImpl::Acknowledge(std::string group_id, uint64_t id, std::string s return ConsumerErrorTemplates::kWrongInput.Generate("empty stream"); } RequestInfo ri; - ri.api = UriPrefix(std::move(stream), std::move(group_id), std::to_string(id)); + ri.api = BrokerApiUri(std::move(stream), std::move(group_id), std::to_string(id)); ri.post = true; ri.body = "{\"Op\":\"ackmessage\"}"; @@ -828,7 +836,7 @@ IdList ConsumerImpl::GetUnacknowledgedMessages(std::string group_id, return {}; } RequestInfo ri; - ri.api = UriPrefix(std::move(stream), std::move(group_id), "nacks"); + ri.api = BrokerApiUri(std::move(stream), std::move(group_id), "nacks"); ri.extra_params = "&from=" + std::to_string(from_id) + "&to=" + std::to_string(to_id); auto json_string = BrokerRequestWithTimeout(ri, error); @@ -851,7 +859,7 @@ uint64_t ConsumerImpl::GetLastAcknowledgedMessage(std::string group_id, std::str return 0; } RequestInfo ri; - ri.api = UriPrefix(std::move(stream), std::move(group_id), "lastack"); + ri.api = BrokerApiUri(std::move(stream), std::move(group_id), "lastack"); auto json_string = BrokerRequestWithTimeout(ri, error); if (*error) { @@ -884,7 +892,7 @@ Error ConsumerImpl::NegativeAcknowledge(std::string group_id, return ConsumerErrorTemplates::kWrongInput.Generate("empty stream"); } RequestInfo ri; - ri.api = UriPrefix(std::move(stream), std::move(group_id), std::to_string(id)); + ri.api = BrokerApiUri(std::move(stream), std::move(group_id), std::to_string(id)); ri.post = true; ri.body = R"({"Op":"negackmessage","Params":{"DelayMs":)" + std::to_string(delay_ms) + "}}"; @@ -926,7 +934,7 @@ uint64_t ConsumerImpl::ParseGetCurrentCountResponce(Error* err, const std::strin RequestInfo ConsumerImpl::GetSizeRequestForSingleMessagesStream(std::string& stream) const { RequestInfo ri; - ri.api = UriPrefix(std::move(stream), "", "size"); + ri.api = BrokerApiUri(std::move(stream), "", "size"); return ri; } @@ -966,7 +974,7 @@ Error ConsumerImpl::GetVersionInfo(std::string* client_info, std::string* server RequestInfo ConsumerImpl::GetDeleteStreamRequest(std::string stream, DeleteStreamOptions options) const { RequestInfo ri; - ri.api = UriPrefix(std::move(stream), "", "delete"); + ri.api = BrokerApiUri(std::move(stream), "", "delete"); ri.post = true; ri.body = options.Json(); return ri; @@ -979,7 +987,7 @@ Error ConsumerImpl::DeleteStream(std::string stream, DeleteStreamOptions options return err; } -std::string ConsumerImpl::UriPrefix( std::string stream, std::string group, std::string suffix) const { +std::string ConsumerImpl::BrokerApiUri(std::string stream, std::string group, std::string suffix) const { auto stream_encoded = httpclient__->UrlEscape(std::move(stream)); auto group_encoded = group.size() > 0 ? httpclient__->UrlEscape(std::move(group)) : ""; auto uri = "/" + kConsumerProtocol.GetBrokerVersion() + "/beamtime/" + source_credentials_.beamtime_id + "/" @@ -995,4 +1003,5 @@ std::string ConsumerImpl::UriPrefix( std::string stream, std::string group, std: } + } \ No newline at end of file diff --git a/consumer/api/cpp/src/consumer_impl.h b/consumer/api/cpp/src/consumer_impl.h index 2804309ae2446656e4c49224b8f917ba695c526a..52b7fa900682232ec946d0ea0bf8b03876608bfd 100644 --- a/consumer/api/cpp/src/consumer_impl.h +++ b/consumer/api/cpp/src/consumer_impl.h @@ -79,6 +79,7 @@ class ConsumerImpl final : public asapo::Consumer { std::string GenerateNewGroupId(Error* err) override; std::string GetBeamtimeMeta(Error* err) override; + std::string GetStreamMeta(const std::string& stream, Error* err) override; uint64_t GetCurrentSize(std::string stream, Error* err) override; uint64_t GetCurrentDatasetCount(std::string stream, bool include_incomplete, Error* err) override; @@ -150,7 +151,7 @@ class ConsumerImpl final : public asapo::Consumer { uint64_t GetCurrentCount(std::string stream, const RequestInfo& ri, Error* err); RequestInfo GetStreamListRequest(const std::string& from, const StreamFilter& filter) const; Error GetServerVersionInfo(std::string* server_info, bool* supported) ; - std::string UriPrefix( std::string stream, std::string group, std::string suffix) const; + std::string BrokerApiUri(std::string stream, std::string group, std::string suffix) const; std::string endpoint_; std::string current_broker_uri_; diff --git a/consumer/api/cpp/unittests/test_consumer_impl.cpp b/consumer/api/cpp/unittests/test_consumer_impl.cpp index 517809314ec2da0d45d9e34fcc967042174d2eeb..bb279cff58b4e7e3ae99908eee85bc19186a0576 100644 --- a/consumer/api/cpp/unittests/test_consumer_impl.cpp +++ b/consumer/api/cpp/unittests/test_consumer_impl.cpp @@ -792,7 +792,7 @@ TEST_F(ConsumerImplTests, GetByIdReturnsEndOfStreamWhenIdTooLarge) { ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kEndOfStream)); } -TEST_F(ConsumerImplTests, GetMetaDataOK) { +TEST_F(ConsumerImplTests, GetBeamtimeMetaDataOK) { MockGetBrokerUri(); consumer->SetTimeout(100); @@ -812,6 +812,25 @@ TEST_F(ConsumerImplTests, GetMetaDataOK) { } +TEST_F(ConsumerImplTests, GetStreamMetaDataOK) { + MockGetBrokerUri(); + consumer->SetTimeout(100); + + EXPECT_CALL(mock_http_client, Get_t(expected_broker_api + "/beamtime/beamtime_id/" + expected_data_source_encoded + + "/" + expected_stream_encoded + "/0/meta/1?token=" + + expected_token, _, + _)).WillOnce(DoAll( + SetArgPointee<1>(HttpCode::OK), + SetArgPointee<2>(nullptr), + Return(expected_metadata))); + + asapo::Error err; + auto res = consumer->GetStreamMeta(expected_stream, &err); + + ASSERT_THAT(err, Eq(nullptr)); + ASSERT_THAT(res, Eq(expected_metadata)); +} + TEST_F(ConsumerImplTests, QueryMessagesReturnError) { MockGetBrokerUri(); diff --git a/consumer/api/python/asapo_consumer.pxd b/consumer/api/python/asapo_consumer.pxd index e2c7846b82f3dbee3e570a7a04a0667016b1c411..1347d886237eb27133ff20d87f2a11d3e1e7a54f 100644 --- a/consumer/api/python/asapo_consumer.pxd +++ b/consumer/api/python/asapo_consumer.pxd @@ -82,6 +82,7 @@ cdef extern from "asapo/asapo_consumer.h" namespace "asapo" nogil: IdList GetUnacknowledgedMessages(string group_id, uint64_t from_id, uint64_t to_id, string stream, Error* error) string GenerateNewGroupId(Error* err) string GetBeamtimeMeta(Error* err) + string GetStreamMeta(string stream,Error* err) MessageMetas QueryMessages(string query, string stream, Error* err) DataSet GetNextDataset(string group_id, uint64_t min_size, string stream, Error* err) DataSet GetLastDataset(uint64_t min_size, string stream, Error* err) diff --git a/consumer/api/python/asapo_consumer.pyx.in b/consumer/api/python/asapo_consumer.pyx.in index 6825c2e3ec5d029cba9f505d20ff3080a45ae3cb..61f06611e40af83d605cf3a9323c7f275ef8df24 100644 --- a/consumer/api/python/asapo_consumer.pyx.in +++ b/consumer/api/python/asapo_consumer.pyx.in @@ -377,6 +377,18 @@ cdef class PyConsumer: meta = json.loads(_str(meta_str)) del meta['_id'] return meta + def get_stream_meta(self, stream = 'default'): + cdef Error err + cdef string b_stream = _bytes(stream) + cdef string meta_str + with nogil: + meta_str = self.c_consumer.get().GetStreamMeta(b_stream,&err) + if err: + throw_exception(err) + meta = json.loads(_str(meta_str)) + del meta['_id'] + return meta + def interrupt_current_operation(self): self.c_consumer.get().InterruptCurrentOperation() cdef class __PyConsumerFactory: diff --git a/examples/consumer/simple-consumer/CMakeLists.txt b/examples/consumer/simple-consumer/CMakeLists.txt deleted file mode 100644 index 99f2a9ee5b9e8de947d4d0b212e7b27b29d73a07..0000000000000000000000000000000000000000 --- a/examples/consumer/simple-consumer/CMakeLists.txt +++ /dev/null @@ -1,21 +0,0 @@ -cmake_minimum_required(VERSION 2.8) - -project(asapo-consume) - -set(CMAKE_CXX_STANDARD 11) - -IF(CMAKE_C_COMPILER_ID STREQUAL "GNU") - SET( CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -static-libgcc -static-libstdc++") -ENDIF() - -find_package (Threads) - -set(TARGET_NAME ${CMAKE_PROJECT_NAME}) - -set(SOURCE_FILES consume.cpp) - -link_directories(asapo/lib) - -add_executable(${TARGET_NAME} ${SOURCE_FILES}) -target_include_directories(${TARGET_NAME} PUBLIC asapo/include) -target_link_libraries(${TARGET_NAME} asapo-consumer curl ${CMAKE_THREAD_LIBS_INIT}) diff --git a/examples/consumer/simple-consumer/consume.cpp b/examples/consumer/simple-consumer/consume.cpp deleted file mode 100644 index 501fd2af5113052bf978372ff50183bc4b1fe4f7..0000000000000000000000000000000000000000 --- a/examples/consumer/simple-consumer/consume.cpp +++ /dev/null @@ -1,35 +0,0 @@ -#include "asapo_consumer.h" - -void exit_if_error(std::string error_string, const asapo::Error& err) { - if (err) { - std::cerr << error_string << err << std::endl; - exit(EXIT_FAILURE); - } -} - -int main(int argc, char* argv[]) { - asapo::Error err; - - auto endpoint = "asapo-services2:8400"; - auto beamtime = "asapo_test"; - auto token = "KmUDdacgBzaOD3NIJvN1NmKGqWKtx0DK-NyPjdpeWkc="; - - auto consumer = asapo::ConsumerFactory::CreateConsumer(endpoint, "", true, asapo::SourceCredentials{beamtime, "", "", token}, &err); - exit_if_error("Cannot create consumer", err); - consumer->SetTimeout((uint64_t) 1000); - - auto group_id = consumer->GenerateNewGroupId(&err); - exit_if_error("Cannot create group id", err); - - asapo::MessageMeta fi; - asapo::MessageData data; - - err = consumer->GetLast(&fi, group_id, &data); - exit_if_error("Cannot get next record", err); - - std::cout << "id: " << fi.id << std::endl; - std::cout << "file name: " << fi.name << std::endl; - std::cout << "file content: " << reinterpret_cast<char const*>(data.get()) << std::endl; - return EXIT_SUCCESS; -} - diff --git a/examples/pipeline/in_to_out_python/check_linux.sh b/examples/pipeline/in_to_out_python/check_linux.sh index 04c4fef60afd82c449055f992e913efaa46f60c5..c2d2fb48b33293d587a0b5dae855962f8e289977 100644 --- a/examples/pipeline/in_to_out_python/check_linux.sh +++ b/examples/pipeline/in_to_out_python/check_linux.sh @@ -57,7 +57,9 @@ export PYTHONPATH=$2:$3:${PYTHONPATH} $1 $4 127.0.0.1:8400 $source_path $beamtime_id $data_source_in $data_source_out $token $timeout $timeout_producer $nthreads 1 > out cat out cat out | grep "Processed 3 file(s)" -cat out | grep "Sent 3 file(s)" +cat out | grep "Sent 5 file(s)" +cat out | grep bt_meta +cat out | grep st_meta echo "db.data_default.find({"_id":1})" | mongo ${outdatabase_name} | tee /dev/stderr | grep "file1_${data_source_out}" diff --git a/examples/pipeline/in_to_out_python/check_windows.bat b/examples/pipeline/in_to_out_python/check_windows.bat index 44d24a150d7af85f80ed349772f287b5821a1cb3..39aca5e1ae3f78893034667e9cae3b83513549f7 100644 --- a/examples/pipeline/in_to_out_python/check_windows.bat +++ b/examples/pipeline/in_to_out_python/check_windows.bat @@ -34,8 +34,10 @@ set PYTHONPATH=%2;%3 "%1" "%4" 127.0.0.1:8400 %source_path% %beamtime_id% %data_source_in% %data_source_out% %token% %timeout% %timeout_producer% %nthreads% 1 > out type out -findstr /I /L /C:"Processed 3 file(s)" out || goto :error +findstr /I /L /C:"Processed 5 file(s)" out || goto :error findstr /I /L /C:"Sent 3 file(s)" out || goto :error +findstr /I /L /C:"bt_meta" out || goto :error +findstr /I /L /C:"st_meta" out || goto :error echo db.data_default.find({"_id":1}) | %mongo_exe% %outdatabase_name% | findstr /c:"file1_%data_source_out%" || goto :error diff --git a/examples/pipeline/in_to_out_python/in_to_out.py b/examples/pipeline/in_to_out_python/in_to_out.py index 93e7328e040c8f6eceb1a37dcc555b8c6f6702c7..94d3802d786eb53b30626f0ac7ea07cb2c9b2f9e 100644 --- a/examples/pipeline/in_to_out_python/in_to_out.py +++ b/examples/pipeline/in_to_out_python/in_to_out.py @@ -36,6 +36,10 @@ group_id = consumer.generate_group_id() n_recv = 0 +producer.send_beamtime_meta('{"data":"bt_meta"}', callback = callback) +producer.send_stream_meta('{"data":"st_meta"}',stream = 'stream_in', callback = callback) + + if transfer_data: ingest_mode = asapo_producer.DEFAULT_INGEST_MODE else: @@ -55,5 +59,12 @@ while True: producer.wait_requests_finished(timeout_s_producer*1000) +consumer = asapo_consumer.create_consumer(source,path, True,beamtime,stream_out,token,timeout_s*1000) +bt_meta = consumer.get_beamtime_meta() +st_meta = consumer.get_stream_meta('stream_in') +print ('bt_meta:',bt_meta) +print ('st_meta:',st_meta) + + print ("Processed "+str(n_recv)+" file(s)") print ("Sent "+str(n_send)+" file(s)") diff --git a/producer/api/cpp/include/asapo/producer/producer.h b/producer/api/cpp/include/asapo/producer/producer.h index 5b684de318a317efbbb53b64230f92a756851ba2..21424aae9e02933d4faf5f358d1d805b02dfec81 100644 --- a/producer/api/cpp/include/asapo/producer/producer.h +++ b/producer/api/cpp/include/asapo/producer/producer.h @@ -131,7 +131,9 @@ class Producer { \param callback - callback function \return Error - will be nullptr on success */ - virtual Error SendStreamMetadata(const std::string& stream, const std::string& metadata, MetaIngestMode mode, + virtual Error SendStreamMetadata(const std::string& metadata, + MetaIngestMode mode, + const std::string& stream, RequestCallback callback) = 0; //! Set internal log level diff --git a/producer/api/cpp/src/producer_impl.cpp b/producer/api/cpp/src/producer_impl.cpp index a57ac7a46caf7f02be9f0e60e1eba240cfd76ca3..a8a213dfa7a54d88b4e31487fab026f986f3bed7 100644 --- a/producer/api/cpp/src/producer_impl.cpp +++ b/producer/api/cpp/src/producer_impl.cpp @@ -475,20 +475,22 @@ Error ProducerImpl::DeleteStream(std::string stream, uint64_t timeout_ms, Delete } Error ProducerImpl::SendBeamtimeMetadata(const std::string& metadata, MetaIngestMode mode, RequestCallback callback) { - return SendMeta("", metadata, mode, callback); + return SendMeta(metadata, mode, "", callback); } -Error ProducerImpl::SendStreamMetadata(const std::string& stream, - const std::string& metadata, +Error ProducerImpl::SendStreamMetadata(const std::string& metadata, MetaIngestMode mode, + const std::string& stream, RequestCallback callback) { if (stream.empty()) { return ProducerErrorTemplates::kWrongInput.Generate("stream is empty"); } - return SendMeta(stream, metadata, mode, callback); + return SendMeta(metadata, mode, stream, callback); } -Error ProducerImpl::SendMeta(std::string stream, const std::string& metadata, MetaIngestMode mode, +Error ProducerImpl::SendMeta(const std::string& metadata, + MetaIngestMode mode, + std::string stream, RequestCallback callback) { GenericRequestHeader request_header{kOpcodeTransferMetaData, 0, metadata.size(), 0, stream.empty() ? "beamtime_global.meta" : stream + ".meta", diff --git a/producer/api/cpp/src/producer_impl.h b/producer/api/cpp/src/producer_impl.h index ab59ff81ad32227715bfed176217059dc9c4c471..c4c7f3634a9a21e8ca42d5999242c214769e634e 100644 --- a/producer/api/cpp/src/producer_impl.h +++ b/producer/api/cpp/src/producer_impl.h @@ -68,7 +68,9 @@ class ProducerImpl : public Producer { Error SendMetadata(const std::string& metadata, RequestCallback callback) override; Error SendBeamtimeMetadata(const std::string& metadata, MetaIngestMode mode, RequestCallback callback) override; - Error SendStreamMetadata(const std::string& stream, const std::string& metadata, MetaIngestMode mode, + Error SendStreamMetadata(const std::string& metadata, + MetaIngestMode mode, + const std::string& stream, RequestCallback callback) override; uint64_t GetRequestsQueueSize() override; @@ -76,7 +78,10 @@ class ProducerImpl : public Producer { uint64_t GetRequestsQueueVolumeMb() override; void SetRequestsQueueLimits(uint64_t size, uint64_t volume) override; private: - Error SendMeta(const std::string stream, const std::string& metadata, MetaIngestMode mode, RequestCallback callback); + Error SendMeta(const std::string& metadata, + MetaIngestMode mode, + std::string stream, + RequestCallback callback); StreamInfo StreamRequest(StreamRequestOp op, std::string stream, uint64_t timeout_ms, Error* err) const; Error Send(const MessageHeader& message_header, std::string stream, MessageData data, std::string full_path, uint64_t ingest_mode, diff --git a/producer/api/cpp/unittests/test_producer_impl.cpp b/producer/api/cpp/unittests/test_producer_impl.cpp index 0eeea7f6979ba0699cfd6855e77ff30bc713525e..1fc2dea861192734e7bb5a3f6c5d1ad6787f428a 100644 --- a/producer/api/cpp/unittests/test_producer_impl.cpp +++ b/producer/api/cpp/unittests/test_producer_impl.cpp @@ -363,7 +363,7 @@ TEST_F(ProducerImplTests, OKAddingSendStreamDataRequest) { auto mode = asapo::MetaIngestMode{asapo::MetaIngestOp::kInsert, false}; - auto err = producer.SendStreamMetadata(expected_stream, expected_metadata, mode, nullptr); + auto err = producer.SendStreamMetadata(expected_metadata, mode, expected_stream, nullptr); ASSERT_THAT(err, Eq(nullptr)); } diff --git a/producer/api/python/asapo_producer.pxd b/producer/api/python/asapo_producer.pxd index f78491a2fe48610944078ed7dc8d0996da170b97..421e41f200f72f02fcfb4df7fdd9661d6a4d61ae 100644 --- a/producer/api/python/asapo_producer.pxd +++ b/producer/api/python/asapo_producer.pxd @@ -46,6 +46,15 @@ cdef extern from "asapo/asapo_producer.h" namespace "asapo": LogLevel LogLevel_Debug "asapo::LogLevel::Debug" LogLevel LogLevel_Warning "asapo::LogLevel::Warning" +cdef extern from "asapo/asapo_producer.h" namespace "asapo": + cppclass MetaIngestOp: + pass + MetaIngestOp kInsert "asapo::MetaIngestOp::kInsert" + MetaIngestOp kReplace "asapo::MetaIngestOp::kReplace" + MetaIngestOp kUpdate "asapo::MetaIngestOp::kUpdate" + struct MetaIngestMode: + MetaIngestOp op + bool upsert cdef extern from "asapo/asapo_producer.h" namespace "asapo": cppclass SourceType: @@ -110,6 +119,8 @@ cdef extern from "asapo/asapo_producer.h" namespace "asapo" nogil: StreamInfo GetLastStream(uint64_t timeout_ms, Error* err) Error GetVersionInfo(string* client_info,string* server_info, bool* supported) Error DeleteStream(string stream, uint64_t timeout_ms, DeleteStreamOptions options) + Error SendBeamtimeMetadata(string metadata, MetaIngestMode mode, RequestCallback callback) + Error SendStreamMetadata(string metadata, MetaIngestMode mode, string stream, RequestCallback callback) cdef extern from "asapo/asapo_producer.h" namespace "asapo": diff --git a/producer/api/python/asapo_producer.pyx.in b/producer/api/python/asapo_producer.pyx.in index 24d97cb5c3b5a51ed5b3ce05b2e529beaa9f6de4..218358a9a59bc943e5b10993416746d963e54ea3 100644 --- a/producer/api/python/asapo_producer.pyx.in +++ b/producer/api/python/asapo_producer.pyx.in @@ -34,6 +34,15 @@ cdef bytes _bytes(s): else: raise TypeError("Could not convert to unicode.") +cdef MetaIngestOp mode_to_c(mode): + if mode == 'replace': + return kReplace + elif mode == 'update': + return kUpdate + elif mode == 'insert': + return kInsert + else: + raise TypeError("Could not convert to unicode.") class AsapoProducerError(Exception): @@ -174,7 +183,54 @@ cdef class PyProducer: if callback != None: Py_XINCREF(<PyObject*>callback) return - + def send_stream_meta(self, metadata, mode = 'replace', upsert = True, stream='default', callback=None): + """ + :param stream: stream name, default "default" + :type stream: string + :param metadata: beamtime metadata in JSON format + :type metadata: string + :param mode: send mode + :type mode: string + :param upsert: send mode + :type upsert: bool + :param callback: callback function, default None + :type callback: callback(info,err), where info - json string with event header that was used to send data and response, err - error string or None + :raises: + AsapoWrongInputError: wrong input (authorization, ...) + AsapoProducerError: other error + """ + cdef MetaIngestMode mode_c + mode_c.op = mode_to_c(mode) + mode_c.upsert = upsert + err = self.c_producer.get().SendStreamMetadata(_bytes(metadata), mode_c,_bytes(stream), + unwrap_callback(<RequestCallbackCython>self.c_callback, <void*>self,<void*>callback if callback != None else NULL)) + if err: + throw_exception(err) + if callback != None: + Py_XINCREF(<PyObject*>callback) + def send_beamtime_meta(self, metadata, mode = 'replace', upsert = True, callback=None): + """ + :param metadata: beamtime metadata in JSON format + :type metadata: string + :param mode: send mode + :type mode: string + :param upsert: send mode + :type upsert: bool + :param callback: callback function, default None + :type callback: callback(info,err), where info - json string with event header that was used to send data and response, err - error string or None + :raises: + AsapoWrongInputError: wrong input (authorization, ...) + AsapoProducerError: other error + """ + cdef MetaIngestMode mode_c + mode_c.op = mode_to_c(mode) + mode_c.upsert = upsert + err = self.c_producer.get().SendBeamtimeMetadata(_bytes(metadata), mode_c, + unwrap_callback(<RequestCallbackCython>self.c_callback, <void*>self,<void*>callback if callback != None else NULL)) + if err: + throw_exception(err) + if callback != None: + Py_XINCREF(<PyObject*>callback) def send(self, uint64_t id, exposed_path, data, user_meta=None, dataset=None, ingest_mode = DEFAULT_INGEST_MODE, stream = "default", callback=None): """ :param id: unique data id diff --git a/tests/automatic/consumer/consumer_api_python/check_linux.sh b/tests/automatic/consumer/consumer_api_python/check_linux.sh index 85fb840de7746600ec25e512a8af8dc50c0d2919..072a55de699e43b9f343aed00b612f54177e465c 100644 --- a/tests/automatic/consumer/consumer_api_python/check_linux.sh +++ b/tests/automatic/consumer/consumer_api_python/check_linux.sh @@ -28,6 +28,10 @@ done echo 'db.data_streamfts.insert({"_id":'1',"size":0,"name":"'1'","timestamp":1000,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"test":10}})' | mongo ${database_name} +echo 'db.meta.insert({"_id":"bt","data":"test_bt"})' | mongo ${database_name} +echo 'db.meta.insert({"_id":"st_test","data":"test_st"})' | mongo ${database_name} + + for i in `seq 1 5`; do echo 'db.data_stream1.insert({"_id":'$i',"size":6,"name":"'1$i'","timestamp":2000,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"test":10}})' | mongo ${database_name} diff --git a/tests/automatic/consumer/consumer_api_python/check_windows.bat b/tests/automatic/consumer/consumer_api_python/check_windows.bat index 68824528f2e442d9d63594785785a7e0b38ea9cf..9522114bef59c749796241ae9d32d9b3837bdc2c 100644 --- a/tests/automatic/consumer/consumer_api_python/check_windows.bat +++ b/tests/automatic/consumer/consumer_api_python/check_windows.bat @@ -14,6 +14,10 @@ for /l %%x in (1, 1, 5) do echo db.data_default.insert({"_id":%%x,"size":6,"name echo db.data_streamfts.insert({"_id":1,"size":0,"name":"1","timestamp":1000,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"test":10}}) | %mongo_exe% %database_name% || goto :error +echo 'db.meta.insert({"_id":"bt","data":"test_bt"})' | %mongo_exe% %database_name% +echo 'db.meta.insert({"_id":"st_test","data":"test_st"})' | %mongo_exe% %database_name% + + for /l %%x in (1, 1, 5) do echo db.data_stream1.insert({"_id":%%x,"size":6,"name":"1%%x","timestamp":2000,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"test":10}}) | %mongo_exe% %database_name% || goto :error for /l %%x in (1, 1, 5) do echo db.data_stream2.insert({"_id":%%x,"size":6,"name":"2%%x","timestamp":3000,"source":"none","buf_id":0,"dataset_substream":0,"meta":{"test":10}}) | %mongo_exe% %database_name% || goto :error diff --git a/tests/automatic/consumer/consumer_api_python/consumer_api.py b/tests/automatic/consumer/consumer_api_python/consumer_api.py index 54baf5d799b45790d3ca803aeaa3f78927c29ed6..18a8dd3814210a20761dd626bf46325519fdb00c 100644 --- a/tests/automatic/consumer/consumer_api_python/consumer_api.py +++ b/tests/automatic/consumer/consumer_api_python/consumer_api.py @@ -232,7 +232,21 @@ def check_single(consumer, group_id): else: exit_on_noerr("wrong query") - # delete stream + # metadata + bt_meta = consumer.get_beamtime_meta() + assert_eq(bt_meta['data'], 'test_bt', "beamtime meta ") + st_meta = consumer.get_stream_meta("test") + assert_eq(st_meta['data'], 'test_st', "stream meta ") + + try: + consumer.get_stream_meta("notexist") + except asapo_consumer.AsapoNoDataError as err: + print(err) + pass + else: + exit_on_noerr("should be wrong input on non existing stream") + +# delete stream consumer.delete_stream(stream='default') try: