Skip to content
Snippets Groups Projects
Commit 7e9eccc2 authored by Sergey Yakubov's avatar Sergey Yakubov
Browse files

update getlat test for datasets

parent 8e74a6a4
Branches
Tags v3.0-3
No related merge requests found
......@@ -6,17 +6,16 @@
#include <chrono>
#include <iomanip>
#include <numeric>
#include <mutex>
#include "asapo_worker.h"
#include <mutex>
using std::chrono::system_clock;
using asapo::Error;
std::string group_id = "";
std::mutex lock;
using std::chrono::system_clock;
using asapo::Error;
struct Params {
std::string server;
std::string file_path;
......@@ -25,6 +24,7 @@ struct Params {
int timeout_ms;
int nthreads;
bool read_data;
bool datasets;
};
void WaitThreads(std::vector<std::thread>* threads) {
......@@ -42,47 +42,57 @@ int ProcessError(const Error& err) {
std::vector<std::thread> StartThreads(const Params& params,
std::vector<int>* nfiles,
std::vector<int>* errors,
std::vector<int>* nbuf) {
auto exec_next = [&params, nfiles, errors, nbuf](int i) {
asapo::FileInfo fi;
Error err;
auto broker = asapo::DataBrokerFactory::CreateServerBroker(params.server, params.file_path, params.beamtime_id,
params.token, &err);
broker->SetTimeout(params.timeout_ms);
asapo::FileData data;
lock.lock();
if (group_id.empty()) {
group_id = broker->GenerateNewGroupId(&err);
if (err) {
(*errors)[i] += ProcessError(err);
return;
}
}
lock.unlock();
auto start = system_clock::now();
while (std::chrono::duration_cast<std::chrono::milliseconds>(system_clock::now() - start).count() <
params.timeout_ms) {
err = broker->GetLast(&fi, group_id, params.read_data ? &data : nullptr);
if (err == nullptr) {
(*nbuf)[i] += fi.buf_id == 0 ? 0 : 1;
if (params.read_data && (*nfiles)[i] < 10 && fi.size < 10) {
if (data != nullptr) {
data[9] = 0;
std::cout << "Received: " << reinterpret_cast<char const*>(data.get()) << std::endl;
}
}
} else {
(*errors)[i] += ProcessError(err);
if (err == asapo::IOErrorTemplates::kTimeout) {
break;
}
}
(*nfiles)[i]++;
}
std::vector<int>* nbuf,
std::vector<int>* nfiles_total) {
auto exec_next = [&params, nfiles, errors, nbuf, nfiles_total](int i) {
asapo::FileInfo fi;
Error err;
auto broker = asapo::DataBrokerFactory::CreateServerBroker(params.server, params.file_path, params.beamtime_id,
params.token, &err);
broker->SetTimeout((uint64_t) params.timeout_ms);
asapo::FileData data;
lock.lock();
if (group_id.empty()) {
group_id = broker->GenerateNewGroupId(&err);
if (err) {
(*errors)[i] += ProcessError(err);
return;
}
}
lock.unlock();
auto start = system_clock::now();
while (std::chrono::duration_cast<std::chrono::milliseconds>(system_clock::now() - start).count() <
params.timeout_ms) {
if (params.datasets) {
auto dataset = broker->GetLastDataset(group_id, &err);
if (err == nullptr) {
for (auto& fi : dataset.content) {
(*nbuf)[i] += fi.buf_id == 0 ? 0 : 1;
(*nfiles_total)[i]++;
}
}
} else {
err = broker->GetLast(&fi, group_id, params.read_data ? &data : nullptr);
if (err == nullptr) {
(*nbuf)[i] += fi.buf_id == 0 ? 0 : 1;
if (params.read_data && (*nfiles)[i] < 10 && fi.size < 10) {
data[9] = 0;
std::cout << "Received: " << reinterpret_cast<char const*>(data.get()) << std::endl;
}
}
}
if (err) {
(*errors)[i] += ProcessError(err);
if (err == asapo::IOErrorTemplates::kTimeout) {
break;
}
}
(*nfiles)[i]++;
}
};
std::vector<std::thread> threads;
......@@ -92,21 +102,22 @@ std::vector<std::thread> StartThreads(const Params& params,
return threads;
}
int ReadAllData(const Params& params, uint64_t* duration_ms, int* nerrors, int* nbuf) {
int ReadAllData(const Params& params, uint64_t* duration_ms, int* nerrors, int* nbuf, int* nfiles_total) {
asapo::FileInfo fi;
system_clock::time_point t1 = system_clock::now();
std::vector<int> nfiles(params.nthreads, 0);
std::vector<int> errors(params.nthreads, 0);
std::vector<int> nfiles_frombuf(params.nthreads, 0);
std::vector<int> nfiles_total_in_datasets(params.nthreads, 0);
auto threads = StartThreads(params, &nfiles, &errors, &nfiles_frombuf);
auto threads = StartThreads(params, &nfiles, &errors, &nfiles_frombuf, &nfiles_total_in_datasets);
WaitThreads(&threads);
int n_total = std::accumulate(nfiles.begin(), nfiles.end(), 0);
*nerrors = std::accumulate(errors.begin(), errors.end(), 0);
*nbuf = std::accumulate(nfiles_frombuf.begin(), nfiles_frombuf.end(), 0);
*nfiles_total = std::accumulate(nfiles_total_in_datasets.begin(), nfiles_total_in_datasets.end(), 0);
system_clock::time_point t2 = system_clock::now();
auto duration_read = std::chrono::duration_cast<std::chrono::milliseconds>(t2 - t1);
......@@ -116,14 +127,15 @@ int ReadAllData(const Params& params, uint64_t* duration_ms, int* nerrors, int*
int main(int argc, char* argv[]) {
asapo::ExitAfterPrintVersionIfNeeded("GetLast Broker Example", argc, argv);
if (argc != 8) {
Params params;
params.datasets = false;
if (argc != 8 && argc != 9) {
std::cout << "Usage: " + std::string{argv[0]}
+ " <server> <files_path> <run_name> <nthreads> <token> <timeout ms> <metaonly>"
+ " <server> <files_path> <run_name> <nthreads> <token> <timeout ms> <metaonly> [use datasets]"
<<
std::endl;
exit(EXIT_FAILURE);
}
Params params;
params.server = std::string{argv[1]};
params.file_path = std::string{argv[2]};
params.beamtime_id = std::string{argv[3]};
......@@ -131,14 +143,18 @@ int main(int argc, char* argv[]) {
params.token = std::string{argv[5]};
params.timeout_ms = atoi(argv[6]);
params.read_data = atoi(argv[7]) != 1;
if (argc == 9) {
params.datasets = atoi(argv[8]) == 1;
}
uint64_t duration_ms;
int nerrors, nbuf;
auto nfiles = ReadAllData(params, &duration_ms, &nerrors, &nbuf);
std::cout << "Processed " << nfiles << " file(s)" << std::endl;
int nerrors, nbuf, nfiles_total;
auto nfiles = ReadAllData(params, &duration_ms, &nerrors, &nbuf, &nfiles_total);
std::cout << "Processed " << nfiles << (params.datasets ? " dataset(s)" : " file(s)") << std::endl;
if (params.datasets) {
std::cout << " with " << nfiles_total << " file(s)" << std::endl;
}
std::cout << "Successfully: " << nfiles - nerrors << std::endl;
if (params.read_data) {
if (params.read_data && !params.datasets) {
std::cout << " from memory buffer: " << nbuf << std::endl;
std::cout << " from filesystem: " << nfiles - nerrors - nbuf << std::endl;
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment