diff --git a/tests/manual/performance_broker_receiver/getlast_broker.cpp b/tests/manual/performance_broker_receiver/getlast_broker.cpp index d339a65fc9487d2ecf391da6663168f821be743a..29ee0e4a0a08e8b7d3010f5060f1f7d1aae5d07c 100644 --- a/tests/manual/performance_broker_receiver/getlast_broker.cpp +++ b/tests/manual/performance_broker_receiver/getlast_broker.cpp @@ -6,17 +6,16 @@ #include <chrono> #include <iomanip> #include <numeric> +#include <mutex> #include "asapo_worker.h" -#include <mutex> +using std::chrono::system_clock; +using asapo::Error; std::string group_id = ""; std::mutex lock; -using std::chrono::system_clock; -using asapo::Error; - struct Params { std::string server; std::string file_path; @@ -25,6 +24,7 @@ struct Params { int timeout_ms; int nthreads; bool read_data; + bool datasets; }; void WaitThreads(std::vector<std::thread>* threads) { @@ -42,47 +42,57 @@ int ProcessError(const Error& err) { std::vector<std::thread> StartThreads(const Params& params, std::vector<int>* nfiles, std::vector<int>* errors, - std::vector<int>* nbuf) { - auto exec_next = [¶ms, nfiles, errors, nbuf](int i) { - asapo::FileInfo fi; - Error err; - auto broker = asapo::DataBrokerFactory::CreateServerBroker(params.server, params.file_path, params.beamtime_id, - params.token, &err); - broker->SetTimeout(params.timeout_ms); - asapo::FileData data; - - lock.lock(); - - if (group_id.empty()) { - group_id = broker->GenerateNewGroupId(&err); - if (err) { - (*errors)[i] += ProcessError(err); - return; - } - } - - lock.unlock(); - - auto start = system_clock::now(); - while (std::chrono::duration_cast<std::chrono::milliseconds>(system_clock::now() - start).count() < - params.timeout_ms) { - err = broker->GetLast(&fi, group_id, params.read_data ? &data : nullptr); - if (err == nullptr) { - (*nbuf)[i] += fi.buf_id == 0 ? 0 : 1; - if (params.read_data && (*nfiles)[i] < 10 && fi.size < 10) { - if (data != nullptr) { - data[9] = 0; - std::cout << "Received: " << reinterpret_cast<char const*>(data.get()) << std::endl; - } - } - } else { - (*errors)[i] += ProcessError(err); - if (err == asapo::IOErrorTemplates::kTimeout) { - break; - } - } - (*nfiles)[i]++; - } + std::vector<int>* nbuf, + std::vector<int>* nfiles_total) { + auto exec_next = [¶ms, nfiles, errors, nbuf, nfiles_total](int i) { + asapo::FileInfo fi; + Error err; + auto broker = asapo::DataBrokerFactory::CreateServerBroker(params.server, params.file_path, params.beamtime_id, + params.token, &err); + broker->SetTimeout((uint64_t) params.timeout_ms); + asapo::FileData data; + + lock.lock(); + + if (group_id.empty()) { + group_id = broker->GenerateNewGroupId(&err); + if (err) { + (*errors)[i] += ProcessError(err); + return; + } + } + + lock.unlock(); + + auto start = system_clock::now(); + while (std::chrono::duration_cast<std::chrono::milliseconds>(system_clock::now() - start).count() < + params.timeout_ms) { + if (params.datasets) { + auto dataset = broker->GetLastDataset(group_id, &err); + if (err == nullptr) { + for (auto& fi : dataset.content) { + (*nbuf)[i] += fi.buf_id == 0 ? 0 : 1; + (*nfiles_total)[i]++; + } + } + } else { + err = broker->GetLast(&fi, group_id, params.read_data ? &data : nullptr); + if (err == nullptr) { + (*nbuf)[i] += fi.buf_id == 0 ? 0 : 1; + if (params.read_data && (*nfiles)[i] < 10 && fi.size < 10) { + data[9] = 0; + std::cout << "Received: " << reinterpret_cast<char const*>(data.get()) << std::endl; + } + } + } + if (err) { + (*errors)[i] += ProcessError(err); + if (err == asapo::IOErrorTemplates::kTimeout) { + break; + } + } + (*nfiles)[i]++; + } }; std::vector<std::thread> threads; @@ -92,21 +102,22 @@ std::vector<std::thread> StartThreads(const Params& params, return threads; } -int ReadAllData(const Params& params, uint64_t* duration_ms, int* nerrors, int* nbuf) { +int ReadAllData(const Params& params, uint64_t* duration_ms, int* nerrors, int* nbuf, int* nfiles_total) { asapo::FileInfo fi; system_clock::time_point t1 = system_clock::now(); std::vector<int> nfiles(params.nthreads, 0); std::vector<int> errors(params.nthreads, 0); std::vector<int> nfiles_frombuf(params.nthreads, 0); + std::vector<int> nfiles_total_in_datasets(params.nthreads, 0); - auto threads = StartThreads(params, &nfiles, &errors, &nfiles_frombuf); + auto threads = StartThreads(params, &nfiles, &errors, &nfiles_frombuf, &nfiles_total_in_datasets); WaitThreads(&threads); int n_total = std::accumulate(nfiles.begin(), nfiles.end(), 0); *nerrors = std::accumulate(errors.begin(), errors.end(), 0); *nbuf = std::accumulate(nfiles_frombuf.begin(), nfiles_frombuf.end(), 0); - + *nfiles_total = std::accumulate(nfiles_total_in_datasets.begin(), nfiles_total_in_datasets.end(), 0); system_clock::time_point t2 = system_clock::now(); auto duration_read = std::chrono::duration_cast<std::chrono::milliseconds>(t2 - t1); @@ -116,14 +127,15 @@ int ReadAllData(const Params& params, uint64_t* duration_ms, int* nerrors, int* int main(int argc, char* argv[]) { asapo::ExitAfterPrintVersionIfNeeded("GetLast Broker Example", argc, argv); - if (argc != 8) { + Params params; + params.datasets = false; + if (argc != 8 && argc != 9) { std::cout << "Usage: " + std::string{argv[0]} - + " <server> <files_path> <run_name> <nthreads> <token> <timeout ms> <metaonly>" + + " <server> <files_path> <run_name> <nthreads> <token> <timeout ms> <metaonly> [use datasets]" << std::endl; exit(EXIT_FAILURE); } - Params params; params.server = std::string{argv[1]}; params.file_path = std::string{argv[2]}; params.beamtime_id = std::string{argv[3]}; @@ -131,14 +143,18 @@ int main(int argc, char* argv[]) { params.token = std::string{argv[5]}; params.timeout_ms = atoi(argv[6]); params.read_data = atoi(argv[7]) != 1; - + if (argc == 9) { + params.datasets = atoi(argv[8]) == 1; + } uint64_t duration_ms; - int nerrors, nbuf; - auto nfiles = ReadAllData(params, &duration_ms, &nerrors, &nbuf); - - std::cout << "Processed " << nfiles << " file(s)" << std::endl; + int nerrors, nbuf, nfiles_total; + auto nfiles = ReadAllData(params, &duration_ms, &nerrors, &nbuf, &nfiles_total); + std::cout << "Processed " << nfiles << (params.datasets ? " dataset(s)" : " file(s)") << std::endl; + if (params.datasets) { + std::cout << " with " << nfiles_total << " file(s)" << std::endl; + } std::cout << "Successfully: " << nfiles - nerrors << std::endl; - if (params.read_data) { + if (params.read_data && !params.datasets) { std::cout << " from memory buffer: " << nbuf << std::endl; std::cout << " from filesystem: " << nfiles - nerrors - nbuf << std::endl; }