diff --git a/common/cpp/include/asapo/common/internal/version.h.in b/common/cpp/include/asapo/common/internal/version.h.in index 22f7de8e83814a9c7a693cddfa20cbdbb37d1544..79fcea4136e61555e17ee7fc03f5d14d7f66a54a 100644 --- a/common/cpp/include/asapo/common/internal/version.h.in +++ b/common/cpp/include/asapo/common/internal/version.h.in @@ -37,6 +37,7 @@ inline int VersionToNumber(const std::string& version) { } Error ExtractVersionFromResponse(const std::string &response, + const std::string &client, std::string* server_info, bool* supported); } diff --git a/common/cpp/src/version/version.cpp b/common/cpp/src/version/version.cpp index 1f9d2a39285946f5f3a3a2322f6c54b800c31bf3..57600be0ceb58b7e3515959b643194018d573310 100644 --- a/common/cpp/src/version/version.cpp +++ b/common/cpp/src/version/version.cpp @@ -4,6 +4,7 @@ namespace asapo { Error ExtractVersionFromResponse(const std::string &response, + const std::string &client, std::string* server_info, bool* supported) { JsonStringParser parser(response); @@ -16,7 +17,7 @@ Error ExtractVersionFromResponse(const std::string &response, } if (server_info) { *server_info = - "Server version: " + server_version + ", protocol on server: " + current_client_protocol; + "Server version: " + server_version + ", " + client + " protocol on server: " + current_client_protocol; } if (supported) { *supported = client_supported == "yes"; diff --git a/consumer/api/cpp/src/consumer_impl.cpp b/consumer/api/cpp/src/consumer_impl.cpp index 6ac36b34e5290de72dabab928d7c1eb35ec95d72..7d3d565e5db6a80a6382e31b6c39adaf7d9f504c 100644 --- a/consumer/api/cpp/src/consumer_impl.cpp +++ b/consumer/api/cpp/src/consumer_impl.cpp @@ -924,7 +924,7 @@ Error ConsumerImpl::GetServerVersionInfo(std::string* server_info, bool* support if (err) { return err; } - return ExtractVersionFromResponse(output.string_output,server_info,supported); + return ExtractVersionFromResponse(output.string_output,"consumer",server_info,supported); } Error ConsumerImpl::GetVersionInfo(std::string* client_info, std::string* server_info, bool* supported) { diff --git a/consumer/api/python/asapo_consumer.pxd b/consumer/api/python/asapo_consumer.pxd index f0c9fa99c9dc5885b8bee77d9227d89db15d4724..fa755a375a83d05f4469cb4fad4030ea0130caf2 100644 --- a/consumer/api/python/asapo_consumer.pxd +++ b/consumer/api/python/asapo_consumer.pxd @@ -87,6 +87,7 @@ cdef extern from "asapo/asapo_consumer.h" namespace "asapo" nogil: vector[StreamInfo] GetStreamList(string from_stream, StreamFilter filter, Error* err) void SetResendNacs(bool resend, uint64_t delay_ms, uint64_t resend_attempts) void InterruptCurrentOperation() + Error GetVersionInfo(string* client_info,string* server_info, bool* supported) cdef extern from "asapo/asapo_consumer.h" namespace "asapo" nogil: cdef cppclass ConsumerFactory: @@ -103,6 +104,8 @@ cdef extern from "asapo/asapo_consumer.h" namespace "asapo": ErrorTemplateInterface kLocalIOError "asapo::ConsumerErrorTemplates::kLocalIOError" ErrorTemplateInterface kWrongInput "asapo::ConsumerErrorTemplates::kWrongInput" ErrorTemplateInterface kPartialData "asapo::ConsumerErrorTemplates::kPartialData" + ErrorTemplateInterface kUnsupportedClient "asapo::ConsumerErrorTemplates::kUnsupportedClient" + cdef cppclass ConsumerErrorData: uint64_t id diff --git a/consumer/api/python/asapo_consumer.pyx.in b/consumer/api/python/asapo_consumer.pyx.in index 88d5be838750d5e531875823885e41a3da7369c9..1db15ed4a72531c1bb9403c632a4ea9231f4da58 100644 --- a/consumer/api/python/asapo_consumer.pyx.in +++ b/consumer/api/python/asapo_consumer.pyx.in @@ -46,6 +46,9 @@ class AsapoUnavailableServiceError(AsapoConsumerError): class AsapoLocalIOError(AsapoConsumerError): pass +class AsapoUnsupportedClientError(AsapoConsumerError): + pass + class AsapoStreamFinishedError(AsapoConsumerError): def __init__(self,message,id_max=None,next_stream=None): AsapoConsumerError.__init__(self,message) @@ -99,6 +102,8 @@ cdef throw_exception(Error& err, res = None): raise AsapoUnavailableServiceError(error_string) elif err == kInterruptedTransaction: raise AsapoInterruptedTransactionError(error_string) + elif err == kUnsupportedClient: + raise AsapoUnsupportedClientError(error_string) else: raise AsapoConsumerError(error_string) @@ -211,6 +216,21 @@ cdef class PyConsumer: if err: throw_exception(err) return + def get_version_info(self, from_server = "true"): + cdef string client_info,server_info + cdef bool supported + cdef string* p_server_info = &server_info if from_server else <string*>NULL + cdef bool* p_supported = &supported if from_server else <bool*>NULL + cdef Error err + with nogil: + err = self.c_consumer.get().GetVersionInfo(&client_info,p_server_info,p_supported) + if err: + throw_exception(err) + version = {} + if from_server: + return {'client': _str(client_info), 'server': _str(server_info), 'supported': supported} + else: + return {'client': _str(client_info)} def reset_lastread_marker(self,group_id, stream = "default"): cdef string b_group_id = _bytes(group_id) cdef string b_stream = _bytes(stream) diff --git a/producer/api/cpp/src/producer_impl.cpp b/producer/api/cpp/src/producer_impl.cpp index 6779fbacb0e8f88707bdf6d2d1d526024baf294f..ba3422ad66376aac5c93a2983b7422e30b721557 100644 --- a/producer/api/cpp/src/producer_impl.cpp +++ b/producer/api/cpp/src/producer_impl.cpp @@ -409,14 +409,15 @@ Error ProducerImpl::GetVersionInfo(std::string* client_info, std::string* server Error ProducerImpl::GetServerVersionInfo(std::string* server_info, bool* supported) const { - auto endpoint = endpoint_ +"/asapo-discovery/"+kProducerProtocol.GetDiscoveryVersion()+"/version?client=producer&protocol="+kProducerProtocol.GetVersion(); + auto endpoint = endpoint_ +"/asapo-discovery/"+kProducerProtocol.GetDiscoveryVersion()+ + "/version?client=producer&protocol="+kProducerProtocol.GetVersion(); HttpCode code; Error err; auto response = httpclient__->Get(endpoint, &code, &err); if (err) { return err; } - return ExtractVersionFromResponse(response,server_info,supported); + return ExtractVersionFromResponse(response,"producer",server_info,supported); } } \ No newline at end of file diff --git a/producer/api/python/asapo_producer.pxd b/producer/api/python/asapo_producer.pxd index f35a341c933c0dd447ea318ef8b0bae1936c3e94..cd627b6203aa0e9322625f893d7203dfd392c6a1 100644 --- a/producer/api/python/asapo_producer.pxd +++ b/producer/api/python/asapo_producer.pxd @@ -22,6 +22,7 @@ cdef extern from "asapo/asapo_producer.h" namespace "asapo": ErrorTemplateInterface kLocalIOError "asapo::ProducerErrorTemplates::kLocalIOError" ErrorTemplateInterface kServerWarning "asapo::ProducerErrorTemplates::kServerWarning" ErrorTemplateInterface kRequestPoolIsFull "asapo::ProducerErrorTemplates::kRequestPoolIsFull" + ErrorTemplateInterface kUnsupportedClient "asapo::ProducerErrorTemplates::kUnsupportedClient" cdef extern from "asapo/asapo_producer.h" namespace "asapo": cppclass MessageData: @@ -104,6 +105,7 @@ cdef extern from "asapo/asapo_producer.h" namespace "asapo" nogil: Error SendStreamFinishedFlag(string stream, uint64_t last_id, string next_stream, RequestCallback callback) StreamInfo GetStreamInfo(string stream, uint64_t timeout_ms, Error* err) StreamInfo GetLastStream(uint64_t timeout_ms, Error* err) + Error GetVersionInfo(string* client_info,string* server_info, bool* supported) cdef extern from "asapo/asapo_producer.h" namespace "asapo": diff --git a/producer/api/python/asapo_producer.pyx.in b/producer/api/python/asapo_producer.pyx.in index e40d7a5c9e233f7fab8e14802e2cf828ea0f2e23..5391c8ce8ea41e3eada8ad182b8e4cc755d3feb1 100644 --- a/producer/api/python/asapo_producer.pyx.in +++ b/producer/api/python/asapo_producer.pyx.in @@ -54,6 +54,9 @@ class AsapoServerWarning(AsapoProducerError): class AsapoRequestsPoolIsFull(AsapoProducerError): pass +class AsapoUnsupportedClientError(AsapoProducerError): + pass + cdef python_exception_from_error(Error& err): error_string = _str(err.get().Explain()) if err == kTimeout: @@ -66,6 +69,8 @@ cdef python_exception_from_error(Error& err): return AsapoServerWarning(error_string) elif err == kRequestPoolIsFull: return AsapoRequestsPoolIsFull(error_string) + elif err == kUnsupportedClient: + raise AsapoUnsupportedClientError(error_string) else: return AsapoProducerError(error_string) @@ -110,7 +115,21 @@ cdef class PyProducer: print("wrong loglevel mode: "+ level) return self.c_producer.get().SetLogLevel(log_level) - + def get_version_info(self, from_server = "true"): + cdef string client_info,server_info + cdef bool supported + cdef string* p_server_info = &server_info if from_server else <string*>NULL + cdef bool* p_supported = &supported if from_server else <bool*>NULL + cdef Error err + with nogil: + err = self.c_producer.get().GetVersionInfo(&client_info,p_server_info,p_supported) + if err: + throw_exception(err) + version = {} + if from_server: + return {'client': _str(client_info), 'server': _str(server_info), 'supported': supported} + else: + return {'client': _str(client_info)} def __send_np_array(self, id, exposed_path,data, user_meta=None,dataset=None,stream="default",ingest_mode = DEFAULT_INGEST_MODE,callback=None): cdef MessageHeader message_header = self.create_message_header(id,exposed_path,user_meta,dataset,ingest_mode) if data is None: diff --git a/tests/automatic/consumer/consumer_api/consumer_api.cpp b/tests/automatic/consumer/consumer_api/consumer_api.cpp index 91d802d18e75632333eaac2a0fa657047faa8eee..a6f889c2abb8d5943c6c68bb8153a7db9e8e16ea 100644 --- a/tests/automatic/consumer/consumer_api/consumer_api.cpp +++ b/tests/automatic/consumer/consumer_api/consumer_api.cpp @@ -31,6 +31,14 @@ void TestSingle(const std::unique_ptr<asapo::Consumer>& consumer, const std::str asapo::MessageMeta fi; asapo::Error err; + std::string client,server; + bool supported; + err = consumer->GetVersionInfo(&client,&server,&supported); + M_AssertTrue(err == nullptr, "Version OK"); + M_AssertTrue(supported, "client supported by server"); + M_AssertTrue(!client.empty(), "client version"); + M_AssertTrue(!server.empty(), "server version"); + err = consumer->GetNext(group_id, &fi, nullptr, "default"); if (err) { std::cout << err->Explain() << std::endl; diff --git a/tests/automatic/consumer/consumer_api_python/consumer_api.py b/tests/automatic/consumer/consumer_api_python/consumer_api.py index b150e5fb57b5fb14c76e78d1662d6525d43e014b..1b5bf5a153d00bfbd4336529a2ee9a9b9823799f 100644 --- a/tests/automatic/consumer/consumer_api_python/consumer_api.py +++ b/tests/automatic/consumer/consumer_api_python/consumer_api.py @@ -29,6 +29,13 @@ def assert_usermetadata(meta, name): print('meta: ', json.dumps(meta, indent=4, sort_keys=True)) sys.exit(1) +def assert_version(version): + print("asserting version ",version) + ok = version['supported'] and version['client'] and version['server'] + if not ok: + sys.exit(1) + + def assert_eq(val, expected, name): print("asserting eq for " + name) @@ -48,6 +55,10 @@ def check_file_transfer_service(consumer, group_id): def check_single(consumer, group_id): global thread_res + + version = consumer.get_version_info() + assert_version(version) + _, meta = consumer.get_next(group_id, meta_only=True) assert_metaname(meta, "1", "get next1") assert_usermetadata(meta, "get next1") diff --git a/tests/automatic/producer/python_api/producer_api.py b/tests/automatic/producer/python_api/producer_api.py index 11859cc789bf989e00d866f9d45a06f741106a54..2b419865037cb5cc806da97d3bff01743045c5df 100644 --- a/tests/automatic/producer/python_api/producer_api.py +++ b/tests/automatic/producer/python_api/producer_api.py @@ -36,11 +36,21 @@ def callback(payload, err): print("successfuly sent: ", payload) lock.release() +def assert_version(version): + print("asserting version ",version) + ok = version['supported'] and version['client'] and version['server'] + if not ok: + sys.exit(1) producer = asapo_producer.create_producer(endpoint,'processed', beamtime, 'auto', data_source, token, nthreads, 60000) producer.set_log_level("debug") + +version = producer.get_version_info() +assert_version(version) + + # send single file producer.send_file(1, local_path="./file1", exposed_path="processed/" + data_source + "/" + "file1", user_meta='{"test_key":"test_val"}', callback=callback)