diff --git a/common/cpp/include/common/networking.h b/common/cpp/include/common/networking.h index d5a5edb5976179df45b8b82e23e77ff00d8737d5..9ce7f962e1dd3eacbc4d9d54daefc22826d89017 100644 --- a/common/cpp/include/common/networking.h +++ b/common/cpp/include/common/networking.h @@ -61,8 +61,17 @@ struct GenericRequestHeader { uint64_t meta_size; CustomRequestData custom_data; char message[kMaxMessageSize]; + std::string Json() { + std::string s = "{\"id\":" + std::to_string(data_id) + "," + "\"buffer\":\"" + std::string(message)+"\"" + + "}"; + return s; + }; + }; + + struct GenericNetworkResponse { Opcode op_code; NetworkErrorCode error_code; diff --git a/producer/api/cpp/include/producer/producer_error.h b/producer/api/cpp/include/producer/producer_error.h index 856af4d5e99502c19a47402d0f32a4be3772c617..3ceded7777a114ace0ae55d101cffc446ee887d6 100644 --- a/producer/api/cpp/include/producer/producer_error.h +++ b/producer/api/cpp/include/producer/producer_error.h @@ -10,6 +10,7 @@ enum class ProducerErrorType { kConnectionNotReady, kFileTooLarge, kFileNameTooLong, + kEmptyFileName, kBeamtimeIdTooLong, kBeamtimeAlreadySet, kFileIdAlreadyInUse, @@ -51,6 +52,11 @@ auto const kFileNameTooLong = ProducerErrorTemplate { "filename too long", ProducerErrorType::kFileNameTooLong }; +auto const kEmptyFileName = ProducerErrorTemplate { + "empty filename", ProducerErrorType::kEmptyFileName +}; + + auto const kCredentialsTooLong = ProducerErrorTemplate { "beamtime id too long", ProducerErrorType::kBeamtimeIdTooLong }; diff --git a/producer/api/cpp/src/producer_impl.cpp b/producer/api/cpp/src/producer_impl.cpp index ba82eb7b1fd34a33fb638ed8bdf68163f0728887..78d5c3fb3fcb40f30cfb4ee9b7d5d8dd3e30e329 100644 --- a/producer/api/cpp/src/producer_impl.cpp +++ b/producer/api/cpp/src/producer_impl.cpp @@ -66,6 +66,10 @@ Error CheckProducerRequest(const EventHeader& event_header, uint64_t injest_mode return ProducerErrorTemplates::kFileNameTooLong.Generate(); } + if (event_header.file_name.empty() ) { + return ProducerErrorTemplates::kEmptyFileName.Generate(); + } + if (event_header.subset_id > 0 && event_header.subset_size == 0) { return ProducerErrorTemplates::kErrorSubsetSize.Generate(); } @@ -101,6 +105,10 @@ Error ProducerImpl::SendData(const EventHeader& event_header, FileData data, Error ProducerImpl::SendFile(const EventHeader& event_header, std::string full_path, uint64_t injest_mode, RequestCallback callback) { + if (full_path.empty()) { + return ProducerErrorTemplates::kEmptyFileName.Generate(); + } + return Send(event_header, nullptr, std::move(full_path), injest_mode, callback); } diff --git a/producer/api/cpp/src/producer_request.cpp b/producer/api/cpp/src/producer_request.cpp index 734e3fbc66e600575f17038026e30ecd796dbda3..6a7857f33ff4ba9f733d54b438206a5e7aa8986c 100644 --- a/producer/api/cpp/src/producer_request.cpp +++ b/producer/api/cpp/src/producer_request.cpp @@ -3,7 +3,7 @@ namespace asapo { Error ProducerRequest::ReadDataFromFileIfNeeded(const IO* io) { - if (data != nullptr || original_filepath.empty()) { + if (data != nullptr || original_filepath.empty() || !NeedSendData()) { return nullptr; } Error err; @@ -24,4 +24,11 @@ ProducerRequest::ProducerRequest(std::string source_credentials, callback{callback} { } +bool ProducerRequest::NeedSendData() const { + if (header.op_code == kOpcodeTransferData || header.op_code == kOpcodeTransferSubsetData) { + return header.custom_data[kPosInjestMode] & IngestModeFlags::kTransferData; + } + return true; +} + } \ No newline at end of file diff --git a/producer/api/cpp/src/producer_request.h b/producer/api/cpp/src/producer_request.h index 28beafab7bcfd7bf6aad6a0253e2968f858427fd..dba048b346fcbc192cd925cfcfe767eaa2cdb2b7 100644 --- a/producer/api/cpp/src/producer_request.h +++ b/producer/api/cpp/src/producer_request.h @@ -21,6 +21,7 @@ class ProducerRequest : public GenericRequest { std::string original_filepath; RequestCallback callback; Error ReadDataFromFileIfNeeded(const IO* io); + bool NeedSendData() const; }; } diff --git a/producer/api/cpp/src/request_handler_tcp.cpp b/producer/api/cpp/src/request_handler_tcp.cpp index 5ad76ffe915bea68691008842b6130f375d73cf6..21c26ef9ab032ecb3ec62fb9592339936ebd4155 100644 --- a/producer/api/cpp/src/request_handler_tcp.cpp +++ b/producer/api/cpp/src/request_handler_tcp.cpp @@ -48,14 +48,6 @@ Error RequestHandlerTcp::ConnectToReceiver(const std::string& beamtime_id, const return nullptr; } -bool NeedSendData(const ProducerRequest* request) { - if (request->header.op_code == kOpcodeTransferData || request->header.op_code == kOpcodeTransferSubsetData) { - return request->header.custom_data[kPosInjestMode] & IngestModeFlags::kTransferData; - } - - return true; -} - Error RequestHandlerTcp::SendRequestContent(const ProducerRequest* request) { Error io_error; io__->Send(sd_, &(request->header), sizeof(request->header), &io_error); @@ -63,7 +55,7 @@ Error RequestHandlerTcp::SendRequestContent(const ProducerRequest* request) { return io_error; } - if (NeedSendData(request)) { + if (request->NeedSendData()) { io__->Send(sd_, (void*) request->data.get(), (size_t)request->header.data_size, &io_error); } @@ -209,7 +201,10 @@ Error RequestHandlerTcp::ProcessRequestUnlocked(GenericRequest* request) { auto producer_request = static_cast<ProducerRequest*>(request); auto err = producer_request->ReadDataFromFileIfNeeded(io__.get()); if (err) { - return err; + if (producer_request->callback) { + producer_request->callback(producer_request->header, std::move(err)); + } + return nullptr; } if (NeedRebalance()) { diff --git a/producer/api/cpp/unittests/test_producer.cpp b/producer/api/cpp/unittests/test_producer.cpp index 11359b693c1c0cbbd24a15abd58367272ad4d410..8e2e836024c71e5f9e89f56626ece6962b970dcd 100644 --- a/producer/api/cpp/unittests/test_producer.cpp +++ b/producer/api/cpp/unittests/test_producer.cpp @@ -53,7 +53,7 @@ TEST(CreateProducer, TooManyThreads) { TEST(CreateProducer, ZeroThreads) { asapo::Error err; std::unique_ptr<asapo::Producer> producer = asapo::Producer::Create("", 0, - asapo::RequestHandlerType::kTcp, SourceCredentials{"bt", "", ""}, &err); + asapo::RequestHandlerType::kTcp, SourceCredentials{"bt", "", ""}, &err); ASSERT_THAT(producer, Eq(nullptr)); ASSERT_THAT(err, Ne(nullptr)); } @@ -65,7 +65,7 @@ TEST(Producer, SimpleWorkflowWihoutConnection) { SourceCredentials{"bt", "", ""}, &err); - asapo::EventHeader event_header{1, 1, ""}; + asapo::EventHeader event_header{1, 1, "test"}; auto err_send = producer->SendData(event_header, nullptr, asapo::kDefaultIngestMode, nullptr); std::this_thread::sleep_for(std::chrono::milliseconds(100)); diff --git a/producer/api/cpp/unittests/test_producer_impl.cpp b/producer/api/cpp/unittests/test_producer_impl.cpp index e3f071ff696111b152ffdacd6527eb3e75415109..90a73651394a26f16cd45dbb9005401196ff1099 100644 --- a/producer/api/cpp/unittests/test_producer_impl.cpp +++ b/producer/api/cpp/unittests/test_producer_impl.cpp @@ -90,7 +90,7 @@ class ProducerImplTests : public testing::Test { TEST_F(ProducerImplTests, SendReturnsError) { EXPECT_CALL(mock_pull, AddRequest_t(_)).WillOnce(Return( asapo::ProducerErrorTemplates::kRequestPoolIsFull.Generate().release())); - asapo::EventHeader event_header{1, 1, ""}; + asapo::EventHeader event_header{1, 1, "test"}; auto err = producer.SendData(event_header, nullptr, asapo::kDefaultIngestMode, nullptr); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kRequestPoolIsFull)); } @@ -102,6 +102,14 @@ TEST_F(ProducerImplTests, ErrorIfFileNameTooLong) { ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kFileNameTooLong)); } +TEST_F(ProducerImplTests, ErrorIfFileEmpty) { + std::string long_string(asapo::kMaxMessageSize + 100, 'a'); + asapo::EventHeader event_header{1, 1, ""}; + auto err = producer.SendData(event_header, nullptr, asapo::kDefaultIngestMode, nullptr); + ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kEmptyFileName)); +} + + TEST_F(ProducerImplTests, ErrorIfSizeTooLarge) { EXPECT_CALL(mock_logger, Error(testing::HasSubstr("error checking"))); asapo::EventHeader event_header{1, asapo::ProducerImpl::kMaxChunkSize + 1, ""}; @@ -111,7 +119,7 @@ TEST_F(ProducerImplTests, ErrorIfSizeTooLarge) { TEST_F(ProducerImplTests, ErrorIfSubsetSizeNotDefined) { EXPECT_CALL(mock_logger, Error(testing::HasSubstr("subset size"))); - asapo::EventHeader event_header{1, asapo::ProducerImpl::kMaxChunkSize, "", "", 1}; + asapo::EventHeader event_header{1, asapo::ProducerImpl::kMaxChunkSize, "test", "", 1}; auto err = producer.SendData(event_header, nullptr, asapo::kDefaultIngestMode, nullptr); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kErrorSubsetSize)); } @@ -196,6 +204,32 @@ TEST_F(ProducerImplTests, OKAddingSendMetaDataRequest) { ASSERT_THAT(err, Eq(nullptr)); } + +TEST_F(ProducerImplTests, ErrorSendingEmptyFileName) { + producer.SetCredentials(expected_credentials); + + EXPECT_CALL(mock_pull, AddRequest_t(_)).Times(0); + + asapo::EventHeader event_header{expected_id, 0, expected_name}; + auto err = producer.SendFile(event_header, "", expected_injest_mode, nullptr); + + ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kEmptyFileName)); + +} + +TEST_F(ProducerImplTests, ErrorSendingEmptyRelativeFileName) { + producer.SetCredentials(expected_credentials); + + EXPECT_CALL(mock_pull, AddRequest_t(_)).Times(0); + + asapo::EventHeader event_header{expected_id, 0, ""}; + auto err = producer.SendFile(event_header, expected_fullpath, expected_injest_mode, nullptr); + + ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kEmptyFileName)); + +} + + TEST_F(ProducerImplTests, OKSendingSendFileRequest) { producer.SetCredentials(expected_credentials); diff --git a/producer/api/cpp/unittests/test_request_handler_filesystem.cpp b/producer/api/cpp/unittests/test_request_handler_filesystem.cpp index bce8b8a9255d34e81caf7ea196cee42ea5e95cc6..603bbf49e07e1b5b9de5ec269cf473c5bda66c21 100644 --- a/producer/api/cpp/unittests/test_request_handler_filesystem.cpp +++ b/producer/api/cpp/unittests/test_request_handler_filesystem.cpp @@ -71,6 +71,10 @@ class RequestHandlerFilesystemTests : public testing::Test { asapo::RequestHandlerFilesystem request_handler{expected_destination, expected_thread_id}; void SetUp() override { + request.header.custom_data[asapo::kPosInjestMode] = asapo::kDefaultIngestMode; + request_filesend.header.custom_data[asapo::kPosInjestMode] = asapo::kDefaultIngestMode; + request_nocallback.header.custom_data[asapo::kPosInjestMode] = asapo::kDefaultIngestMode; + request_handler.log__ = &mock_logger; request_handler.io__.reset(&mock_io); } diff --git a/producer/api/cpp/unittests/test_request_handler_tcp.cpp b/producer/api/cpp/unittests/test_request_handler_tcp.cpp index 70cf1706954d123af94654137071e4368abb056c..0c9caf4bb6f30a3970c82fda3b256e8e384bca6c 100644 --- a/producer/api/cpp/unittests/test_request_handler_tcp.cpp +++ b/producer/api/cpp/unittests/test_request_handler_tcp.cpp @@ -15,6 +15,8 @@ #include "mocking.h" +#include <functional> + namespace { using ::testing::Return; @@ -62,16 +64,23 @@ class RequestHandlerTcpTests : public testing::Test { asapo::Error callback_err; asapo::GenericRequestHeader header{expected_op_code, expected_file_id, expected_file_size, expected_meta_size, expected_file_name}; - bool called = false; + bool callback_called = false; asapo::GenericRequestHeader callback_header; + + asapo::ProducerRequest request{expected_beamtime_id, header, nullptr, expected_metadata, "", [this](asapo::GenericRequestHeader header, asapo::Error err) { - called = true; + callback_called = true; callback_err = std::move(err); callback_header = header; }}; std::string expected_origin_fullpath = std::string("origin/") + expected_file_name; - asapo::ProducerRequest request_filesend{expected_beamtime_id, header, nullptr, expected_metadata, expected_origin_fullpath, nullptr}; + asapo::ProducerRequest request_filesend{expected_beamtime_id, header, nullptr, expected_metadata, + expected_origin_fullpath, [this](asapo::GenericRequestHeader header, asapo::Error err) { + callback_called = true; + callback_err = std::move(err); + callback_header = header; + }}; asapo::ProducerRequest request_nocallback{expected_beamtime_id, header, nullptr, expected_metadata, "", nullptr}; @@ -626,7 +635,7 @@ void RequestHandlerTcpTests::AssertImmediatelyCallBack(asapo::NetworkErrorCode e request_handler.PrepareProcessingRequestLocked(); auto err = request_handler.ProcessRequestUnlocked(&request); ASSERT_THAT(callback_err, Eq(err_template)); - ASSERT_THAT(called, Eq(true)); + ASSERT_THAT(callback_called, Eq(true)); ASSERT_THAT(err, Eq(nullptr)); } @@ -650,7 +659,7 @@ TEST_F(RequestHandlerTcpTests, SendEmptyCallBack) { auto err = request_handler.ProcessRequestUnlocked(&request_nocallback); ASSERT_THAT(err, Eq(nullptr)); - ASSERT_THAT(called, Eq(false)); + ASSERT_THAT(callback_called, Eq(false)); } TEST_F(RequestHandlerTcpTests, FileRequestErrorOnReadData) { @@ -665,7 +674,10 @@ TEST_F(RequestHandlerTcpTests, FileRequestErrorOnReadData) { )); auto err = request_handler.ProcessRequestUnlocked(&request_filesend); - ASSERT_THAT(err, Eq(asapo::IOErrorTemplates::kUnknownIOError)); + ASSERT_THAT(callback_err, Eq(asapo::IOErrorTemplates::kUnknownIOError)); + ASSERT_THAT(callback_called, Eq(true)); + ASSERT_THAT(err, Eq(nullptr)); + } @@ -689,6 +701,8 @@ TEST_F(RequestHandlerTcpTests, FileRequestOK) { ASSERT_THAT(err, Eq(nullptr)); } + + TEST_F(RequestHandlerTcpTests, SendOK) { ExpectOKConnect(true); ExpectOKAuthorize(true); @@ -700,7 +714,7 @@ TEST_F(RequestHandlerTcpTests, SendOK) { ASSERT_THAT(err, Eq(nullptr)); ASSERT_THAT(callback_err, Eq(nullptr)); - ASSERT_THAT(called, Eq(true)); + ASSERT_THAT(callback_called, Eq(true)); ASSERT_THAT(callback_header.data_size, Eq(header.data_size)); ASSERT_THAT(callback_header.op_code, Eq(header.op_code)); ASSERT_THAT(callback_header.data_id, Eq(header.data_id)); @@ -743,5 +757,24 @@ TEST_F(RequestHandlerTcpTests, SendMetaOnlyOK) { ASSERT_THAT(callback_header.custom_data[asapo::kPosInjestMode], Eq(injest_mode)); } +TEST_F(RequestHandlerTcpTests, SendMetaOnlyForFileReadOK) { + ExpectOKConnect(true); + ExpectOKAuthorize(true); + ExpectOKSendHeader(true); + ExpectOKSendMetaData(true); + ExpectOKReceive(); + + request_handler.PrepareProcessingRequestLocked(); + + EXPECT_CALL(mock_io, GetDataFromFile_t(_,_,_)).Times(0); + + auto injest_mode = asapo::IngestModeFlags::kTransferMetaDataOnly; + + request_filesend.header.custom_data[asapo::kPosInjestMode] = injest_mode; + auto err = request_handler.ProcessRequestUnlocked(&request_filesend); + ASSERT_THAT(err, Eq(nullptr)); +} + + } diff --git a/producer/api/python/CMakeLists_Linux.cmake b/producer/api/python/CMakeLists_Linux.cmake index f01baa6ecffa9cc89f7a0c9e2794b0095d9830b9..95f3fb3a03b5497d63b3e2df11553c9dd7f1f68c 100644 --- a/producer/api/python/CMakeLists_Linux.cmake +++ b/producer/api/python/CMakeLists_Linux.cmake @@ -16,10 +16,10 @@ set (ASAPO_PRODUCER_INCLUDE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/../cpp/include) configure_files(${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_CURRENT_BINARY_DIR} @ONLY) ADD_CUSTOM_TARGET(python-lib2-producer ALL - COMMAND python setup.py build_ext --inplace) + COMMAND python setup.py build_ext --inplace --force ) ADD_CUSTOM_TARGET(python-lib3-producer ALL - COMMAND python3 setup.py build_ext --inplace) + COMMAND python3 setup.py build_ext --inplace --force) ADD_DEPENDENCIES(python-lib2-producer asapo-producer) ADD_DEPENDENCIES(python-lib3-producer asapo-producer) diff --git a/producer/api/python/asapo_producer.pxd b/producer/api/python/asapo_producer.pxd index 3dcbdd19dbb2c239653db22ae61d2fd052a7fd22..abc6b065c977dea5585eba1cbc217180461ab4bc 100644 --- a/producer/api/python/asapo_producer.pxd +++ b/producer/api/python/asapo_producer.pxd @@ -26,6 +26,16 @@ cdef extern from "asapo_producer.h" namespace "asapo": RequestHandlerType RequestHandlerType_Tcp "asapo::RequestHandlerType::kTcp" +cdef extern from "asapo_producer.h" namespace "asapo": + cppclass LogLevel: + pass + LogLevel LogLevel_None "asapo::LogLevel::None" + LogLevel LogLevel_Error "asapo::LogLevel::Error" + LogLevel LogLevel_Info "asapo::LogLevel::Info" + LogLevel LogLevel_Debug "asapo::LogLevel::Debug" + LogLevel LogLevel_Warning "asapo::LogLevel::Warning" + + cdef extern from "asapo_producer.h" namespace "asapo": struct SourceCredentials: string beamtime_id @@ -52,7 +62,7 @@ cdef extern from "asapo_producer.h" namespace "asapo": cdef extern from "asapo_producer.h" namespace "asapo": struct GenericRequestHeader: - pass + string Json() cdef extern from "asapo_producer.h" namespace "asapo": cppclass RequestCallback: @@ -60,10 +70,9 @@ cdef extern from "asapo_producer.h" namespace "asapo": cdef extern from "asapo_wrappers.h" namespace "asapo": - cdef cppclass function_wrapper: - ctypedef void (*cy_callback) (void*, GenericRequestHeader, Error) - @staticmethod - RequestCallback make_std_function(cy_callback, void*) + cppclass RequestCallbackCython: + pass + RequestCallback unwrap_callback(RequestCallbackCython, void*,void*) cdef extern from "asapo_producer.h" namespace "asapo": @@ -71,4 +80,12 @@ cdef extern from "asapo_producer.h" namespace "asapo": @staticmethod unique_ptr[Producer] Create(string endpoint,uint8_t nthreads,RequestHandlerType type, SourceCredentials source,Error* error) Error SendFile(const EventHeader& event_header, string full_path, uint64_t injest_mode,RequestCallback callback) + void SetLogLevel(LogLevel level) + +cdef extern from "asapo_producer.h" namespace "asapo": + uint64_t kDefaultIngestMode + enum IngestModeFlags: + kTransferData + kTransferMetaDataOnly + kStoreInFilesystem diff --git a/producer/api/python/asapo_producer.pyx.in b/producer/api/python/asapo_producer.pyx.in index 4ef65b11312c3f330fec63d05d5717443392cc53..d5d14af015287174d305d15cbdafd1860a50cd8c 100644 --- a/producer/api/python/asapo_producer.pyx.in +++ b/producer/api/python/asapo_producer.pyx.in @@ -9,6 +9,12 @@ from libcpp.memory cimport unique_ptr np.import_array() +DEFAULT_INJEST_MODE = kDefaultIngestMode +INJEST_MODE_TRANSFER_DATA = kTransferData +INJEST_MODE_TRANSFER_METADATA_ONLY = kTransferMetaDataOnly +INJEST_MODE_STORE_IN_FILESYSTEM = kStoreInFilesystem + + cdef extern from "numpy/ndarraytypes.h": void PyArray_ENABLEFLAGS(np.ndarray arr, int flags) @@ -27,26 +33,53 @@ cdef bytes _bytes(s): else: raise TypeError("Could not convert to unicode.") -ctypedef void (*cb_type)(void*, GenericRequestHeader, Error) - - cdef class PyProducer: cdef unique_ptr[Producer] c_producer - cdef object py_callback - def send_file(self,string fname): + def set_log_level(self,level): + cdef LogLevel log_level + log_level = LogLevel_Info + if level == "debug" : + log_level = LogLevel_Debug + elif level == "info" : + log_level = LogLevel_Info + elif level == "none" : + log_level = LogLevel_None + elif level == "warn" : + log_level = LogLevel_Warning + else: + print("wrong loglevel mode: "+level) + return + self.c_producer.get().SetLogLevel(log_level) + def send_file(self,int id,string local_path,string exposed_path,user_meta=None,subset=None,injest_mode = DEFAULT_INJEST_MODE,callback=None): cdef EventHeader event_header - err = self.c_producer.get().SendFile(event_header, _bytes(fname), 1, - function_wrapper.make_std_function(<cb_type>self.c_callback, <void*>self)) + event_header.file_id = id + event_header.file_size = 0 + event_header.file_name = exposed_path + event_header.user_metadata = user_meta if user_meta!=None else "" + if subset == None: + event_header.subset_id = 0 + event_header.subset_size = 0 + else: + event_header.subset_id = subset[0] + event_header.subset_size = subset[1] + err = self.c_producer.get().SendFile(event_header, _bytes(local_path), injest_mode, + unwrap_callback(<RequestCallbackCython>self.c_callback, <void*>self,<void*>callback if callback != None else NULL)) cdef err_str = GetErrorString(&err) if err_str.strip(): return err_str else: - return "ok" - cdef void c_callback(self,GenericRequestHeader header, Error err) with gil: - self.py_callback(1,2) - return + return None + cdef void c_callback(self,py_callback,GenericRequestHeader header, Error err) with gil: + info_str = _str(header.Json()) + info = json.loads(info_str) + cdef err_str = GetErrorString(&err) + if err_str.strip(): + py_err = err_str + else: + py_err = None + py_callback(info,py_err) @staticmethod - def create_producer(endpoint,beamtime_id,stream,token,nthreads,py_callback): + def create_producer(endpoint,beamtime_id,stream,token,nthreads): pyProd = PyProducer() cdef Error err cdef SourceCredentials source @@ -54,15 +87,14 @@ cdef class PyProducer: source.user_token = token source.stream = stream pyProd.c_producer = Producer.Create(endpoint,nthreads,RequestHandlerType_Tcp,source,&err) - pyProd.py_callback = py_callback cdef err_str = GetErrorString(&err) if err_str.strip(): return None,err_str else: return pyProd,None -def create_producer(endpoint,beamtime_id,stream,token,nthreads,py_callback): - return PyProducer.create_producer(_bytes(endpoint),_bytes(beamtime_id),_bytes(stream),_bytes(token),nthreads,py_callback) +def create_producer(endpoint,beamtime_id,stream,token,nthreads): + return PyProducer.create_producer(_bytes(endpoint),_bytes(beamtime_id),_bytes(stream),_bytes(token),nthreads) __version__ = "@ASAPO_VERSION_PYTHON@" diff --git a/producer/api/python/asapo_wrappers.h b/producer/api/python/asapo_wrappers.h index 034e0fc875ae62f7018e800960ed576b4fe09695..ffaa4b1844274bb2e1f77f21be93149131c6b84c 100644 --- a/producer/api/python/asapo_wrappers.h +++ b/producer/api/python/asapo_wrappers.h @@ -14,20 +14,17 @@ inline std::string GetErrorString(asapo::Error* err) { return ""; } -using cy_callback = void (*)(void*, GenericRequestHeader header, Error err); - -class function_wrapper { - public: - static - RequestCallback make_std_function(cy_callback callback, void* c_self) - { - RequestCallback wrapper = [=](GenericRequestHeader header, Error err) -> void - { - callback(c_self, header, std::move(err)); - }; - return wrapper; +using RequestCallbackCython = void (*)(void*, void*, GenericRequestHeader header, Error err); + +RequestCallback unwrap_callback(RequestCallbackCython callback, void* c_self, void* py_func) { + if (py_func == NULL) { + return nullptr; } -}; + RequestCallback wrapper = [ = ](GenericRequestHeader header, Error err) -> void { + callback(c_self, py_func, header, std::move(err)); + }; + return wrapper; +} } diff --git a/tests/manual/python_tests/producer/authorizer.json.tpl b/tests/manual/python_tests/producer/authorizer.json.tpl new file mode 100644 index 0000000000000000000000000000000000000000..7c3a796d20b0bdb556fc1d88f82e13922b5d140d --- /dev/null +++ b/tests/manual/python_tests/producer/authorizer.json.tpl @@ -0,0 +1,10 @@ +{ + "Port": {{ env "NOMAD_PORT_authorizer" }}, + "LogLevel":"debug", + "AlwaysAllowedBeamtimes":[{"BeamtimeId":"asapo_test","Beamline":"test"}, + {"BeamtimeId":"asapo_test1","Beamline":"test1"}, + {"BeamtimeId":"asapo_test2","Beamline":"test2"}], + "SecretFile":"auth_secret.key" +} + + diff --git a/tests/manual/python_tests/producer/authorizer.nmd b/tests/manual/python_tests/producer/authorizer.nmd new file mode 100644 index 0000000000000000000000000000000000000000..8b32105cf2e4644e97b6fda5e4aba06c6c269822 --- /dev/null +++ b/tests/manual/python_tests/producer/authorizer.nmd @@ -0,0 +1,55 @@ +job "authorizer" { + datacenters = ["dc1"] + + type = "service" + + group "group" { + count = 1 + + task "authorizer" { + driver = "raw_exec" + + config { + command = "/home/yakubov/projects/asapo/cmake-build-debug/authorizer/asapo-authorizer", + args = ["-config","${NOMAD_TASK_DIR}/authorizer.json"] + } + + resources { + cpu = 500 # 500 MHz + memory = 256 # 256MB + network { + port "authorizer" { + static = "5007" + } + } + } + + service { + name = "authorizer" + port = "authorizer" + check { + name = "alive" + type = "http" + path = "/health-check" + interval = "10s" + timeout = "2s" + initial_status = "passing" + } + } + + template { + source = "/home/yakubov/projects/asapo/cmake-build-debug/tests/automatic/full_chain/simple_chain/authorizer.json.tpl" + destination = "local/authorizer.json" + change_mode = "signal" + change_signal = "SIGHUP" + } + + template { + source = "/home/yakubov/projects/asapo/cmake-build-debug/tests/automatic/full_chain/simple_chain/auth_secret.key" + destination = "auth_secret.key" + change_mode = "signal" + change_signal = "SIGHUP" + } + } + } +} diff --git a/tests/manual/python_tests/producer/clean_db.sh b/tests/manual/python_tests/producer/clean_db.sh new file mode 100755 index 0000000000000000000000000000000000000000..326ee510db2ef352129f53a9be80f846a463c988 --- /dev/null +++ b/tests/manual/python_tests/producer/clean_db.sh @@ -0,0 +1,6 @@ +#!/usr/bin/env bash + +beamtime_id=asapo_test1 + + +echo "db.dropDatabase()" | mongo ${beamtime_id}_detector diff --git a/tests/manual/python_tests/producer/discovery.json.tpl b/tests/manual/python_tests/producer/discovery.json.tpl new file mode 100644 index 0000000000000000000000000000000000000000..25cba67824339d7c81a9d39b4a89d8356d4ea9bf --- /dev/null +++ b/tests/manual/python_tests/producer/discovery.json.tpl @@ -0,0 +1,10 @@ +{ + "Mode": "consul", + "Receiver": { + "MaxConnections": 32 + }, + "Port": {{ env "NOMAD_PORT_discovery" }}, + "LogLevel":"debug" +} + + diff --git a/tests/manual/python_tests/producer/discovery.nmd b/tests/manual/python_tests/producer/discovery.nmd new file mode 100644 index 0000000000000000000000000000000000000000..f95980bae862ec43bf59383d85b307bf36dc3bcf --- /dev/null +++ b/tests/manual/python_tests/producer/discovery.nmd @@ -0,0 +1,49 @@ +job "discovery" { + datacenters = ["dc1"] + + type = "service" + + group "group" { + count = 1 + + task "discovery" { + driver = "raw_exec" + + config { + command = "/home/yakubov/projects/asapo/cmake-build-debug/discovery/asapo-discovery", + args = ["-config","${NOMAD_TASK_DIR}/discovery.json"] + } + + resources { + cpu = 500 # 500 MHz + memory = 256 # 256MB + network { + port "discovery" { + static = "5006" + } + } + } + + service { + name = "discovery" + port = "discovery" + check { + name = "alive" + type = "http" + path = "/receivers" + interval = "10s" + timeout = "2s" + initial_status = "passing" + } + } + + template { + source = "/home/yakubov/projects/asapo/tests/manual/python_tests/producer/discovery.json.tpl" + destination = "local/discovery.json" + change_mode = "signal" + change_signal = "SIGHUP" + } + + } + } +} diff --git a/tests/manual/python_tests/producer/file1 b/tests/manual/python_tests/producer/file1 new file mode 100644 index 0000000000000000000000000000000000000000..9daeafb9864cf43055ae93beb0afd6c7d144bfa4 --- /dev/null +++ b/tests/manual/python_tests/producer/file1 @@ -0,0 +1 @@ +test diff --git a/tests/manual/python_tests/producer/nginx.conf.tpl b/tests/manual/python_tests/producer/nginx.conf.tpl new file mode 100644 index 0000000000000000000000000000000000000000..a545307b376e004d2a0d6e5cbad696996b6a4136 --- /dev/null +++ b/tests/manual/python_tests/producer/nginx.conf.tpl @@ -0,0 +1,42 @@ +worker_processes 1; +daemon off; + +events { + worker_connections 1024; +} + +http { +# include mime.types; +# default_type application/octet-stream; + +# sendfile on; +# tcp_nopush on; + +# keepalive_timeout 0; +# keepalive_timeout 65; + + resolver 127.0.0.1:8600 valid=1s; + server { + listen {{ env "NOMAD_PORT_nginx" }}; + set $discovery_endpoint discovery.service.asapo; + set $authorizer_endpoint authorizer.service.asapo; + # set $fluentd_endpoint localhost; + location /discovery/ { + rewrite ^/discovery(/.*) $1 break; + proxy_pass http://$discovery_endpoint:5006$uri; + } + location /logs/ { + # rewrite ^/logs(/.*) $1 break; + proxy_pass http://localhost:9880/asapo; + } + location /authorizer/ { + rewrite ^/authorizer(/.*) $1 break; + proxy_pass http://$authorizer_endpoint:5007$uri; + } + + + location /nginx-health { + return 200 "healthy\n"; + } + } +} diff --git a/tests/manual/python_tests/producer/nginx.nmd b/tests/manual/python_tests/producer/nginx.nmd new file mode 100644 index 0000000000000000000000000000000000000000..b424e53874d17c7b0612106225f0250fa274fac4 --- /dev/null +++ b/tests/manual/python_tests/producer/nginx.nmd @@ -0,0 +1,63 @@ +job "nginx" { + datacenters = ["dc1"] + + type = "service" + + update { + max_parallel = 1 + min_healthy_time = "10s" + healthy_deadline = "3m" + auto_revert = false + } + + group "nginx" { + count = 1 + + restart { + attempts = 2 + interval = "30m" + delay = "15s" + mode = "fail" + } + + task "nginx" { + driver = "raw_exec" + + config { + command = "nginx", + args = ["-c","${NOMAD_TASK_DIR}/nginx.conf"] + } + + resources { + cpu = 500 # 500 MHz + memory = 256 # 256MB + network { + mbits = 10 + port "nginx" { + static = 8400 + } + } + } + + service { + port = "nginx" + name = "nginx" + check { + name = "alive" + type = "http" + path = "/nginx-health" + timeout = "2s" + interval = "10s" + } + } + + template { + source = "/home/yakubov/projects/asapo/cmake-build-debug/tests/automatic/full_chain/simple_chain/nginx.conf.tpl" + destination = "local/nginx.conf" + change_mode = "restart" + } + + + } + } +} diff --git a/tests/manual/python_tests/producer/receiver.json.tpl b/tests/manual/python_tests/producer/receiver.json.tpl new file mode 100644 index 0000000000000000000000000000000000000000..b361ba8510e66deffe308481b949ed36bce309e3 --- /dev/null +++ b/tests/manual/python_tests/producer/receiver.json.tpl @@ -0,0 +1,22 @@ +{ + "MonitorDbAddress":"localhost:8086", + "MonitorDbName": "db_test", + "BrokerDbAddress":"localhost:27017", + "DataServer": { + "NThreads": 2, + "ListenPort": {{ env "NOMAD_PORT_recv_ds" }} + }, + "DataCache": { + "Use": true, + "SizeGB": 1, + "ReservedShare": 10 + }, + "AuthorizationServer": "localhost:8400/authorizer", + "AuthorizationInterval": 10000, + "ListenPort": {{ env "NOMAD_PORT_recv" }}, + "Tag": "{{ env "NOMAD_ADDR_recv" }}", + "WriteToDisk": true, + "WriteToDb": true, + "LogLevel" : "debug", + "RootFolder" : "/tmp/asapo/receiver/files" +} diff --git a/tests/manual/python_tests/producer/receiver.nmd b/tests/manual/python_tests/producer/receiver.nmd new file mode 100644 index 0000000000000000000000000000000000000000..75fbca5749a82d13f1d3d44ad90b0afb2cec68e0 --- /dev/null +++ b/tests/manual/python_tests/producer/receiver.nmd @@ -0,0 +1,47 @@ +job "receiver" { + datacenters = ["dc1"] + + type = "service" + + group "group" { + count = 1 + + task "receiver" { + driver = "raw_exec" + + config { + command = "/home/yakubov/projects/asapo/cmake-build-debug/receiver/receiver", + args = ["${NOMAD_TASK_DIR}/receiver.json"] + } + + resources { + cpu = 500 # 500 MHz + memory = 256 # 256MB + network { + port "recv" {} + port "recv_ds" {} + } + } + + service { + name = "asapo-receiver" + port = "recv" + check { + name = "alive" + type = "tcp" + interval = "10000s" + timeout = "2s" + initial_status = "passing" + } + } + + template { + source = "/home/yakubov/projects/asapo/cmake-build-debug/tests/automatic/full_chain/simple_chain/receiver.json.tpl" + destination = "local/receiver.json" + change_mode = "signal" + change_signal = "SIGHUP" + } + + } + } +} diff --git a/tests/manual/python_tests/producer/run.sh b/tests/manual/python_tests/producer/run.sh index 52c6aec889735e9d99d2377834ac5767ac08c5a2..62b5fec56935b8783c952a3d84d667b82dd27446 100755 --- a/tests/manual/python_tests/producer/run.sh +++ b/tests/manual/python_tests/producer/run.sh @@ -1,4 +1,6 @@ #!/usr/bin/env bash export PYTHONPATH=/home/yakubov/projects/asapo/cmake-build-debug/producer/api/python:${PYTHONPATH} +mkdir -p /tmp/asapo/receiver/files/test1/asapo_test1 + python test.py diff --git a/tests/manual/python_tests/producer/start_services.sh b/tests/manual/python_tests/producer/start_services.sh new file mode 100755 index 0000000000000000000000000000000000000000..bd0128b53c38ed3181131fdde33bc90ba02c3683 --- /dev/null +++ b/tests/manual/python_tests/producer/start_services.sh @@ -0,0 +1,6 @@ +#!/usr/bin/env bash + +nomad run authorizer.nmd +nomad run discovery.nmd +nomad run nginx.nmd +nomad run receiver.nmd diff --git a/tests/manual/python_tests/producer/stop_services.sh b/tests/manual/python_tests/producer/stop_services.sh new file mode 100755 index 0000000000000000000000000000000000000000..13c664ca5189da0eb0de44907f1b8d5bc0ff2327 --- /dev/null +++ b/tests/manual/python_tests/producer/stop_services.sh @@ -0,0 +1,6 @@ +#!/usr/bin/env bash + +nomad stop authorizer +nomad stop discovery +nomad stop nginx +nomad stop receiver diff --git a/tests/manual/python_tests/producer/test.py b/tests/manual/python_tests/producer/test.py index 9e93ff9768db5bef591f48224bc295aee5f193c4..0f9ed285ed7a7ddef7de7ffb2fc7a6094218ddbb 100644 --- a/tests/manual/python_tests/producer/test.py +++ b/tests/manual/python_tests/producer/test.py @@ -2,28 +2,49 @@ from __future__ import print_function import asapo_producer import sys -import json import time -endpoint = "psana002:8400" +#import threading +#lock = threading.Lock() + + +endpoint = "127.0.0.1:8400" beamtime = "asapo_test1" -stream = "stream" +stream = "" token = "" -nthreads = 1 +nthreads = 8 -def callback(i,j): - print(i,j) +def callback(header,err): +# lock.acquire() # just example, don't do this if not needed + if err is not None: + print("could not sent: ",header,err) + else: + print ("successfuly sent: ",header) +# lock.release() +producer, err = asapo_producer.create_producer(endpoint,beamtime, stream, token, nthreads) +if err is not None: + print(err) + sys.exit(1) -producer, err = asapo_producer.create_producer(endpoint,beamtime, stream, token, nthreads,callback) +producer.set_log_level("info") +#send single file +err = producer.send_file(1, local_path = "./file1", exposed_path = "file1", user_meta = '{"test_key":"test_val"}', callback = callback) if err is not None: print(err) - sys.exit(1) -else: - print(producer) -producer.send_file("") +#send subsets +#producer.send_file(1, local_path = "./file1", exposed_path = "file1"",subset=(1,2),user_meta = '{"test_key":"test_val"}', callback = callback) +#producer.send_file(1, local_path = "./file1", exposed_path = "file1",subset=(1,2),user_meta = '{"test_key":"test_val"}', callback = callback) + +#send meta only +err = producer.send_file(2, local_path = "./file2",exposed_path = "./file2", + injest_mode = asapo_producer.INJEST_MODE_TRANSFER_METADATA_ONLY, callback = callback) +if err is not None: + print(err) + +time.sleep(1)