diff --git a/common/cpp/include/request/request_pool.h b/common/cpp/include/request/request_pool.h index fe38451ea706f1b3d6c0185f6bf70d9a27667079..796e37df3d4dd73c0719b214712873806d7e84e1 100644 --- a/common/cpp/include/request/request_pool.h +++ b/common/cpp/include/request/request_pool.h @@ -24,10 +24,10 @@ class RequestPool { explicit RequestPool(uint8_t n_threads, RequestHandlerFactory* request_handler_factory, const AbstractLogger* log); VIRTUAL Error AddRequest(GenericRequestPtr request); VIRTUAL Error AddRequests(GenericRequests requests); - ~RequestPool(); - uint64_t NRequestsInQueue(); - private: + VIRTUAL uint64_t NRequestsInPool(); + VIRTUAL Error WaitRequestsFinished(uint64_t timeout_ms); + private: const AbstractLogger* log__; RequestHandlerFactory* request_handler_factory__; std::vector<std::thread> threads_; @@ -41,6 +41,7 @@ class RequestPool { GenericRequestPtr GetRequestFromQueue(); void PutRequestBackToQueue(GenericRequestPtr request); uint64_t shared_counter_{0}; + uint64_t requests_in_progress_{0}; }; diff --git a/common/cpp/src/request/request_pool.cpp b/common/cpp/src/request/request_pool.cpp index f84f56c32b201584ede66b25e54b0c9fe680c88d..81eb356d910d2b1732329bf902d8b396b4b816b4 100644 --- a/common/cpp/src/request/request_pool.cpp +++ b/common/cpp/src/request/request_pool.cpp @@ -42,9 +42,11 @@ void RequestPool::ProcessRequest(const std::unique_ptr<RequestHandler>& request_ ThreadInformation* thread_info) { request_handler->PrepareProcessingRequestLocked(); auto request = GetRequestFromQueue(); + requests_in_progress_++; thread_info->lock.unlock(); auto success = request_handler->ProcessRequestUnlocked(request.get()); thread_info->lock.lock(); + requests_in_progress_--; request_handler->TearDownProcessingRequestLocked(success); if (!success) { std::this_thread::sleep_for(std::chrono::milliseconds(1000)); @@ -81,9 +83,9 @@ RequestPool::~RequestPool() { } } } -uint64_t RequestPool::NRequestsInQueue() { +uint64_t RequestPool::NRequestsInPool() { std::lock_guard<std::mutex> lock{mutex_}; - return request_queue_.size(); + return request_queue_.size()+requests_in_progress_; } Error RequestPool::AddRequests(GenericRequests requests) { std::unique_lock<std::mutex> lock(mutex_); @@ -97,4 +99,21 @@ Error RequestPool::AddRequests(GenericRequests requests) { } +Error RequestPool::WaitRequestsFinished(uint64_t timeout_ms) { + uint64_t elapsed_ms = 0; + while (true) { + auto n_requests = NRequestsInPool(); + if (n_requests == 0) { + break; + } + if (elapsed_ms >= timeout_ms) { + return IOErrorTemplates::kTimeout.Generate(); + } + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + elapsed_ms += 100; + } + return nullptr; +} + + } diff --git a/common/cpp/unittests/request/mocking.h b/common/cpp/unittests/request/mocking.h index 94cd423596eb9db0dfc5e024050703c642e6bb39..9e9ef6d5c13fc5b32fda21a53d8b22b95e8af306 100644 --- a/common/cpp/unittests/request/mocking.h +++ b/common/cpp/unittests/request/mocking.h @@ -18,6 +18,7 @@ class MockRequestHandler : public RequestHandler { MOCK_METHOD1(ProcessRequestUnlocked_t, bool (const GenericRequest* request)); bool ProcessRequestUnlocked(GenericRequest* request) override { + std::this_thread::sleep_for(std::chrono::milliseconds(50)); return ProcessRequestUnlocked_t(request); } diff --git a/common/cpp/unittests/request/test_request_pool.cpp b/common/cpp/unittests/request/test_request_pool.cpp index 1f71c1d39cf1d41586cf762dee1d5da51a9ec4a3..02e1dd05738af9c26279ec062c5c432efbd35c04 100644 --- a/common/cpp/unittests/request/test_request_pool.cpp +++ b/common/cpp/unittests/request/test_request_pool.cpp @@ -93,11 +93,13 @@ TEST_F(RequestPoolTests, AddRequestDoesNotGoFurtherWhenNotReady) { ASSERT_THAT(err, Eq(nullptr)); } -TEST_F(RequestPoolTests, NRequestsInQueue) { - auto nreq = pool.NRequestsInQueue(); +TEST_F(RequestPoolTests, NRequestsInPoolInitial) { + auto nreq = pool.NRequestsInPool(); ASSERT_THAT(nreq, Eq(0)); } + + void ExpectSend(MockRequestHandler* mock_handler, int ntimes = 1) { EXPECT_CALL(*mock_handler, ReadyProcessRequest()).Times(ntimes).WillRepeatedly(Return(true)); EXPECT_CALL(*mock_handler, PrepareProcessingRequestLocked()).Times(ntimes); @@ -106,7 +108,6 @@ void ExpectSend(MockRequestHandler* mock_handler, int ntimes = 1) { } - TEST_F(RequestPoolTests, AddRequestCallsSend) { ExpectSend(mock_request_handler); @@ -117,6 +118,30 @@ TEST_F(RequestPoolTests, AddRequestCallsSend) { ASSERT_THAT(err, Eq(nullptr)); } +TEST_F(RequestPoolTests, NRequestsInPool) { + + pool.AddRequest(std::move(request)); + auto nreq = pool.NRequestsInPool(); + + ASSERT_THAT(nreq, Eq(1)); +} + +TEST_F(RequestPoolTests, NRequestsInPoolAccountsForRequestsInProgress) { + ExpectSend(mock_request_handler,1); + + pool.AddRequest(std::move(request)); + + std::this_thread::sleep_for(std::chrono::milliseconds(30)); + + auto nreq1 = pool.NRequestsInPool(); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + + auto nreq2 = pool.NRequestsInPool(); + + ASSERT_THAT(nreq1, Eq(1)); + ASSERT_THAT(nreq2, Eq(0)); +} + TEST_F(RequestPoolTests, AddRequestCallsSendTwoRequests) { @@ -124,13 +149,11 @@ TEST_F(RequestPoolTests, AddRequestCallsSendTwoRequests) { ExpectSend(mock_request_handler, 2); - - auto err1 = pool.AddRequest(std::move(request)); request.reset(request2); auto err2 = pool.AddRequest(std::move(request)); - std::this_thread::sleep_for(std::chrono::milliseconds(30)); + std::this_thread::sleep_for(std::chrono::milliseconds(80)); ASSERT_THAT(err1, Eq(nullptr)); ASSERT_THAT(err2, Eq(nullptr)); } @@ -148,15 +171,43 @@ TEST_F(RequestPoolTests, AddRequestsOk) { auto err = pool.AddRequests(std::move(requests)); - std::this_thread::sleep_for(std::chrono::milliseconds(30)); + std::this_thread::sleep_for(std::chrono::milliseconds(80)); ASSERT_THAT(err, Eq(nullptr)); } - - TEST_F(RequestPoolTests, FinishProcessingThreads) { EXPECT_CALL(mock_logger, Debug(HasSubstr("finishing thread"))).Times(nthreads); } +TEST_F(RequestPoolTests, WaitRequestsFinished) { + ExpectSend(mock_request_handler); + + pool.AddRequest(std::move(request)); + std::this_thread::sleep_for(std::chrono::milliseconds(20)); + + auto err = pool.WaitRequestsFinished(1000); + + auto nreq = pool.NRequestsInPool(); + + ASSERT_THAT(nreq, Eq(0)); + ASSERT_THAT(err, Eq(nullptr)); + +} + +TEST_F(RequestPoolTests, WaitRequestsFinishedTimeout) { + ExpectSend(mock_request_handler); + + pool.AddRequest(std::move(request)); + std::this_thread::sleep_for(std::chrono::milliseconds(20)); + + auto err = pool.WaitRequestsFinished(0); + + auto nreq = pool.NRequestsInPool(); + + ASSERT_THAT(nreq, Eq(1)); + ASSERT_THAT(err, Eq(asapo::IOErrorTemplates::kTimeout)); + +} + } diff --git a/examples/pipeline/in_to_out/in_to_out.cpp b/examples/pipeline/in_to_out/in_to_out.cpp index 0a15cac5f0d3d194b063cc1784e0cf6e733fad25..fa5e530f2d487318c78c9294676a6a2e55368613 100644 --- a/examples/pipeline/in_to_out/in_to_out.cpp +++ b/examples/pipeline/in_to_out/in_to_out.cpp @@ -199,23 +199,6 @@ std::unique_ptr<asapo::Producer> CreateProducer(const Args& args) { return producer; } -void WaitProducerThreadsFinished(const Args& args, int nfiles) { - int elapsed_ms = 0; - while (true) { - if (files_sent == nfiles) { - break; - } - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - elapsed_ms += 100; - if (elapsed_ms > args.timeout_ms_producer) { - std::cerr << "Stream out exit on timeout " << std::endl; - break; - } - } -} - - - int main(int argc, char* argv[]) { asapo::ExitAfterPrintVersionIfNeeded("GetNext Broker Example", argc, argv); Args args; @@ -245,7 +228,9 @@ int main(int argc, char* argv[]) { int nerrors; auto nfiles = ProcessAllData(args, producer, &duration_ms, &nerrors); - WaitProducerThreadsFinished(args, nfiles); + if (producer->WaitRequestsFinished(args.timeout_ms_producer)!=nullptr) { + std::cerr << "Stream out exit on timeout " << std::endl; + } auto duration_streamout = std::chrono::duration_cast<std::chrono::milliseconds>(streamout_finish - streamout_start); std::cout << "Stream in " << std::endl; diff --git a/examples/pipeline/in_to_out_python/in_to_out.py b/examples/pipeline/in_to_out_python/in_to_out.py index e2e2d603173f1ed7d21bce94a4fcce936736bd61..b8a51975c9849877aaa597abe41956e8e5c14bfc 100644 --- a/examples/pipeline/in_to_out_python/in_to_out.py +++ b/examples/pipeline/in_to_out_python/in_to_out.py @@ -22,13 +22,6 @@ def callback(header,err): n_send = n_send + 1 lock.release() -def wait_send(n_files, timeout_s): - elapsed = 0 - while elapsed < timeout_s: - if n_send == n_files: - break - time.sleep(1) - source, path, beamtime,stream_in, stream_out, token, timeout_s,timeout_s_producer,nthreads, transfer_data = sys.argv[1:] timeout_s=int(timeout_s) timeout_s_producer=int(timeout_s_producer) @@ -60,8 +53,7 @@ while True: except asapo_producer.AsapoProducerError: break - -wait_send(n_recv,timeout_s_producer) +producer.wait_requests_finished(timeout_s_producer*1000) print ("Processed "+str(n_recv)+" file(s)") print ("Sent "+str(n_send)+" file(s)") diff --git a/examples/producer/dummy-data-producer/dummy_data_producer.cpp b/examples/producer/dummy-data-producer/dummy_data_producer.cpp index 1d5484184978bf4686de49ff2246d3dcfecc7c73..ecab2603da73a18f94962ea0b96a932cb4659265 100644 --- a/examples/producer/dummy-data-producer/dummy_data_producer.cpp +++ b/examples/producer/dummy-data-producer/dummy_data_producer.cpp @@ -191,25 +191,6 @@ std::unique_ptr<asapo::Producer> CreateProducer(const Args& args) { return producer; } -void WaitThreadsFinished(const Args& args) { - uint64_t elapsed_ms = 0; - while (true) { - mutex.lock(); - if (iterations_remained <= 0) { - mutex.unlock(); - break; - } - mutex.unlock(); - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - elapsed_ms += 100; - if (elapsed_ms > args.timeout_sec * 1000) { - std::cerr << "Producer exit on timeout " << std::endl; - exit(EXIT_FAILURE); - } - } - -} - void PrintOutput(const Args& args, const system_clock::time_point& start) { system_clock::time_point t2 = system_clock::now(); double duration_sec = std::chrono::duration_cast<std::chrono::milliseconds>( t2 - start ).count() / 1000.0; @@ -220,8 +201,6 @@ void PrintOutput(const Args& args, const system_clock::time_point& start) { std::cout << "Bandwidth " << size_gb / duration_sec / 8 << " GBytes/s" << std::endl; } - - int main (int argc, char* argv[]) { Args args; ProcessCommandArguments(argc, argv, &args); @@ -241,7 +220,16 @@ int main (int argc, char* argv[]) { return EXIT_FAILURE; } - WaitThreadsFinished(args); + auto err = producer->WaitRequestsFinished(args.timeout_sec*1000); + if (err) { + std::cerr << "Producer exit on timeout " << std::endl; + exit(EXIT_FAILURE); + } + + if (iterations_remained!=0) { + std::cerr << "Producer did not send all data " << std::endl; + exit(EXIT_FAILURE); + } PrintOutput(args, start_time); return EXIT_SUCCESS; diff --git a/producer/api/cpp/include/producer/producer.h b/producer/api/cpp/include/producer/producer.h index b87140ae9814685e0a0a86c370534ebeab0e7552..d72b99774f7faa58016e8d2f28e3cb9e99f243d9 100644 --- a/producer/api/cpp/include/producer/producer.h +++ b/producer/api/cpp/include/producer/producer.h @@ -65,6 +65,11 @@ class Producer { virtual void EnableRemoteLog(bool enable) = 0; //! Set beamtime id which producer will use to send data virtual Error SetCredentials(SourceCredentials source_cred) = 0; + //! Set get current size of the requests queue + virtual uint64_t GetRequestsQueueSize() = 0; + //! Wait until all current requests are processed or timeout + virtual Error WaitRequestsFinished(uint64_t timeout_ms) = 0; + }; } diff --git a/producer/api/cpp/include/producer/producer_error.h b/producer/api/cpp/include/producer/producer_error.h index 12c5440ad418a0d3b8cece74f2afa78dcdd13b41..0dc331258da6d07b3d42a73bba22ba74e29659d4 100644 --- a/producer/api/cpp/include/producer/producer_error.h +++ b/producer/api/cpp/include/producer/producer_error.h @@ -9,7 +9,8 @@ enum class ProducerErrorType { kInternalServerError, kRequestPoolIsFull, kLocalIOError, - kWrongInput + kWrongInput, + kTimeout }; using ProducerErrorTemplate = ServiceErrorTemplate<ProducerErrorType, ErrorType::kProducerError>; @@ -32,6 +33,11 @@ auto const kInternalServerError = ProducerErrorTemplate { "Internal server error", ProducerErrorType::kInternalServerError }; +auto const kTimeout = ProducerErrorTemplate { + "Timeout", ProducerErrorType::kTimeout +}; + + }; } diff --git a/producer/api/cpp/src/producer_impl.cpp b/producer/api/cpp/src/producer_impl.cpp index e19bac0702835f552e66d5c80543759773843f26..4874437a9b00d7616fddb8271f730dd9c8fe3412 100644 --- a/producer/api/cpp/src/producer_impl.cpp +++ b/producer/api/cpp/src/producer_impl.cpp @@ -185,4 +185,17 @@ Error ProducerImpl::SendData_(const EventHeader& event_header, return Send(std::move(event_header), std::move(data_wrapped), "", ingest_mode, callback, false); } +uint64_t ProducerImpl::GetRequestsQueueSize() { + return request_pool__->NRequestsInPool(); +}; + +Error ProducerImpl::WaitRequestsFinished(uint64_t timeout_ms) { + if (request_pool__->WaitRequestsFinished(timeout_ms)!=nullptr) { + return ProducerErrorTemplates::kTimeout.Generate("waiting to finish processing requests"); + } else { + return nullptr; + } +} + + } \ No newline at end of file diff --git a/producer/api/cpp/src/producer_impl.h b/producer/api/cpp/src/producer_impl.h index d8b939181baececf9d36926dc58772bcdb8e7a37..bfa66f818146dfe193063eed4e16f7a8233e3eef 100644 --- a/producer/api/cpp/src/producer_impl.h +++ b/producer/api/cpp/src/producer_impl.h @@ -39,6 +39,8 @@ class ProducerImpl : public Producer { Error SetCredentials(SourceCredentials source_cred) override; Error SendMetaData(const std::string& metadata, RequestCallback callback) override; + uint64_t GetRequestsQueueSize() override; + Error WaitRequestsFinished(uint64_t timeout_ms) override; private: Error Send(const EventHeader& event_header, FileData data, std::string full_path, uint64_t ingest_mode, diff --git a/producer/api/cpp/unittests/mocking.h b/producer/api/cpp/unittests/mocking.h index e009cd0945d6817f702f879a4bd3266778e90330..d2bae4754d975546d889f11162f06ca1577b8e0f 100644 --- a/producer/api/cpp/unittests/mocking.h +++ b/producer/api/cpp/unittests/mocking.h @@ -33,6 +33,16 @@ class MockRequestPull : public RequestPool { return asapo::Error{AddRequest_t(request.get())}; } MOCK_METHOD1(AddRequest_t, asapo::ErrorInterface * (GenericRequest*)); + MOCK_METHOD0(NRequestsInPool, uint64_t ()); + + MOCK_METHOD1(WaitRequestsFinished_t, asapo::ErrorInterface * (uint64_t timeout_ms)); + + asapo::Error WaitRequestsFinished(uint64_t timeout_ms) override { + return asapo::Error{WaitRequestsFinished_t(timeout_ms)}; + } + + + }; diff --git a/producer/api/cpp/unittests/test_producer_impl.cpp b/producer/api/cpp/unittests/test_producer_impl.cpp index dbaeb7f27b855eaaacf557f238b183886a502188..55d948d756a5abf05a6ed4e1d33819585f299ec5 100644 --- a/producer/api/cpp/unittests/test_producer_impl.cpp +++ b/producer/api/cpp/unittests/test_producer_impl.cpp @@ -344,4 +344,21 @@ TEST_F(ProducerImplTests, ErrorSendingWrongIngestMode) { } +TEST_F(ProducerImplTests, GetQueueSize) { + EXPECT_CALL(mock_pull, NRequestsInPool()).WillOnce(Return(10)); + + auto size = producer.GetRequestsQueueSize(); + + ASSERT_THAT(size, Eq(10)); +} + +TEST_F(ProducerImplTests, WaitRequestsFinished) { + EXPECT_CALL(mock_pull, WaitRequestsFinished_t(_)).WillOnce(Return(asapo::IOErrorTemplates::kTimeout.Generate().release())); + + auto err = producer.WaitRequestsFinished(100); + + ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kTimeout)); +} + + } diff --git a/producer/api/python/asapo_producer.pxd b/producer/api/python/asapo_producer.pxd index 350e6001048ed0e42be14fae4b2e3e9a353124d4..8a826680f8775a44c3f616007887fa07cc807b64 100644 --- a/producer/api/python/asapo_producer.pxd +++ b/producer/api/python/asapo_producer.pxd @@ -93,6 +93,8 @@ cdef extern from "asapo_producer.h" namespace "asapo": Error SendFile(const EventHeader& event_header, string full_path, uint64_t ingest_mode,RequestCallback callback) Error SendData_(const EventHeader& event_header, void* data, uint64_t ingest_mode,RequestCallback callback) void SetLogLevel(LogLevel level) + uint64_t GetRequestsQueueSize() + Error WaitRequestsFinished(uint64_t timeout_ms) cdef extern from "asapo_producer.h" namespace "asapo": uint64_t kDefaultIngestMode diff --git a/producer/api/python/asapo_producer.pyx.in b/producer/api/python/asapo_producer.pyx.in index 1e60d18d8b90470ef061c539b049f303de01641b..76aef7a5453e167bea00286de0edbae74fbb1c34 100644 --- a/producer/api/python/asapo_producer.pyx.in +++ b/producer/api/python/asapo_producer.pyx.in @@ -178,6 +178,19 @@ cdef class PyProducer: if err: throw_exception(err) return + def get_requests_queue_size(self): + return self.c_producer.get().GetRequestsQueueSize() + def wait_requests_finished(self,timeout_ms): + """ + :param timeout_ms: timeout in milliseconds + :type timeout_ms: int + :raises: AsapoProducerError + """ + + err = self.c_producer.get().WaitRequestsFinished(timeout_ms) + if err: + throw_exception(err) + return cdef void c_callback_python(self,py_callback, GenericRequestHeader header, Error& err): if py_callback != None: info_str = _str(header.Json()) diff --git a/tests/automatic/producer/python_api/producer_api.py b/tests/automatic/producer/python_api/producer_api.py index c65000b9d007295ce2c1c32746172f8cca4e59b2..8e8582a157760408813524d5846cf35e57870481 100644 --- a/tests/automatic/producer/python_api/producer_api.py +++ b/tests/automatic/producer/python_api/producer_api.py @@ -55,6 +55,13 @@ producer.send_data(5, stream+"/"+"file6",b"hello", producer.send_data(6, stream+"/"+"file7",None, ingest_mode = asapo_producer.INGEST_MODE_TRANSFER_METADATA_ONLY, callback = callback) +producer.wait_requests_finished(25000) +n = producer.get_requests_queue_size() +if n!=0: + print("number of remaining requestst should be zero, got ",n) + sys.exit(1) + + # create with error try: @@ -67,6 +74,5 @@ else: -time.sleep(25)