diff --git a/common/cpp/include/request/request_pool.h b/common/cpp/include/request/request_pool.h index fe38451ea706f1b3d6c0185f6bf70d9a27667079..2c0be787d7b1f5d04dec730e2c5a29dff8254804 100644 --- a/common/cpp/include/request/request_pool.h +++ b/common/cpp/include/request/request_pool.h @@ -24,9 +24,10 @@ class RequestPool { explicit RequestPool(uint8_t n_threads, RequestHandlerFactory* request_handler_factory, const AbstractLogger* log); VIRTUAL Error AddRequest(GenericRequestPtr request); VIRTUAL Error AddRequests(GenericRequests requests); - ~RequestPool(); - uint64_t NRequestsInQueue(); + VIRTUAL uint64_t NRequestsInPool(); + VIRTUAL Error WaitRequestsFinished(uint64_t timeout_ms); + VIRTUAL void StopThreads(); private: const AbstractLogger* log__; RequestHandlerFactory* request_handler_factory__; @@ -41,6 +42,7 @@ class RequestPool { GenericRequestPtr GetRequestFromQueue(); void PutRequestBackToQueue(GenericRequestPtr request); uint64_t shared_counter_{0}; + uint64_t requests_in_progress_{0}; }; diff --git a/common/cpp/src/request/request_pool.cpp b/common/cpp/src/request/request_pool.cpp index f84f56c32b201584ede66b25e54b0c9fe680c88d..7d9b6c03bbe95d66d8f84b2faf3c2e89441017dc 100644 --- a/common/cpp/src/request/request_pool.cpp +++ b/common/cpp/src/request/request_pool.cpp @@ -42,9 +42,11 @@ void RequestPool::ProcessRequest(const std::unique_ptr<RequestHandler>& request_ ThreadInformation* thread_info) { request_handler->PrepareProcessingRequestLocked(); auto request = GetRequestFromQueue(); + requests_in_progress_++; thread_info->lock.unlock(); auto success = request_handler->ProcessRequestUnlocked(request.get()); thread_info->lock.lock(); + requests_in_progress_--; request_handler->TearDownProcessingRequestLocked(success); if (!success) { std::this_thread::sleep_for(std::chrono::milliseconds(1000)); @@ -69,21 +71,12 @@ void RequestPool::ThreadHandler(uint64_t id) { } RequestPool::~RequestPool() { - mutex_.lock(); - quit_ = true; - mutex_.unlock(); - condition_.notify_all(); - - for(size_t i = 0; i < threads_.size(); i++) { - if(threads_[i].joinable()) { - log__->Debug("finishing thread " + std::to_string(i)); - threads_[i].join(); - } - } + StopThreads(); } -uint64_t RequestPool::NRequestsInQueue() { + +uint64_t RequestPool::NRequestsInPool() { std::lock_guard<std::mutex> lock{mutex_}; - return request_queue_.size(); + return request_queue_.size() + requests_in_progress_; } Error RequestPool::AddRequests(GenericRequests requests) { std::unique_lock<std::mutex> lock(mutex_); @@ -97,4 +90,35 @@ Error RequestPool::AddRequests(GenericRequests requests) { } +Error RequestPool::WaitRequestsFinished(uint64_t timeout_ms) { + uint64_t elapsed_ms = 0; + while (true) { + auto n_requests = NRequestsInPool(); + if (n_requests == 0) { + break; + } + if (elapsed_ms >= timeout_ms) { + return IOErrorTemplates::kTimeout.Generate(); + } + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + elapsed_ms += 100; + } + return nullptr; +} + +void RequestPool::StopThreads() { + mutex_.lock(); + quit_ = true; + mutex_.unlock(); + condition_.notify_all(); + + for(size_t i = 0; i < threads_.size(); i++) { + if(threads_[i].joinable()) { + log__->Debug("finishing thread " + std::to_string(i)); + threads_[i].join(); + } + } + +} + } diff --git a/common/cpp/unittests/request/mocking.h b/common/cpp/unittests/request/mocking.h index 94cd423596eb9db0dfc5e024050703c642e6bb39..9e9ef6d5c13fc5b32fda21a53d8b22b95e8af306 100644 --- a/common/cpp/unittests/request/mocking.h +++ b/common/cpp/unittests/request/mocking.h @@ -18,6 +18,7 @@ class MockRequestHandler : public RequestHandler { MOCK_METHOD1(ProcessRequestUnlocked_t, bool (const GenericRequest* request)); bool ProcessRequestUnlocked(GenericRequest* request) override { + std::this_thread::sleep_for(std::chrono::milliseconds(50)); return ProcessRequestUnlocked_t(request); } diff --git a/common/cpp/unittests/request/test_request_pool.cpp b/common/cpp/unittests/request/test_request_pool.cpp index 1f71c1d39cf1d41586cf762dee1d5da51a9ec4a3..91c5cb790ce5c392dc3df1797999c21eb83be0f5 100644 --- a/common/cpp/unittests/request/test_request_pool.cpp +++ b/common/cpp/unittests/request/test_request_pool.cpp @@ -93,11 +93,13 @@ TEST_F(RequestPoolTests, AddRequestDoesNotGoFurtherWhenNotReady) { ASSERT_THAT(err, Eq(nullptr)); } -TEST_F(RequestPoolTests, NRequestsInQueue) { - auto nreq = pool.NRequestsInQueue(); +TEST_F(RequestPoolTests, NRequestsInPoolInitial) { + auto nreq = pool.NRequestsInPool(); ASSERT_THAT(nreq, Eq(0)); } + + void ExpectSend(MockRequestHandler* mock_handler, int ntimes = 1) { EXPECT_CALL(*mock_handler, ReadyProcessRequest()).Times(ntimes).WillRepeatedly(Return(true)); EXPECT_CALL(*mock_handler, PrepareProcessingRequestLocked()).Times(ntimes); @@ -106,7 +108,6 @@ void ExpectSend(MockRequestHandler* mock_handler, int ntimes = 1) { } - TEST_F(RequestPoolTests, AddRequestCallsSend) { ExpectSend(mock_request_handler); @@ -117,6 +118,30 @@ TEST_F(RequestPoolTests, AddRequestCallsSend) { ASSERT_THAT(err, Eq(nullptr)); } +TEST_F(RequestPoolTests, NRequestsInPool) { + + pool.AddRequest(std::move(request)); + auto nreq = pool.NRequestsInPool(); + + ASSERT_THAT(nreq, Eq(1)); +} + +TEST_F(RequestPoolTests, NRequestsInPoolAccountsForRequestsInProgress) { + ExpectSend(mock_request_handler, 1); + + pool.AddRequest(std::move(request)); + + std::this_thread::sleep_for(std::chrono::milliseconds(30)); + + auto nreq1 = pool.NRequestsInPool(); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + + auto nreq2 = pool.NRequestsInPool(); + + ASSERT_THAT(nreq1, Eq(1)); + ASSERT_THAT(nreq2, Eq(0)); +} + TEST_F(RequestPoolTests, AddRequestCallsSendTwoRequests) { @@ -124,13 +149,11 @@ TEST_F(RequestPoolTests, AddRequestCallsSendTwoRequests) { ExpectSend(mock_request_handler, 2); - - auto err1 = pool.AddRequest(std::move(request)); request.reset(request2); auto err2 = pool.AddRequest(std::move(request)); - std::this_thread::sleep_for(std::chrono::milliseconds(30)); + std::this_thread::sleep_for(std::chrono::milliseconds(80)); ASSERT_THAT(err1, Eq(nullptr)); ASSERT_THAT(err2, Eq(nullptr)); } @@ -148,14 +171,50 @@ TEST_F(RequestPoolTests, AddRequestsOk) { auto err = pool.AddRequests(std::move(requests)); - std::this_thread::sleep_for(std::chrono::milliseconds(30)); + std::this_thread::sleep_for(std::chrono::milliseconds(80)); + ASSERT_THAT(err, Eq(nullptr)); +} + +TEST_F(RequestPoolTests, FinishProcessingThreads) { + EXPECT_CALL(mock_logger, Debug(HasSubstr("finishing thread"))).Times(nthreads); +} + +TEST_F(RequestPoolTests, WaitRequestsFinished) { + ExpectSend(mock_request_handler); + + pool.AddRequest(std::move(request)); + std::this_thread::sleep_for(std::chrono::milliseconds(20)); + + auto err = pool.WaitRequestsFinished(1000); + + auto nreq = pool.NRequestsInPool(); + + ASSERT_THAT(nreq, Eq(0)); ASSERT_THAT(err, Eq(nullptr)); + } +TEST_F(RequestPoolTests, WaitRequestsFinishedTimeout) { + ExpectSend(mock_request_handler); + + pool.AddRequest(std::move(request)); + std::this_thread::sleep_for(std::chrono::milliseconds(20)); + auto err = pool.WaitRequestsFinished(0); -TEST_F(RequestPoolTests, FinishProcessingThreads) { + auto nreq = pool.NRequestsInPool(); + + ASSERT_THAT(nreq, Eq(1)); + ASSERT_THAT(err, Eq(asapo::IOErrorTemplates::kTimeout)); + +} + +TEST_F(RequestPoolTests, StopThreads) { EXPECT_CALL(mock_logger, Debug(HasSubstr("finishing thread"))).Times(nthreads); + + pool.StopThreads(); + + Mock::VerifyAndClearExpectations(&mock_logger); } diff --git a/examples/pipeline/in_to_out/in_to_out.cpp b/examples/pipeline/in_to_out/in_to_out.cpp index 0a15cac5f0d3d194b063cc1784e0cf6e733fad25..5007d229636c848c7a455c887c78ad553d0c1188 100644 --- a/examples/pipeline/in_to_out/in_to_out.cpp +++ b/examples/pipeline/in_to_out/in_to_out.cpp @@ -199,23 +199,6 @@ std::unique_ptr<asapo::Producer> CreateProducer(const Args& args) { return producer; } -void WaitProducerThreadsFinished(const Args& args, int nfiles) { - int elapsed_ms = 0; - while (true) { - if (files_sent == nfiles) { - break; - } - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - elapsed_ms += 100; - if (elapsed_ms > args.timeout_ms_producer) { - std::cerr << "Stream out exit on timeout " << std::endl; - break; - } - } -} - - - int main(int argc, char* argv[]) { asapo::ExitAfterPrintVersionIfNeeded("GetNext Broker Example", argc, argv); Args args; @@ -245,7 +228,9 @@ int main(int argc, char* argv[]) { int nerrors; auto nfiles = ProcessAllData(args, producer, &duration_ms, &nerrors); - WaitProducerThreadsFinished(args, nfiles); + if (producer->WaitRequestsFinished(args.timeout_ms_producer) != nullptr) { + std::cerr << "Stream out exit on timeout " << std::endl; + } auto duration_streamout = std::chrono::duration_cast<std::chrono::milliseconds>(streamout_finish - streamout_start); std::cout << "Stream in " << std::endl; diff --git a/examples/pipeline/in_to_out_python/in_to_out.py b/examples/pipeline/in_to_out_python/in_to_out.py index e2e2d603173f1ed7d21bce94a4fcce936736bd61..b8a51975c9849877aaa597abe41956e8e5c14bfc 100644 --- a/examples/pipeline/in_to_out_python/in_to_out.py +++ b/examples/pipeline/in_to_out_python/in_to_out.py @@ -22,13 +22,6 @@ def callback(header,err): n_send = n_send + 1 lock.release() -def wait_send(n_files, timeout_s): - elapsed = 0 - while elapsed < timeout_s: - if n_send == n_files: - break - time.sleep(1) - source, path, beamtime,stream_in, stream_out, token, timeout_s,timeout_s_producer,nthreads, transfer_data = sys.argv[1:] timeout_s=int(timeout_s) timeout_s_producer=int(timeout_s_producer) @@ -60,8 +53,7 @@ while True: except asapo_producer.AsapoProducerError: break - -wait_send(n_recv,timeout_s_producer) +producer.wait_requests_finished(timeout_s_producer*1000) print ("Processed "+str(n_recv)+" file(s)") print ("Sent "+str(n_send)+" file(s)") diff --git a/examples/producer/dummy-data-producer/dummy_data_producer.cpp b/examples/producer/dummy-data-producer/dummy_data_producer.cpp index 1d5484184978bf4686de49ff2246d3dcfecc7c73..e2b0343859b676244ad6924b29f3d3b7cc765ef6 100644 --- a/examples/producer/dummy-data-producer/dummy_data_producer.cpp +++ b/examples/producer/dummy-data-producer/dummy_data_producer.cpp @@ -191,25 +191,6 @@ std::unique_ptr<asapo::Producer> CreateProducer(const Args& args) { return producer; } -void WaitThreadsFinished(const Args& args) { - uint64_t elapsed_ms = 0; - while (true) { - mutex.lock(); - if (iterations_remained <= 0) { - mutex.unlock(); - break; - } - mutex.unlock(); - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - elapsed_ms += 100; - if (elapsed_ms > args.timeout_sec * 1000) { - std::cerr << "Producer exit on timeout " << std::endl; - exit(EXIT_FAILURE); - } - } - -} - void PrintOutput(const Args& args, const system_clock::time_point& start) { system_clock::time_point t2 = system_clock::now(); double duration_sec = std::chrono::duration_cast<std::chrono::milliseconds>( t2 - start ).count() / 1000.0; @@ -220,8 +201,6 @@ void PrintOutput(const Args& args, const system_clock::time_point& start) { std::cout << "Bandwidth " << size_gb / duration_sec / 8 << " GBytes/s" << std::endl; } - - int main (int argc, char* argv[]) { Args args; ProcessCommandArguments(argc, argv, &args); @@ -241,7 +220,16 @@ int main (int argc, char* argv[]) { return EXIT_FAILURE; } - WaitThreadsFinished(args); + auto err = producer->WaitRequestsFinished(args.timeout_sec * 1000); + if (err) { + std::cerr << "Producer exit on timeout " << std::endl; + exit(EXIT_FAILURE); + } + + if (iterations_remained != 0) { + std::cerr << "Producer did not send all data " << std::endl; + exit(EXIT_FAILURE); + } PrintOutput(args, start_time); return EXIT_SUCCESS; diff --git a/producer/api/cpp/include/producer/producer.h b/producer/api/cpp/include/producer/producer.h index b87140ae9814685e0a0a86c370534ebeab0e7552..d0daa41606149858a578bcd66df65981c6a17a8a 100644 --- a/producer/api/cpp/include/producer/producer.h +++ b/producer/api/cpp/include/producer/producer.h @@ -36,9 +36,12 @@ class Producer { //! Sends data to the receiver - same as SendData - memory should not be freed until send is finished //! used e.g. for Python bindings - virtual Error SendData_(const EventHeader& event_header, void* data, uint64_t ingest_mode, - RequestCallback callback) = 0; + virtual Error SendData__(const EventHeader& event_header, void* data, uint64_t ingest_mode, + RequestCallback callback) = 0; + //! Stop processing threads + //! used e.g. for Python bindings + virtual void StopThreads__() = 0; //! Sends files to the receiver /*! @@ -65,6 +68,11 @@ class Producer { virtual void EnableRemoteLog(bool enable) = 0; //! Set beamtime id which producer will use to send data virtual Error SetCredentials(SourceCredentials source_cred) = 0; + //! Set get current size of the requests queue + virtual uint64_t GetRequestsQueueSize() = 0; + //! Wait until all current requests are processed or timeout + virtual Error WaitRequestsFinished(uint64_t timeout_ms) = 0; + }; } diff --git a/producer/api/cpp/include/producer/producer_error.h b/producer/api/cpp/include/producer/producer_error.h index 12c5440ad418a0d3b8cece74f2afa78dcdd13b41..0dc331258da6d07b3d42a73bba22ba74e29659d4 100644 --- a/producer/api/cpp/include/producer/producer_error.h +++ b/producer/api/cpp/include/producer/producer_error.h @@ -9,7 +9,8 @@ enum class ProducerErrorType { kInternalServerError, kRequestPoolIsFull, kLocalIOError, - kWrongInput + kWrongInput, + kTimeout }; using ProducerErrorTemplate = ServiceErrorTemplate<ProducerErrorType, ErrorType::kProducerError>; @@ -32,6 +33,11 @@ auto const kInternalServerError = ProducerErrorTemplate { "Internal server error", ProducerErrorType::kInternalServerError }; +auto const kTimeout = ProducerErrorTemplate { + "Timeout", ProducerErrorType::kTimeout +}; + + }; } diff --git a/producer/api/cpp/src/producer.cpp b/producer/api/cpp/src/producer.cpp index c219c7ca7d041a71fd158e8a46951ca2e7fe8f40..84191fcf8b0e2a81c70574e612960f16b9a5cb58 100644 --- a/producer/api/cpp/src/producer.cpp +++ b/producer/api/cpp/src/producer.cpp @@ -1,11 +1,13 @@ #include "producer/producer.h" #include "producer_impl.h" +#include "producer/producer_error.h" std::unique_ptr<asapo::Producer> asapo::Producer::Create(const std::string& endpoint, uint8_t n_processing_threads, asapo::RequestHandlerType type, SourceCredentials source_cred, Error* err) { if (n_processing_threads > kMaxProcessingThreads || n_processing_threads == 0) { - *err = TextError("Set number of processing threads > 0 and <= " + std::to_string(kMaxProcessingThreads)); + *err = ProducerErrorTemplates::kWrongInput.Generate("Set number of processing threads > 0 and <= " + std::to_string( + kMaxProcessingThreads)); return nullptr; } diff --git a/producer/api/cpp/src/producer_impl.cpp b/producer/api/cpp/src/producer_impl.cpp index e19bac0702835f552e66d5c80543759773843f26..d2f933380189d7658ad73d434b5f6e864db8bdac 100644 --- a/producer/api/cpp/src/producer_impl.cpp +++ b/producer/api/cpp/src/producer_impl.cpp @@ -171,10 +171,10 @@ Error ProducerImpl::SendMetaData(const std::string& metadata, RequestCallback ca } -Error ProducerImpl::SendData_(const EventHeader& event_header, - void* data, - uint64_t ingest_mode, - RequestCallback callback) { +Error ProducerImpl::SendData__(const EventHeader& event_header, + void* data, + uint64_t ingest_mode, + RequestCallback callback) { FileData data_wrapped = FileData{(uint8_t*)data}; if (auto err = CheckData(ingest_mode, event_header, &data_wrapped)) { @@ -185,4 +185,20 @@ Error ProducerImpl::SendData_(const EventHeader& event_header, return Send(std::move(event_header), std::move(data_wrapped), "", ingest_mode, callback, false); } +uint64_t ProducerImpl::GetRequestsQueueSize() { + return request_pool__->NRequestsInPool(); +}; + +Error ProducerImpl::WaitRequestsFinished(uint64_t timeout_ms) { + if (request_pool__->WaitRequestsFinished(timeout_ms) != nullptr) { + return ProducerErrorTemplates::kTimeout.Generate("waiting to finish processing requests"); + } else { + return nullptr; + } +} + +void ProducerImpl::StopThreads__() { + request_pool__->StopThreads(); +} + } \ No newline at end of file diff --git a/producer/api/cpp/src/producer_impl.h b/producer/api/cpp/src/producer_impl.h index d8b939181baececf9d36926dc58772bcdb8e7a37..e39dba3f3b5fa951cdd65cbe7c0b9033fb6c1abb 100644 --- a/producer/api/cpp/src/producer_impl.h +++ b/producer/api/cpp/src/producer_impl.h @@ -28,9 +28,9 @@ class ProducerImpl : public Producer { void EnableLocalLog(bool enable) override; void EnableRemoteLog(bool enable) override; Error SendData(const EventHeader& event_header, FileData data, uint64_t ingest_mode, RequestCallback callback) override; - Error SendData_(const EventHeader& event_header, void* data , uint64_t ingest_mode, - RequestCallback callback) override; - + Error SendData__(const EventHeader& event_header, void* data , uint64_t ingest_mode, + RequestCallback callback) override; + void StopThreads__() override; Error SendFile(const EventHeader& event_header, std::string full_path, uint64_t ingest_mode, RequestCallback callback) override; AbstractLogger* log__; @@ -39,6 +39,8 @@ class ProducerImpl : public Producer { Error SetCredentials(SourceCredentials source_cred) override; Error SendMetaData(const std::string& metadata, RequestCallback callback) override; + uint64_t GetRequestsQueueSize() override; + Error WaitRequestsFinished(uint64_t timeout_ms) override; private: Error Send(const EventHeader& event_header, FileData data, std::string full_path, uint64_t ingest_mode, diff --git a/producer/api/cpp/unittests/mocking.h b/producer/api/cpp/unittests/mocking.h index e009cd0945d6817f702f879a4bd3266778e90330..c3dae1c6eab435e4c589c080b23faccee91b3654 100644 --- a/producer/api/cpp/unittests/mocking.h +++ b/producer/api/cpp/unittests/mocking.h @@ -33,6 +33,16 @@ class MockRequestPull : public RequestPool { return asapo::Error{AddRequest_t(request.get())}; } MOCK_METHOD1(AddRequest_t, asapo::ErrorInterface * (GenericRequest*)); + MOCK_METHOD0(NRequestsInPool, uint64_t ()); + + MOCK_METHOD1(WaitRequestsFinished_t, asapo::ErrorInterface * (uint64_t timeout_ms)); + + asapo::Error WaitRequestsFinished(uint64_t timeout_ms) override { + return asapo::Error{WaitRequestsFinished_t(timeout_ms)}; + } + + + }; diff --git a/producer/api/cpp/unittests/test_producer.cpp b/producer/api/cpp/unittests/test_producer.cpp index 7c640338fc6b4ff23e2b138e77e32ed515cb5962..4788882c0031a7548f1e140fa8159bf76cf7b01c 100644 --- a/producer/api/cpp/unittests/test_producer.cpp +++ b/producer/api/cpp/unittests/test_producer.cpp @@ -46,7 +46,7 @@ TEST(CreateProducer, TooManyThreads) { std::unique_ptr<asapo::Producer> producer = asapo::Producer::Create("", asapo::kMaxProcessingThreads + 1, asapo::RequestHandlerType::kTcp, SourceCredentials{"bt", "", ""}, &err); ASSERT_THAT(producer, Eq(nullptr)); - ASSERT_THAT(err, Ne(nullptr)); + ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kWrongInput)); } @@ -55,7 +55,7 @@ TEST(CreateProducer, ZeroThreads) { std::unique_ptr<asapo::Producer> producer = asapo::Producer::Create("", 0, asapo::RequestHandlerType::kTcp, SourceCredentials{"bt", "", ""}, &err); ASSERT_THAT(producer, Eq(nullptr)); - ASSERT_THAT(err, Ne(nullptr)); + ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kWrongInput)); } diff --git a/producer/api/cpp/unittests/test_producer_impl.cpp b/producer/api/cpp/unittests/test_producer_impl.cpp index dbaeb7f27b855eaaacf557f238b183886a502188..0b3d35eb2f61f869bbff260264963db8d160b3cb 100644 --- a/producer/api/cpp/unittests/test_producer_impl.cpp +++ b/producer/api/cpp/unittests/test_producer_impl.cpp @@ -136,7 +136,7 @@ TEST_F(ProducerImplTests, ErrorIfNoData) { TEST_F(ProducerImplTests, ErrorIfNoDataSend_) { asapo::EventHeader event_header{1, 100, expected_fullpath}; - auto err = producer.SendData_(event_header, nullptr, asapo::kDefaultIngestMode, nullptr); + auto err = producer.SendData__(event_header, nullptr, asapo::kDefaultIngestMode, nullptr); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kWrongInput)); } @@ -215,7 +215,7 @@ TEST_F(ProducerImplTests, OKSendingSendDataRequesUnmanagedMemory) { nullptr)); asapo::EventHeader event_header{expected_id, expected_size, expected_name, expected_metadata}; - auto err = producer.SendData_(event_header, nullptr, expected_ingest_mode, nullptr); + auto err = producer.SendData__(event_header, nullptr, expected_ingest_mode, nullptr); ASSERT_THAT(err, Eq(nullptr)); } @@ -344,4 +344,22 @@ TEST_F(ProducerImplTests, ErrorSendingWrongIngestMode) { } +TEST_F(ProducerImplTests, GetQueueSize) { + EXPECT_CALL(mock_pull, NRequestsInPool()).WillOnce(Return(10)); + + auto size = producer.GetRequestsQueueSize(); + + ASSERT_THAT(size, Eq(10)); +} + +TEST_F(ProducerImplTests, WaitRequestsFinished) { + EXPECT_CALL(mock_pull, WaitRequestsFinished_t(_)).WillOnce(Return( + asapo::IOErrorTemplates::kTimeout.Generate().release())); + + auto err = producer.WaitRequestsFinished(100); + + ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kTimeout)); +} + + } diff --git a/producer/api/python/asapo_producer.pxd b/producer/api/python/asapo_producer.pxd index 350e6001048ed0e42be14fae4b2e3e9a353124d4..ce6912c336152fd8a8a6d75979f4da16b68359df 100644 --- a/producer/api/python/asapo_producer.pxd +++ b/producer/api/python/asapo_producer.pxd @@ -18,10 +18,9 @@ cdef extern from "asapo_producer.h" namespace "asapo": cdef bool operator==(Error lhs, ErrorTemplateInterface rhs) cdef extern from "asapo_producer.h" namespace "asapo": - ErrorTemplateInterface kInternalServerError "asapo::ProducerErrorTemplates::kInternalServerError" - ErrorTemplateInterface kCannotSendDataToReceivers "asapo::ProducerErrorTemplates::kCannotSendDataToReceivers" - ErrorTemplateInterface kRequestPoolIsFull "asapo::ProducerErrorTemplates::kRequestPoolIsFull" + ErrorTemplateInterface kTimeout "asapo::ProducerErrorTemplates::kTimeout" ErrorTemplateInterface kWrongInput "asapo::ProducerErrorTemplates::kWrongInput" + ErrorTemplateInterface kLocalIOError "asapo::ProducerErrorTemplates::kLocalIOError" cdef extern from "asapo_producer.h" namespace "asapo": cppclass FileData: @@ -86,13 +85,16 @@ cdef extern from "asapo_wrappers.h" namespace "asapo": RequestCallback unwrap_callback_with_memory(RequestCallbackCythonMemory, void*,void*,void*) -cdef extern from "asapo_producer.h" namespace "asapo": +cdef extern from "asapo_producer.h" namespace "asapo" nogil: cppclass Producer: @staticmethod unique_ptr[Producer] Create(string endpoint,uint8_t nthreads,RequestHandlerType type, SourceCredentials source,Error* error) Error SendFile(const EventHeader& event_header, string full_path, uint64_t ingest_mode,RequestCallback callback) - Error SendData_(const EventHeader& event_header, void* data, uint64_t ingest_mode,RequestCallback callback) + Error SendData__(const EventHeader& event_header, void* data, uint64_t ingest_mode,RequestCallback callback) + void StopThreads__() void SetLogLevel(LogLevel level) + uint64_t GetRequestsQueueSize() + Error WaitRequestsFinished(uint64_t timeout_ms) cdef extern from "asapo_producer.h" namespace "asapo": uint64_t kDefaultIngestMode diff --git a/producer/api/python/asapo_producer.pyx.in b/producer/api/python/asapo_producer.pyx.in index 1e60d18d8b90470ef061c539b049f303de01641b..46c2b859ff7efa4234a030364b6a3fd2875ec969 100644 --- a/producer/api/python/asapo_producer.pyx.in +++ b/producer/api/python/asapo_producer.pyx.in @@ -7,6 +7,7 @@ import json from cpython.version cimport PY_MAJOR_VERSION from libcpp.memory cimport unique_ptr from cpython.ref cimport PyObject,Py_XINCREF,Py_XDECREF +import atexit np.import_array() @@ -40,15 +41,25 @@ class AsapoProducerError(Exception): class AsapoWrongInputError(AsapoProducerError): pass -#todo: more types -cdef python_exception_from_error(Error& err): - return AsapoProducerError(err.get().Explain()) +class AsapoLocalIOError(AsapoProducerError): + pass +class AsapoTimeOutError(AsapoProducerError): + pass + +cdef python_exception_from_error(Error& err): + if err == kTimeout: + return AsapoTimeOutError(err.get().Explain()) + elif err == kWrongInput: + return AsapoWrongInputError(err.get().Explain()) + elif err == kLocalIOError: + return AsapoLocalIOError(err.get().Explain()) + else: + return AsapoProducerError(err.get().Explain()) cdef throw_exception(Error& err): raise python_exception_from_error(err) - cdef void* data_pointer_nparray(data): if data is None: return <void*>NULL @@ -64,6 +75,8 @@ cdef void* data_pointer_bytes(data): cdef class PyProducer: cdef unique_ptr[Producer] c_producer + def __init__(self): + atexit.register(self.cleanup) def set_log_level(self,level): cdef LogLevel log_level log_level = LogLevel_Info @@ -89,9 +102,9 @@ cdef class PyProducer: event_header.file_size = 0 else: event_header.file_size = data.nbytes - err = self.c_producer.get().SendData_(event_header, data_pointer_nparray(data), ingest_mode, + err = self.c_producer.get().SendData__(event_header, data_pointer_nparray(data), ingest_mode, unwrap_callback_with_memory(<RequestCallbackCythonMemory>self.c_callback_ndarr, - <void*>self,<void*>callback if callback != None else NULL, <void*>data)) + <void*>self,<void*>callback, <void*>data)) if err: throw_exception(err) if data is not None: @@ -116,9 +129,9 @@ cdef class PyProducer: def __send_bytes(self,int id, exposed_path,data, user_meta=None,subset=None,ingest_mode = DEFAULT_INGEST_MODE,callback=None): cdef EventHeader event_header = self.create_event_header(id,exposed_path,user_meta,subset,ingest_mode) event_header.file_size = len(data) - err = self.c_producer.get().SendData_(event_header, data_pointer_bytes(data), ingest_mode, + err = self.c_producer.get().SendData__(event_header, data_pointer_bytes(data), ingest_mode, unwrap_callback_with_memory(<RequestCallbackCythonMemory>self.c_callback_bytesaddr, - <void*>self,<void*>callback if callback != None else NULL, <void*>data)) + <void*>self,<void*>callback, <void*>data)) if err: throw_exception(err) Py_XINCREF(<PyObject*>data) @@ -140,8 +153,9 @@ cdef class PyProducer: :type ingest_mode: int :param callback: callback function, default None :type callback: callback(info,err), where info - json string with event header that was used to send data, err - error string or None - :raises: AsapoProducerError - :rtype: string + :raises: + AsapoWrongInputError: wrong input (authorization, meta, ...) + AsapoProducerError: actually should not happen """ if type(data) == np.ndarray or data == None: @@ -167,8 +181,10 @@ cdef class PyProducer: :type ingest_mode: int :param callback: callback function, default None :type callback: callback(info,err), where info - json string with event header that was used to send data, err - error string or None - :raises: AsapoProducerError - :rtype: string + :raises: + AsapoWrongInputError: wrong input (authorization, meta, ...) + AsapoLocalIOError: problems reading file to send + AsapoProducerError: actually should not happen """ cdef EventHeader event_header = self.create_event_header(id,exposed_path,user_meta,subset,ingest_mode) @@ -178,6 +194,22 @@ cdef class PyProducer: if err: throw_exception(err) return + def get_requests_queue_size(self): + return self.c_producer.get().GetRequestsQueueSize() + def wait_requests_finished(self,timeout_ms): + """ + :param timeout_ms: timeout in milliseconds + :type timeout_ms: int + :raises: + AsapoTimeoutError: requests not finished for a given timeout + """ + cdef Error err + cdef uint64_t timeout = timeout_ms + with nogil: + err = self.c_producer.get().WaitRequestsFinished(timeout) + if err: + throw_exception(err) + return cdef void c_callback_python(self,py_callback, GenericRequestHeader header, Error& err): if py_callback != None: info_str = _str(header.Json()) @@ -203,7 +235,10 @@ cdef class PyProducer: if bytes_array is not None: Py_XDECREF(<PyObject*>bytes_array) self.c_callback_python(py_callback,header,err) - + def cleanup(self): + with nogil: + if self.c_producer.get() is not NULL: + self.c_producer.get().StopThreads__() @staticmethod def __create_producer(endpoint,beamtime_id,stream,token,nthreads): pyProd = PyProducer() @@ -218,6 +253,21 @@ cdef class PyProducer: return pyProd def create_producer(endpoint,beamtime_id,stream,token,nthreads): + """ + :param endpoint: server endpoint (url:port) + :type endpoint: string + :param beamtime_id: beamtime id + :type beamtime_id: string + :param stream: stream to producer data to + :type stream: string + :param token: authorization token + :type token: string + :param nthreads: ingest mode flag + :type nthreads: int + :raises: + AsapoWrongInputError: wrong input (number of threads, ,,,) + AsapoProducerError: actually should not happen + """ return PyProducer.__create_producer(_bytes(endpoint),_bytes(beamtime_id),_bytes(stream),_bytes(token),nthreads) diff --git a/producer/api/python/asapo_wrappers.h b/producer/api/python/asapo_wrappers.h index 1e7c44240f6b4e5216cf098ff91017a9035562c5..2bb1c2e4082de80d2698956b56077ab4389e21dc 100644 --- a/producer/api/python/asapo_wrappers.h +++ b/producer/api/python/asapo_wrappers.h @@ -4,7 +4,6 @@ #include <memory> #include <functional> - namespace asapo { inline std::string GetErrorString(asapo::Error* err) { @@ -22,7 +21,7 @@ RequestCallback unwrap_callback(RequestCallbackCython callback, void* c_self, vo return nullptr; } RequestCallback wrapper = [ = ](GenericRequestHeader header, Error err) -> void { - callback(c_self, py_func, header, std::move(err)); + callback(c_self, py_func, std::move(header), std::move(err)); }; return wrapper; } @@ -30,7 +29,7 @@ RequestCallback unwrap_callback(RequestCallbackCython callback, void* c_self, vo RequestCallback unwrap_callback_with_memory(RequestCallbackCythonMemory callback, void* c_self, void* py_func, void* nd_array) { RequestCallback wrapper = [ = ](GenericRequestHeader header, Error err) -> void { - callback(c_self, py_func, nd_array, header, std::move(err)); + callback(c_self, py_func, nd_array, std::move(header), std::move(err)); }; return wrapper; } diff --git a/tests/automatic/producer/python_api/check_linux.sh b/tests/automatic/producer/python_api/check_linux.sh index e7654d39438bd8a4b457a1e77d3c00f1778706cd..f3f67ec52c54e6b29f935e87e61b365fb2304de5 100644 --- a/tests/automatic/producer/python_api/check_linux.sh +++ b/tests/automatic/producer/python_api/check_linux.sh @@ -36,6 +36,7 @@ echo test > file1 sleep 1 -$1 $3 $stream $beamtime_id "127.0.0.1:8400" > out +$1 $3 $stream $beamtime_id "127.0.0.1:8400" > out || cat out cat out -cat out | grep "successfuly sent" | wc -l | grep 7 \ No newline at end of file +cat out | grep "successfuly sent" | wc -l | grep 7 +cat out | grep "local i/o error" \ No newline at end of file diff --git a/tests/automatic/producer/python_api/producer_api.py b/tests/automatic/producer/python_api/producer_api.py index c65000b9d007295ce2c1c32746172f8cca4e59b2..8c2fab4ec80bdbe1e788bc8d8f133ee485231696 100644 --- a/tests/automatic/producer/python_api/producer_api.py +++ b/tests/automatic/producer/python_api/producer_api.py @@ -7,7 +7,6 @@ import numpy as np import threading lock = threading.Lock() - stream = sys.argv[1] beamtime = sys.argv[2] endpoint = sys.argv[3] @@ -15,8 +14,6 @@ endpoint = sys.argv[3] token = "" nthreads = 8 - - def callback(header,err): lock.acquire() # to print if err is not None: @@ -27,12 +24,14 @@ def callback(header,err): producer = asapo_producer.create_producer(endpoint,beamtime, stream, token, nthreads) - producer.set_log_level("info") #send single file producer.send_file(1, local_path = "./file1", exposed_path = stream+"/"+"file1", user_meta = '{"test_key":"test_val"}', callback = callback) +#send single file without callback +producer.send_file(1, local_path = "./file1", exposed_path = stream+"/"+"file1", user_meta = '{"test_key":"test_val"}',callback=None) + #send subsets producer.send_file(2, local_path = "./file1", exposed_path = stream+"/"+"file2",subset=(2,2),user_meta = '{"test_key":"test_val"}', callback = callback) producer.send_file(3, local_path = "./file1", exposed_path = stream+"/"+"file3",subset=(2,2),user_meta = '{"test_key":"test_val"}', callback = callback) @@ -55,11 +54,20 @@ producer.send_data(5, stream+"/"+"file6",b"hello", producer.send_data(6, stream+"/"+"file7",None, ingest_mode = asapo_producer.INGEST_MODE_TRANSFER_METADATA_ONLY, callback = callback) +#send single file/wrong filename +producer.send_file(1, local_path = "./file2", exposed_path = stream+"/"+"file1", callback = callback) + +producer.wait_requests_finished(50000) +n = producer.get_requests_queue_size() +if n!=0: + print("number of remaining requestst should be zero, got ",n) + sys.exit(1) + # create with error try: producer = asapo_producer.create_producer(endpoint,beamtime, stream, token, 0) -except Exception as e: +except asapo_producer.AsapoWrongInputError as e: print(e) else: print("should be error") @@ -67,6 +75,5 @@ else: -time.sleep(25) diff --git a/tests/manual/python_tests/producer_wait_threads/asapo_producer.so b/tests/manual/python_tests/producer_wait_threads/asapo_producer.so new file mode 120000 index 0000000000000000000000000000000000000000..cd186e18092ad6fd8de0e7ab30f6b7a4b5417775 --- /dev/null +++ b/tests/manual/python_tests/producer_wait_threads/asapo_producer.so @@ -0,0 +1 @@ +/home/yakubov/projects/asapo/cmake-build-debug/producer/api/python/asapo_producer.so \ No newline at end of file diff --git a/tests/manual/python_tests/producer_wait_threads/producer_api.py b/tests/manual/python_tests/producer_wait_threads/producer_api.py new file mode 100644 index 0000000000000000000000000000000000000000..d4de005f366ccea59d20bbcd2881c02d88354b80 --- /dev/null +++ b/tests/manual/python_tests/producer_wait_threads/producer_api.py @@ -0,0 +1,76 @@ +from __future__ import print_function + +import asapo_producer +import sys +import time +import numpy as np +import threading +lock = threading.Lock() + +stream = "python" +beamtime = "asapo_test" +endpoint = "127.0.0.1:8400" + +token = "" +nthreads = 8 + +def callback(header,err): + lock.acquire() # to print + if err is not None: + print("could not sent: ",header,err) + else: + print ("successfuly sent: ",header) + lock.release() + +producer = asapo_producer.create_producer(endpoint,beamtime, stream, token, nthreads) + +producer.set_log_level("info") + +#send single file +producer.send_file(1, local_path = "./file1", exposed_path = stream+"/"+"file1", user_meta = '{"test_key":"test_val"}', callback = callback) + +#send single file without callback +producer.send_file(1, local_path = "./file1", exposed_path = stream+"/"+"file1", user_meta = '{"test_key":"test_val"}',callback=None) + +#send subsets +producer.send_file(2, local_path = "./file1", exposed_path = stream+"/"+"file2",subset=(2,2),user_meta = '{"test_key":"test_val"}', callback = callback) +producer.send_file(3, local_path = "./file1", exposed_path = stream+"/"+"file3",subset=(2,2),user_meta = '{"test_key":"test_val"}', callback = callback) + +#send meta only +producer.send_file(3, local_path = "./not_exist",exposed_path = "./whatever", + ingest_mode = asapo_producer.INGEST_MODE_TRANSFER_METADATA_ONLY, callback = callback) + +data = np.arange(10,dtype=np.float64) + +#send data from array +producer.send_data(4, stream+"/"+"file5",data, + ingest_mode = asapo_producer.DEFAULT_INGEST_MODE, callback = callback) + +#send data from string +producer.send_data(5, stream+"/"+"file6",b"hello", + ingest_mode = asapo_producer.DEFAULT_INGEST_MODE, callback = callback) + +#send metadata only +producer.send_data(6, stream+"/"+"file7",None, + ingest_mode = asapo_producer.INGEST_MODE_TRANSFER_METADATA_ONLY, callback = callback) + +producer.wait_requests_finished(1000) +n = producer.get_requests_queue_size() +if n!=0: + print("number of remaining requestst should be zero, got ",n) + sys.exit(1) + + +# create with error +try: + producer = asapo_producer.create_producer(endpoint,beamtime, stream, token, 0) +except Exception as Asapo: + print(e) +else: + print("should be error") + sys.exit(1) + + + + + diff --git a/tests/manual/python_tests/producer_wait_threads/test.py b/tests/manual/python_tests/producer_wait_threads/test.py new file mode 100644 index 0000000000000000000000000000000000000000..3a0206d73ac7b556e751a3e7a848ea19c44b7513 --- /dev/null +++ b/tests/manual/python_tests/producer_wait_threads/test.py @@ -0,0 +1,60 @@ +from __future__ import print_function +import threading +import asapo_producer +import sys +import time +import numpy as np +lock = threading.Lock() + +stream = "python" +beamtime = "asapo_test" +endpoint = "127.0.0.1:8400" + +token = "" +nthreads = 8 + +def callback(header,err): + global lock + lock.acquire() # to print + if err is not None: + print("could not sent: ",header,err) + else: + print ("successfuly sent: ",header) + lock.release() + +producer = asapo_producer.create_producer(endpoint,beamtime, stream, token, nthreads) + +producer.set_log_level("info") + +#send single file +producer.send_file(1, local_path = "./file1", exposed_path = stream+"/"+"file1", user_meta = '{"test_key":"test_val"}', callback = callback) + +#send single file without callback +producer.send_file(1, local_path = "./file1", exposed_path = stream+"/"+"file1", user_meta = '{"test_key":"test_val"}') + + +#send subsets +producer.send_file(2, local_path = "./file1", exposed_path = stream+"/"+"file2",subset=(2,2),user_meta = '{"test_key":"test_val"}', callback = callback) +producer.send_file(3, local_path = "./file1", exposed_path = stream+"/"+"file3",subset=(2,2),user_meta = '{"test_key":"test_val"}', callback = callback) + +#send meta only +producer.send_file(3, local_path = "./not_exist",exposed_path = "./whatever", + ingest_mode = asapo_producer.INGEST_MODE_TRANSFER_METADATA_ONLY, callback = callback) + +data = np.arange(10,dtype=np.float64) + +#send data from array +producer.send_data(4, stream+"/"+"file5",data, + ingest_mode = asapo_producer.DEFAULT_INGEST_MODE, callback = callback) + +#send data from string +producer.send_data(5, stream+"/"+"file6",b"hello", + ingest_mode = asapo_producer.DEFAULT_INGEST_MODE, callback = callback) + +#send metadata only +producer.send_data(6, stream+"/"+"file7",None, + ingest_mode = asapo_producer.INGEST_MODE_TRANSFER_METADATA_ONLY, callback = callback) + +producer.wait_requests_finished(1) + +