Skip to content
Snippets Groups Projects
Commit f5324284 authored by Carsten Patzke's avatar Carsten Patzke
Browse files

Added PipelineId Arugment to getnext and dummy_data_producer

parent b3553526
No related branches found
No related tags found
No related merge requests found
...@@ -43,6 +43,7 @@ struct Args { ...@@ -43,6 +43,7 @@ struct Args {
bool read_data; bool read_data;
bool datasets; bool datasets;
bool need_beamtime_meta = false; bool need_beamtime_meta = false;
std::string pipeline_name = "GetNextPipelineStep";
}; };
class LatchedTimer { class LatchedTimer {
...@@ -104,7 +105,7 @@ StartThreads(const Args& params, std::vector<int>* nfiles, std::vector<int>* err ...@@ -104,7 +105,7 @@ StartThreads(const Args& params, std::vector<int>* nfiles, std::vector<int>* err
params.file_path, params.file_path,
true, true,
asapo::SourceCredentials{asapo::SourceType::kProcessed, asapo::SourceCredentials{asapo::SourceType::kProcessed,
"auto", "auto", "auto", params.pipeline_name,
params.beamtime_id, "", params.beamtime_id, "",
params.data_source, params.token}, params.data_source, params.token},
&err); &err);
...@@ -269,11 +270,10 @@ void TryGetStream(Args* args) { ...@@ -269,11 +270,10 @@ void TryGetStream(Args* args) {
int main(int argc, char* argv[]) { int main(int argc, char* argv[]) {
Args params; Args params;
params.datasets = false; params.datasets = false;
if (argc != 8 && argc != 9 && argc != 10) { if (argc != 8 && argc != 9 && argc != 10 && argc != 11) {
std::cout << "Usage: " + std::string{argv[0]} std::cout << "Usage: " + std::string{argv[0]}
+ " <server> <files_path> <run_name> <nthreads> <token> <timeout ms> <metaonly> [use datasets] [send metadata]" + " <server> <files_path> <beamtime_id[%<data_source>[%<token>]]> <nthreads> <token> <timeout ms> <metaonly> [use datasets] [send metadata] [pipelinename]"
<< << std::endl;
std::endl;
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
params.server = std::string{argv[1]}; params.server = std::string{argv[1]};
...@@ -290,6 +290,9 @@ int main(int argc, char* argv[]) { ...@@ -290,6 +290,9 @@ int main(int argc, char* argv[]) {
if (argc == 10) { if (argc == 10) {
params.need_beamtime_meta = atoi(argv[9]) == 1; params.need_beamtime_meta = atoi(argv[9]) == 1;
} }
if (argc == 11) {
params.pipeline_name = argv[10];
}
if (params.read_data) { if (params.read_data) {
std::cout << "Will read metadata+payload" << std::endl; std::cout << "Will read metadata+payload" << std::endl;
......
...@@ -26,20 +26,22 @@ struct Args { ...@@ -26,20 +26,22 @@ struct Args {
uint64_t mode; uint64_t mode;
uint64_t timeout_ms; uint64_t timeout_ms;
uint64_t messages_in_set; uint64_t messages_in_set;
std::string pipeline_name = "DummyDataProducerPipelineStep";
}; };
void PrintCommandArguments(const Args& args) { void PrintCommandArguments(const Args& args) {
std::cout << "discovery_service_endpoint: " << args.discovery_service_endpoint << std::endl std::cout << "discovery_service_endpoint: " << args.discovery_service_endpoint << std::endl
<< "beamtime_id: " << args.beamtime_id << std::endl << "beamtime_id: " << args.beamtime_id << std::endl
<< "Package size: " << args.number_of_bytes / 1000 << "k" << std::endl << "Package size: " << args.number_of_bytes / 1000 << "k" << std::endl
<< "iterations: " << args.iterations << std::endl << "iterations: " << args.iterations << std::endl
<< "nthreads: " << (int)args.nthreads << std::endl << "nthreads: " << (int)args.nthreads << std::endl
<< "mode: " << args.mode << std::endl << "mode: " << args.mode << std::endl
<< "Write files: " << ((args.mode % 100) / 10 == 1) << std::endl << "Write files: " << ((args.mode % 100) / 10 == 1) << std::endl
<< "Tcp mode: " << ((args.mode % 10) == 0 ) << std::endl << "Tcp mode: " << ((args.mode % 10) == 0 ) << std::endl
<< "Raw: " << (args.mode / 100 == 1) << std::endl << "Raw: " << (args.mode / 100 == 1) << std::endl
<< "timeout: " << args.timeout_ms << std::endl << "timeout: " << args.timeout_ms << std::endl
<< "messages in set: " << args.messages_in_set << std::endl << "messages in set: " << args.messages_in_set << std::endl
<< "pipeline: " << args.pipeline_name << std::endl
<< std::endl; << std::endl;
} }
...@@ -69,10 +71,10 @@ void TryGetDataSourceAndToken(Args* args) { ...@@ -69,10 +71,10 @@ void TryGetDataSourceAndToken(Args* args) {
void ProcessCommandArguments(int argc, char* argv[], Args* args) { void ProcessCommandArguments(int argc, char* argv[], Args* args) {
if (argc != 8 && argc != 9) { if (argc != 8 && argc != 9 && argc != 10) {
std::cout << std::cout <<
"Usage: " << argv[0] << "Usage: " << argv[0] <<
" <destination> <beamtime_id[%<data_source>%<token>]> <number_of_kbyte> <iterations> <nthreads>" " <destination> <beamtime_id[%<data_source>[%<token>]]> <number_of_kbyte> <iterations> <nthreads> [pipeline_name]"
" <mode 0xx - processed source type, 1xx - raw source type, xx0 -t tcp, xx1 - filesystem, x0x - write files, x1x - do not write files> <timeout (sec)> [n messages in set (default 1)]" " <mode 0xx - processed source type, 1xx - raw source type, xx0 -t tcp, xx1 - filesystem, x0x - write files, x1x - do not write files> <timeout (sec)> [n messages in set (default 1)]"
<< std::endl; << std::endl;
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
...@@ -86,11 +88,14 @@ void ProcessCommandArguments(int argc, char* argv[], Args* args) { ...@@ -86,11 +88,14 @@ void ProcessCommandArguments(int argc, char* argv[], Args* args) {
args->nthreads = static_cast<uint8_t>(std::stoi(argv[5])); args->nthreads = static_cast<uint8_t>(std::stoi(argv[5]));
args->mode = std::stoull(argv[6]); args->mode = std::stoull(argv[6]);
args->timeout_ms = std::stoull(argv[7]) * 1000; args->timeout_ms = std::stoull(argv[7]) * 1000;
if (argc == 9) { if (argc >= 9) {
args->messages_in_set = std::stoull(argv[8]); args->messages_in_set = std::stoull(argv[8]);
} else { } else {
args->messages_in_set = 1; args->messages_in_set = 1;
} }
if (argc >= 10) {
args->pipeline_name = argv[9];
}
PrintCommandArguments(*args); PrintCommandArguments(*args);
return; return;
} catch(std::exception& e) { } catch(std::exception& e) {
...@@ -198,7 +203,7 @@ std::unique_ptr<asapo::Producer> CreateProducer(const Args& args) { ...@@ -198,7 +203,7 @@ std::unique_ptr<asapo::Producer> CreateProducer(const Args& args) {
auto producer = asapo::Producer::Create(args.discovery_service_endpoint, args.nthreads, auto producer = asapo::Producer::Create(args.discovery_service_endpoint, args.nthreads,
args.mode % 10 == 0 ? asapo::RequestHandlerType::kTcp : asapo::RequestHandlerType::kFilesystem, args.mode % 10 == 0 ? asapo::RequestHandlerType::kTcp : asapo::RequestHandlerType::kFilesystem,
asapo::SourceCredentials{args.mode / 100 == 0 ? asapo::SourceType::kProcessed : asapo::SourceType::kRaw, asapo::SourceCredentials{args.mode / 100 == 0 ? asapo::SourceType::kProcessed : asapo::SourceType::kRaw,
"auto", "auto", args.beamtime_id, "", args.data_source, args.token }, "auto", args.pipeline_name, args.beamtime_id, "", args.data_source, args.token },
3600000, &err); 3600000, &err);
if(err) { if(err) {
std::cerr << "Cannot start producer. ProducerError: " << err << std::endl; std::cerr << "Cannot start producer. ProducerError: " << err << std::endl;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment