diff --git a/common/cpp/src/data_structs/data_structs.cpp b/common/cpp/src/data_structs/data_structs.cpp index c23d24ab69cb62c150855e00b3c66b4af1521425..eb5e22a80ea53a32714617c35f6cb9db471a1615 100644 --- a/common/cpp/src/data_structs/data_structs.cpp +++ b/common/cpp/src/data_structs/data_structs.cpp @@ -12,16 +12,16 @@ namespace asapo { std::string FileInfo::Json() const { auto nanoseconds_from_epoch = std::chrono::time_point_cast<std::chrono::nanoseconds>(modify_date). time_since_epoch().count(); - + std::string x = name; //todo: change this - use / when sending file from windows #ifdef WIN32 - std::string old{kPathSeparator}; + std::string old {kPathSeparator}; std::string rep = old + kPathSeparator; int pos = 0; while ((pos = x.find(old, pos)) != std::string::npos) { - x.replace(pos, old.length(), rep); - pos += rep.length(); + x.replace(pos, old.length(), rep); + pos += rep.length(); } #endif diff --git a/common/cpp/src/system_io/system_io_linux.cpp b/common/cpp/src/system_io/system_io_linux.cpp index a072e24981609790b2291f92b4f3d98cd7af1982..c786841fe3c04f6a6105987557bf17b8a351baf0 100644 --- a/common/cpp/src/system_io/system_io_linux.cpp +++ b/common/cpp/src/system_io/system_io_linux.cpp @@ -6,6 +6,7 @@ #include <sys/stat.h> #include <algorithm> #include <netinet/in.h> +#include <netinet/tcp.h> #include <arpa/inet.h> #include <fcntl.h> #include <iostream> @@ -15,7 +16,6 @@ #include "system_io.h" - using std::string; using std::vector; using std::chrono::system_clock; @@ -212,7 +212,9 @@ void SystemIO::ApplyNetworkOptions(SocketDescriptor socket_fd, Error* err) const setsockopt(socket_fd, SOL_SOCKET, SO_SNDBUF, (char*)&kNetBufferSize, sizeof(kNetBufferSize)) != 0 || setsockopt(socket_fd, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof(int)) != 0 || - setsockopt(socket_fd, SOL_SOCKET, SO_REUSEPORT, &flag, sizeof(int)) != 0 + setsockopt(socket_fd, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof(int)) != 0 || + setsockopt(socket_fd, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(int)) != 0 + ) { *err = GetLastError(); } diff --git a/common/cpp/unittests/data_structs/test_data_structs.cpp b/common/cpp/unittests/data_structs/test_data_structs.cpp index 8b8c41d4a182b9a24ccaa3d48438d8b6a7beb0b2..e9fee1f8c860a18c43f3bea59a782494e12cf0bb 100644 --- a/common/cpp/unittests/data_structs/test_data_structs.cpp +++ b/common/cpp/unittests/data_structs/test_data_structs.cpp @@ -44,14 +44,14 @@ TEST(FileInFo, Defaults) { TEST(FileInFo, CorrectConvertToJson) { auto finfo = PrepareFileInfo(); std::string json = finfo.Json(); - printf("%s\n",json.c_str()); + printf("%s\n", json.c_str()); if (asapo::kPathSeparator == '/') { - ASSERT_THAT(json, Eq( - R"({"_id":1,"size":100,"name":"folder/test","lastchange":1000000,"source":"host:1234","buf_id":-1})")); + ASSERT_THAT(json, Eq( + R"({"_id":1,"size":100,"name":"folder/test","lastchange":1000000,"source":"host:1234","buf_id":-1})")); } else { - ASSERT_THAT(json, Eq( - R"({"_id":1,"size":100,"name":"folder\\test","lastchange":1000000,"source":"host:1234","buf_id":-1})")); - } + ASSERT_THAT(json, Eq( + R"({"_id":1,"size":100,"name":"folder\\test","lastchange":1000000,"source":"host:1234","buf_id":-1})")); + } } TEST(FileInFo, CorrectConvertFromJsonReturnsError) { diff --git a/deploy/nomad_jobs/receiver.json.tpl b/deploy/nomad_jobs/receiver.json.tpl index 3795e8561c024f1b624c78493939752619d0a667..1d669fc6f34ee405fd55feeacd7e4fe6e2a10b06 100644 --- a/deploy/nomad_jobs/receiver.json.tpl +++ b/deploy/nomad_jobs/receiver.json.tpl @@ -17,6 +17,6 @@ "Tag": "{{ env "NOMAD_ADDR_recv" }}", "WriteToDisk":true, "WriteToDb":true, - "LogLevel" : "info", + "LogLevel": "{{ keyOrDefault "receiver_log_level" "info" }}", "RootFolder" : "/var/lib/receiver/data" } diff --git a/examples/worker/getnext_broker/getnext_broker.cpp b/examples/worker/getnext_broker/getnext_broker.cpp index c9a497d61ef7bf279b03e5c0f3c0c63d9ab18dc9..2355199e042f3829fd7e122681553b767889c92c 100644 --- a/examples/worker/getnext_broker/getnext_broker.cpp +++ b/examples/worker/getnext_broker/getnext_broker.cpp @@ -95,7 +95,7 @@ int ReadAllData(const Params& params, uint64_t* duration_ms, int* nerrors, int* int main(int argc, char* argv[]) { asapo::ExitAfterPrintVersionIfNeeded("GetNext Broker Example", argc, argv); - if (argc != 8) { + if (argc != 9) { std::cout << "Usage: " + std::string{argv[0]} + " <server> <files_path> <run_name> <nthreads> <token> <timeout ms> <metaonly>" << diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 55d918a7c01e0b9443d020fc34128b382b6d0266..30598b059e866cf2fb886e6f3e9ce32167f2b2cd 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -3,3 +3,4 @@ add_subdirectory(automatic) configure_files(${CMAKE_CURRENT_SOURCE_DIR}/manual/tests_via_nomad ${CMAKE_CURRENT_BINARY_DIR}/manual/tests_via_nomad @ONLY) +add_subdirectory(manual/performance_broker_receiver) diff --git a/tests/manual/performance_broker_receiver/CMakeLists.txt b/tests/manual/performance_broker_receiver/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..40714bf99702539d450e5245c8d3bae8a9f6e2cd --- /dev/null +++ b/tests/manual/performance_broker_receiver/CMakeLists.txt @@ -0,0 +1,17 @@ +set(TARGET_NAME getlast_broker) +set(SOURCE_FILES getlast_broker.cpp) + + +add_executable(${TARGET_NAME} ${SOURCE_FILES}) +target_link_libraries(${TARGET_NAME} asapo-worker) + +#use expression generator to get rid of VS adding Debug/Release folders +set_target_properties(${TARGET_NAME} PROPERTIES RUNTIME_OUTPUT_DIRECTORY + ${CMAKE_CURRENT_BINARY_DIR}$<$<CONFIG:Debug>:> + ) + +get_target_property(VAR ${TARGET_NAME} RUNTIME_OUTPUT_DIRECTORY) +add_dependencies(${TARGET_NAME} asapo-broker) + + + diff --git a/tests/manual/performance_broker_receiver/getlast_broker.cpp b/tests/manual/performance_broker_receiver/getlast_broker.cpp new file mode 100644 index 0000000000000000000000000000000000000000..9bdb7e7ad5f833fea1a9d1f8c51078ca8bf85467 --- /dev/null +++ b/tests/manual/performance_broker_receiver/getlast_broker.cpp @@ -0,0 +1,132 @@ +#include <iostream> +#include <memory> +#include <vector> +#include <algorithm> +#include <thread> +#include <chrono> +#include <iomanip> +#include <numeric> + +#include "asapo_worker.h" + +using std::chrono::high_resolution_clock; +using asapo::Error; + +struct Params { + std::string server; + std::string file_path; + std::string beamtime_id; + std::string token; + int timeout_ms; + int nthreads; + bool read_data; +}; + +void WaitThreads(std::vector<std::thread>* threads) { + for (auto& thread : *threads) { + thread.join(); + } +} + +int ProcessError(const Error& err) { + if (err == nullptr) return 0; + std::cout << err->Explain() << std::endl; + return err->GetErrorType() == asapo::ErrorType::kTimeOut ? 0 : 1; +} + +std::vector<std::thread> StartThreads(const Params& params, + std::vector<int>* nfiles, + std::vector<int>* errors, + std::vector<int>* nbuf) { + auto exec_next = [¶ms, nfiles, errors, nbuf](int i) { + asapo::FileInfo fi; + Error err; + auto broker = asapo::DataBrokerFactory::CreateServerBroker(params.server, params.file_path, params.beamtime_id, + params.token, &err); + broker->SetTimeout(params.timeout_ms); + asapo::FileData data; + + auto start = high_resolution_clock::now(); + while (std::chrono::duration_cast<std::chrono::milliseconds>(high_resolution_clock::now() - start).count() < + params.timeout_ms) { + err = broker->GetLast(&fi, params.read_data ? &data : nullptr); + if (err == nullptr) { + (*nbuf)[i] += fi.buf_id == 0 ? 0 : 1; + if (params.read_data && (*nfiles)[i] < 10 && fi.size < 10) { + if (data != nullptr) { + data[9] = 0; + std::cout << "Received: " << reinterpret_cast<char const*>(data.get()) << std::endl; + } + } + } else { + (*errors)[i] += ProcessError(err); + if (err->GetErrorType() == asapo::ErrorType::kTimeOut) { + break; + } + } + (*nfiles)[i]++; + } + }; + + std::vector<std::thread> threads; + for (int i = 0; i < params.nthreads; i++) { + threads.emplace_back(std::thread(exec_next, i)); + } + return threads; +} + +int ReadAllData(const Params& params, uint64_t* duration_ms, int* nerrors, int* nbuf) { + asapo::FileInfo fi; + high_resolution_clock::time_point t1 = high_resolution_clock::now(); + + std::vector<int> nfiles(params.nthreads, 0); + std::vector<int> errors(params.nthreads, 0); + std::vector<int> nfiles_frombuf(params.nthreads, 0); + + auto threads = StartThreads(params, &nfiles, &errors, &nfiles_frombuf); + WaitThreads(&threads); + + int n_total = std::accumulate(nfiles.begin(), nfiles.end(), 0); + *nerrors = std::accumulate(errors.begin(), errors.end(), 0); + *nbuf = std::accumulate(nfiles_frombuf.begin(), nfiles_frombuf.end(), 0); + + + high_resolution_clock::time_point t2 = high_resolution_clock::now(); + auto duration_read = std::chrono::duration_cast<std::chrono::milliseconds>(t2 - t1); + *duration_ms = duration_read.count(); + return n_total; +} + +int main(int argc, char* argv[]) { + asapo::ExitAfterPrintVersionIfNeeded("GetLast Broker Example", argc, argv); + if (argc != 8) { + std::cout << "Usage: " + std::string{argv[0]} + + " <server> <files_path> <run_name> <nthreads> <token> <timeout ms> <metaonly>" + << + std::endl; + exit(EXIT_FAILURE); + } + Params params; + params.server = std::string{argv[1]}; + params.file_path = std::string{argv[2]}; + params.beamtime_id = std::string{argv[3]}; + params.nthreads = atoi(argv[4]); + params.token = std::string{argv[5]}; + params.timeout_ms = atoi(argv[6]); + params.read_data = atoi(argv[7]) != 1; + + uint64_t duration_ms; + int nerrors, nbuf; + auto nfiles = ReadAllData(params, &duration_ms, &nerrors, &nbuf); + + std::cout << "Processed " << nfiles << " file(s)" << std::endl; + std::cout << "Successfully: " << nfiles - nerrors << std::endl; + if (params.read_data) { + std::cout << " from memory buffer: " << nbuf << std::endl; + std::cout << " from filesystem: " << nfiles - nerrors - nbuf << std::endl; + } + std::cout << "Errors : " << nerrors << std::endl; + std::cout << "Elapsed : " << duration_ms << "ms" << std::endl; + std::cout << "Rate : " << 1000.0f * nfiles / (duration_ms) << " Hz" << std::endl; + return nerrors == 0 ? 0 : 1; +} diff --git a/tests/manual/python_tests/test_p.py b/tests/manual/python_tests/test_p.py index b3b771270595c9b6541af62d497892de6c4908ed..a679a30b0c3134f198af3982c3363674662d19bc 100644 --- a/tests/manual/python_tests/test_p.py +++ b/tests/manual/python_tests/test_p.py @@ -11,16 +11,10 @@ if not broker: print("Cannot create broker: " + err) sys.exit(1) -first = True +last_id = 0 while True: - if first: - array, meta, err = broker.get_last(meta_only=False) - first = False - else: - array, meta, err = broker.get_next(meta_only=False) - if err: - print ('err: ', err) - break - print ('data:', array.tostring().strip()) - print ('filename: ', meta['name']) - print ('meta: ', json.dumps(meta, indent=4, sort_keys=True)) + array, meta, err = broker.get_last(meta_only=False) +# id = meta['_id'] +# if id != last_id: +# print ("file content:",array.tostring().strip().decode("utf-8")) +# last_id = id