From c20e160f679400cfc88f9de82b2d7ba2d175c16e Mon Sep 17 00:00:00 2001 From: Sergey Yakubov <sergey.yakubov@desy.de> Date: Wed, 13 Nov 2019 14:49:08 +0100 Subject: [PATCH] add functions to get requests pool size and wait until pool is empty --- common/cpp/include/request/request_pool.h | 7 +- common/cpp/src/request/request_pool.cpp | 23 ++++++- common/cpp/unittests/request/mocking.h | 1 + .../unittests/request/test_request_pool.cpp | 69 ++++++++++++++++--- examples/pipeline/in_to_out/in_to_out.cpp | 21 +----- .../pipeline/in_to_out_python/in_to_out.py | 10 +-- .../dummy_data_producer.cpp | 32 +++------ producer/api/cpp/include/producer/producer.h | 5 ++ .../api/cpp/include/producer/producer_error.h | 8 ++- producer/api/cpp/src/producer_impl.cpp | 13 ++++ producer/api/cpp/src/producer_impl.h | 2 + producer/api/cpp/unittests/mocking.h | 10 +++ .../api/cpp/unittests/test_producer_impl.cpp | 17 +++++ producer/api/python/asapo_producer.pxd | 2 + producer/api/python/asapo_producer.pyx.in | 13 ++++ .../producer/python_api/producer_api.py | 8 ++- 16 files changed, 176 insertions(+), 65 deletions(-) diff --git a/common/cpp/include/request/request_pool.h b/common/cpp/include/request/request_pool.h index fe38451ea..796e37df3 100644 --- a/common/cpp/include/request/request_pool.h +++ b/common/cpp/include/request/request_pool.h @@ -24,10 +24,10 @@ class RequestPool { explicit RequestPool(uint8_t n_threads, RequestHandlerFactory* request_handler_factory, const AbstractLogger* log); VIRTUAL Error AddRequest(GenericRequestPtr request); VIRTUAL Error AddRequests(GenericRequests requests); - ~RequestPool(); - uint64_t NRequestsInQueue(); - private: + VIRTUAL uint64_t NRequestsInPool(); + VIRTUAL Error WaitRequestsFinished(uint64_t timeout_ms); + private: const AbstractLogger* log__; RequestHandlerFactory* request_handler_factory__; std::vector<std::thread> threads_; @@ -41,6 +41,7 @@ class RequestPool { GenericRequestPtr GetRequestFromQueue(); void PutRequestBackToQueue(GenericRequestPtr request); uint64_t shared_counter_{0}; + uint64_t requests_in_progress_{0}; }; diff --git a/common/cpp/src/request/request_pool.cpp b/common/cpp/src/request/request_pool.cpp index f84f56c32..81eb356d9 100644 --- a/common/cpp/src/request/request_pool.cpp +++ b/common/cpp/src/request/request_pool.cpp @@ -42,9 +42,11 @@ void RequestPool::ProcessRequest(const std::unique_ptr<RequestHandler>& request_ ThreadInformation* thread_info) { request_handler->PrepareProcessingRequestLocked(); auto request = GetRequestFromQueue(); + requests_in_progress_++; thread_info->lock.unlock(); auto success = request_handler->ProcessRequestUnlocked(request.get()); thread_info->lock.lock(); + requests_in_progress_--; request_handler->TearDownProcessingRequestLocked(success); if (!success) { std::this_thread::sleep_for(std::chrono::milliseconds(1000)); @@ -81,9 +83,9 @@ RequestPool::~RequestPool() { } } } -uint64_t RequestPool::NRequestsInQueue() { +uint64_t RequestPool::NRequestsInPool() { std::lock_guard<std::mutex> lock{mutex_}; - return request_queue_.size(); + return request_queue_.size()+requests_in_progress_; } Error RequestPool::AddRequests(GenericRequests requests) { std::unique_lock<std::mutex> lock(mutex_); @@ -97,4 +99,21 @@ Error RequestPool::AddRequests(GenericRequests requests) { } +Error RequestPool::WaitRequestsFinished(uint64_t timeout_ms) { + uint64_t elapsed_ms = 0; + while (true) { + auto n_requests = NRequestsInPool(); + if (n_requests == 0) { + break; + } + if (elapsed_ms >= timeout_ms) { + return IOErrorTemplates::kTimeout.Generate(); + } + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + elapsed_ms += 100; + } + return nullptr; +} + + } diff --git a/common/cpp/unittests/request/mocking.h b/common/cpp/unittests/request/mocking.h index 94cd42359..9e9ef6d5c 100644 --- a/common/cpp/unittests/request/mocking.h +++ b/common/cpp/unittests/request/mocking.h @@ -18,6 +18,7 @@ class MockRequestHandler : public RequestHandler { MOCK_METHOD1(ProcessRequestUnlocked_t, bool (const GenericRequest* request)); bool ProcessRequestUnlocked(GenericRequest* request) override { + std::this_thread::sleep_for(std::chrono::milliseconds(50)); return ProcessRequestUnlocked_t(request); } diff --git a/common/cpp/unittests/request/test_request_pool.cpp b/common/cpp/unittests/request/test_request_pool.cpp index 1f71c1d39..02e1dd057 100644 --- a/common/cpp/unittests/request/test_request_pool.cpp +++ b/common/cpp/unittests/request/test_request_pool.cpp @@ -93,11 +93,13 @@ TEST_F(RequestPoolTests, AddRequestDoesNotGoFurtherWhenNotReady) { ASSERT_THAT(err, Eq(nullptr)); } -TEST_F(RequestPoolTests, NRequestsInQueue) { - auto nreq = pool.NRequestsInQueue(); +TEST_F(RequestPoolTests, NRequestsInPoolInitial) { + auto nreq = pool.NRequestsInPool(); ASSERT_THAT(nreq, Eq(0)); } + + void ExpectSend(MockRequestHandler* mock_handler, int ntimes = 1) { EXPECT_CALL(*mock_handler, ReadyProcessRequest()).Times(ntimes).WillRepeatedly(Return(true)); EXPECT_CALL(*mock_handler, PrepareProcessingRequestLocked()).Times(ntimes); @@ -106,7 +108,6 @@ void ExpectSend(MockRequestHandler* mock_handler, int ntimes = 1) { } - TEST_F(RequestPoolTests, AddRequestCallsSend) { ExpectSend(mock_request_handler); @@ -117,6 +118,30 @@ TEST_F(RequestPoolTests, AddRequestCallsSend) { ASSERT_THAT(err, Eq(nullptr)); } +TEST_F(RequestPoolTests, NRequestsInPool) { + + pool.AddRequest(std::move(request)); + auto nreq = pool.NRequestsInPool(); + + ASSERT_THAT(nreq, Eq(1)); +} + +TEST_F(RequestPoolTests, NRequestsInPoolAccountsForRequestsInProgress) { + ExpectSend(mock_request_handler,1); + + pool.AddRequest(std::move(request)); + + std::this_thread::sleep_for(std::chrono::milliseconds(30)); + + auto nreq1 = pool.NRequestsInPool(); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + + auto nreq2 = pool.NRequestsInPool(); + + ASSERT_THAT(nreq1, Eq(1)); + ASSERT_THAT(nreq2, Eq(0)); +} + TEST_F(RequestPoolTests, AddRequestCallsSendTwoRequests) { @@ -124,13 +149,11 @@ TEST_F(RequestPoolTests, AddRequestCallsSendTwoRequests) { ExpectSend(mock_request_handler, 2); - - auto err1 = pool.AddRequest(std::move(request)); request.reset(request2); auto err2 = pool.AddRequest(std::move(request)); - std::this_thread::sleep_for(std::chrono::milliseconds(30)); + std::this_thread::sleep_for(std::chrono::milliseconds(80)); ASSERT_THAT(err1, Eq(nullptr)); ASSERT_THAT(err2, Eq(nullptr)); } @@ -148,15 +171,43 @@ TEST_F(RequestPoolTests, AddRequestsOk) { auto err = pool.AddRequests(std::move(requests)); - std::this_thread::sleep_for(std::chrono::milliseconds(30)); + std::this_thread::sleep_for(std::chrono::milliseconds(80)); ASSERT_THAT(err, Eq(nullptr)); } - - TEST_F(RequestPoolTests, FinishProcessingThreads) { EXPECT_CALL(mock_logger, Debug(HasSubstr("finishing thread"))).Times(nthreads); } +TEST_F(RequestPoolTests, WaitRequestsFinished) { + ExpectSend(mock_request_handler); + + pool.AddRequest(std::move(request)); + std::this_thread::sleep_for(std::chrono::milliseconds(20)); + + auto err = pool.WaitRequestsFinished(1000); + + auto nreq = pool.NRequestsInPool(); + + ASSERT_THAT(nreq, Eq(0)); + ASSERT_THAT(err, Eq(nullptr)); + +} + +TEST_F(RequestPoolTests, WaitRequestsFinishedTimeout) { + ExpectSend(mock_request_handler); + + pool.AddRequest(std::move(request)); + std::this_thread::sleep_for(std::chrono::milliseconds(20)); + + auto err = pool.WaitRequestsFinished(0); + + auto nreq = pool.NRequestsInPool(); + + ASSERT_THAT(nreq, Eq(1)); + ASSERT_THAT(err, Eq(asapo::IOErrorTemplates::kTimeout)); + +} + } diff --git a/examples/pipeline/in_to_out/in_to_out.cpp b/examples/pipeline/in_to_out/in_to_out.cpp index 0a15cac5f..fa5e530f2 100644 --- a/examples/pipeline/in_to_out/in_to_out.cpp +++ b/examples/pipeline/in_to_out/in_to_out.cpp @@ -199,23 +199,6 @@ std::unique_ptr<asapo::Producer> CreateProducer(const Args& args) { return producer; } -void WaitProducerThreadsFinished(const Args& args, int nfiles) { - int elapsed_ms = 0; - while (true) { - if (files_sent == nfiles) { - break; - } - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - elapsed_ms += 100; - if (elapsed_ms > args.timeout_ms_producer) { - std::cerr << "Stream out exit on timeout " << std::endl; - break; - } - } -} - - - int main(int argc, char* argv[]) { asapo::ExitAfterPrintVersionIfNeeded("GetNext Broker Example", argc, argv); Args args; @@ -245,7 +228,9 @@ int main(int argc, char* argv[]) { int nerrors; auto nfiles = ProcessAllData(args, producer, &duration_ms, &nerrors); - WaitProducerThreadsFinished(args, nfiles); + if (producer->WaitRequestsFinished(args.timeout_ms_producer)!=nullptr) { + std::cerr << "Stream out exit on timeout " << std::endl; + } auto duration_streamout = std::chrono::duration_cast<std::chrono::milliseconds>(streamout_finish - streamout_start); std::cout << "Stream in " << std::endl; diff --git a/examples/pipeline/in_to_out_python/in_to_out.py b/examples/pipeline/in_to_out_python/in_to_out.py index e2e2d6031..b8a51975c 100644 --- a/examples/pipeline/in_to_out_python/in_to_out.py +++ b/examples/pipeline/in_to_out_python/in_to_out.py @@ -22,13 +22,6 @@ def callback(header,err): n_send = n_send + 1 lock.release() -def wait_send(n_files, timeout_s): - elapsed = 0 - while elapsed < timeout_s: - if n_send == n_files: - break - time.sleep(1) - source, path, beamtime,stream_in, stream_out, token, timeout_s,timeout_s_producer,nthreads, transfer_data = sys.argv[1:] timeout_s=int(timeout_s) timeout_s_producer=int(timeout_s_producer) @@ -60,8 +53,7 @@ while True: except asapo_producer.AsapoProducerError: break - -wait_send(n_recv,timeout_s_producer) +producer.wait_requests_finished(timeout_s_producer*1000) print ("Processed "+str(n_recv)+" file(s)") print ("Sent "+str(n_send)+" file(s)") diff --git a/examples/producer/dummy-data-producer/dummy_data_producer.cpp b/examples/producer/dummy-data-producer/dummy_data_producer.cpp index 1d5484184..ecab2603d 100644 --- a/examples/producer/dummy-data-producer/dummy_data_producer.cpp +++ b/examples/producer/dummy-data-producer/dummy_data_producer.cpp @@ -191,25 +191,6 @@ std::unique_ptr<asapo::Producer> CreateProducer(const Args& args) { return producer; } -void WaitThreadsFinished(const Args& args) { - uint64_t elapsed_ms = 0; - while (true) { - mutex.lock(); - if (iterations_remained <= 0) { - mutex.unlock(); - break; - } - mutex.unlock(); - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - elapsed_ms += 100; - if (elapsed_ms > args.timeout_sec * 1000) { - std::cerr << "Producer exit on timeout " << std::endl; - exit(EXIT_FAILURE); - } - } - -} - void PrintOutput(const Args& args, const system_clock::time_point& start) { system_clock::time_point t2 = system_clock::now(); double duration_sec = std::chrono::duration_cast<std::chrono::milliseconds>( t2 - start ).count() / 1000.0; @@ -220,8 +201,6 @@ void PrintOutput(const Args& args, const system_clock::time_point& start) { std::cout << "Bandwidth " << size_gb / duration_sec / 8 << " GBytes/s" << std::endl; } - - int main (int argc, char* argv[]) { Args args; ProcessCommandArguments(argc, argv, &args); @@ -241,7 +220,16 @@ int main (int argc, char* argv[]) { return EXIT_FAILURE; } - WaitThreadsFinished(args); + auto err = producer->WaitRequestsFinished(args.timeout_sec*1000); + if (err) { + std::cerr << "Producer exit on timeout " << std::endl; + exit(EXIT_FAILURE); + } + + if (iterations_remained!=0) { + std::cerr << "Producer did not send all data " << std::endl; + exit(EXIT_FAILURE); + } PrintOutput(args, start_time); return EXIT_SUCCESS; diff --git a/producer/api/cpp/include/producer/producer.h b/producer/api/cpp/include/producer/producer.h index b87140ae9..d72b99774 100644 --- a/producer/api/cpp/include/producer/producer.h +++ b/producer/api/cpp/include/producer/producer.h @@ -65,6 +65,11 @@ class Producer { virtual void EnableRemoteLog(bool enable) = 0; //! Set beamtime id which producer will use to send data virtual Error SetCredentials(SourceCredentials source_cred) = 0; + //! Set get current size of the requests queue + virtual uint64_t GetRequestsQueueSize() = 0; + //! Wait until all current requests are processed or timeout + virtual Error WaitRequestsFinished(uint64_t timeout_ms) = 0; + }; } diff --git a/producer/api/cpp/include/producer/producer_error.h b/producer/api/cpp/include/producer/producer_error.h index 12c5440ad..0dc331258 100644 --- a/producer/api/cpp/include/producer/producer_error.h +++ b/producer/api/cpp/include/producer/producer_error.h @@ -9,7 +9,8 @@ enum class ProducerErrorType { kInternalServerError, kRequestPoolIsFull, kLocalIOError, - kWrongInput + kWrongInput, + kTimeout }; using ProducerErrorTemplate = ServiceErrorTemplate<ProducerErrorType, ErrorType::kProducerError>; @@ -32,6 +33,11 @@ auto const kInternalServerError = ProducerErrorTemplate { "Internal server error", ProducerErrorType::kInternalServerError }; +auto const kTimeout = ProducerErrorTemplate { + "Timeout", ProducerErrorType::kTimeout +}; + + }; } diff --git a/producer/api/cpp/src/producer_impl.cpp b/producer/api/cpp/src/producer_impl.cpp index e19bac070..4874437a9 100644 --- a/producer/api/cpp/src/producer_impl.cpp +++ b/producer/api/cpp/src/producer_impl.cpp @@ -185,4 +185,17 @@ Error ProducerImpl::SendData_(const EventHeader& event_header, return Send(std::move(event_header), std::move(data_wrapped), "", ingest_mode, callback, false); } +uint64_t ProducerImpl::GetRequestsQueueSize() { + return request_pool__->NRequestsInPool(); +}; + +Error ProducerImpl::WaitRequestsFinished(uint64_t timeout_ms) { + if (request_pool__->WaitRequestsFinished(timeout_ms)!=nullptr) { + return ProducerErrorTemplates::kTimeout.Generate("waiting to finish processing requests"); + } else { + return nullptr; + } +} + + } \ No newline at end of file diff --git a/producer/api/cpp/src/producer_impl.h b/producer/api/cpp/src/producer_impl.h index d8b939181..bfa66f818 100644 --- a/producer/api/cpp/src/producer_impl.h +++ b/producer/api/cpp/src/producer_impl.h @@ -39,6 +39,8 @@ class ProducerImpl : public Producer { Error SetCredentials(SourceCredentials source_cred) override; Error SendMetaData(const std::string& metadata, RequestCallback callback) override; + uint64_t GetRequestsQueueSize() override; + Error WaitRequestsFinished(uint64_t timeout_ms) override; private: Error Send(const EventHeader& event_header, FileData data, std::string full_path, uint64_t ingest_mode, diff --git a/producer/api/cpp/unittests/mocking.h b/producer/api/cpp/unittests/mocking.h index e009cd094..d2bae4754 100644 --- a/producer/api/cpp/unittests/mocking.h +++ b/producer/api/cpp/unittests/mocking.h @@ -33,6 +33,16 @@ class MockRequestPull : public RequestPool { return asapo::Error{AddRequest_t(request.get())}; } MOCK_METHOD1(AddRequest_t, asapo::ErrorInterface * (GenericRequest*)); + MOCK_METHOD0(NRequestsInPool, uint64_t ()); + + MOCK_METHOD1(WaitRequestsFinished_t, asapo::ErrorInterface * (uint64_t timeout_ms)); + + asapo::Error WaitRequestsFinished(uint64_t timeout_ms) override { + return asapo::Error{WaitRequestsFinished_t(timeout_ms)}; + } + + + }; diff --git a/producer/api/cpp/unittests/test_producer_impl.cpp b/producer/api/cpp/unittests/test_producer_impl.cpp index dbaeb7f27..55d948d75 100644 --- a/producer/api/cpp/unittests/test_producer_impl.cpp +++ b/producer/api/cpp/unittests/test_producer_impl.cpp @@ -344,4 +344,21 @@ TEST_F(ProducerImplTests, ErrorSendingWrongIngestMode) { } +TEST_F(ProducerImplTests, GetQueueSize) { + EXPECT_CALL(mock_pull, NRequestsInPool()).WillOnce(Return(10)); + + auto size = producer.GetRequestsQueueSize(); + + ASSERT_THAT(size, Eq(10)); +} + +TEST_F(ProducerImplTests, WaitRequestsFinished) { + EXPECT_CALL(mock_pull, WaitRequestsFinished_t(_)).WillOnce(Return(asapo::IOErrorTemplates::kTimeout.Generate().release())); + + auto err = producer.WaitRequestsFinished(100); + + ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kTimeout)); +} + + } diff --git a/producer/api/python/asapo_producer.pxd b/producer/api/python/asapo_producer.pxd index 350e60010..8a826680f 100644 --- a/producer/api/python/asapo_producer.pxd +++ b/producer/api/python/asapo_producer.pxd @@ -93,6 +93,8 @@ cdef extern from "asapo_producer.h" namespace "asapo": Error SendFile(const EventHeader& event_header, string full_path, uint64_t ingest_mode,RequestCallback callback) Error SendData_(const EventHeader& event_header, void* data, uint64_t ingest_mode,RequestCallback callback) void SetLogLevel(LogLevel level) + uint64_t GetRequestsQueueSize() + Error WaitRequestsFinished(uint64_t timeout_ms) cdef extern from "asapo_producer.h" namespace "asapo": uint64_t kDefaultIngestMode diff --git a/producer/api/python/asapo_producer.pyx.in b/producer/api/python/asapo_producer.pyx.in index 1e60d18d8..76aef7a54 100644 --- a/producer/api/python/asapo_producer.pyx.in +++ b/producer/api/python/asapo_producer.pyx.in @@ -178,6 +178,19 @@ cdef class PyProducer: if err: throw_exception(err) return + def get_requests_queue_size(self): + return self.c_producer.get().GetRequestsQueueSize() + def wait_requests_finished(self,timeout_ms): + """ + :param timeout_ms: timeout in milliseconds + :type timeout_ms: int + :raises: AsapoProducerError + """ + + err = self.c_producer.get().WaitRequestsFinished(timeout_ms) + if err: + throw_exception(err) + return cdef void c_callback_python(self,py_callback, GenericRequestHeader header, Error& err): if py_callback != None: info_str = _str(header.Json()) diff --git a/tests/automatic/producer/python_api/producer_api.py b/tests/automatic/producer/python_api/producer_api.py index c65000b9d..8e8582a15 100644 --- a/tests/automatic/producer/python_api/producer_api.py +++ b/tests/automatic/producer/python_api/producer_api.py @@ -55,6 +55,13 @@ producer.send_data(5, stream+"/"+"file6",b"hello", producer.send_data(6, stream+"/"+"file7",None, ingest_mode = asapo_producer.INGEST_MODE_TRANSFER_METADATA_ONLY, callback = callback) +producer.wait_requests_finished(25000) +n = producer.get_requests_queue_size() +if n!=0: + print("number of remaining requestst should be zero, got ",n) + sys.exit(1) + + # create with error try: @@ -67,6 +74,5 @@ else: -time.sleep(25) -- GitLab