diff --git a/common/cpp/include/common/data_structs.h b/common/cpp/include/common/data_structs.h index 1d32acf4881a6b18ca118e8613ad0dd767ca091a..0361f1043107fed8d26ae6556e57d0b1427f74ce 100644 --- a/common/cpp/include/common/data_structs.h +++ b/common/cpp/include/common/data_structs.h @@ -59,11 +59,10 @@ struct SourceCredentials { enum IngestModeFlags : uint64_t { kTransferData = 1 << 0, kTransferMetaDataOnly = 1 << 1, - kStoreInCache = 1 << 2, - kStoreInFilesystem = 1 << 3, + kStoreInFilesystem = 1 << 2, }; -const uint64_t kDefaultIngestMode = kTransferData | kStoreInCache | kStoreInFilesystem; +const uint64_t kDefaultIngestMode = kTransferData | kStoreInFilesystem; diff --git a/common/cpp/include/common/networking.h b/common/cpp/include/common/networking.h index d9760b2409493939046b6a94bd47e270ea811dee..2ea7c7c520501ededc8dd4a23226aa221c340ec9 100644 --- a/common/cpp/include/common/networking.h +++ b/common/cpp/include/common/networking.h @@ -36,7 +36,9 @@ enum NetworkErrorCode : uint16_t { const std::size_t kMaxMessageSize = 1024; const std::size_t kNCustomParams = 3; using CustomRequestData = uint64_t[kNCustomParams]; - +const std::size_t kPosInjestMode = 0; +const std::size_t kPosDataSetId = 1; +const std::size_t kPosDataSetSize = 2; struct GenericRequestHeader { GenericRequestHeader(Opcode i_op_code = kOpcodeUnknownOp, uint64_t i_data_id = 0, diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index ae70ed32ef8c56c1562e5103f200dfc441234863..beb4a38d87dce9beea4b529653a1452df56b52c0 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -1,4 +1,5 @@ add_subdirectory(worker) +add_subdirectory(pipeline) add_subdirectory(producer) diff --git a/examples/pipeline/CMakeLists.txt b/examples/pipeline/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..2a8c0e70a9fbe718ded70c0ce13c667d1ed840cf --- /dev/null +++ b/examples/pipeline/CMakeLists.txt @@ -0,0 +1,6 @@ + +add_subdirectory(in_to_out) + +#if (UNIX OR CMAKE_BUILD_TYPE STREQUAL "Release") +# add_subdirectory(getnext_broker_python) +#endif() diff --git a/examples/pipeline/in_to_out/CMakeLists.txt b/examples/pipeline/in_to_out/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..a7bfa9e71110407b87868d1c4fa6b9ef00737001 --- /dev/null +++ b/examples/pipeline/in_to_out/CMakeLists.txt @@ -0,0 +1,15 @@ +set(TARGET_NAME pipeline_inout) +set(SOURCE_FILES in_to_out.cpp) + +add_executable(${TARGET_NAME} ${SOURCE_FILES}) +target_link_libraries(${TARGET_NAME} asapo-worker producer-api) + +#use expression generator to get rid of VS adding Debug/Release folders +set_target_properties(${TARGET_NAME} PROPERTIES RUNTIME_OUTPUT_DIRECTORY + ${CMAKE_CURRENT_BINARY_DIR}$<$<CONFIG:Debug>:> + ) + +prepare_asapo() + +add_script_test("${TARGET_NAME}" "${CMAKE_CURRENT_BINARY_DIR}/${TARGET_NAME}" nomem) + diff --git a/examples/pipeline/in_to_out/CMakeLists_separate.in b/examples/pipeline/in_to_out/CMakeLists_separate.in new file mode 100644 index 0000000000000000000000000000000000000000..5bcf1c56fc63ba39f0968b4d6f2e4e67279927b6 --- /dev/null +++ b/examples/pipeline/in_to_out/CMakeLists_separate.in @@ -0,0 +1,34 @@ +cmake_minimum_required(VERSION 2.8) + +project(@TARGET_NAME@) + +set(CMAKE_CXX_STANDARD 11) + +IF(CMAKE_C_COMPILER_ID STREQUAL "GNU") + SET( CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -static-libgcc -static-libstdc++") +ENDIF() + +find_package (Threads) + +if (NOT "$ENV{LIBCURL_DIR}" STREQUAL "") + if (NOT LIBCURL_DIR) + set(LIBCURL_DIR $ENV{LIBCURL_DIR}) + endif() +endif () + + +set (CMAKE_PREFIX_PATH "${LIBCURL_DIR}") +find_package (CURL REQUIRED) +message (STATUS "Found cURL libraries: ${CURL_LIBRARIES}") +message (STATUS "cURL include: ${CURL_INCLUDE_DIRS}") + + +set(TARGET_NAME ${CMAKE_PROJECT_NAME}) + +set(SOURCE_FILES @SOURCE_FILES@) + +link_directories(@CMAKE_INSTALL_PREFIX@/lib) + +add_executable(${TARGET_NAME} ${SOURCE_FILES}) +target_include_directories(${TARGET_NAME} PUBLIC @CMAKE_INSTALL_PREFIX@/include ${CURL_INCLUDE_DIRS}) +target_link_libraries(${TARGET_NAME} asapo-worker ${CURL_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT}) diff --git a/examples/pipeline/in_to_out/Makefile.in b/examples/pipeline/in_to_out/Makefile.in new file mode 100644 index 0000000000000000000000000000000000000000..ae61c2ed41f2448088fbda0b24b71bd6ba5e7bd6 --- /dev/null +++ b/examples/pipeline/in_to_out/Makefile.in @@ -0,0 +1,25 @@ +PROGRAM=@TARGET_NAME@ + +CXX=g++ +CXXFLAGS=-std=c++11 +LDFLAGS=-pthread -static-libgcc -static-libstdc++ +LIBS=-L @CMAKE_INSTALL_PREFIX@/lib -lasapo-worker -L ${LIBCURL_DIR}/lib -lcurl +INCLUDE=-I @CMAKE_INSTALL_PREFIX@/include -I ${LIBCURL_DIR}/include +RM=rm -f + +SRCS=@SOURCE_FILES@ +OBJS=$(subst .cpp,.o,$(SRCS)) + +all: $(PROGRAM) + +$(PROGRAM): $(OBJS) + $(CXX) $(LDFLAGS) -o $@ $^ $(LIBS) + +%.o: %.cpp + $(CXX) $(CXXFLAGS) $(INCLUDE) -c -o $@ $< + +clean: + $(RM) $(OBJS) + +distclean: clean + $(RM) $(PROGRAM) diff --git a/examples/pipeline/in_to_out/check_linux.sh b/examples/pipeline/in_to_out/check_linux.sh new file mode 100644 index 0000000000000000000000000000000000000000..6f00ecbb7ab8c66b8e19a5f176a89d3a3eca3705 --- /dev/null +++ b/examples/pipeline/in_to_out/check_linux.sh @@ -0,0 +1,60 @@ +#!/usr/bin/env bash + +source_path=. +beamtime_id=asapo_test +stream_in=detector +stream_out=stream +indatabase_name=${beamtime_id}_${stream_in} +outdatabase_name=${beamtime_id}_${stream_out} +token=IEfwsWa0GXky2S3MkxJSUHJT1sI8DD5teRdjBUXVRxk= + +beamline=test +receiver_root_folder=/tmp/asapo/receiver/files +receiver_folder=${receiver_root_folder}/${beamline}/${beamtime_id} + + +#set -e + +trap Cleanup EXIT + +Cleanup() { + set +e + nomad stop nginx + nomad stop discovery + nomad stop broker + nomad stop receiver + nomad stop authorizer + echo "db.dropDatabase()" | mongo ${indatabase_name} + echo "db.dropDatabase()" | mongo ${outdatabase_name} + rm -rf file1 file2 file3 + rm -rf ${receiver_root_folder} + +} + +nomad run nginx.nmd +nomad run discovery.nmd +nomad run broker.nmd +nomad run receiver.nmd +nomad run authorizer.nmd + +mkdir -p $receiver_folder + +echo hello1 > file1 +echo hello2 > file2 +echo hello3 > file3 + +for i in `seq 1 3`; +do + echo 'db.data.insert({"_id":'$i',"size":6,"name":"'file$i'","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}})' | mongo ${indatabase_name} +done + +sleep 1 + +$1 127.0.0.1:8400 $source_path $beamtime_id $stream_in $stream_out $token 2 1000 1 | tee /dev/stderr | grep "Processed 3 file(s)" + + +cat ${receiver_folder}/file1_${stream_out} | grep hello1 + +cat ${receiver_folder}/file2_${stream_out} | grep hello2 + +cat ${receiver_folder}/file3_${stream_out} | grep hello3 diff --git a/examples/pipeline/in_to_out/check_windows.bat b/examples/pipeline/in_to_out/check_windows.bat new file mode 100644 index 0000000000000000000000000000000000000000..a63e3f77a32ed15d3668053768f64fa04db831c5 --- /dev/null +++ b/examples/pipeline/in_to_out/check_windows.bat @@ -0,0 +1,30 @@ +SET source_path=dummy + +SET beamtime_id=test_run +SET stream=detector +SET database_name=%beamtime_id%_%stream% + +SET mongo_exe="c:\Program Files\MongoDB\Server\3.6\bin\mongo.exe" +set token_test_run=K38Mqc90iRv8fC7prcFHd994mF_wfUiJnWBfIjIzieo= + +c:\opt\consul\nomad run discovery.nmd +c:\opt\consul\nomad run broker.nmd +c:\opt\consul\nomad run nginx.nmd + +ping 1.0.0.0 -n 10 -w 100 > nul + +for /l %%x in (1, 1, 3) do echo db.data.insert({"_id":%%x,"size":100,"name":"%%x","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}}) | %mongo_exe% %database_name% || goto :error + + +"%1" 127.0.0.1:8400 %source_path% %beamtime_id% 1 %token_test_run% 1000 1 | findstr /c:"Processed 3 file" || goto :error +goto :clean + +:error +call :clean +exit /b 1 + +:clean +c:\opt\consul\nomad stop discovery +c:\opt\consul\nomad stop broker +c:\opt\consul\nomad stop nginx +echo db.dropDatabase() | %mongo_exe% %database_name% diff --git a/examples/pipeline/in_to_out/in_to_out.cpp b/examples/pipeline/in_to_out/in_to_out.cpp new file mode 100644 index 0000000000000000000000000000000000000000..c1c39911c9b66b9ab07463b1a2c5cea210110e67 --- /dev/null +++ b/examples/pipeline/in_to_out/in_to_out.cpp @@ -0,0 +1,195 @@ +#include <iostream> +#include <memory> +#include <vector> +#include <algorithm> +#include <thread> +#include <chrono> +#include <iomanip> +#include <numeric> +#include <mutex> +#include <string> +#include <sstream> + + +#include "asapo_worker.h" +#include "asapo_producer.h" + +using std::chrono::system_clock; +using asapo::Error; + +std::string group_id = ""; +std::mutex lock; + +struct Args { + std::string server; + std::string file_path; + std::string beamtime_id; + std::string stream_in; + std::string stream_out; + std::string token; + int timeout_ms; + int nthreads; + bool transfer_data; +}; + +void ProcessAfterSend(asapo::GenericRequestHeader header, asapo::Error err) { + if (err) { + std::cerr << "Data was not successfully send: " << err << std::endl; + return; + } +} + + +void WaitThreads(std::vector<std::thread>* threads) { + for (auto& thread : *threads) { + thread.join(); + } +} + +int ProcessError(const Error& err) { + if (err == nullptr) return 0; + std::cout << err->Explain() << std::endl; + return err == asapo::IOErrorTemplates::kTimeout ? 0 : 1; +} + +std::vector<std::thread> StartThreads(const Args& args, asapo::Producer* producer, + std::vector<int>* nfiles, + std::vector<int>* errors) { + auto exec_next = [&args, nfiles, errors, producer ](int i) { + asapo::FileInfo fi; + Error err; + auto broker = asapo::DataBrokerFactory::CreateServerBroker(args.server, args.file_path, + asapo::SourceCredentials{args.beamtime_id, args.stream_in, args.token}, &err); + + broker->SetTimeout((uint64_t) args.timeout_ms); + + lock.lock(); + + if (group_id.empty()) { + group_id = broker->GenerateNewGroupId(&err); + if (err) { + (*errors)[i] += ProcessError(err); + return; + } + } + + lock.unlock(); + + if (i == 0) { + auto meta = broker->GetBeamtimeMeta(&err); + if (err == nullptr) { + std::cout << meta << std::endl; + } else { + std::cout << "Cannot get metadata: " << err->Explain() << std::endl; + } + } + while (true) { + asapo::FileData data; + err = broker->GetNext(&fi, group_id, args.transfer_data ? &data : nullptr); + if (err) { + (*errors)[i] += ProcessError(err); + if (err == asapo::IOErrorTemplates::kTimeout) { + break; + } + } + + asapo::EventHeader header{fi.id, fi.size, fi.name, fi.metadata}; + + Error err_send; + if (args.transfer_data) { + header.file_name += "_" + args.stream_out; + err_send = producer->SendData(header, std::move(data), asapo::kDefaultIngestMode, ProcessAfterSend); + } else { + header.file_name = args.file_path + "/" + header.file_name; + err_send = producer->SendData(header, nullptr, asapo::IngestModeFlags::kTransferMetaDataOnly, ProcessAfterSend); + std::cout << err_send << std::endl; + + } + + if (err_send) { + std::cout << "Send error: " << err_send->Explain() << std::endl; + } + + (*nfiles)[i]++; + } + }; + + std::vector<std::thread> threads; + for (int i = 0; i < args.nthreads; i++) { + threads.emplace_back(std::thread(exec_next, i)); + } + return threads; +} + +int ProcessAllData(const Args& args, asapo::Producer* producer, uint64_t* duration_ms, int* nerrors) { + asapo::FileInfo fi; + system_clock::time_point t1 = system_clock::now(); + + std::vector<int> nfiles(args.nthreads, 0); + std::vector<int> errors(args.nthreads, 0); + + auto threads = StartThreads(args, producer, &nfiles, &errors); + WaitThreads(&threads); + + int n_total = std::accumulate(nfiles.begin(), nfiles.end(), 0); + *nerrors = std::accumulate(errors.begin(), errors.end(), 0); + + system_clock::time_point t2 = system_clock::now(); + auto duration_read = std::chrono::duration_cast<std::chrono::milliseconds>(t2 - t1); + *duration_ms = duration_read.count(); + return n_total; +} + +std::unique_ptr<asapo::Producer> CreateProducer(const Args& args) { + asapo::Error err; + auto producer = asapo::Producer::Create(args.server, args.nthreads, + asapo::RequestHandlerType::kTcp, + asapo::SourceCredentials{args.beamtime_id, args.stream_out, args.token }, &err); + if(err) { + std::cerr << "Cannot start producer. ProducerError: " << err << std::endl; + exit(EXIT_FAILURE); + } + + producer->EnableLocalLog(true); + producer->SetLogLevel(asapo::LogLevel::Debug); + return producer; +} + + +int main(int argc, char* argv[]) { + asapo::ExitAfterPrintVersionIfNeeded("GetNext Broker Example", argc, argv); + Args args; + if (argc != 10) { + std::cout << "Usage: " + std::string{argv[0]} + + " <server> <files_path> <beamtime_id> <stream_in> <stream_out> <nthreads> <token> <timeout ms> <transfer data>" + << + std::endl; + exit(EXIT_FAILURE); + } + args.server = std::string{argv[1]}; + args.file_path = std::string{argv[2]}; + args.beamtime_id = std::string{argv[3]}; + args.stream_in = std::string{argv[4]}; + args.stream_out = std::string{argv[5]}; + args.token = std::string{argv[6]}; + args.nthreads = atoi(argv[7]); + args.timeout_ms = atoi(argv[8]); + args.transfer_data = atoi(argv[9]) == 1; + + auto producer = CreateProducer(args); + + uint64_t duration_ms; + int nerrors; + auto nfiles = ProcessAllData(args, producer.get(), &duration_ms, &nerrors); + std::cout << "Processed " << nfiles << " file(s)" << std::endl; + std::cout << "Successfully: " << nfiles - nerrors << std::endl; + std::cout << "Errors : " << nerrors << std::endl; + std::cout << "Elapsed : " << duration_ms << "ms" << std::endl; + std::cout << "Rate : " << 1000.0f * nfiles / (duration_ms - args.timeout_ms) << std::endl; + + + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + + + return nerrors == 0 ? 0 : 1; +} diff --git a/examples/worker/getnext_broker/getnext_broker.cpp b/examples/worker/getnext_broker/getnext_broker.cpp index 9d78d35990dd45ed4ba8298bffa371315a424e12..31d64038b2eab30ed95b4191de19bf183066198b 100644 --- a/examples/worker/getnext_broker/getnext_broker.cpp +++ b/examples/worker/getnext_broker/getnext_broker.cpp @@ -19,7 +19,7 @@ using asapo::Error; std::string group_id = ""; std::mutex lock; -struct Params { +struct Args { std::string server; std::string file_path; std::string beamtime_id; @@ -43,7 +43,7 @@ int ProcessError(const Error& err) { return err == asapo::IOErrorTemplates::kTimeout ? 0 : 1; } -std::vector<std::thread> StartThreads(const Params& params, +std::vector<std::thread> StartThreads(const Args& params, std::vector<int>* nfiles, std::vector<int>* errors, std::vector<int>* nbuf, @@ -114,7 +114,7 @@ std::vector<std::thread> StartThreads(const Params& params, return threads; } -int ReadAllData(const Params& params, uint64_t* duration_ms, int* nerrors, int* nbuf, int* nfiles_total) { +int ReadAllData(const Args& params, uint64_t* duration_ms, int* nerrors, int* nbuf, int* nfiles_total) { asapo::FileInfo fi; system_clock::time_point t1 = system_clock::now(); @@ -137,7 +137,7 @@ int ReadAllData(const Params& params, uint64_t* duration_ms, int* nerrors, int* return n_total; } -void TryGetStream(Params* args) { +void TryGetStream(Args* args) { std::stringstream test(args->beamtime_id); std::string segment; std::vector<std::string> seglist; @@ -159,7 +159,7 @@ void TryGetStream(Params* args) { int main(int argc, char* argv[]) { asapo::ExitAfterPrintVersionIfNeeded("GetNext Broker Example", argc, argv); - Params params; + Args params; params.datasets = false; if (argc != 8 && argc != 9) { std::cout << "Usage: " + std::string{argv[0]} diff --git a/producer/api/src/producer_impl.cpp b/producer/api/src/producer_impl.cpp index 40fd02ea103e3f36eb217d3478b8a0ab2da0db44..86dd8e7660e0eefde0691282bc552b16423cb2e8 100644 --- a/producer/api/src/producer_impl.cpp +++ b/producer/api/src/producer_impl.cpp @@ -36,10 +36,10 @@ GenericRequestHeader ProducerImpl::GenerateNextSendRequest(const EventHeader& ev event_header.user_metadata.size(), std::move(event_header.file_name)}; if (event_header.subset_id != 0) { request.op_code = kOpcodeTransferSubsetData; - request.custom_data[1] = event_header.subset_id; - request.custom_data[2] = event_header.subset_size; + request.custom_data[kPosDataSetId] = event_header.subset_id; + request.custom_data[kPosDataSetSize] = event_header.subset_size; } - request.custom_data[0] = injest_mode; + request.custom_data[kPosInjestMode] = injest_mode; return request; } diff --git a/producer/api/src/request_handler_tcp.cpp b/producer/api/src/request_handler_tcp.cpp index e8546eb9282148a7544a083d22b383bb43c7fe79..5ad76ffe915bea68691008842b6130f375d73cf6 100644 --- a/producer/api/src/request_handler_tcp.cpp +++ b/producer/api/src/request_handler_tcp.cpp @@ -50,7 +50,7 @@ Error RequestHandlerTcp::ConnectToReceiver(const std::string& beamtime_id, const bool NeedSendData(const ProducerRequest* request) { if (request->header.op_code == kOpcodeTransferData || request->header.op_code == kOpcodeTransferSubsetData) { - return request->header.custom_data[0] & IngestModeFlags::kTransferData; + return request->header.custom_data[kPosInjestMode] & IngestModeFlags::kTransferData; } return true; diff --git a/producer/api/unittests/test_producer_impl.cpp b/producer/api/unittests/test_producer_impl.cpp index f2136eb002bccca0672dc99f525b26a8cccc2b13..2a802c9a5cd0883430b3923722b4e2a7d8465060 100644 --- a/producer/api/unittests/test_producer_impl.cpp +++ b/producer/api/unittests/test_producer_impl.cpp @@ -43,7 +43,7 @@ MATCHER_P9(M_CheckSendDataRequest, op_code, source_credentials, metadata, file_i && (op_code == asapo::kOpcodeTransferSubsetData ? ((asapo::GenericRequestHeader) (arg->header)).custom_data[2] == uint64_t(subset_size) : true) && ((op_code == asapo::kOpcodeTransferSubsetData || op_code == asapo::kOpcodeTransferData) ? - ((asapo::GenericRequestHeader) (arg->header)).custom_data[0] == uint64_t(injest_mode) : true) + ((asapo::GenericRequestHeader) (arg->header)).custom_data[asapo::kPosInjestMode] == uint64_t(injest_mode) : true) && strcmp(((asapo::GenericRequestHeader) (arg->header)).message, message) == 0; } @@ -64,7 +64,7 @@ class ProducerImplTests : public testing::Test { uint64_t expected_id = 10; uint64_t expected_subset_id = 100; uint64_t expected_subset_size = 4; - uint64_t expected_injest_mode = asapo::IngestModeFlags::kTransferData | asapo::IngestModeFlags::kStoreInCache; + uint64_t expected_injest_mode = asapo::IngestModeFlags::kTransferData; char expected_name[asapo::kMaxMessageSize] = "test_name"; asapo::SourceCredentials expected_credentials{ diff --git a/producer/api/unittests/test_request_handler_tcp.cpp b/producer/api/unittests/test_request_handler_tcp.cpp index c0b9cfb4239a2653466606fa8dee59bd2d8e380c..70cf1706954d123af94654137071e4368abb056c 100644 --- a/producer/api/unittests/test_request_handler_tcp.cpp +++ b/producer/api/unittests/test_request_handler_tcp.cpp @@ -109,9 +109,9 @@ class RequestHandlerTcpTests : public testing::Test { void SetUp() override { request_handler.log__ = &mock_logger; request_handler.io__.reset(&mock_io); - request.header.custom_data[0] = asapo::kDefaultIngestMode; - request_filesend.header.custom_data[0] = asapo::kDefaultIngestMode; - request_nocallback.header.custom_data[0] = asapo::kDefaultIngestMode; + request.header.custom_data[asapo::kPosInjestMode] = asapo::kDefaultIngestMode; + request_filesend.header.custom_data[asapo::kPosInjestMode] = asapo::kDefaultIngestMode; + request_nocallback.header.custom_data[asapo::kPosInjestMode] = asapo::kDefaultIngestMode; ON_CALL(mock_discovery_service, RotatedUriList(_)). WillByDefault(Return(receivers_list)); @@ -716,7 +716,7 @@ TEST_F(RequestHandlerTcpTests, SendMetadataIgnoresInjestMode) { ExpectOKReceive(); auto injest_mode = asapo::IngestModeFlags::kTransferMetaDataOnly; - request.header.custom_data[0] = injest_mode; + request.header.custom_data[asapo::kPosInjestMode] = injest_mode; request.header.op_code = asapo::kOpcodeTransferMetaData; request_handler.PrepareProcessingRequestLocked(); @@ -735,12 +735,12 @@ TEST_F(RequestHandlerTcpTests, SendMetaOnlyOK) { auto injest_mode = asapo::IngestModeFlags::kTransferMetaDataOnly; - request.header.custom_data[0] = injest_mode; + request.header.custom_data[asapo::kPosInjestMode] = injest_mode; request_handler.PrepareProcessingRequestLocked(); auto err = request_handler.ProcessRequestUnlocked(&request); ASSERT_THAT(err, Eq(nullptr)); - ASSERT_THAT(callback_header.custom_data[0], Eq(injest_mode)); + ASSERT_THAT(callback_header.custom_data[asapo::kPosInjestMode], Eq(injest_mode)); } diff --git a/receiver/src/request.cpp b/receiver/src/request.cpp index e8e7646891ac5902db565d0e6960c4b6816c48ba..c5bcb69ff66e7f2a2e44ea1dc4c94890fde1af3e 100644 --- a/receiver/src/request.cpp +++ b/receiver/src/request.cpp @@ -47,8 +47,13 @@ Error Request::ReceiveMetaData() { } +bool Request::NeedReceiveData() { + return request_header_.data_size > 0 && + (request_header_.custom_data[asapo::kPosInjestMode] & asapo::kTransferData); +} + Error Request::ReceiveData() { - if (request_header_.data_size == 0) { + if (!NeedReceiveData()) { return nullptr; } diff --git a/receiver/src/request.h b/receiver/src/request.h index b11f825bb1d4f882384e6613586c633faf6694e7..c8cf216fd48565ffb62d699c5505b19c84ed1ff0 100644 --- a/receiver/src/request.h +++ b/receiver/src/request.h @@ -57,7 +57,7 @@ class Request { Error ReceiveData(); Error ReceiveMetaData(); Error ReceiveRequestContent(ReceiverStatistics* statistics); - + bool NeedReceiveData(); const GenericRequestHeader request_header_; const SocketDescriptor socket_fd_; FileData data_buffer_; diff --git a/receiver/src/request_factory.cpp b/receiver/src/request_factory.cpp index d0e1c152244b74ecedc06b31b56662439b779f2c..ef08ad35d797898660f7e0781f092d84712c6cb0 100644 --- a/receiver/src/request_factory.cpp +++ b/receiver/src/request_factory.cpp @@ -4,42 +4,62 @@ namespace asapo { -std::unique_ptr<Request> RequestFactory::GenerateRequest(const GenericRequestHeader& - request_header, SocketDescriptor socket_fd, std::string origin_uri, - Error* err) const noexcept { - *err = nullptr; - auto request = std::unique_ptr<Request> {new Request{request_header, socket_fd, std::move(origin_uri), cache_.get()}}; +bool NeedFileWriteHandler (const GenericRequestHeader& request_header) { + return GetReceiverConfig()->write_to_disk && + (request_header.custom_data[kPosInjestMode] & IngestModeFlags::kStoreInFilesystem); +} + +bool NeedDbHandler (const GenericRequestHeader& request_header) { + return GetReceiverConfig()->write_to_db; +} + +Error RequestFactory::AddHandlersToRequest(std::unique_ptr<Request>& request, + const GenericRequestHeader& request_header) const { + request->AddHandler(&request_handler_authorize_); + switch (request_header.op_code) { case Opcode::kOpcodeTransferData: case Opcode::kOpcodeTransferSubsetData: { - request->AddHandler(&request_handler_authorize_); - if (GetReceiverConfig()->write_to_disk) { + if (NeedFileWriteHandler(request_header)) { request->AddHandler(&request_handler_filewrite_); } - if (GetReceiverConfig()->write_to_db) { + if (NeedDbHandler(request_header)) { request->AddHandler(&request_handler_dbwrite_); } - return request; + break; } case Opcode::kOpcodeTransferMetaData: { - request->AddHandler(&request_handler_authorize_); - if (GetReceiverConfig()->write_to_db) { + if (NeedDbHandler(request_header)) { request->AddHandler(&request_handler_db_meta_write_); } else { - *err = ReceiverErrorTemplates::kReject.Generate("reciever does not support writing to database"); - return nullptr; + return ReceiverErrorTemplates::kReject.Generate("reciever does not support writing to database"); } - return request; + break; } case Opcode::kOpcodeAuthorize: { - request->AddHandler(&request_handler_authorize_); - return request; + // do nothing + break; } default: - *err = ReceiverErrorTemplates::kInvalidOpCode.Generate(); + return ReceiverErrorTemplates::kInvalidOpCode.Generate(); + } + + return nullptr; + +} + +std::unique_ptr<Request> RequestFactory::GenerateRequest(const GenericRequestHeader& + request_header, SocketDescriptor socket_fd, std::string origin_uri, + Error* err) const noexcept { + auto request = std::unique_ptr<Request> {new Request{request_header, socket_fd, std::move(origin_uri), cache_.get()}}; + *err = AddHandlersToRequest(request, request_header); + if (*err) { return nullptr; } + return request; } + + RequestFactory::RequestFactory(SharedCache cache): cache_{cache} { } diff --git a/receiver/src/request_factory.h b/receiver/src/request_factory.h index ec4043b6b5e018c7b31063465b285af57e1eca93..f77a2c39506364344642f27d5d2ac5d6494f6e40 100644 --- a/receiver/src/request_factory.h +++ b/receiver/src/request_factory.h @@ -11,6 +11,7 @@ class RequestFactory { virtual std::unique_ptr<Request> GenerateRequest(const GenericRequestHeader& request_header, SocketDescriptor socket_fd, std::string origin_uri, Error* err) const noexcept; private: + Error AddHandlersToRequest(std::unique_ptr<Request>& request, const GenericRequestHeader& request_header) const; RequestHandlerFileWrite request_handler_filewrite_; RequestHandlerDbWrite request_handler_dbwrite_{kDBDataCollectionName}; RequestHandlerDbMetaWrite request_handler_db_meta_write_{kDBMetaCollectionName}; diff --git a/receiver/src/request_handler_file_write.cpp b/receiver/src/request_handler_file_write.cpp index 7a3009a50edaf482f3afb496a5fe5b80455762d1..260fb1e76358bb662f31043972a32488e537817f 100644 --- a/receiver/src/request_handler_file_write.cpp +++ b/receiver/src/request_handler_file_write.cpp @@ -10,7 +10,7 @@ namespace asapo { Error RequestHandlerFileWrite::ProcessRequest(Request* request) const { auto fsize = request->GetDataSize(); if (fsize <= 0 || fsize > kMaxFileSize) { - return ReceiverErrorTemplates::kBadRequest.Generate(); + return ReceiverErrorTemplates::kBadRequest.Generate("wrong file size"); } auto data = request->GetData(); diff --git a/receiver/unittests/test_request.cpp b/receiver/unittests/test_request.cpp index 73d917796925db7c1e1ee31e595738701136966d..33859865a2233137f6e5cd86bf3e9cbc47759a95 100644 --- a/receiver/unittests/test_request.cpp +++ b/receiver/unittests/test_request.cpp @@ -83,6 +83,7 @@ class RequestTests : public Test { generic_request_header.data_id = data_id_; generic_request_header.meta_size = expected_metadata_size; generic_request_header.op_code = expected_op_code; + generic_request_header.custom_data[asapo::kPosInjestMode] = asapo::kDefaultIngestMode; strcpy(generic_request_header.message, expected_request_message); request.reset(new Request{generic_request_header, socket_fd_, expected_origin_uri, nullptr}); request->io__ = std::unique_ptr<asapo::IO> {&mock_io}; @@ -145,6 +146,22 @@ TEST_F(RequestTests, HandleDoesNotReceiveEmptyData) { ASSERT_THAT(err, Eq(nullptr)); } + +TEST_F(RequestTests, HandleDoesNotReceiveDataWhenMetadataOnlyWasSent) { + generic_request_header.data_size = 10; + generic_request_header.custom_data[asapo::kPosInjestMode] = asapo::kTransferMetaDataOnly; + request->io__.release(); + request.reset(new Request{generic_request_header, socket_fd_, "", nullptr}); + request->io__ = std::unique_ptr<asapo::IO> {&mock_io};; + + ExpectReceiveMetaData(true); + + auto err = request->Handle(stat); + + ASSERT_THAT(err, Eq(nullptr)); +} + + TEST_F(RequestTests, HandleReturnsErrorOnDataReceive) { ExpectReceiveData(false); auto err = request->Handle(stat); diff --git a/receiver/unittests/test_request_factory.cpp b/receiver/unittests/test_request_factory.cpp index c737746152962c9294fb440d2d5e290027ac5933..c9c70754f2df7bfcf5b38ef4d415229ef934f598 100644 --- a/receiver/unittests/test_request_factory.cpp +++ b/receiver/unittests/test_request_factory.cpp @@ -60,6 +60,7 @@ class FactoryTests : public Test { std::string origin_uri{"origin_uri"}; void SetUp() override { generic_request_header.op_code = asapo::Opcode::kOpcodeTransferData; + generic_request_header.custom_data[asapo::kPosInjestMode] = asapo::kDefaultIngestMode; config.write_to_disk = true; config.write_to_db = true; SetReceiverConfig(config, "none"); @@ -99,7 +100,7 @@ TEST_F(FactoryTests, ReturnsDataRequestForAuthorizationCode) { } -TEST_F(FactoryTests, DoNotAddDiskWriterIfNotWanted) { +TEST_F(FactoryTests, DoNotAddDiskWriterIfNotWantedInConfig) { config.write_to_disk = false; SetReceiverConfig(config, "none"); @@ -111,6 +112,15 @@ TEST_F(FactoryTests, DoNotAddDiskWriterIfNotWanted) { ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerDbWrite*>(request->GetListHandlers().back()), Ne(nullptr)); } +TEST_F(FactoryTests, DoNotAddDiskWriterIfNotWantedInRequest) { + generic_request_header.custom_data[asapo::kPosInjestMode] = asapo::IngestModeFlags::kTransferData; + auto request = factory.GenerateRequest(generic_request_header, 1, origin_uri, &err); + ASSERT_THAT(err, Eq(nullptr)); + ASSERT_THAT(request->GetListHandlers().size(), Eq(2)); + ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerAuthorize*>(request->GetListHandlers()[0]), Ne(nullptr)); + ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerDbWrite*>(request->GetListHandlers().back()), Ne(nullptr)); +} + TEST_F(FactoryTests, DoNotAddDbWriterIfNotWanted) { config.write_to_db = false; diff --git a/tests/automatic/system_io/write_data_to_file/write_data_to_file.cpp b/tests/automatic/system_io/write_data_to_file/write_data_to_file.cpp index b629bc158236772c8fdeccdb72a4d6f08439d31d..edc3e5179a01076f067fd4149674afea1a77b3f6 100644 --- a/tests/automatic/system_io/write_data_to_file/write_data_to_file.cpp +++ b/tests/automatic/system_io/write_data_to_file/write_data_to_file.cpp @@ -8,14 +8,14 @@ using asapo::Error; using asapo::FileData; -struct Params { +struct Args { std::string fname; std::string result; std::string message; uint64_t length; }; -Params GetParams(int argc, char* argv[]) { +Args GetParams(int argc, char* argv[]) { if (argc != 4) { std::cout << "Wrong number of arguments" << std::endl; exit(EXIT_FAILURE); @@ -24,11 +24,11 @@ Params GetParams(int argc, char* argv[]) { std::string result{argv[2]}; std::string message{argv[3]}; - return Params{fname, result, message, 3}; + return Args{fname, result, message, 3}; } void AssertGoodResult(const std::unique_ptr<IO>& io, const Error& err, const FileData& data, - const Params& params) { + const Args& params) { if (err) { std::cerr << err << std::endl; exit(EXIT_FAILURE); @@ -39,7 +39,7 @@ void AssertGoodResult(const std::unique_ptr<IO>& io, const Error& err, const Fil asapo::M_AssertContains(std::string(read_data.get(), read_data.get() + params.length), "123"); } -void AssertBadResult(const Error& err, const Params& params) { +void AssertBadResult(const Error& err, const Args& params) { if (err == nullptr) { std::cerr << "Should be error" << std::endl; exit(EXIT_FAILURE); diff --git a/tests/manual/performance_broker_receiver/getlast_broker.cpp b/tests/manual/performance_broker_receiver/getlast_broker.cpp index af57a6e8641516c47c35a528c93db186f87b799e..74ef2c9b1f2e1c8f41060902ef9b485cc4c078f8 100644 --- a/tests/manual/performance_broker_receiver/getlast_broker.cpp +++ b/tests/manual/performance_broker_receiver/getlast_broker.cpp @@ -16,7 +16,7 @@ using asapo::Error; std::string group_id = ""; std::mutex lock; -struct Params { +struct Args { std::string server; std::string file_path; std::string beamtime_id; @@ -39,7 +39,7 @@ int ProcessError(const Error& err) { return err == asapo::IOErrorTemplates::kTimeout ? 0 : 1; } -std::vector<std::thread> StartThreads(const Params& params, +std::vector<std::thread> StartThreads(const Args& params, std::vector<int>* nfiles, std::vector<int>* errors, std::vector<int>* nbuf, @@ -102,7 +102,7 @@ std::vector<std::thread> StartThreads(const Params& params, return threads; } -int ReadAllData(const Params& params, uint64_t* duration_ms, int* nerrors, int* nbuf, int* nfiles_total) { +int ReadAllData(const Args& params, uint64_t* duration_ms, int* nerrors, int* nbuf, int* nfiles_total) { asapo::FileInfo fi; system_clock::time_point t1 = system_clock::now(); @@ -127,7 +127,7 @@ int ReadAllData(const Params& params, uint64_t* duration_ms, int* nerrors, int* int main(int argc, char* argv[]) { asapo::ExitAfterPrintVersionIfNeeded("GetLast Broker Example", argc, argv); - Params params; + Args params; params.datasets = false; if (argc != 8 && argc != 9) { std::cout << "Usage: " + std::string{argv[0]} diff --git a/tests/manual/python_tests/test_p.py b/tests/manual/python_tests/test_p.py index 5f1f8b973e20a1b9e3ede7c58ee3798cc89a88fd..9000bcd7beebcd1f66d41d2a29f84edd39bb01cb 100644 --- a/tests/manual/python_tests/test_p.py +++ b/tests/manual/python_tests/test_p.py @@ -5,16 +5,10 @@ import sys import json import time -<<<<<<< Updated upstream -broker, err = asapo_worker.create_server_broker("psana002:8400", "/tmp", "asapo_test2","", - "yzgAcLmijSLWIm8dBiGNCbc0i42u5HSm-zR6FRqo__Y=", 1000000) -======= source = "psana002:8400" path = "/asapo_shared/asapo/data" beamtime = "asapo_test" token = "KmUDdacgBzaOD3NIJvN1NmKGqWKtx0DK-NyPjdpeWkc=" ->>>>>>> Stashed changes - broker, err = asapo_worker.create_server_broker( source, path, beamtime, token, 1000)