Skip to content
Snippets Groups Projects
Commit 8d4c34e1 authored by Sergey Yakubov's avatar Sergey Yakubov
Browse files

refactor examples

parent 8aff293d
No related branches found
No related tags found
No related merge requests found
......@@ -205,7 +205,23 @@ std::unique_ptr<asapo::Producer> CreateProducer(const Args& args) {
return producer;
}
int main(int argc, char* argv[]) {
void OutputResults(const Args& args, int nfiles, int nerrors, uint64_t duration_ms, uint64_t duration_streamout ) {
std::cout << "Data source in " << std::endl;
std::cout << " Processed " << nfiles << " file(s)" << std::endl;
std::cout << " Successfully: " << nfiles - nerrors << std::endl;
std::cout << " Errors : " << nerrors << std::endl;
std::cout << " Elapsed : " << duration_ms - static_cast<unsigned long long int>(args.timeout_ms) << "ms" << std::endl;
std::cout << " Rate : " << 1000.0f * static_cast<float>(nfiles) / (static_cast<float>(duration_ms
- static_cast<unsigned long long int>(args.timeout_ms))) << std::endl;
std::cout << "Data source out " << std::endl;
std::cout << " Sent " << files_sent << " file(s)" << std::endl;
std::cout << " Elapsed : " << duration_streamout << "ms" << std::endl;
std::cout << " Rate : " << 1000.0f * static_cast<float>(static_cast<uint64_t>(files_sent) / duration_streamout) <<
std::endl;
}
Args GetCommandArgs(int argc, char* argv[]) {
Args args;
if (argc != 11) {
std::cout << "Usage: " + std::string{argv[0]}
......@@ -224,7 +240,11 @@ int main(int argc, char* argv[]) {
args.timeout_ms = atoi(argv[8]);
args.timeout_ms_producer = atoi(argv[9]);
args.transfer_data = atoi(argv[10]) == 1;
return args;
}
int main(int argc, char* argv[]) {
auto args = GetCommandArgs(argc, argv);
auto producer = CreateProducer(args);
files_sent = 0;
streamout_timer_started = false;
......@@ -236,22 +256,11 @@ int main(int argc, char* argv[]) {
if (producer->WaitRequestsFinished(static_cast<uint64_t>(args.timeout_ms_producer)) != nullptr) {
std::cerr << "Data source out exit on timeout " << std::endl;
}
auto duration_streamout = std::chrono::duration_cast<std::chrono::milliseconds>(streamout_finish - streamout_start);
std::cout << "Data source in " << std::endl;
std::cout << " Processed " << nfiles << " file(s)" << std::endl;
std::cout << " Successfully: " << nfiles - nerrors << std::endl;
std::cout << " Errors : " << nerrors << std::endl;
std::cout << " Elapsed : " << duration_ms - static_cast<unsigned long long int>(args.timeout_ms) << "ms" << std::endl;
std::cout << " Rate : " << 1000.0f * static_cast<float>(nfiles) / (static_cast<float>(duration_ms
- static_cast<unsigned long long int>(args.timeout_ms))) << std::endl;
std::cout << "Data source out " << std::endl;
std::cout << " Sent " << files_sent << " file(s)" << std::endl;
std::cout << " Elapsed : " << duration_streamout.count() << "ms" << std::endl;
std::cout << " Rate : " << 1000.0f * static_cast<float>(files_sent / duration_streamout.count()) << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
auto duration_streamout = static_cast<uint64_t>(std::chrono::duration_cast<std::chrono::milliseconds>
(streamout_finish - streamout_start).count());
OutputResults(args, nfiles, nerrors, duration_ms, duration_streamout);
return (nerrors == 0) && (files_sent == nfiles) ? 0 : 1;
}
......@@ -20,23 +20,26 @@ struct Args {
std::string beamtime_id;
std::string data_source;
std::string token;
size_t number_of_bytes;
size_t data_size;
uint64_t iterations;
uint8_t nthreads;
uint64_t mode;
uint64_t timeout_ms;
uint64_t messages_in_set;
bool write_files;
asapo::SourceType type;
asapo::RequestHandlerType handler;
};
void PrintCommandArguments(const Args& args) {
std::cout << "discovery_service_endpoint: " << args.discovery_service_endpoint << std::endl
<< "beamtime_id: " << args.beamtime_id << std::endl
<< "Package size: " << args.number_of_bytes / 1000 << "k" << std::endl
<< "Package size: " << args.data_size / 1000 << "k" << std::endl
<< "iterations: " << args.iterations << std::endl
<< "nthreads: " << args.nthreads << std::endl
<< "mode: " << args.mode << std::endl
<< "Write files: " << ((args.mode % 100) / 10 == 1) << std::endl
<< "Tcp mode: " << ((args.mode % 10) == 0 ) << std::endl
<< "Write files: " << args.write_files << std::endl
<< "Tcp mode: " << ((args.mode % 10) == 0) << std::endl
<< "Raw: " << (args.mode / 100 == 1) << std::endl
<< "timeout: " << args.timeout_ms << std::endl
<< "messages in set: " << args.messages_in_set << std::endl
......@@ -48,7 +51,7 @@ void TryGetDataSourceAndToken(Args* args) {
std::string segment;
std::vector<std::string> seglist;
while(std::getline(test, segment, '%')) {
while (std::getline(test, segment, '%')) {
seglist.push_back(segment);
}
if (seglist.size() == 1) {
......@@ -65,9 +68,6 @@ void TryGetDataSourceAndToken(Args* args) {
}
void ProcessCommandArguments(int argc, char* argv[], Args* args) {
if (argc != 8 && argc != 9) {
std::cout <<
......@@ -81,7 +81,7 @@ void ProcessCommandArguments(int argc, char* argv[], Args* args) {
args->discovery_service_endpoint = argv[1];
args->beamtime_id = argv[2];
TryGetDataSourceAndToken(args);
args->number_of_bytes = std::stoull(argv[3]) * 1000;
args->data_size = std::stoull(argv[3]) * 1000;
args->iterations = std::stoull(argv[4]);
args->nthreads = static_cast<uint8_t>(std::stoi(argv[5]));
args->mode = std::stoull(argv[6]);
......@@ -91,9 +91,13 @@ void ProcessCommandArguments(int argc, char* argv[], Args* args) {
} else {
args->messages_in_set = 1;
}
args->write_files = (args->mode % 100) / 10 == 0;
args->type = args->mode / 100 == 0 ? asapo::SourceType::kProcessed : asapo::SourceType::kRaw;
args->handler = args->mode % 10 == 0 ? asapo::RequestHandlerType::kTcp
: asapo::RequestHandlerType::kFilesystem;
PrintCommandArguments(*args);
return;
} catch(std::exception& e) {
} catch (std::exception& e) {
std::cerr << "Fail to parse arguments" << std::endl;
std::cerr << e.what() << std::endl;
exit(EXIT_FAILURE);
......@@ -127,12 +131,55 @@ asapo::MessageData CreateMemoryBuffer(size_t size) {
return asapo::MessageData(new uint8_t[size]);
}
asapo::MessageHeader PrepareMessageHeader(uint64_t i, const Args& args) {
std::string message_folder = GetStringFromSourceType(args.type) + asapo::kPathSeparator;
asapo::MessageHeader message_header{i + 1, args.data_size, std::to_string(i + 1)};
std::string meta = "{\"user_meta\":\"test" + std::to_string(i + 1) + "\"}";
if (!args.data_source.empty()) {
message_header.file_name = args.data_source + "/" + message_header.file_name;
}
message_header.file_name = message_folder + message_header.file_name;
message_header.user_metadata = meta;
return message_header;
}
asapo::Error Send(asapo::Producer* producer, const asapo::MessageHeader& message_header, const Args& args) {
auto buffer = CreateMemoryBuffer(args.data_size);
return producer->Send(message_header,
std::move(buffer),
args.write_files ? asapo::kDefaultIngestMode : asapo::kTransferData,
"default",
&ProcessAfterSend);
}
bool SendSingleMesssage(asapo::Producer* producer, uint64_t i, const Args& args) {
asapo::MessageHeader message_header = PrepareMessageHeader(i, args);
auto err = Send(producer, message_header, args);
if (err) {
std::cerr << "Cannot send file: " << err << std::endl;
return false;
}
return true;
}
bool SendDataset(asapo::Producer* producer, uint64_t i, uint64_t id, const Args& args) {
asapo::MessageHeader message_header = PrepareMessageHeader(i, args);
message_header.dataset_substream = id + 1;
message_header.dataset_size = args.messages_in_set;
message_header.file_name += "_" + std::to_string(id + 1);
auto err = Send(producer, message_header, args);
if (err) {
std::cerr << "Cannot send file: " << err << std::endl;
return false;
}
return true;
}
bool SendDummyData(asapo::Producer* producer, size_t number_of_byte, uint64_t iterations, uint64_t messages_in_set,
const std::string& data_source, bool write_files, asapo::SourceType type) {
bool SendDummyData(asapo::Producer* producer, const Args& args) {
asapo::Error err;
if (iterations == 0) {
if (args.iterations == 0) {
auto mode = asapo::MetaIngestMode{asapo::MetaIngestOp::kReplace, true};
err = producer->SendBeamtimeMetadata("{\"dummy_meta\":\"test\"}", mode, &ProcessAfterMetaDataSend);
if (err) {
......@@ -141,65 +188,30 @@ bool SendDummyData(asapo::Producer* producer, size_t number_of_byte, uint64_t it
}
}
std::string message_folder = GetStringFromSourceType(type) + asapo::kPathSeparator;
for (uint64_t i = 0; i < iterations; i++) {
asapo::MessageHeader message_header{i + 1, number_of_byte, std::to_string(i + 1)};
std::string meta = "{\"user_meta\":\"test" + std::to_string(i + 1) + "\"}";
if (messages_in_set == 1) {
auto buffer = CreateMemoryBuffer(number_of_byte);
if (!data_source.empty()) {
message_header.file_name = data_source + "/" + message_header.file_name;
}
message_header.file_name = message_folder + message_header.file_name;
message_header.user_metadata = meta;
err = producer->Send(message_header,
std::move(buffer),
write_files ? asapo::kDefaultIngestMode :
asapo::kTransferData,
"default",
&ProcessAfterSend);
if (err) {
std::cerr << "Cannot send file: " << err << std::endl;
return false;
}
for (uint64_t i = 0; i < args.iterations; i++) {
bool res = true;
if (args.messages_in_set == 1) {
res = SendSingleMesssage(producer, i, args);
} else {
for (uint64_t id = 0; id < messages_in_set; id++) {
auto buffer = CreateMemoryBuffer(number_of_byte);
message_header.dataset_substream = id + 1;
message_header.dataset_size = messages_in_set;
message_header.message_id = i + 1;
message_header.file_name = std::to_string(i + 1) + "_" + std::to_string(id + 1);
if (!data_source.empty()) {
message_header.file_name = data_source + "/" + message_header.file_name;
}
message_header.file_name = message_folder + message_header.file_name;
message_header.user_metadata = meta;
err =
producer->Send(message_header,
std::move(buffer),
write_files ? asapo::kDefaultIngestMode :
asapo::kTransferData,
"default",
&ProcessAfterSend);
if (err) {
std::cerr << "Cannot send file: " << err << std::endl;
return false;
}
for (uint64_t id = 0; id < args.messages_in_set; id++) {
res &= SendDataset(producer, i, id, args);
}
}
if (!res) {
return false;
}
}
return producer->SendStreamFinishedFlag("default", iterations, "", nullptr) == nullptr;
return producer->SendStreamFinishedFlag("default", args.iterations, "", nullptr) == nullptr;
}
std::unique_ptr<asapo::Producer> CreateProducer(const Args& args) {
asapo::Error err;
auto producer = asapo::Producer::Create(args.discovery_service_endpoint, args.nthreads,
args.mode % 10 == 0 ? asapo::RequestHandlerType::kTcp : asapo::RequestHandlerType::kFilesystem,
asapo::SourceCredentials{args.mode / 100 == 0 ? asapo::SourceType::kProcessed : asapo::SourceType::kRaw, args.beamtime_id, "", args.data_source, args.token },
3600000, &err);
if(err) {
auto producer = asapo::Producer::Create(args.discovery_service_endpoint, args.nthreads, args.handler,
asapo::SourceCredentials{
args.type, args.beamtime_id,
"", args.data_source, args.token},
3600000, &err);
if (err) {
std::cerr << "Cannot start producer. ProducerError: " << err << std::endl;
exit(EXIT_FAILURE);
}
......@@ -211,16 +223,17 @@ std::unique_ptr<asapo::Producer> CreateProducer(const Args& args) {
void PrintOutput(const Args& args, const system_clock::time_point& start) {
system_clock::time_point t2 = system_clock::now();
double duration_sec = static_cast<double>(std::chrono::duration_cast<std::chrono::milliseconds>(t2 - start).count())
/ 1000.0;
double size_gb = static_cast<double>(args.number_of_bytes * args.iterations) / 1000.0 / 1000.0 / 1000.0 * 8.0;
double duration_sec =
static_cast<double>(std::chrono::duration_cast<std::chrono::milliseconds>(t2 - start).count())
/ 1000.0;
double size_gb = static_cast<double>(args.data_size * args.iterations) / 1000.0 / 1000.0 / 1000.0 * 8.0;
double rate = static_cast<double>(args.iterations) / duration_sec;
std::cout << "Rate: " << rate << " Hz" << std::endl;
std::cout << "Bandwidth " << size_gb / duration_sec << " Gbit/s" << std::endl;
std::cout << "Bandwidth " << size_gb / duration_sec / 8 << " GBytes/s" << std::endl;
}
int main (int argc, char* argv[]) {
int main(int argc, char* argv[]) {
Args args;
ProcessCommandArguments(argc, argv, &args);
......@@ -234,8 +247,7 @@ int main (int argc, char* argv[]) {
system_clock::time_point start_time = system_clock::now();
if(!SendDummyData(producer.get(), args.number_of_bytes, args.iterations, args.messages_in_set, args.data_source,
(args.mode % 100) / 10 == 0, args.mode / 100 == 0 ? asapo::SourceType::kProcessed : asapo::SourceType::kRaw)) {
if (!SendDummyData(producer.get(), args)) {
return EXIT_FAILURE;
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment