diff --git a/common/cpp/include/request/request_pool.h b/common/cpp/include/request/request_pool.h index 1f3ea7a94003ffe2a9686ef5d707e3e510bef055..2c0be787d7b1f5d04dec730e2c5a29dff8254804 100644 --- a/common/cpp/include/request/request_pool.h +++ b/common/cpp/include/request/request_pool.h @@ -28,7 +28,7 @@ class RequestPool { VIRTUAL uint64_t NRequestsInPool(); VIRTUAL Error WaitRequestsFinished(uint64_t timeout_ms); VIRTUAL void StopThreads(); - private: + private: const AbstractLogger* log__; RequestHandlerFactory* request_handler_factory__; std::vector<std::thread> threads_; diff --git a/common/cpp/src/request/request_pool.cpp b/common/cpp/src/request/request_pool.cpp index 938ae8eee4f4aa2c9b1b55062dadb5801c24aa5d..7d9b6c03bbe95d66d8f84b2faf3c2e89441017dc 100644 --- a/common/cpp/src/request/request_pool.cpp +++ b/common/cpp/src/request/request_pool.cpp @@ -76,7 +76,7 @@ RequestPool::~RequestPool() { uint64_t RequestPool::NRequestsInPool() { std::lock_guard<std::mutex> lock{mutex_}; - return request_queue_.size()+requests_in_progress_; + return request_queue_.size() + requests_in_progress_; } Error RequestPool::AddRequests(GenericRequests requests) { std::unique_lock<std::mutex> lock(mutex_); diff --git a/common/cpp/unittests/request/test_request_pool.cpp b/common/cpp/unittests/request/test_request_pool.cpp index 02e1dd05738af9c26279ec062c5c432efbd35c04..91c5cb790ce5c392dc3df1797999c21eb83be0f5 100644 --- a/common/cpp/unittests/request/test_request_pool.cpp +++ b/common/cpp/unittests/request/test_request_pool.cpp @@ -127,7 +127,7 @@ TEST_F(RequestPoolTests, NRequestsInPool) { } TEST_F(RequestPoolTests, NRequestsInPoolAccountsForRequestsInProgress) { - ExpectSend(mock_request_handler,1); + ExpectSend(mock_request_handler, 1); pool.AddRequest(std::move(request)); @@ -209,5 +209,13 @@ TEST_F(RequestPoolTests, WaitRequestsFinishedTimeout) { } +TEST_F(RequestPoolTests, StopThreads) { + EXPECT_CALL(mock_logger, Debug(HasSubstr("finishing thread"))).Times(nthreads); + + pool.StopThreads(); + + Mock::VerifyAndClearExpectations(&mock_logger); +} + } diff --git a/examples/pipeline/in_to_out/in_to_out.cpp b/examples/pipeline/in_to_out/in_to_out.cpp index fa5e530f2d487318c78c9294676a6a2e55368613..5007d229636c848c7a455c887c78ad553d0c1188 100644 --- a/examples/pipeline/in_to_out/in_to_out.cpp +++ b/examples/pipeline/in_to_out/in_to_out.cpp @@ -228,7 +228,7 @@ int main(int argc, char* argv[]) { int nerrors; auto nfiles = ProcessAllData(args, producer, &duration_ms, &nerrors); - if (producer->WaitRequestsFinished(args.timeout_ms_producer)!=nullptr) { + if (producer->WaitRequestsFinished(args.timeout_ms_producer) != nullptr) { std::cerr << "Stream out exit on timeout " << std::endl; } auto duration_streamout = std::chrono::duration_cast<std::chrono::milliseconds>(streamout_finish - streamout_start); diff --git a/examples/pipeline/in_to_out_python/in_to_out.py b/examples/pipeline/in_to_out_python/in_to_out.py index 27290b573ebb6f6093a7646d7ca4919703afc0db..b8a51975c9849877aaa597abe41956e8e5c14bfc 100644 --- a/examples/pipeline/in_to_out_python/in_to_out.py +++ b/examples/pipeline/in_to_out_python/in_to_out.py @@ -53,11 +53,7 @@ while True: except asapo_producer.AsapoProducerError: break -try: - producer.wait_requests_finished(timeout_s_producer*1000) -except: - print("waiting requestst finished failed") - +producer.wait_requests_finished(timeout_s_producer*1000) print ("Processed "+str(n_recv)+" file(s)") print ("Sent "+str(n_send)+" file(s)") diff --git a/examples/producer/dummy-data-producer/dummy_data_producer.cpp b/examples/producer/dummy-data-producer/dummy_data_producer.cpp index ecab2603da73a18f94962ea0b96a932cb4659265..e2b0343859b676244ad6924b29f3d3b7cc765ef6 100644 --- a/examples/producer/dummy-data-producer/dummy_data_producer.cpp +++ b/examples/producer/dummy-data-producer/dummy_data_producer.cpp @@ -220,13 +220,13 @@ int main (int argc, char* argv[]) { return EXIT_FAILURE; } - auto err = producer->WaitRequestsFinished(args.timeout_sec*1000); + auto err = producer->WaitRequestsFinished(args.timeout_sec * 1000); if (err) { - std::cerr << "Producer exit on timeout " << std::endl; - exit(EXIT_FAILURE); + std::cerr << "Producer exit on timeout " << std::endl; + exit(EXIT_FAILURE); } - if (iterations_remained!=0) { + if (iterations_remained != 0) { std::cerr << "Producer did not send all data " << std::endl; exit(EXIT_FAILURE); } diff --git a/producer/api/cpp/include/producer/producer.h b/producer/api/cpp/include/producer/producer.h index c7d944fac4e2557b5cc7c61e2a0a5f7fdb15addf..d0daa41606149858a578bcd66df65981c6a17a8a 100644 --- a/producer/api/cpp/include/producer/producer.h +++ b/producer/api/cpp/include/producer/producer.h @@ -68,9 +68,9 @@ class Producer { virtual void EnableRemoteLog(bool enable) = 0; //! Set beamtime id which producer will use to send data virtual Error SetCredentials(SourceCredentials source_cred) = 0; - //! Set get current size of the requests queue + //! Set get current size of the requests queue virtual uint64_t GetRequestsQueueSize() = 0; - //! Wait until all current requests are processed or timeout + //! Wait until all current requests are processed or timeout virtual Error WaitRequestsFinished(uint64_t timeout_ms) = 0; }; diff --git a/producer/api/cpp/src/producer_impl.cpp b/producer/api/cpp/src/producer_impl.cpp index 6026feb2c21ba19e0d3256f9efb18243579abb17..d2f933380189d7658ad73d434b5f6e864db8bdac 100644 --- a/producer/api/cpp/src/producer_impl.cpp +++ b/producer/api/cpp/src/producer_impl.cpp @@ -190,7 +190,7 @@ uint64_t ProducerImpl::GetRequestsQueueSize() { }; Error ProducerImpl::WaitRequestsFinished(uint64_t timeout_ms) { - if (request_pool__->WaitRequestsFinished(timeout_ms)!=nullptr) { + if (request_pool__->WaitRequestsFinished(timeout_ms) != nullptr) { return ProducerErrorTemplates::kTimeout.Generate("waiting to finish processing requests"); } else { return nullptr; diff --git a/producer/api/cpp/unittests/mocking.h b/producer/api/cpp/unittests/mocking.h index d2bae4754d975546d889f11162f06ca1577b8e0f..c3dae1c6eab435e4c589c080b23faccee91b3654 100644 --- a/producer/api/cpp/unittests/mocking.h +++ b/producer/api/cpp/unittests/mocking.h @@ -38,8 +38,8 @@ class MockRequestPull : public RequestPool { MOCK_METHOD1(WaitRequestsFinished_t, asapo::ErrorInterface * (uint64_t timeout_ms)); asapo::Error WaitRequestsFinished(uint64_t timeout_ms) override { - return asapo::Error{WaitRequestsFinished_t(timeout_ms)}; - } + return asapo::Error{WaitRequestsFinished_t(timeout_ms)}; + } diff --git a/producer/api/cpp/unittests/test_producer_impl.cpp b/producer/api/cpp/unittests/test_producer_impl.cpp index 5a4650dd47b10f7940f30cd48d0b538388f02698..0b3d35eb2f61f869bbff260264963db8d160b3cb 100644 --- a/producer/api/cpp/unittests/test_producer_impl.cpp +++ b/producer/api/cpp/unittests/test_producer_impl.cpp @@ -353,7 +353,8 @@ TEST_F(ProducerImplTests, GetQueueSize) { } TEST_F(ProducerImplTests, WaitRequestsFinished) { - EXPECT_CALL(mock_pull, WaitRequestsFinished_t(_)).WillOnce(Return(asapo::IOErrorTemplates::kTimeout.Generate().release())); + EXPECT_CALL(mock_pull, WaitRequestsFinished_t(_)).WillOnce(Return( + asapo::IOErrorTemplates::kTimeout.Generate().release())); auto err = producer.WaitRequestsFinished(100); diff --git a/producer/api/python/asapo_producer.pyx.in b/producer/api/python/asapo_producer.pyx.in index b0904e31241478088cdd95ad8771da70792461ee..5adb34b538e5b7b3b05a2b94c9df2643be478332 100644 --- a/producer/api/python/asapo_producer.pyx.in +++ b/producer/api/python/asapo_producer.pyx.in @@ -94,7 +94,7 @@ cdef class PyProducer: event_header.file_size = data.nbytes err = self.c_producer.get().SendData__(event_header, data_pointer_nparray(data), ingest_mode, unwrap_callback_with_memory(<RequestCallbackCythonMemory>self.c_callback_ndarr, - <void*>self,<void*>callback if callback != None else NULL, <void*>data)) + <void*>self,<void*>callback, <void*>data)) if err: throw_exception(err) if data is not None: @@ -121,7 +121,7 @@ cdef class PyProducer: event_header.file_size = len(data) err = self.c_producer.get().SendData__(event_header, data_pointer_bytes(data), ingest_mode, unwrap_callback_with_memory(<RequestCallbackCythonMemory>self.c_callback_bytesaddr, - <void*>self,<void*>callback if callback != None else NULL, <void*>data)) + <void*>self,<void*>callback, <void*>data)) if err: throw_exception(err) Py_XINCREF(<PyObject*>data) @@ -189,7 +189,6 @@ cdef class PyProducer: :type timeout_ms: int :raises: AsapoProducerError """ - cdef Error err cdef uint64_t timeout = timeout_ms with nogil: @@ -224,7 +223,8 @@ cdef class PyProducer: self.c_callback_python(py_callback,header,err) def cleanup(self): with nogil: - self.c_producer.get().StopThreads__() + if self.c_producer.get() is not NULL: + self.c_producer.get().StopThreads__() @staticmethod def __create_producer(endpoint,beamtime_id,stream,token,nthreads): pyProd = PyProducer() diff --git a/producer/api/python/asapo_wrappers.h b/producer/api/python/asapo_wrappers.h index 5e0add7c47b0423a9675a9f38d3b56a84e163fb8..2bb1c2e4082de80d2698956b56077ab4389e21dc 100644 --- a/producer/api/python/asapo_wrappers.h +++ b/producer/api/python/asapo_wrappers.h @@ -4,7 +4,6 @@ #include <memory> #include <functional> - namespace asapo { inline std::string GetErrorString(asapo::Error* err) { diff --git a/tests/automatic/producer/python_api/check_linux.sh b/tests/automatic/producer/python_api/check_linux.sh index e7654d39438bd8a4b457a1e77d3c00f1778706cd..9d77846fb0068c311838abed990f119966e3b6db 100644 --- a/tests/automatic/producer/python_api/check_linux.sh +++ b/tests/automatic/producer/python_api/check_linux.sh @@ -36,6 +36,6 @@ echo test > file1 sleep 1 -$1 $3 $stream $beamtime_id "127.0.0.1:8400" > out +$1 $3 $stream $beamtime_id "127.0.0.1:8400" > out || cat out cat out cat out | grep "successfuly sent" | wc -l | grep 7 \ No newline at end of file diff --git a/tests/automatic/producer/python_api/producer_api.py b/tests/automatic/producer/python_api/producer_api.py index 88a5457bc97aa9862015a16df9f36fc8e40e81c7..0253decb57e074f044120fe849aa41f509b24ad9 100644 --- a/tests/automatic/producer/python_api/producer_api.py +++ b/tests/automatic/producer/python_api/producer_api.py @@ -7,7 +7,6 @@ import numpy as np import threading lock = threading.Lock() - stream = sys.argv[1] beamtime = sys.argv[2] endpoint = sys.argv[3] @@ -15,8 +14,6 @@ endpoint = sys.argv[3] token = "" nthreads = 8 - - def callback(header,err): lock.acquire() # to print if err is not None: @@ -27,12 +24,14 @@ def callback(header,err): producer = asapo_producer.create_producer(endpoint,beamtime, stream, token, nthreads) - producer.set_log_level("info") #send single file producer.send_file(1, local_path = "./file1", exposed_path = stream+"/"+"file1", user_meta = '{"test_key":"test_val"}', callback = callback) +#send single file without callback +producer.send_file(1, local_path = "./file1", exposed_path = stream+"/"+"file1", user_meta = '{"test_key":"test_val"}',callback=None) + #send subsets producer.send_file(2, local_path = "./file1", exposed_path = stream+"/"+"file2",subset=(2,2),user_meta = '{"test_key":"test_val"}', callback = callback) producer.send_file(3, local_path = "./file1", exposed_path = stream+"/"+"file3",subset=(2,2),user_meta = '{"test_key":"test_val"}', callback = callback) @@ -55,15 +54,11 @@ producer.send_data(5, stream+"/"+"file6",b"hello", producer.send_data(6, stream+"/"+"file7",None, ingest_mode = asapo_producer.INGEST_MODE_TRANSFER_METADATA_ONLY, callback = callback) -try: - producer.wait_requests_finished(50000) - n = producer.get_requests_queue_size() - if n!=0: - print("number of remaining requestst should be zero, got ",n) - sys.exit(1) -except: - print("waiting requests finished failed") - sys.exit(1) +producer.wait_requests_finished(50000) +n = producer.get_requests_queue_size() +if n!=0: + print("number of remaining requestst should be zero, got ",n) + sys.exit(1) # create with error diff --git a/tests/manual/python_tests/producer_wait_threads/asapo_producer.so b/tests/manual/python_tests/producer_wait_threads/asapo_producer.so new file mode 120000 index 0000000000000000000000000000000000000000..cd186e18092ad6fd8de0e7ab30f6b7a4b5417775 --- /dev/null +++ b/tests/manual/python_tests/producer_wait_threads/asapo_producer.so @@ -0,0 +1 @@ +/home/yakubov/projects/asapo/cmake-build-debug/producer/api/python/asapo_producer.so \ No newline at end of file diff --git a/tests/manual/python_tests/producer_wait_threads/file1 b/tests/manual/python_tests/producer_wait_threads/file1 new file mode 100644 index 0000000000000000000000000000000000000000..9daeafb9864cf43055ae93beb0afd6c7d144bfa4 --- /dev/null +++ b/tests/manual/python_tests/producer_wait_threads/file1 @@ -0,0 +1 @@ +test diff --git a/tests/manual/python_tests/producer_wait_threads/producer_api.py b/tests/manual/python_tests/producer_wait_threads/producer_api.py new file mode 100644 index 0000000000000000000000000000000000000000..de4be60ec77da6b68afc998db4751313d03d2415 --- /dev/null +++ b/tests/manual/python_tests/producer_wait_threads/producer_api.py @@ -0,0 +1,76 @@ +from __future__ import print_function + +import asapo_producer +import sys +import time +import numpy as np +import threading +lock = threading.Lock() + +stream = "python" +beamtime = "asapo_test" +endpoint = "127.0.0.1:8400" + +token = "" +nthreads = 8 + +def callback(header,err): + lock.acquire() # to print + if err is not None: + print("could not sent: ",header,err) + else: + print ("successfuly sent: ",header) + lock.release() + +producer = asapo_producer.create_producer(endpoint,beamtime, stream, token, nthreads) + +producer.set_log_level("info") + +#send single file +producer.send_file(1, local_path = "./file1", exposed_path = stream+"/"+"file1", user_meta = '{"test_key":"test_val"}', callback = callback) + +#send single file without callback +producer.send_file(1, local_path = "./file1", exposed_path = stream+"/"+"file1", user_meta = '{"test_key":"test_val"}',callback=None) + +#send subsets +producer.send_file(2, local_path = "./file1", exposed_path = stream+"/"+"file2",subset=(2,2),user_meta = '{"test_key":"test_val"}', callback = callback) +producer.send_file(3, local_path = "./file1", exposed_path = stream+"/"+"file3",subset=(2,2),user_meta = '{"test_key":"test_val"}', callback = callback) + +#send meta only +producer.send_file(3, local_path = "./not_exist",exposed_path = "./whatever", + ingest_mode = asapo_producer.INGEST_MODE_TRANSFER_METADATA_ONLY, callback = callback) + +data = np.arange(10,dtype=np.float64) + +#send data from array +producer.send_data(4, stream+"/"+"file5",data, + ingest_mode = asapo_producer.DEFAULT_INGEST_MODE, callback = callback) + +#send data from string +producer.send_data(5, stream+"/"+"file6",b"hello", + ingest_mode = asapo_producer.DEFAULT_INGEST_MODE, callback = callback) + +#send metadata only +producer.send_data(6, stream+"/"+"file7",None, + ingest_mode = asapo_producer.INGEST_MODE_TRANSFER_METADATA_ONLY, callback = callback) + +producer.wait_requests_finished(1000) +n = producer.get_requests_queue_size() +if n!=0: + print("number of remaining requestst should be zero, got ",n) + sys.exit(1) + + +# create with error +try: + producer = asapo_producer.create_producer(endpoint,beamtime, stream, token, 0) +except Exception as e: + print(e) +else: + print("should be error") + sys.exit(1) + + + + + diff --git a/tests/manual/python_tests/producer_wait_threads/test.py b/tests/manual/python_tests/producer_wait_threads/test.py new file mode 100644 index 0000000000000000000000000000000000000000..3a0206d73ac7b556e751a3e7a848ea19c44b7513 --- /dev/null +++ b/tests/manual/python_tests/producer_wait_threads/test.py @@ -0,0 +1,60 @@ +from __future__ import print_function +import threading +import asapo_producer +import sys +import time +import numpy as np +lock = threading.Lock() + +stream = "python" +beamtime = "asapo_test" +endpoint = "127.0.0.1:8400" + +token = "" +nthreads = 8 + +def callback(header,err): + global lock + lock.acquire() # to print + if err is not None: + print("could not sent: ",header,err) + else: + print ("successfuly sent: ",header) + lock.release() + +producer = asapo_producer.create_producer(endpoint,beamtime, stream, token, nthreads) + +producer.set_log_level("info") + +#send single file +producer.send_file(1, local_path = "./file1", exposed_path = stream+"/"+"file1", user_meta = '{"test_key":"test_val"}', callback = callback) + +#send single file without callback +producer.send_file(1, local_path = "./file1", exposed_path = stream+"/"+"file1", user_meta = '{"test_key":"test_val"}') + + +#send subsets +producer.send_file(2, local_path = "./file1", exposed_path = stream+"/"+"file2",subset=(2,2),user_meta = '{"test_key":"test_val"}', callback = callback) +producer.send_file(3, local_path = "./file1", exposed_path = stream+"/"+"file3",subset=(2,2),user_meta = '{"test_key":"test_val"}', callback = callback) + +#send meta only +producer.send_file(3, local_path = "./not_exist",exposed_path = "./whatever", + ingest_mode = asapo_producer.INGEST_MODE_TRANSFER_METADATA_ONLY, callback = callback) + +data = np.arange(10,dtype=np.float64) + +#send data from array +producer.send_data(4, stream+"/"+"file5",data, + ingest_mode = asapo_producer.DEFAULT_INGEST_MODE, callback = callback) + +#send data from string +producer.send_data(5, stream+"/"+"file6",b"hello", + ingest_mode = asapo_producer.DEFAULT_INGEST_MODE, callback = callback) + +#send metadata only +producer.send_data(6, stream+"/"+"file7",None, + ingest_mode = asapo_producer.INGEST_MODE_TRANSFER_METADATA_ONLY, callback = callback) + +producer.wait_requests_finished(1) + +