From 71e628f4a977132851c016bef4ea58b489e8a813 Mon Sep 17 00:00:00 2001
From: Sergey Yakubov <sergey.yakubov@desy.de>
Date: Fri, 15 Nov 2019 13:03:32 +0100
Subject: [PATCH] fix segfault

---
 common/cpp/include/request/request_pool.h     |  2 +-
 common/cpp/src/request/request_pool.cpp       |  2 +-
 .../unittests/request/test_request_pool.cpp   | 10 ++-
 examples/pipeline/in_to_out/in_to_out.cpp     |  2 +-
 .../pipeline/in_to_out_python/in_to_out.py    |  6 +-
 .../dummy_data_producer.cpp                   |  8 +-
 producer/api/cpp/include/producer/producer.h  |  4 +-
 producer/api/cpp/src/producer_impl.cpp        |  2 +-
 producer/api/cpp/unittests/mocking.h          |  4 +-
 .../api/cpp/unittests/test_producer_impl.cpp  |  3 +-
 producer/api/python/asapo_producer.pyx.in     |  8 +-
 producer/api/python/asapo_wrappers.h          |  1 -
 .../producer/python_api/check_linux.sh        |  2 +-
 .../producer/python_api/producer_api.py       | 21 ++---
 .../producer_wait_threads/asapo_producer.so   |  1 +
 .../python_tests/producer_wait_threads/file1  |  1 +
 .../producer_wait_threads/producer_api.py     | 76 +++++++++++++++++++
 .../producer_wait_threads/test.py             | 60 +++++++++++++++
 18 files changed, 175 insertions(+), 38 deletions(-)
 create mode 120000 tests/manual/python_tests/producer_wait_threads/asapo_producer.so
 create mode 100644 tests/manual/python_tests/producer_wait_threads/file1
 create mode 100644 tests/manual/python_tests/producer_wait_threads/producer_api.py
 create mode 100644 tests/manual/python_tests/producer_wait_threads/test.py

diff --git a/common/cpp/include/request/request_pool.h b/common/cpp/include/request/request_pool.h
index 1f3ea7a94..2c0be787d 100644
--- a/common/cpp/include/request/request_pool.h
+++ b/common/cpp/include/request/request_pool.h
@@ -28,7 +28,7 @@ class RequestPool {
     VIRTUAL uint64_t NRequestsInPool();
     VIRTUAL Error WaitRequestsFinished(uint64_t timeout_ms);
     VIRTUAL void StopThreads();
- private:
+  private:
     const AbstractLogger* log__;
     RequestHandlerFactory* request_handler_factory__;
     std::vector<std::thread> threads_;
diff --git a/common/cpp/src/request/request_pool.cpp b/common/cpp/src/request/request_pool.cpp
index 938ae8eee..7d9b6c03b 100644
--- a/common/cpp/src/request/request_pool.cpp
+++ b/common/cpp/src/request/request_pool.cpp
@@ -76,7 +76,7 @@ RequestPool::~RequestPool() {
 
 uint64_t RequestPool::NRequestsInPool() {
     std::lock_guard<std::mutex> lock{mutex_};
-    return request_queue_.size()+requests_in_progress_;
+    return request_queue_.size() + requests_in_progress_;
 }
 Error RequestPool::AddRequests(GenericRequests requests) {
     std::unique_lock<std::mutex> lock(mutex_);
diff --git a/common/cpp/unittests/request/test_request_pool.cpp b/common/cpp/unittests/request/test_request_pool.cpp
index 02e1dd057..91c5cb790 100644
--- a/common/cpp/unittests/request/test_request_pool.cpp
+++ b/common/cpp/unittests/request/test_request_pool.cpp
@@ -127,7 +127,7 @@ TEST_F(RequestPoolTests, NRequestsInPool) {
 }
 
 TEST_F(RequestPoolTests, NRequestsInPoolAccountsForRequestsInProgress) {
-    ExpectSend(mock_request_handler,1);
+    ExpectSend(mock_request_handler, 1);
 
     pool.AddRequest(std::move(request));
 
@@ -209,5 +209,13 @@ TEST_F(RequestPoolTests, WaitRequestsFinishedTimeout) {
 
 }
 
+TEST_F(RequestPoolTests, StopThreads) {
+    EXPECT_CALL(mock_logger, Debug(HasSubstr("finishing thread"))).Times(nthreads);
+
+    pool.StopThreads();
+
+    Mock::VerifyAndClearExpectations(&mock_logger);
+}
+
 
 }
diff --git a/examples/pipeline/in_to_out/in_to_out.cpp b/examples/pipeline/in_to_out/in_to_out.cpp
index fa5e530f2..5007d2296 100644
--- a/examples/pipeline/in_to_out/in_to_out.cpp
+++ b/examples/pipeline/in_to_out/in_to_out.cpp
@@ -228,7 +228,7 @@ int main(int argc, char* argv[]) {
     int nerrors;
     auto nfiles = ProcessAllData(args, producer, &duration_ms, &nerrors);
 
-    if (producer->WaitRequestsFinished(args.timeout_ms_producer)!=nullptr) {
+    if (producer->WaitRequestsFinished(args.timeout_ms_producer) != nullptr) {
         std::cerr << "Stream out exit on timeout " << std::endl;
     }
     auto duration_streamout = std::chrono::duration_cast<std::chrono::milliseconds>(streamout_finish - streamout_start);
diff --git a/examples/pipeline/in_to_out_python/in_to_out.py b/examples/pipeline/in_to_out_python/in_to_out.py
index 27290b573..b8a51975c 100644
--- a/examples/pipeline/in_to_out_python/in_to_out.py
+++ b/examples/pipeline/in_to_out_python/in_to_out.py
@@ -53,11 +53,7 @@ while True:
     except  asapo_producer.AsapoProducerError:
         break
 
-try:
-    producer.wait_requests_finished(timeout_s_producer*1000)
-except:
-    print("waiting requestst finished failed")
-
+producer.wait_requests_finished(timeout_s_producer*1000)
 
 print ("Processed "+str(n_recv)+" file(s)")
 print ("Sent "+str(n_send)+" file(s)")
diff --git a/examples/producer/dummy-data-producer/dummy_data_producer.cpp b/examples/producer/dummy-data-producer/dummy_data_producer.cpp
index ecab2603d..e2b034385 100644
--- a/examples/producer/dummy-data-producer/dummy_data_producer.cpp
+++ b/examples/producer/dummy-data-producer/dummy_data_producer.cpp
@@ -220,13 +220,13 @@ int main (int argc, char* argv[]) {
         return EXIT_FAILURE;
     }
 
-    auto err = producer->WaitRequestsFinished(args.timeout_sec*1000);
+    auto err = producer->WaitRequestsFinished(args.timeout_sec * 1000);
     if (err) {
-            std::cerr << "Producer exit on timeout " << std::endl;
-            exit(EXIT_FAILURE);
+        std::cerr << "Producer exit on timeout " << std::endl;
+        exit(EXIT_FAILURE);
     }
 
-    if (iterations_remained!=0) {
+    if (iterations_remained != 0) {
         std::cerr << "Producer did not send all data " << std::endl;
         exit(EXIT_FAILURE);
     }
diff --git a/producer/api/cpp/include/producer/producer.h b/producer/api/cpp/include/producer/producer.h
index c7d944fac..d0daa4160 100644
--- a/producer/api/cpp/include/producer/producer.h
+++ b/producer/api/cpp/include/producer/producer.h
@@ -68,9 +68,9 @@ class Producer {
     virtual void EnableRemoteLog(bool enable) = 0;
     //! Set beamtime id which producer will use to send data
     virtual Error SetCredentials(SourceCredentials source_cred) = 0;
-  //! Set get current size of the requests queue
+    //! Set get current size of the requests queue
     virtual  uint64_t  GetRequestsQueueSize() = 0;
-  //! Wait until all current requests are processed or timeout
+    //! Wait until all current requests are processed or timeout
     virtual Error WaitRequestsFinished(uint64_t timeout_ms) = 0;
 
 };
diff --git a/producer/api/cpp/src/producer_impl.cpp b/producer/api/cpp/src/producer_impl.cpp
index 6026feb2c..d2f933380 100644
--- a/producer/api/cpp/src/producer_impl.cpp
+++ b/producer/api/cpp/src/producer_impl.cpp
@@ -190,7 +190,7 @@ uint64_t  ProducerImpl::GetRequestsQueueSize() {
 };
 
 Error ProducerImpl::WaitRequestsFinished(uint64_t timeout_ms) {
-    if (request_pool__->WaitRequestsFinished(timeout_ms)!=nullptr) {
+    if (request_pool__->WaitRequestsFinished(timeout_ms) != nullptr) {
         return ProducerErrorTemplates::kTimeout.Generate("waiting to finish processing requests");
     } else {
         return nullptr;
diff --git a/producer/api/cpp/unittests/mocking.h b/producer/api/cpp/unittests/mocking.h
index d2bae4754..c3dae1c6e 100644
--- a/producer/api/cpp/unittests/mocking.h
+++ b/producer/api/cpp/unittests/mocking.h
@@ -38,8 +38,8 @@ class MockRequestPull : public RequestPool {
     MOCK_METHOD1(WaitRequestsFinished_t, asapo::ErrorInterface * (uint64_t timeout_ms));
 
     asapo::Error WaitRequestsFinished(uint64_t timeout_ms) override {
-      return asapo::Error{WaitRequestsFinished_t(timeout_ms)};
-  }
+        return asapo::Error{WaitRequestsFinished_t(timeout_ms)};
+    }
 
 
 
diff --git a/producer/api/cpp/unittests/test_producer_impl.cpp b/producer/api/cpp/unittests/test_producer_impl.cpp
index 5a4650dd4..0b3d35eb2 100644
--- a/producer/api/cpp/unittests/test_producer_impl.cpp
+++ b/producer/api/cpp/unittests/test_producer_impl.cpp
@@ -353,7 +353,8 @@ TEST_F(ProducerImplTests, GetQueueSize) {
 }
 
 TEST_F(ProducerImplTests, WaitRequestsFinished) {
-    EXPECT_CALL(mock_pull, WaitRequestsFinished_t(_)).WillOnce(Return(asapo::IOErrorTemplates::kTimeout.Generate().release()));
+    EXPECT_CALL(mock_pull, WaitRequestsFinished_t(_)).WillOnce(Return(
+                asapo::IOErrorTemplates::kTimeout.Generate().release()));
 
     auto err  = producer.WaitRequestsFinished(100);
 
diff --git a/producer/api/python/asapo_producer.pyx.in b/producer/api/python/asapo_producer.pyx.in
index b0904e312..5adb34b53 100644
--- a/producer/api/python/asapo_producer.pyx.in
+++ b/producer/api/python/asapo_producer.pyx.in
@@ -94,7 +94,7 @@ cdef class PyProducer:
             event_header.file_size = data.nbytes
         err = self.c_producer.get().SendData__(event_header, data_pointer_nparray(data), ingest_mode,
             unwrap_callback_with_memory(<RequestCallbackCythonMemory>self.c_callback_ndarr,
-             <void*>self,<void*>callback if callback != None else NULL, <void*>data))
+             <void*>self,<void*>callback, <void*>data))
         if err:
             throw_exception(err)
         if data is not None:
@@ -121,7 +121,7 @@ cdef class PyProducer:
         event_header.file_size = len(data)
         err = self.c_producer.get().SendData__(event_header, data_pointer_bytes(data), ingest_mode,
             unwrap_callback_with_memory(<RequestCallbackCythonMemory>self.c_callback_bytesaddr,
-             <void*>self,<void*>callback if callback != None else NULL, <void*>data))
+             <void*>self,<void*>callback, <void*>data))
         if err:
             throw_exception(err)
         Py_XINCREF(<PyObject*>data)
@@ -189,7 +189,6 @@ cdef class PyProducer:
          :type timeout_ms: int
          :raises: AsapoProducerError
         """
-
         cdef Error err
         cdef uint64_t timeout = timeout_ms
         with nogil:
@@ -224,7 +223,8 @@ cdef class PyProducer:
         self.c_callback_python(py_callback,header,err)
     def cleanup(self):
         with  nogil:
-            self.c_producer.get().StopThreads__()
+            if self.c_producer.get() is not NULL:
+                self.c_producer.get().StopThreads__()
     @staticmethod
     def __create_producer(endpoint,beamtime_id,stream,token,nthreads):
         pyProd = PyProducer()
diff --git a/producer/api/python/asapo_wrappers.h b/producer/api/python/asapo_wrappers.h
index 5e0add7c4..2bb1c2e40 100644
--- a/producer/api/python/asapo_wrappers.h
+++ b/producer/api/python/asapo_wrappers.h
@@ -4,7 +4,6 @@
 #include <memory>
 #include <functional>
 
-
 namespace asapo {
 
 inline std::string GetErrorString(asapo::Error* err) {
diff --git a/tests/automatic/producer/python_api/check_linux.sh b/tests/automatic/producer/python_api/check_linux.sh
index e7654d394..9d77846fb 100644
--- a/tests/automatic/producer/python_api/check_linux.sh
+++ b/tests/automatic/producer/python_api/check_linux.sh
@@ -36,6 +36,6 @@ echo test > file1
 
 sleep 1
 
-$1 $3 $stream $beamtime_id  "127.0.0.1:8400" > out
+$1 $3 $stream $beamtime_id  "127.0.0.1:8400" > out || cat out
 cat out
 cat out | grep "successfuly sent" | wc -l | grep 7
\ No newline at end of file
diff --git a/tests/automatic/producer/python_api/producer_api.py b/tests/automatic/producer/python_api/producer_api.py
index 88a5457bc..0253decb5 100644
--- a/tests/automatic/producer/python_api/producer_api.py
+++ b/tests/automatic/producer/python_api/producer_api.py
@@ -7,7 +7,6 @@ import numpy as np
 import threading
 lock = threading.Lock()
 
-
 stream = sys.argv[1]
 beamtime = sys.argv[2]
 endpoint = sys.argv[3]
@@ -15,8 +14,6 @@ endpoint = sys.argv[3]
 token = ""
 nthreads = 8
 
-
-
 def callback(header,err):
     lock.acquire() # to print
     if err is not None:
@@ -27,12 +24,14 @@ def callback(header,err):
 
 producer  = asapo_producer.create_producer(endpoint,beamtime, stream, token, nthreads)
 
-
 producer.set_log_level("info")
 
 #send single file
 producer.send_file(1, local_path = "./file1", exposed_path = stream+"/"+"file1", user_meta = '{"test_key":"test_val"}', callback = callback)
 
+#send single file without callback
+producer.send_file(1, local_path = "./file1", exposed_path = stream+"/"+"file1", user_meta = '{"test_key":"test_val"}',callback=None)
+
 #send subsets
 producer.send_file(2, local_path = "./file1", exposed_path = stream+"/"+"file2",subset=(2,2),user_meta = '{"test_key":"test_val"}', callback = callback)
 producer.send_file(3, local_path = "./file1", exposed_path = stream+"/"+"file3",subset=(2,2),user_meta = '{"test_key":"test_val"}', callback = callback)
@@ -55,15 +54,11 @@ producer.send_data(5, stream+"/"+"file6",b"hello",
 producer.send_data(6, stream+"/"+"file7",None,
                          ingest_mode = asapo_producer.INGEST_MODE_TRANSFER_METADATA_ONLY, callback = callback)
 
-try:
-    producer.wait_requests_finished(50000)
-    n = producer.get_requests_queue_size()
-    if n!=0:
-        print("number of remaining requestst should be zero, got ",n)
-        sys.exit(1)
-except:
-    print("waiting requests finished failed")
-    sys.exit(1)
+producer.wait_requests_finished(50000)
+n = producer.get_requests_queue_size()
+if n!=0:
+	print("number of remaining requestst should be zero, got ",n)
+	sys.exit(1)
 
 
 # create with error
diff --git a/tests/manual/python_tests/producer_wait_threads/asapo_producer.so b/tests/manual/python_tests/producer_wait_threads/asapo_producer.so
new file mode 120000
index 000000000..cd186e180
--- /dev/null
+++ b/tests/manual/python_tests/producer_wait_threads/asapo_producer.so
@@ -0,0 +1 @@
+/home/yakubov/projects/asapo/cmake-build-debug/producer/api/python/asapo_producer.so
\ No newline at end of file
diff --git a/tests/manual/python_tests/producer_wait_threads/file1 b/tests/manual/python_tests/producer_wait_threads/file1
new file mode 100644
index 000000000..9daeafb98
--- /dev/null
+++ b/tests/manual/python_tests/producer_wait_threads/file1
@@ -0,0 +1 @@
+test
diff --git a/tests/manual/python_tests/producer_wait_threads/producer_api.py b/tests/manual/python_tests/producer_wait_threads/producer_api.py
new file mode 100644
index 000000000..de4be60ec
--- /dev/null
+++ b/tests/manual/python_tests/producer_wait_threads/producer_api.py
@@ -0,0 +1,76 @@
+from __future__ import print_function
+
+import asapo_producer
+import sys
+import time
+import numpy as np
+import threading
+lock = threading.Lock()
+
+stream = "python"
+beamtime = "asapo_test"
+endpoint = "127.0.0.1:8400"
+
+token = ""
+nthreads = 8
+
+def callback(header,err):
+    lock.acquire() # to print
+    if err is not None:
+        print("could not sent: ",header,err)
+    else:
+        print ("successfuly sent: ",header)
+    lock.release()
+
+producer  = asapo_producer.create_producer(endpoint,beamtime, stream, token, nthreads)
+
+producer.set_log_level("info")
+
+#send single file
+producer.send_file(1, local_path = "./file1", exposed_path = stream+"/"+"file1", user_meta = '{"test_key":"test_val"}', callback = callback)
+
+#send single file without callback
+producer.send_file(1, local_path = "./file1", exposed_path = stream+"/"+"file1", user_meta = '{"test_key":"test_val"}',callback=None)
+
+#send subsets
+producer.send_file(2, local_path = "./file1", exposed_path = stream+"/"+"file2",subset=(2,2),user_meta = '{"test_key":"test_val"}', callback = callback)
+producer.send_file(3, local_path = "./file1", exposed_path = stream+"/"+"file3",subset=(2,2),user_meta = '{"test_key":"test_val"}', callback = callback)
+
+#send meta only
+producer.send_file(3, local_path = "./not_exist",exposed_path = "./whatever",
+                         ingest_mode = asapo_producer.INGEST_MODE_TRANSFER_METADATA_ONLY, callback = callback)
+
+data = np.arange(10,dtype=np.float64)
+
+#send data from array
+producer.send_data(4, stream+"/"+"file5",data,
+                         ingest_mode = asapo_producer.DEFAULT_INGEST_MODE, callback = callback)
+
+#send data from string
+producer.send_data(5, stream+"/"+"file6",b"hello",
+                         ingest_mode = asapo_producer.DEFAULT_INGEST_MODE, callback = callback)
+
+#send metadata only
+producer.send_data(6, stream+"/"+"file7",None,
+                         ingest_mode = asapo_producer.INGEST_MODE_TRANSFER_METADATA_ONLY, callback = callback)
+
+producer.wait_requests_finished(1000)
+n = producer.get_requests_queue_size()
+if n!=0:
+	print("number of remaining requestst should be zero, got ",n)
+	sys.exit(1)
+
+
+# create with error
+try:
+    producer  = asapo_producer.create_producer(endpoint,beamtime, stream, token, 0)
+except Exception as e:
+    print(e)
+else:
+    print("should be error")
+    sys.exit(1)
+
+
+
+
+
diff --git a/tests/manual/python_tests/producer_wait_threads/test.py b/tests/manual/python_tests/producer_wait_threads/test.py
new file mode 100644
index 000000000..3a0206d73
--- /dev/null
+++ b/tests/manual/python_tests/producer_wait_threads/test.py
@@ -0,0 +1,60 @@
+from __future__ import print_function
+import threading
+import asapo_producer
+import sys
+import time
+import numpy as np
+lock = threading.Lock()
+
+stream = "python"
+beamtime = "asapo_test"
+endpoint = "127.0.0.1:8400"
+
+token = ""
+nthreads = 8
+
+def callback(header,err):
+    global lock
+    lock.acquire() # to print
+    if err is not None:
+        print("could not sent: ",header,err)
+    else:
+        print ("successfuly sent: ",header)
+    lock.release()
+
+producer  = asapo_producer.create_producer(endpoint,beamtime, stream, token, nthreads)
+
+producer.set_log_level("info")
+
+#send single file
+producer.send_file(1, local_path = "./file1", exposed_path = stream+"/"+"file1", user_meta = '{"test_key":"test_val"}', callback = callback)
+
+#send single file without callback
+producer.send_file(1, local_path = "./file1", exposed_path = stream+"/"+"file1", user_meta = '{"test_key":"test_val"}')
+
+
+#send subsets
+producer.send_file(2, local_path = "./file1", exposed_path = stream+"/"+"file2",subset=(2,2),user_meta = '{"test_key":"test_val"}', callback = callback)
+producer.send_file(3, local_path = "./file1", exposed_path = stream+"/"+"file3",subset=(2,2),user_meta = '{"test_key":"test_val"}', callback = callback)
+
+#send meta only
+producer.send_file(3, local_path = "./not_exist",exposed_path = "./whatever",
+                         ingest_mode = asapo_producer.INGEST_MODE_TRANSFER_METADATA_ONLY, callback = callback)
+
+data = np.arange(10,dtype=np.float64)
+
+#send data from array
+producer.send_data(4, stream+"/"+"file5",data,
+                         ingest_mode = asapo_producer.DEFAULT_INGEST_MODE, callback = callback)
+
+#send data from string
+producer.send_data(5, stream+"/"+"file6",b"hello",
+                         ingest_mode = asapo_producer.DEFAULT_INGEST_MODE, callback = callback)
+
+#send metadata only
+producer.send_data(6, stream+"/"+"file7",None,
+                         ingest_mode = asapo_producer.INGEST_MODE_TRANSFER_METADATA_ONLY, callback = callback)
+
+producer.wait_requests_finished(1)
+
+
-- 
GitLab