Skip to content
Snippets Groups Projects
Commit 4dd55501 authored by Sergey Yakubov's avatar Sergey Yakubov
Browse files

Merge pull request #63 in ASAPO/asapo from feature_producer-api to develop

* commit 'c08f7ff1':
  update producer errors
  fix segfault
  stop producer threads at python exit
  try-except block for wait_requests_finished, nogil
  add functions to get requests pool size and wait until pool is empty
parents e5c5c003 c08f7ff1
No related branches found
No related tags found
No related merge requests found
Showing
with 288 additions and 116 deletions
......@@ -24,9 +24,10 @@ class RequestPool {
explicit RequestPool(uint8_t n_threads, RequestHandlerFactory* request_handler_factory, const AbstractLogger* log);
VIRTUAL Error AddRequest(GenericRequestPtr request);
VIRTUAL Error AddRequests(GenericRequests requests);
~RequestPool();
uint64_t NRequestsInQueue();
VIRTUAL uint64_t NRequestsInPool();
VIRTUAL Error WaitRequestsFinished(uint64_t timeout_ms);
VIRTUAL void StopThreads();
private:
const AbstractLogger* log__;
RequestHandlerFactory* request_handler_factory__;
......@@ -41,6 +42,7 @@ class RequestPool {
GenericRequestPtr GetRequestFromQueue();
void PutRequestBackToQueue(GenericRequestPtr request);
uint64_t shared_counter_{0};
uint64_t requests_in_progress_{0};
};
......
......@@ -42,9 +42,11 @@ void RequestPool::ProcessRequest(const std::unique_ptr<RequestHandler>& request_
ThreadInformation* thread_info) {
request_handler->PrepareProcessingRequestLocked();
auto request = GetRequestFromQueue();
requests_in_progress_++;
thread_info->lock.unlock();
auto success = request_handler->ProcessRequestUnlocked(request.get());
thread_info->lock.lock();
requests_in_progress_--;
request_handler->TearDownProcessingRequestLocked(success);
if (!success) {
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
......@@ -69,21 +71,12 @@ void RequestPool::ThreadHandler(uint64_t id) {
}
RequestPool::~RequestPool() {
mutex_.lock();
quit_ = true;
mutex_.unlock();
condition_.notify_all();
for(size_t i = 0; i < threads_.size(); i++) {
if(threads_[i].joinable()) {
log__->Debug("finishing thread " + std::to_string(i));
threads_[i].join();
}
}
StopThreads();
}
uint64_t RequestPool::NRequestsInQueue() {
uint64_t RequestPool::NRequestsInPool() {
std::lock_guard<std::mutex> lock{mutex_};
return request_queue_.size();
return request_queue_.size() + requests_in_progress_;
}
Error RequestPool::AddRequests(GenericRequests requests) {
std::unique_lock<std::mutex> lock(mutex_);
......@@ -97,4 +90,35 @@ Error RequestPool::AddRequests(GenericRequests requests) {
}
Error RequestPool::WaitRequestsFinished(uint64_t timeout_ms) {
uint64_t elapsed_ms = 0;
while (true) {
auto n_requests = NRequestsInPool();
if (n_requests == 0) {
break;
}
if (elapsed_ms >= timeout_ms) {
return IOErrorTemplates::kTimeout.Generate();
}
std::this_thread::sleep_for(std::chrono::milliseconds(100));
elapsed_ms += 100;
}
return nullptr;
}
void RequestPool::StopThreads() {
mutex_.lock();
quit_ = true;
mutex_.unlock();
condition_.notify_all();
for(size_t i = 0; i < threads_.size(); i++) {
if(threads_[i].joinable()) {
log__->Debug("finishing thread " + std::to_string(i));
threads_[i].join();
}
}
}
}
......@@ -18,6 +18,7 @@ class MockRequestHandler : public RequestHandler {
MOCK_METHOD1(ProcessRequestUnlocked_t, bool (const GenericRequest* request));
bool ProcessRequestUnlocked(GenericRequest* request) override {
std::this_thread::sleep_for(std::chrono::milliseconds(50));
return ProcessRequestUnlocked_t(request);
}
......
......@@ -93,11 +93,13 @@ TEST_F(RequestPoolTests, AddRequestDoesNotGoFurtherWhenNotReady) {
ASSERT_THAT(err, Eq(nullptr));
}
TEST_F(RequestPoolTests, NRequestsInQueue) {
auto nreq = pool.NRequestsInQueue();
TEST_F(RequestPoolTests, NRequestsInPoolInitial) {
auto nreq = pool.NRequestsInPool();
ASSERT_THAT(nreq, Eq(0));
}
void ExpectSend(MockRequestHandler* mock_handler, int ntimes = 1) {
EXPECT_CALL(*mock_handler, ReadyProcessRequest()).Times(ntimes).WillRepeatedly(Return(true));
EXPECT_CALL(*mock_handler, PrepareProcessingRequestLocked()).Times(ntimes);
......@@ -106,7 +108,6 @@ void ExpectSend(MockRequestHandler* mock_handler, int ntimes = 1) {
}
TEST_F(RequestPoolTests, AddRequestCallsSend) {
ExpectSend(mock_request_handler);
......@@ -117,6 +118,30 @@ TEST_F(RequestPoolTests, AddRequestCallsSend) {
ASSERT_THAT(err, Eq(nullptr));
}
TEST_F(RequestPoolTests, NRequestsInPool) {
pool.AddRequest(std::move(request));
auto nreq = pool.NRequestsInPool();
ASSERT_THAT(nreq, Eq(1));
}
TEST_F(RequestPoolTests, NRequestsInPoolAccountsForRequestsInProgress) {
ExpectSend(mock_request_handler, 1);
pool.AddRequest(std::move(request));
std::this_thread::sleep_for(std::chrono::milliseconds(30));
auto nreq1 = pool.NRequestsInPool();
std::this_thread::sleep_for(std::chrono::milliseconds(100));
auto nreq2 = pool.NRequestsInPool();
ASSERT_THAT(nreq1, Eq(1));
ASSERT_THAT(nreq2, Eq(0));
}
TEST_F(RequestPoolTests, AddRequestCallsSendTwoRequests) {
......@@ -124,13 +149,11 @@ TEST_F(RequestPoolTests, AddRequestCallsSendTwoRequests) {
ExpectSend(mock_request_handler, 2);
auto err1 = pool.AddRequest(std::move(request));
request.reset(request2);
auto err2 = pool.AddRequest(std::move(request));
std::this_thread::sleep_for(std::chrono::milliseconds(30));
std::this_thread::sleep_for(std::chrono::milliseconds(80));
ASSERT_THAT(err1, Eq(nullptr));
ASSERT_THAT(err2, Eq(nullptr));
}
......@@ -148,14 +171,50 @@ TEST_F(RequestPoolTests, AddRequestsOk) {
auto err = pool.AddRequests(std::move(requests));
std::this_thread::sleep_for(std::chrono::milliseconds(30));
std::this_thread::sleep_for(std::chrono::milliseconds(80));
ASSERT_THAT(err, Eq(nullptr));
}
TEST_F(RequestPoolTests, FinishProcessingThreads) {
EXPECT_CALL(mock_logger, Debug(HasSubstr("finishing thread"))).Times(nthreads);
}
TEST_F(RequestPoolTests, WaitRequestsFinished) {
ExpectSend(mock_request_handler);
pool.AddRequest(std::move(request));
std::this_thread::sleep_for(std::chrono::milliseconds(20));
auto err = pool.WaitRequestsFinished(1000);
auto nreq = pool.NRequestsInPool();
ASSERT_THAT(nreq, Eq(0));
ASSERT_THAT(err, Eq(nullptr));
}
TEST_F(RequestPoolTests, WaitRequestsFinishedTimeout) {
ExpectSend(mock_request_handler);
pool.AddRequest(std::move(request));
std::this_thread::sleep_for(std::chrono::milliseconds(20));
auto err = pool.WaitRequestsFinished(0);
TEST_F(RequestPoolTests, FinishProcessingThreads) {
auto nreq = pool.NRequestsInPool();
ASSERT_THAT(nreq, Eq(1));
ASSERT_THAT(err, Eq(asapo::IOErrorTemplates::kTimeout));
}
TEST_F(RequestPoolTests, StopThreads) {
EXPECT_CALL(mock_logger, Debug(HasSubstr("finishing thread"))).Times(nthreads);
pool.StopThreads();
Mock::VerifyAndClearExpectations(&mock_logger);
}
......
......@@ -199,23 +199,6 @@ std::unique_ptr<asapo::Producer> CreateProducer(const Args& args) {
return producer;
}
void WaitProducerThreadsFinished(const Args& args, int nfiles) {
int elapsed_ms = 0;
while (true) {
if (files_sent == nfiles) {
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(100));
elapsed_ms += 100;
if (elapsed_ms > args.timeout_ms_producer) {
std::cerr << "Stream out exit on timeout " << std::endl;
break;
}
}
}
int main(int argc, char* argv[]) {
asapo::ExitAfterPrintVersionIfNeeded("GetNext Broker Example", argc, argv);
Args args;
......@@ -245,7 +228,9 @@ int main(int argc, char* argv[]) {
int nerrors;
auto nfiles = ProcessAllData(args, producer, &duration_ms, &nerrors);
WaitProducerThreadsFinished(args, nfiles);
if (producer->WaitRequestsFinished(args.timeout_ms_producer) != nullptr) {
std::cerr << "Stream out exit on timeout " << std::endl;
}
auto duration_streamout = std::chrono::duration_cast<std::chrono::milliseconds>(streamout_finish - streamout_start);
std::cout << "Stream in " << std::endl;
......
......@@ -22,13 +22,6 @@ def callback(header,err):
n_send = n_send + 1
lock.release()
def wait_send(n_files, timeout_s):
elapsed = 0
while elapsed < timeout_s:
if n_send == n_files:
break
time.sleep(1)
source, path, beamtime,stream_in, stream_out, token, timeout_s,timeout_s_producer,nthreads, transfer_data = sys.argv[1:]
timeout_s=int(timeout_s)
timeout_s_producer=int(timeout_s_producer)
......@@ -60,8 +53,7 @@ while True:
except asapo_producer.AsapoProducerError:
break
wait_send(n_recv,timeout_s_producer)
producer.wait_requests_finished(timeout_s_producer*1000)
print ("Processed "+str(n_recv)+" file(s)")
print ("Sent "+str(n_send)+" file(s)")
......@@ -191,25 +191,6 @@ std::unique_ptr<asapo::Producer> CreateProducer(const Args& args) {
return producer;
}
void WaitThreadsFinished(const Args& args) {
uint64_t elapsed_ms = 0;
while (true) {
mutex.lock();
if (iterations_remained <= 0) {
mutex.unlock();
break;
}
mutex.unlock();
std::this_thread::sleep_for(std::chrono::milliseconds(100));
elapsed_ms += 100;
if (elapsed_ms > args.timeout_sec * 1000) {
std::cerr << "Producer exit on timeout " << std::endl;
exit(EXIT_FAILURE);
}
}
}
void PrintOutput(const Args& args, const system_clock::time_point& start) {
system_clock::time_point t2 = system_clock::now();
double duration_sec = std::chrono::duration_cast<std::chrono::milliseconds>( t2 - start ).count() / 1000.0;
......@@ -220,8 +201,6 @@ void PrintOutput(const Args& args, const system_clock::time_point& start) {
std::cout << "Bandwidth " << size_gb / duration_sec / 8 << " GBytes/s" << std::endl;
}
int main (int argc, char* argv[]) {
Args args;
ProcessCommandArguments(argc, argv, &args);
......@@ -241,7 +220,16 @@ int main (int argc, char* argv[]) {
return EXIT_FAILURE;
}
WaitThreadsFinished(args);
auto err = producer->WaitRequestsFinished(args.timeout_sec * 1000);
if (err) {
std::cerr << "Producer exit on timeout " << std::endl;
exit(EXIT_FAILURE);
}
if (iterations_remained != 0) {
std::cerr << "Producer did not send all data " << std::endl;
exit(EXIT_FAILURE);
}
PrintOutput(args, start_time);
return EXIT_SUCCESS;
......
......@@ -36,9 +36,12 @@ class Producer {
//! Sends data to the receiver - same as SendData - memory should not be freed until send is finished
//! used e.g. for Python bindings
virtual Error SendData_(const EventHeader& event_header, void* data, uint64_t ingest_mode,
RequestCallback callback) = 0;
virtual Error SendData__(const EventHeader& event_header, void* data, uint64_t ingest_mode,
RequestCallback callback) = 0;
//! Stop processing threads
//! used e.g. for Python bindings
virtual void StopThreads__() = 0;
//! Sends files to the receiver
/*!
......@@ -65,6 +68,11 @@ class Producer {
virtual void EnableRemoteLog(bool enable) = 0;
//! Set beamtime id which producer will use to send data
virtual Error SetCredentials(SourceCredentials source_cred) = 0;
//! Set get current size of the requests queue
virtual uint64_t GetRequestsQueueSize() = 0;
//! Wait until all current requests are processed or timeout
virtual Error WaitRequestsFinished(uint64_t timeout_ms) = 0;
};
}
......
......@@ -9,7 +9,8 @@ enum class ProducerErrorType {
kInternalServerError,
kRequestPoolIsFull,
kLocalIOError,
kWrongInput
kWrongInput,
kTimeout
};
using ProducerErrorTemplate = ServiceErrorTemplate<ProducerErrorType, ErrorType::kProducerError>;
......@@ -32,6 +33,11 @@ auto const kInternalServerError = ProducerErrorTemplate {
"Internal server error", ProducerErrorType::kInternalServerError
};
auto const kTimeout = ProducerErrorTemplate {
"Timeout", ProducerErrorType::kTimeout
};
};
}
......
#include "producer/producer.h"
#include "producer_impl.h"
#include "producer/producer_error.h"
std::unique_ptr<asapo::Producer> asapo::Producer::Create(const std::string& endpoint, uint8_t n_processing_threads,
asapo::RequestHandlerType type, SourceCredentials source_cred, Error* err) {
if (n_processing_threads > kMaxProcessingThreads || n_processing_threads == 0) {
*err = TextError("Set number of processing threads > 0 and <= " + std::to_string(kMaxProcessingThreads));
*err = ProducerErrorTemplates::kWrongInput.Generate("Set number of processing threads > 0 and <= " + std::to_string(
kMaxProcessingThreads));
return nullptr;
}
......
......@@ -171,10 +171,10 @@ Error ProducerImpl::SendMetaData(const std::string& metadata, RequestCallback ca
}
Error ProducerImpl::SendData_(const EventHeader& event_header,
void* data,
uint64_t ingest_mode,
RequestCallback callback) {
Error ProducerImpl::SendData__(const EventHeader& event_header,
void* data,
uint64_t ingest_mode,
RequestCallback callback) {
FileData data_wrapped = FileData{(uint8_t*)data};
if (auto err = CheckData(ingest_mode, event_header, &data_wrapped)) {
......@@ -185,4 +185,20 @@ Error ProducerImpl::SendData_(const EventHeader& event_header,
return Send(std::move(event_header), std::move(data_wrapped), "", ingest_mode, callback, false);
}
uint64_t ProducerImpl::GetRequestsQueueSize() {
return request_pool__->NRequestsInPool();
};
Error ProducerImpl::WaitRequestsFinished(uint64_t timeout_ms) {
if (request_pool__->WaitRequestsFinished(timeout_ms) != nullptr) {
return ProducerErrorTemplates::kTimeout.Generate("waiting to finish processing requests");
} else {
return nullptr;
}
}
void ProducerImpl::StopThreads__() {
request_pool__->StopThreads();
}
}
\ No newline at end of file
......@@ -28,9 +28,9 @@ class ProducerImpl : public Producer {
void EnableLocalLog(bool enable) override;
void EnableRemoteLog(bool enable) override;
Error SendData(const EventHeader& event_header, FileData data, uint64_t ingest_mode, RequestCallback callback) override;
Error SendData_(const EventHeader& event_header, void* data , uint64_t ingest_mode,
RequestCallback callback) override;
Error SendData__(const EventHeader& event_header, void* data , uint64_t ingest_mode,
RequestCallback callback) override;
void StopThreads__() override;
Error SendFile(const EventHeader& event_header, std::string full_path, uint64_t ingest_mode,
RequestCallback callback) override;
AbstractLogger* log__;
......@@ -39,6 +39,8 @@ class ProducerImpl : public Producer {
Error SetCredentials(SourceCredentials source_cred) override;
Error SendMetaData(const std::string& metadata, RequestCallback callback) override;
uint64_t GetRequestsQueueSize() override;
Error WaitRequestsFinished(uint64_t timeout_ms) override;
private:
Error Send(const EventHeader& event_header, FileData data, std::string full_path, uint64_t ingest_mode,
......
......@@ -33,6 +33,16 @@ class MockRequestPull : public RequestPool {
return asapo::Error{AddRequest_t(request.get())};
}
MOCK_METHOD1(AddRequest_t, asapo::ErrorInterface * (GenericRequest*));
MOCK_METHOD0(NRequestsInPool, uint64_t ());
MOCK_METHOD1(WaitRequestsFinished_t, asapo::ErrorInterface * (uint64_t timeout_ms));
asapo::Error WaitRequestsFinished(uint64_t timeout_ms) override {
return asapo::Error{WaitRequestsFinished_t(timeout_ms)};
}
};
......
......@@ -46,7 +46,7 @@ TEST(CreateProducer, TooManyThreads) {
std::unique_ptr<asapo::Producer> producer = asapo::Producer::Create("", asapo::kMaxProcessingThreads + 1,
asapo::RequestHandlerType::kTcp, SourceCredentials{"bt", "", ""}, &err);
ASSERT_THAT(producer, Eq(nullptr));
ASSERT_THAT(err, Ne(nullptr));
ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kWrongInput));
}
......@@ -55,7 +55,7 @@ TEST(CreateProducer, ZeroThreads) {
std::unique_ptr<asapo::Producer> producer = asapo::Producer::Create("", 0,
asapo::RequestHandlerType::kTcp, SourceCredentials{"bt", "", ""}, &err);
ASSERT_THAT(producer, Eq(nullptr));
ASSERT_THAT(err, Ne(nullptr));
ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kWrongInput));
}
......
......@@ -136,7 +136,7 @@ TEST_F(ProducerImplTests, ErrorIfNoData) {
TEST_F(ProducerImplTests, ErrorIfNoDataSend_) {
asapo::EventHeader event_header{1, 100, expected_fullpath};
auto err = producer.SendData_(event_header, nullptr, asapo::kDefaultIngestMode, nullptr);
auto err = producer.SendData__(event_header, nullptr, asapo::kDefaultIngestMode, nullptr);
ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kWrongInput));
}
......@@ -215,7 +215,7 @@ TEST_F(ProducerImplTests, OKSendingSendDataRequesUnmanagedMemory) {
nullptr));
asapo::EventHeader event_header{expected_id, expected_size, expected_name, expected_metadata};
auto err = producer.SendData_(event_header, nullptr, expected_ingest_mode, nullptr);
auto err = producer.SendData__(event_header, nullptr, expected_ingest_mode, nullptr);
ASSERT_THAT(err, Eq(nullptr));
}
......@@ -344,4 +344,22 @@ TEST_F(ProducerImplTests, ErrorSendingWrongIngestMode) {
}
TEST_F(ProducerImplTests, GetQueueSize) {
EXPECT_CALL(mock_pull, NRequestsInPool()).WillOnce(Return(10));
auto size = producer.GetRequestsQueueSize();
ASSERT_THAT(size, Eq(10));
}
TEST_F(ProducerImplTests, WaitRequestsFinished) {
EXPECT_CALL(mock_pull, WaitRequestsFinished_t(_)).WillOnce(Return(
asapo::IOErrorTemplates::kTimeout.Generate().release()));
auto err = producer.WaitRequestsFinished(100);
ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kTimeout));
}
}
......@@ -18,10 +18,9 @@ cdef extern from "asapo_producer.h" namespace "asapo":
cdef bool operator==(Error lhs, ErrorTemplateInterface rhs)
cdef extern from "asapo_producer.h" namespace "asapo":
ErrorTemplateInterface kInternalServerError "asapo::ProducerErrorTemplates::kInternalServerError"
ErrorTemplateInterface kCannotSendDataToReceivers "asapo::ProducerErrorTemplates::kCannotSendDataToReceivers"
ErrorTemplateInterface kRequestPoolIsFull "asapo::ProducerErrorTemplates::kRequestPoolIsFull"
ErrorTemplateInterface kTimeout "asapo::ProducerErrorTemplates::kTimeout"
ErrorTemplateInterface kWrongInput "asapo::ProducerErrorTemplates::kWrongInput"
ErrorTemplateInterface kLocalIOError "asapo::ProducerErrorTemplates::kLocalIOError"
cdef extern from "asapo_producer.h" namespace "asapo":
cppclass FileData:
......@@ -86,13 +85,16 @@ cdef extern from "asapo_wrappers.h" namespace "asapo":
RequestCallback unwrap_callback_with_memory(RequestCallbackCythonMemory, void*,void*,void*)
cdef extern from "asapo_producer.h" namespace "asapo":
cdef extern from "asapo_producer.h" namespace "asapo" nogil:
cppclass Producer:
@staticmethod
unique_ptr[Producer] Create(string endpoint,uint8_t nthreads,RequestHandlerType type, SourceCredentials source,Error* error)
Error SendFile(const EventHeader& event_header, string full_path, uint64_t ingest_mode,RequestCallback callback)
Error SendData_(const EventHeader& event_header, void* data, uint64_t ingest_mode,RequestCallback callback)
Error SendData__(const EventHeader& event_header, void* data, uint64_t ingest_mode,RequestCallback callback)
void StopThreads__()
void SetLogLevel(LogLevel level)
uint64_t GetRequestsQueueSize()
Error WaitRequestsFinished(uint64_t timeout_ms)
cdef extern from "asapo_producer.h" namespace "asapo":
uint64_t kDefaultIngestMode
......
......@@ -7,6 +7,7 @@ import json
from cpython.version cimport PY_MAJOR_VERSION
from libcpp.memory cimport unique_ptr
from cpython.ref cimport PyObject,Py_XINCREF,Py_XDECREF
import atexit
np.import_array()
......@@ -40,15 +41,25 @@ class AsapoProducerError(Exception):
class AsapoWrongInputError(AsapoProducerError):
pass
#todo: more types
cdef python_exception_from_error(Error& err):
return AsapoProducerError(err.get().Explain())
class AsapoLocalIOError(AsapoProducerError):
pass
class AsapoTimeOutError(AsapoProducerError):
pass
cdef python_exception_from_error(Error& err):
if err == kTimeout:
return AsapoTimeOutError(err.get().Explain())
elif err == kWrongInput:
return AsapoWrongInputError(err.get().Explain())
elif err == kLocalIOError:
return AsapoLocalIOError(err.get().Explain())
else:
return AsapoProducerError(err.get().Explain())
cdef throw_exception(Error& err):
raise python_exception_from_error(err)
cdef void* data_pointer_nparray(data):
if data is None:
return <void*>NULL
......@@ -64,6 +75,8 @@ cdef void* data_pointer_bytes(data):
cdef class PyProducer:
cdef unique_ptr[Producer] c_producer
def __init__(self):
atexit.register(self.cleanup)
def set_log_level(self,level):
cdef LogLevel log_level
log_level = LogLevel_Info
......@@ -89,9 +102,9 @@ cdef class PyProducer:
event_header.file_size = 0
else:
event_header.file_size = data.nbytes
err = self.c_producer.get().SendData_(event_header, data_pointer_nparray(data), ingest_mode,
err = self.c_producer.get().SendData__(event_header, data_pointer_nparray(data), ingest_mode,
unwrap_callback_with_memory(<RequestCallbackCythonMemory>self.c_callback_ndarr,
<void*>self,<void*>callback if callback != None else NULL, <void*>data))
<void*>self,<void*>callback, <void*>data))
if err:
throw_exception(err)
if data is not None:
......@@ -116,9 +129,9 @@ cdef class PyProducer:
def __send_bytes(self,int id, exposed_path,data, user_meta=None,subset=None,ingest_mode = DEFAULT_INGEST_MODE,callback=None):
cdef EventHeader event_header = self.create_event_header(id,exposed_path,user_meta,subset,ingest_mode)
event_header.file_size = len(data)
err = self.c_producer.get().SendData_(event_header, data_pointer_bytes(data), ingest_mode,
err = self.c_producer.get().SendData__(event_header, data_pointer_bytes(data), ingest_mode,
unwrap_callback_with_memory(<RequestCallbackCythonMemory>self.c_callback_bytesaddr,
<void*>self,<void*>callback if callback != None else NULL, <void*>data))
<void*>self,<void*>callback, <void*>data))
if err:
throw_exception(err)
Py_XINCREF(<PyObject*>data)
......@@ -140,8 +153,9 @@ cdef class PyProducer:
:type ingest_mode: int
:param callback: callback function, default None
:type callback: callback(info,err), where info - json string with event header that was used to send data, err - error string or None
:raises: AsapoProducerError
:rtype: string
:raises:
AsapoWrongInputError: wrong input (authorization, meta, ...)
AsapoProducerError: actually should not happen
"""
if type(data) == np.ndarray or data == None:
......@@ -167,8 +181,10 @@ cdef class PyProducer:
:type ingest_mode: int
:param callback: callback function, default None
:type callback: callback(info,err), where info - json string with event header that was used to send data, err - error string or None
:raises: AsapoProducerError
:rtype: string
:raises:
AsapoWrongInputError: wrong input (authorization, meta, ...)
AsapoLocalIOError: problems reading file to send
AsapoProducerError: actually should not happen
"""
cdef EventHeader event_header = self.create_event_header(id,exposed_path,user_meta,subset,ingest_mode)
......@@ -178,6 +194,22 @@ cdef class PyProducer:
if err:
throw_exception(err)
return
def get_requests_queue_size(self):
return self.c_producer.get().GetRequestsQueueSize()
def wait_requests_finished(self,timeout_ms):
"""
:param timeout_ms: timeout in milliseconds
:type timeout_ms: int
:raises:
AsapoTimeoutError: requests not finished for a given timeout
"""
cdef Error err
cdef uint64_t timeout = timeout_ms
with nogil:
err = self.c_producer.get().WaitRequestsFinished(timeout)
if err:
throw_exception(err)
return
cdef void c_callback_python(self,py_callback, GenericRequestHeader header, Error& err):
if py_callback != None:
info_str = _str(header.Json())
......@@ -203,7 +235,10 @@ cdef class PyProducer:
if bytes_array is not None:
Py_XDECREF(<PyObject*>bytes_array)
self.c_callback_python(py_callback,header,err)
def cleanup(self):
with nogil:
if self.c_producer.get() is not NULL:
self.c_producer.get().StopThreads__()
@staticmethod
def __create_producer(endpoint,beamtime_id,stream,token,nthreads):
pyProd = PyProducer()
......@@ -218,6 +253,21 @@ cdef class PyProducer:
return pyProd
def create_producer(endpoint,beamtime_id,stream,token,nthreads):
"""
:param endpoint: server endpoint (url:port)
:type endpoint: string
:param beamtime_id: beamtime id
:type beamtime_id: string
:param stream: stream to producer data to
:type stream: string
:param token: authorization token
:type token: string
:param nthreads: ingest mode flag
:type nthreads: int
:raises:
AsapoWrongInputError: wrong input (number of threads, ,,,)
AsapoProducerError: actually should not happen
"""
return PyProducer.__create_producer(_bytes(endpoint),_bytes(beamtime_id),_bytes(stream),_bytes(token),nthreads)
......
......@@ -4,7 +4,6 @@
#include <memory>
#include <functional>
namespace asapo {
inline std::string GetErrorString(asapo::Error* err) {
......@@ -22,7 +21,7 @@ RequestCallback unwrap_callback(RequestCallbackCython callback, void* c_self, vo
return nullptr;
}
RequestCallback wrapper = [ = ](GenericRequestHeader header, Error err) -> void {
callback(c_self, py_func, header, std::move(err));
callback(c_self, py_func, std::move(header), std::move(err));
};
return wrapper;
}
......@@ -30,7 +29,7 @@ RequestCallback unwrap_callback(RequestCallbackCython callback, void* c_self, vo
RequestCallback unwrap_callback_with_memory(RequestCallbackCythonMemory callback, void* c_self, void* py_func,
void* nd_array) {
RequestCallback wrapper = [ = ](GenericRequestHeader header, Error err) -> void {
callback(c_self, py_func, nd_array, header, std::move(err));
callback(c_self, py_func, nd_array, std::move(header), std::move(err));
};
return wrapper;
}
......
......@@ -36,6 +36,7 @@ echo test > file1
sleep 1
$1 $3 $stream $beamtime_id "127.0.0.1:8400" > out
$1 $3 $stream $beamtime_id "127.0.0.1:8400" > out || cat out
cat out
cat out | grep "successfuly sent" | wc -l | grep 7
\ No newline at end of file
cat out | grep "successfuly sent" | wc -l | grep 7
cat out | grep "local i/o error"
\ No newline at end of file
......@@ -7,7 +7,6 @@ import numpy as np
import threading
lock = threading.Lock()
stream = sys.argv[1]
beamtime = sys.argv[2]
endpoint = sys.argv[3]
......@@ -15,8 +14,6 @@ endpoint = sys.argv[3]
token = ""
nthreads = 8
def callback(header,err):
lock.acquire() # to print
if err is not None:
......@@ -27,12 +24,14 @@ def callback(header,err):
producer = asapo_producer.create_producer(endpoint,beamtime, stream, token, nthreads)
producer.set_log_level("info")
#send single file
producer.send_file(1, local_path = "./file1", exposed_path = stream+"/"+"file1", user_meta = '{"test_key":"test_val"}', callback = callback)
#send single file without callback
producer.send_file(1, local_path = "./file1", exposed_path = stream+"/"+"file1", user_meta = '{"test_key":"test_val"}',callback=None)
#send subsets
producer.send_file(2, local_path = "./file1", exposed_path = stream+"/"+"file2",subset=(2,2),user_meta = '{"test_key":"test_val"}', callback = callback)
producer.send_file(3, local_path = "./file1", exposed_path = stream+"/"+"file3",subset=(2,2),user_meta = '{"test_key":"test_val"}', callback = callback)
......@@ -55,11 +54,20 @@ producer.send_data(5, stream+"/"+"file6",b"hello",
producer.send_data(6, stream+"/"+"file7",None,
ingest_mode = asapo_producer.INGEST_MODE_TRANSFER_METADATA_ONLY, callback = callback)
#send single file/wrong filename
producer.send_file(1, local_path = "./file2", exposed_path = stream+"/"+"file1", callback = callback)
producer.wait_requests_finished(50000)
n = producer.get_requests_queue_size()
if n!=0:
print("number of remaining requestst should be zero, got ",n)
sys.exit(1)
# create with error
try:
producer = asapo_producer.create_producer(endpoint,beamtime, stream, token, 0)
except Exception as e:
except asapo_producer.AsapoWrongInputError as e:
print(e)
else:
print("should be error")
......@@ -67,6 +75,5 @@ else:
time.sleep(25)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment