From 546b29968a1d239fe0dde53b7d71450054e9afd8 Mon Sep 17 00:00:00 2001 From: Sergey Yakubov <sergey.yakubov@desy.de> Date: Fri, 12 Jun 2020 11:39:56 +0200 Subject: [PATCH] fix using non-positive ids --- CHANGELOG.md | 9 +++++++-- consumer/api/cpp/src/server_data_broker.cpp | 4 ++++ consumer/api/cpp/unittests/test_server_broker.cpp | 7 +++++++ consumer/api/python/asapo_consumer.pyx.in | 9 ++++----- producer/api/cpp/src/producer_impl.cpp | 4 ++++ producer/api/cpp/unittests/test_producer_impl.cpp | 5 +++++ producer/api/python/asapo_producer.pxd | 5 ++--- producer/api/python/asapo_producer.pyx.in | 13 ++++++------- tests/automatic/producer/python_api/producer_api.py | 8 ++++++++ 9 files changed, 47 insertions(+), 17 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index bea6ad874..63ce33ba3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,10 +1,16 @@ ## 20.06.0 (unreleased) +FEATURES +* implemented acknowledeges - one can acknowledge a data tuple, get last acknowledged tuple id, get list of unacknowledged tuple ids IMPROVEMENTS -* change behavior when trying to get data from a substream that not exist - return EndOfStream instead of WrongInput +* change behavior when trying to get data from a substream that does not exist - return EndOfStream instead of WrongInput +* change behavior of GetLastXX/get_lastXX functions - current pointer is not being set to the end of a substream after this command anymore * substream name added to producer callback output for Python * added simple C++ examples +BUG FIXES +* check data tuple ids should be positive + ## 20.03.0 FEATURES * introduced substreams for producer/consumer @@ -12,7 +18,6 @@ FEATURES * producer accepts "auto" for beamtime, will automatically select a current one for a given beamline * introduced file transfer service - possibility for consumer clients to receive data also in case filesystem is inaccessible - IMPROVEMENTS * switch to MongoDB 4.2 * API documentation is available for C++ and Python diff --git a/consumer/api/cpp/src/server_data_broker.cpp b/consumer/api/cpp/src/server_data_broker.cpp index 7421b46d4..da5a65a4a 100644 --- a/consumer/api/cpp/src/server_data_broker.cpp +++ b/consumer/api/cpp/src/server_data_broker.cpp @@ -448,6 +448,10 @@ uint64_t ServerDataBroker::GetCurrentSize(Error* err) { return GetCurrentSize(kDefaultSubstream, err); } Error ServerDataBroker::GetById(uint64_t id, FileInfo* info, std::string group_id, FileData* data) { + if (id == 0) { + return ConsumerErrorTemplates::kWrongInput.Generate("id should be positive"); + } + return GetById(id, info, std::move(group_id), kDefaultSubstream, data); } diff --git a/consumer/api/cpp/unittests/test_server_broker.cpp b/consumer/api/cpp/unittests/test_server_broker.cpp index 4e3f6ffe7..8d11e7b8a 100644 --- a/consumer/api/cpp/unittests/test_server_broker.cpp +++ b/consumer/api/cpp/unittests/test_server_broker.cpp @@ -1216,6 +1216,13 @@ TEST_F(ServerDataBrokerTests, GetLastAcknowledgeReturnsNoData) { ASSERT_THAT(ind, Eq(0)); } +TEST_F(ServerDataBrokerTests, GetByIdErrorsForId0) { + + auto err = data_broker->GetById(0, &info, expected_group_id, nullptr); + + ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kWrongInput)); +} + } diff --git a/consumer/api/python/asapo_consumer.pyx.in b/consumer/api/python/asapo_consumer.pyx.in index 6af1f7f80..9b59f9503 100644 --- a/consumer/api/python/asapo_consumer.pyx.in +++ b/consumer/api/python/asapo_consumer.pyx.in @@ -130,7 +130,7 @@ cdef class PyDataBroker: return self._op("next",group_id,substream,meta_only,0) def get_last(self, group_id, substream = "default", meta_only = True): return self._op("last",group_id,substream,meta_only,0) - def get_by_id(self,id,group_id, substream = "default",meta_only = True): + def get_by_id(self,uint64_t id,group_id, substream = "default",meta_only = True): return self._op("id",group_id,substream,meta_only,id) def retrieve_data(self,meta): json_str = json.dumps(meta) @@ -200,13 +200,12 @@ cdef class PyDataBroker: list.append(_str(substream)) return list - def acknowledge(self, group_id, id, substream = "default"): + def acknowledge(self, group_id, uint64_t id, substream = "default"): cdef string b_group_id = _bytes(group_id) cdef string b_substream = _bytes(substream) - cdef uint64_t uint64_id = id cdef Error err with nogil: - err = self.c_broker.Acknowledge(b_group_id,uint64_id,b_substream) + err = self.c_broker.Acknowledge(b_group_id,id,b_substream) if err: throw_exception(err) @@ -273,7 +272,7 @@ cdef class PyDataBroker: return self._op_dataset("next",group_id,substream,0) def get_last_dataset(self, group_id, substream = "default"): return self._op_dataset("last",group_id,substream,0) - def get_dataset_by_id(self, id, group_id, substream = "default"): + def get_dataset_by_id(self, uint64_t id, group_id, substream = "default"): return self._op_dataset("id",group_id,substream,id) def get_beamtime_meta(self): cdef Error err diff --git a/producer/api/cpp/src/producer_impl.cpp b/producer/api/cpp/src/producer_impl.cpp index ff32dbb7f..2d2774609 100644 --- a/producer/api/cpp/src/producer_impl.cpp +++ b/producer/api/cpp/src/producer_impl.cpp @@ -73,6 +73,10 @@ Error CheckProducerRequest(const EventHeader& event_header, uint64_t ingest_mode return ProducerErrorTemplates::kWrongInput.Generate("subset dimensions"); } + if (event_header.file_id == 0) { + return ProducerErrorTemplates::kWrongInput.Generate("data tuple id should be positive"); + } + return CheckIngestMode(ingest_mode); } diff --git a/producer/api/cpp/unittests/test_producer_impl.cpp b/producer/api/cpp/unittests/test_producer_impl.cpp index aff2e2ffc..36f94e2fb 100644 --- a/producer/api/cpp/unittests/test_producer_impl.cpp +++ b/producer/api/cpp/unittests/test_producer_impl.cpp @@ -144,6 +144,11 @@ TEST_F(ProducerImplTests, ErrorIfNoDataSend_) { ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kWrongInput)); } +TEST_F(ProducerImplTests, ErrorIfSendingDataWithZeroId) { + asapo::EventHeader event_header{0, 100, expected_fullpath}; + auto err = producer.SendData(event_header, nullptr, asapo::kTransferMetaDataOnly, nullptr); + ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kWrongInput)); +} TEST_F(ProducerImplTests, OkIfNoDataWithTransferMetadataOnlyMode) { asapo::EventHeader event_header{1, 100, expected_fullpath}; diff --git a/producer/api/python/asapo_producer.pxd b/producer/api/python/asapo_producer.pxd index b4e2386bc..b862d98eb 100644 --- a/producer/api/python/asapo_producer.pxd +++ b/producer/api/python/asapo_producer.pxd @@ -2,9 +2,8 @@ from libcpp.memory cimport unique_ptr from libcpp.string cimport string from libcpp.vector cimport vector from libcpp cimport bool - -ctypedef unsigned char uint8_t -ctypedef unsigned long uint64_t +from libc.stdint cimport uint8_t +from libc.stdint cimport uint64_t ctypedef unique_ptr[ErrorInterface] Error diff --git a/producer/api/python/asapo_producer.pyx.in b/producer/api/python/asapo_producer.pyx.in index b90bf1b44..bca43cb22 100644 --- a/producer/api/python/asapo_producer.pyx.in +++ b/producer/api/python/asapo_producer.pyx.in @@ -106,9 +106,8 @@ cdef class PyProducer: return self.c_producer.get().SetLogLevel(log_level) - def __send_np_array(self,int id, exposed_path,data, user_meta=None,subset=None,substream="default",ingest_mode = DEFAULT_INGEST_MODE,callback=None): + def __send_np_array(self, id, exposed_path,data, user_meta=None,subset=None,substream="default",ingest_mode = DEFAULT_INGEST_MODE,callback=None): cdef EventHeader event_header = self.create_event_header(id,exposed_path,user_meta,subset,ingest_mode) - event_header.file_id = id if data is None: event_header.file_size = 0 else: @@ -127,7 +126,7 @@ cdef class PyProducer: Py_XINCREF(<PyObject*>callback) return - cdef EventHeader create_event_header(self,int id, exposed_path,user_meta,subset,ingest_mode): + cdef EventHeader create_event_header(self,uint64_t id, exposed_path,user_meta,subset,ingest_mode): cdef EventHeader event_header event_header.file_id = id event_header.file_name = _bytes(exposed_path) @@ -140,7 +139,7 @@ cdef class PyProducer: event_header.subset_size = subset[1] return event_header - def __send_bytes(self,int id, exposed_path,data, user_meta=None,subset=None, substream="default", ingest_mode = DEFAULT_INGEST_MODE,callback=None): + def __send_bytes(self, id, exposed_path,data, user_meta=None,subset=None, substream="default", ingest_mode = DEFAULT_INGEST_MODE,callback=None): cdef EventHeader event_header = self.create_event_header(id,exposed_path,user_meta,subset,ingest_mode) event_header.file_size = len(data) err = self.c_producer.get().SendData__(event_header,_bytes(substream), data_pointer_bytes(data), ingest_mode, @@ -153,7 +152,7 @@ cdef class PyProducer: Py_XINCREF(<PyObject*>callback) return - def send_data(self, id, exposed_path, data, user_meta=None, subset=None, substream = "default", ingest_mode = DEFAULT_INGEST_MODE, callback=None): + def send_data(self, uint64_t id, exposed_path, data, user_meta=None, subset=None, substream = "default", ingest_mode = DEFAULT_INGEST_MODE, callback=None): """ :param id: unique data id :type id: int @@ -181,7 +180,7 @@ cdef class PyProducer: self.__send_bytes(id,exposed_path,data,user_meta,subset,substream,ingest_mode,callback) else: raise(AsapoProducerError("wrong data type: " + str(type(data)))) - def send_substream_finished_flag(self, substream, last_id, next_substream = None, callback = None): + def send_substream_finished_flag(self, substream, uint64_t last_id, next_substream = None, callback = None): """ :param substream: substream name :type substream: string @@ -200,7 +199,7 @@ cdef class PyProducer: throw_exception(err) - def send_file(self, id, local_path, exposed_path, user_meta=None, subset=None, substream = "default", ingest_mode = DEFAULT_INGEST_MODE, callback=None): + def send_file(self, uint64_t id, local_path, exposed_path, user_meta=None, subset=None, substream = "default", ingest_mode = DEFAULT_INGEST_MODE, callback=None): """ :param id: unique data id :type id: int diff --git a/tests/automatic/producer/python_api/producer_api.py b/tests/automatic/producer/python_api/producer_api.py index 3146c57b8..92df017b5 100644 --- a/tests/automatic/producer/python_api/producer_api.py +++ b/tests/automatic/producer/python_api/producer_api.py @@ -73,6 +73,14 @@ else: print("should be error sending non-cont array") sys.exit(1) +try: + producer.send_file(0, local_path = "./not_exist",exposed_path = "./whatever", + ingest_mode = asapo_producer.INGEST_MODE_TRANSFER_METADATA_ONLY, callback = callback) +except asapo_producer.AsapoWrongInputError as e: + print(e) +else: + print("should be error sending id 0 ") + sys.exit(1) #send single file once again producer.send_file(1, local_path = "./file1", exposed_path = stream+"/"+"file1", user_meta = '{"test_key":"test_val"}', callback = callback) -- GitLab