Skip to content
Snippets Groups Projects
Commit c20e160f authored by Sergey Yakubov's avatar Sergey Yakubov
Browse files

add functions to get requests pool size and wait until pool is empty

parent e5c5c003
No related branches found
No related tags found
No related merge requests found
Showing
with 176 additions and 65 deletions
......@@ -24,10 +24,10 @@ class RequestPool {
explicit RequestPool(uint8_t n_threads, RequestHandlerFactory* request_handler_factory, const AbstractLogger* log);
VIRTUAL Error AddRequest(GenericRequestPtr request);
VIRTUAL Error AddRequests(GenericRequests requests);
~RequestPool();
uint64_t NRequestsInQueue();
private:
VIRTUAL uint64_t NRequestsInPool();
VIRTUAL Error WaitRequestsFinished(uint64_t timeout_ms);
private:
const AbstractLogger* log__;
RequestHandlerFactory* request_handler_factory__;
std::vector<std::thread> threads_;
......@@ -41,6 +41,7 @@ class RequestPool {
GenericRequestPtr GetRequestFromQueue();
void PutRequestBackToQueue(GenericRequestPtr request);
uint64_t shared_counter_{0};
uint64_t requests_in_progress_{0};
};
......
......@@ -42,9 +42,11 @@ void RequestPool::ProcessRequest(const std::unique_ptr<RequestHandler>& request_
ThreadInformation* thread_info) {
request_handler->PrepareProcessingRequestLocked();
auto request = GetRequestFromQueue();
requests_in_progress_++;
thread_info->lock.unlock();
auto success = request_handler->ProcessRequestUnlocked(request.get());
thread_info->lock.lock();
requests_in_progress_--;
request_handler->TearDownProcessingRequestLocked(success);
if (!success) {
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
......@@ -81,9 +83,9 @@ RequestPool::~RequestPool() {
}
}
}
uint64_t RequestPool::NRequestsInQueue() {
uint64_t RequestPool::NRequestsInPool() {
std::lock_guard<std::mutex> lock{mutex_};
return request_queue_.size();
return request_queue_.size()+requests_in_progress_;
}
Error RequestPool::AddRequests(GenericRequests requests) {
std::unique_lock<std::mutex> lock(mutex_);
......@@ -97,4 +99,21 @@ Error RequestPool::AddRequests(GenericRequests requests) {
}
Error RequestPool::WaitRequestsFinished(uint64_t timeout_ms) {
uint64_t elapsed_ms = 0;
while (true) {
auto n_requests = NRequestsInPool();
if (n_requests == 0) {
break;
}
if (elapsed_ms >= timeout_ms) {
return IOErrorTemplates::kTimeout.Generate();
}
std::this_thread::sleep_for(std::chrono::milliseconds(100));
elapsed_ms += 100;
}
return nullptr;
}
}
......@@ -18,6 +18,7 @@ class MockRequestHandler : public RequestHandler {
MOCK_METHOD1(ProcessRequestUnlocked_t, bool (const GenericRequest* request));
bool ProcessRequestUnlocked(GenericRequest* request) override {
std::this_thread::sleep_for(std::chrono::milliseconds(50));
return ProcessRequestUnlocked_t(request);
}
......
......@@ -93,11 +93,13 @@ TEST_F(RequestPoolTests, AddRequestDoesNotGoFurtherWhenNotReady) {
ASSERT_THAT(err, Eq(nullptr));
}
TEST_F(RequestPoolTests, NRequestsInQueue) {
auto nreq = pool.NRequestsInQueue();
TEST_F(RequestPoolTests, NRequestsInPoolInitial) {
auto nreq = pool.NRequestsInPool();
ASSERT_THAT(nreq, Eq(0));
}
void ExpectSend(MockRequestHandler* mock_handler, int ntimes = 1) {
EXPECT_CALL(*mock_handler, ReadyProcessRequest()).Times(ntimes).WillRepeatedly(Return(true));
EXPECT_CALL(*mock_handler, PrepareProcessingRequestLocked()).Times(ntimes);
......@@ -106,7 +108,6 @@ void ExpectSend(MockRequestHandler* mock_handler, int ntimes = 1) {
}
TEST_F(RequestPoolTests, AddRequestCallsSend) {
ExpectSend(mock_request_handler);
......@@ -117,6 +118,30 @@ TEST_F(RequestPoolTests, AddRequestCallsSend) {
ASSERT_THAT(err, Eq(nullptr));
}
TEST_F(RequestPoolTests, NRequestsInPool) {
pool.AddRequest(std::move(request));
auto nreq = pool.NRequestsInPool();
ASSERT_THAT(nreq, Eq(1));
}
TEST_F(RequestPoolTests, NRequestsInPoolAccountsForRequestsInProgress) {
ExpectSend(mock_request_handler,1);
pool.AddRequest(std::move(request));
std::this_thread::sleep_for(std::chrono::milliseconds(30));
auto nreq1 = pool.NRequestsInPool();
std::this_thread::sleep_for(std::chrono::milliseconds(100));
auto nreq2 = pool.NRequestsInPool();
ASSERT_THAT(nreq1, Eq(1));
ASSERT_THAT(nreq2, Eq(0));
}
TEST_F(RequestPoolTests, AddRequestCallsSendTwoRequests) {
......@@ -124,13 +149,11 @@ TEST_F(RequestPoolTests, AddRequestCallsSendTwoRequests) {
ExpectSend(mock_request_handler, 2);
auto err1 = pool.AddRequest(std::move(request));
request.reset(request2);
auto err2 = pool.AddRequest(std::move(request));
std::this_thread::sleep_for(std::chrono::milliseconds(30));
std::this_thread::sleep_for(std::chrono::milliseconds(80));
ASSERT_THAT(err1, Eq(nullptr));
ASSERT_THAT(err2, Eq(nullptr));
}
......@@ -148,15 +171,43 @@ TEST_F(RequestPoolTests, AddRequestsOk) {
auto err = pool.AddRequests(std::move(requests));
std::this_thread::sleep_for(std::chrono::milliseconds(30));
std::this_thread::sleep_for(std::chrono::milliseconds(80));
ASSERT_THAT(err, Eq(nullptr));
}
TEST_F(RequestPoolTests, FinishProcessingThreads) {
EXPECT_CALL(mock_logger, Debug(HasSubstr("finishing thread"))).Times(nthreads);
}
TEST_F(RequestPoolTests, WaitRequestsFinished) {
ExpectSend(mock_request_handler);
pool.AddRequest(std::move(request));
std::this_thread::sleep_for(std::chrono::milliseconds(20));
auto err = pool.WaitRequestsFinished(1000);
auto nreq = pool.NRequestsInPool();
ASSERT_THAT(nreq, Eq(0));
ASSERT_THAT(err, Eq(nullptr));
}
TEST_F(RequestPoolTests, WaitRequestsFinishedTimeout) {
ExpectSend(mock_request_handler);
pool.AddRequest(std::move(request));
std::this_thread::sleep_for(std::chrono::milliseconds(20));
auto err = pool.WaitRequestsFinished(0);
auto nreq = pool.NRequestsInPool();
ASSERT_THAT(nreq, Eq(1));
ASSERT_THAT(err, Eq(asapo::IOErrorTemplates::kTimeout));
}
}
......@@ -199,23 +199,6 @@ std::unique_ptr<asapo::Producer> CreateProducer(const Args& args) {
return producer;
}
void WaitProducerThreadsFinished(const Args& args, int nfiles) {
int elapsed_ms = 0;
while (true) {
if (files_sent == nfiles) {
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(100));
elapsed_ms += 100;
if (elapsed_ms > args.timeout_ms_producer) {
std::cerr << "Stream out exit on timeout " << std::endl;
break;
}
}
}
int main(int argc, char* argv[]) {
asapo::ExitAfterPrintVersionIfNeeded("GetNext Broker Example", argc, argv);
Args args;
......@@ -245,7 +228,9 @@ int main(int argc, char* argv[]) {
int nerrors;
auto nfiles = ProcessAllData(args, producer, &duration_ms, &nerrors);
WaitProducerThreadsFinished(args, nfiles);
if (producer->WaitRequestsFinished(args.timeout_ms_producer)!=nullptr) {
std::cerr << "Stream out exit on timeout " << std::endl;
}
auto duration_streamout = std::chrono::duration_cast<std::chrono::milliseconds>(streamout_finish - streamout_start);
std::cout << "Stream in " << std::endl;
......
......@@ -22,13 +22,6 @@ def callback(header,err):
n_send = n_send + 1
lock.release()
def wait_send(n_files, timeout_s):
elapsed = 0
while elapsed < timeout_s:
if n_send == n_files:
break
time.sleep(1)
source, path, beamtime,stream_in, stream_out, token, timeout_s,timeout_s_producer,nthreads, transfer_data = sys.argv[1:]
timeout_s=int(timeout_s)
timeout_s_producer=int(timeout_s_producer)
......@@ -60,8 +53,7 @@ while True:
except asapo_producer.AsapoProducerError:
break
wait_send(n_recv,timeout_s_producer)
producer.wait_requests_finished(timeout_s_producer*1000)
print ("Processed "+str(n_recv)+" file(s)")
print ("Sent "+str(n_send)+" file(s)")
......@@ -191,25 +191,6 @@ std::unique_ptr<asapo::Producer> CreateProducer(const Args& args) {
return producer;
}
void WaitThreadsFinished(const Args& args) {
uint64_t elapsed_ms = 0;
while (true) {
mutex.lock();
if (iterations_remained <= 0) {
mutex.unlock();
break;
}
mutex.unlock();
std::this_thread::sleep_for(std::chrono::milliseconds(100));
elapsed_ms += 100;
if (elapsed_ms > args.timeout_sec * 1000) {
std::cerr << "Producer exit on timeout " << std::endl;
exit(EXIT_FAILURE);
}
}
}
void PrintOutput(const Args& args, const system_clock::time_point& start) {
system_clock::time_point t2 = system_clock::now();
double duration_sec = std::chrono::duration_cast<std::chrono::milliseconds>( t2 - start ).count() / 1000.0;
......@@ -220,8 +201,6 @@ void PrintOutput(const Args& args, const system_clock::time_point& start) {
std::cout << "Bandwidth " << size_gb / duration_sec / 8 << " GBytes/s" << std::endl;
}
int main (int argc, char* argv[]) {
Args args;
ProcessCommandArguments(argc, argv, &args);
......@@ -241,7 +220,16 @@ int main (int argc, char* argv[]) {
return EXIT_FAILURE;
}
WaitThreadsFinished(args);
auto err = producer->WaitRequestsFinished(args.timeout_sec*1000);
if (err) {
std::cerr << "Producer exit on timeout " << std::endl;
exit(EXIT_FAILURE);
}
if (iterations_remained!=0) {
std::cerr << "Producer did not send all data " << std::endl;
exit(EXIT_FAILURE);
}
PrintOutput(args, start_time);
return EXIT_SUCCESS;
......
......@@ -65,6 +65,11 @@ class Producer {
virtual void EnableRemoteLog(bool enable) = 0;
//! Set beamtime id which producer will use to send data
virtual Error SetCredentials(SourceCredentials source_cred) = 0;
//! Set get current size of the requests queue
virtual uint64_t GetRequestsQueueSize() = 0;
//! Wait until all current requests are processed or timeout
virtual Error WaitRequestsFinished(uint64_t timeout_ms) = 0;
};
}
......
......@@ -9,7 +9,8 @@ enum class ProducerErrorType {
kInternalServerError,
kRequestPoolIsFull,
kLocalIOError,
kWrongInput
kWrongInput,
kTimeout
};
using ProducerErrorTemplate = ServiceErrorTemplate<ProducerErrorType, ErrorType::kProducerError>;
......@@ -32,6 +33,11 @@ auto const kInternalServerError = ProducerErrorTemplate {
"Internal server error", ProducerErrorType::kInternalServerError
};
auto const kTimeout = ProducerErrorTemplate {
"Timeout", ProducerErrorType::kTimeout
};
};
}
......
......@@ -185,4 +185,17 @@ Error ProducerImpl::SendData_(const EventHeader& event_header,
return Send(std::move(event_header), std::move(data_wrapped), "", ingest_mode, callback, false);
}
uint64_t ProducerImpl::GetRequestsQueueSize() {
return request_pool__->NRequestsInPool();
};
Error ProducerImpl::WaitRequestsFinished(uint64_t timeout_ms) {
if (request_pool__->WaitRequestsFinished(timeout_ms)!=nullptr) {
return ProducerErrorTemplates::kTimeout.Generate("waiting to finish processing requests");
} else {
return nullptr;
}
}
}
\ No newline at end of file
......@@ -39,6 +39,8 @@ class ProducerImpl : public Producer {
Error SetCredentials(SourceCredentials source_cred) override;
Error SendMetaData(const std::string& metadata, RequestCallback callback) override;
uint64_t GetRequestsQueueSize() override;
Error WaitRequestsFinished(uint64_t timeout_ms) override;
private:
Error Send(const EventHeader& event_header, FileData data, std::string full_path, uint64_t ingest_mode,
......
......@@ -33,6 +33,16 @@ class MockRequestPull : public RequestPool {
return asapo::Error{AddRequest_t(request.get())};
}
MOCK_METHOD1(AddRequest_t, asapo::ErrorInterface * (GenericRequest*));
MOCK_METHOD0(NRequestsInPool, uint64_t ());
MOCK_METHOD1(WaitRequestsFinished_t, asapo::ErrorInterface * (uint64_t timeout_ms));
asapo::Error WaitRequestsFinished(uint64_t timeout_ms) override {
return asapo::Error{WaitRequestsFinished_t(timeout_ms)};
}
};
......
......@@ -344,4 +344,21 @@ TEST_F(ProducerImplTests, ErrorSendingWrongIngestMode) {
}
TEST_F(ProducerImplTests, GetQueueSize) {
EXPECT_CALL(mock_pull, NRequestsInPool()).WillOnce(Return(10));
auto size = producer.GetRequestsQueueSize();
ASSERT_THAT(size, Eq(10));
}
TEST_F(ProducerImplTests, WaitRequestsFinished) {
EXPECT_CALL(mock_pull, WaitRequestsFinished_t(_)).WillOnce(Return(asapo::IOErrorTemplates::kTimeout.Generate().release()));
auto err = producer.WaitRequestsFinished(100);
ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kTimeout));
}
}
......@@ -93,6 +93,8 @@ cdef extern from "asapo_producer.h" namespace "asapo":
Error SendFile(const EventHeader& event_header, string full_path, uint64_t ingest_mode,RequestCallback callback)
Error SendData_(const EventHeader& event_header, void* data, uint64_t ingest_mode,RequestCallback callback)
void SetLogLevel(LogLevel level)
uint64_t GetRequestsQueueSize()
Error WaitRequestsFinished(uint64_t timeout_ms)
cdef extern from "asapo_producer.h" namespace "asapo":
uint64_t kDefaultIngestMode
......
......@@ -178,6 +178,19 @@ cdef class PyProducer:
if err:
throw_exception(err)
return
def get_requests_queue_size(self):
return self.c_producer.get().GetRequestsQueueSize()
def wait_requests_finished(self,timeout_ms):
"""
:param timeout_ms: timeout in milliseconds
:type timeout_ms: int
:raises: AsapoProducerError
"""
err = self.c_producer.get().WaitRequestsFinished(timeout_ms)
if err:
throw_exception(err)
return
cdef void c_callback_python(self,py_callback, GenericRequestHeader header, Error& err):
if py_callback != None:
info_str = _str(header.Json())
......
......@@ -55,6 +55,13 @@ producer.send_data(5, stream+"/"+"file6",b"hello",
producer.send_data(6, stream+"/"+"file7",None,
ingest_mode = asapo_producer.INGEST_MODE_TRANSFER_METADATA_ONLY, callback = callback)
producer.wait_requests_finished(25000)
n = producer.get_requests_queue_size()
if n!=0:
print("number of remaining requestst should be zero, got ",n)
sys.exit(1)
# create with error
try:
......@@ -67,6 +74,5 @@ else:
time.sleep(25)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment