From f532428487a3e758f223bb330161331dfd092feb Mon Sep 17 00:00:00 2001 From: Carsten Patzke <carsten.patzke@desy.de> Date: Mon, 16 Aug 2021 15:37:13 +0200 Subject: [PATCH] Added PipelineId Arugment to getnext and dummy_data_producer --- examples/consumer/getnext/getnext.cpp | 13 +++++--- .../dummy_data_producer.cpp | 33 +++++++++++-------- 2 files changed, 27 insertions(+), 19 deletions(-) diff --git a/examples/consumer/getnext/getnext.cpp b/examples/consumer/getnext/getnext.cpp index 29746118a..e510e4e7e 100644 --- a/examples/consumer/getnext/getnext.cpp +++ b/examples/consumer/getnext/getnext.cpp @@ -43,6 +43,7 @@ struct Args { bool read_data; bool datasets; bool need_beamtime_meta = false; + std::string pipeline_name = "GetNextPipelineStep"; }; class LatchedTimer { @@ -104,7 +105,7 @@ StartThreads(const Args& params, std::vector<int>* nfiles, std::vector<int>* err params.file_path, true, asapo::SourceCredentials{asapo::SourceType::kProcessed, - "auto", "auto", + "auto", params.pipeline_name, params.beamtime_id, "", params.data_source, params.token}, &err); @@ -269,11 +270,10 @@ void TryGetStream(Args* args) { int main(int argc, char* argv[]) { Args params; params.datasets = false; - if (argc != 8 && argc != 9 && argc != 10) { + if (argc != 8 && argc != 9 && argc != 10 && argc != 11) { std::cout << "Usage: " + std::string{argv[0]} - + " <server> <files_path> <run_name> <nthreads> <token> <timeout ms> <metaonly> [use datasets] [send metadata]" - << - std::endl; + + " <server> <files_path> <beamtime_id[%<data_source>[%<token>]]> <nthreads> <token> <timeout ms> <metaonly> [use datasets] [send metadata] [pipelinename]" + << std::endl; exit(EXIT_FAILURE); } params.server = std::string{argv[1]}; @@ -290,6 +290,9 @@ int main(int argc, char* argv[]) { if (argc == 10) { params.need_beamtime_meta = atoi(argv[9]) == 1; } + if (argc == 11) { + params.pipeline_name = argv[10]; + } if (params.read_data) { std::cout << "Will read metadata+payload" << std::endl; diff --git a/examples/producer/dummy-data-producer/dummy_data_producer.cpp b/examples/producer/dummy-data-producer/dummy_data_producer.cpp index 1ebacbc77..ca2a723fb 100644 --- a/examples/producer/dummy-data-producer/dummy_data_producer.cpp +++ b/examples/producer/dummy-data-producer/dummy_data_producer.cpp @@ -26,20 +26,22 @@ struct Args { uint64_t mode; uint64_t timeout_ms; uint64_t messages_in_set; + std::string pipeline_name = "DummyDataProducerPipelineStep"; }; void PrintCommandArguments(const Args& args) { std::cout << "discovery_service_endpoint: " << args.discovery_service_endpoint << std::endl - << "beamtime_id: " << args.beamtime_id << std::endl - << "Package size: " << args.number_of_bytes / 1000 << "k" << std::endl - << "iterations: " << args.iterations << std::endl - << "nthreads: " << (int)args.nthreads << std::endl - << "mode: " << args.mode << std::endl - << "Write files: " << ((args.mode % 100) / 10 == 1) << std::endl - << "Tcp mode: " << ((args.mode % 10) == 0 ) << std::endl - << "Raw: " << (args.mode / 100 == 1) << std::endl - << "timeout: " << args.timeout_ms << std::endl - << "messages in set: " << args.messages_in_set << std::endl + << "beamtime_id: " << args.beamtime_id << std::endl + << "Package size: " << args.number_of_bytes / 1000 << "k" << std::endl + << "iterations: " << args.iterations << std::endl + << "nthreads: " << (int)args.nthreads << std::endl + << "mode: " << args.mode << std::endl + << "Write files: " << ((args.mode % 100) / 10 == 1) << std::endl + << "Tcp mode: " << ((args.mode % 10) == 0 ) << std::endl + << "Raw: " << (args.mode / 100 == 1) << std::endl + << "timeout: " << args.timeout_ms << std::endl + << "messages in set: " << args.messages_in_set << std::endl + << "pipeline: " << args.pipeline_name << std::endl << std::endl; } @@ -69,10 +71,10 @@ void TryGetDataSourceAndToken(Args* args) { void ProcessCommandArguments(int argc, char* argv[], Args* args) { - if (argc != 8 && argc != 9) { + if (argc != 8 && argc != 9 && argc != 10) { std::cout << "Usage: " << argv[0] << - " <destination> <beamtime_id[%<data_source>%<token>]> <number_of_kbyte> <iterations> <nthreads>" + " <destination> <beamtime_id[%<data_source>[%<token>]]> <number_of_kbyte> <iterations> <nthreads> [pipeline_name]" " <mode 0xx - processed source type, 1xx - raw source type, xx0 -t tcp, xx1 - filesystem, x0x - write files, x1x - do not write files> <timeout (sec)> [n messages in set (default 1)]" << std::endl; exit(EXIT_FAILURE); @@ -86,11 +88,14 @@ void ProcessCommandArguments(int argc, char* argv[], Args* args) { args->nthreads = static_cast<uint8_t>(std::stoi(argv[5])); args->mode = std::stoull(argv[6]); args->timeout_ms = std::stoull(argv[7]) * 1000; - if (argc == 9) { + if (argc >= 9) { args->messages_in_set = std::stoull(argv[8]); } else { args->messages_in_set = 1; } + if (argc >= 10) { + args->pipeline_name = argv[9]; + } PrintCommandArguments(*args); return; } catch(std::exception& e) { @@ -198,7 +203,7 @@ std::unique_ptr<asapo::Producer> CreateProducer(const Args& args) { auto producer = asapo::Producer::Create(args.discovery_service_endpoint, args.nthreads, args.mode % 10 == 0 ? asapo::RequestHandlerType::kTcp : asapo::RequestHandlerType::kFilesystem, asapo::SourceCredentials{args.mode / 100 == 0 ? asapo::SourceType::kProcessed : asapo::SourceType::kRaw, - "auto", "auto", args.beamtime_id, "", args.data_source, args.token }, + "auto", args.pipeline_name, args.beamtime_id, "", args.data_source, args.token }, 3600000, &err); if(err) { std::cerr << "Cannot start producer. ProducerError: " << err << std::endl; -- GitLab