From 8d4c34e1a753298c06b30cb733bf474ffdc482ef Mon Sep 17 00:00:00 2001
From: Sergey Yakubov <sergey.yakubov@desy.de>
Date: Fri, 29 Oct 2021 11:06:35 +0200
Subject: [PATCH] refactor examples

---
 examples/pipeline/in_to_out/in_to_out.cpp     |  41 +++--
 .../dummy_data_producer.cpp                   | 152 ++++++++++--------
 2 files changed, 107 insertions(+), 86 deletions(-)

diff --git a/examples/pipeline/in_to_out/in_to_out.cpp b/examples/pipeline/in_to_out/in_to_out.cpp
index f69f41656..ac47de215 100644
--- a/examples/pipeline/in_to_out/in_to_out.cpp
+++ b/examples/pipeline/in_to_out/in_to_out.cpp
@@ -205,7 +205,23 @@ std::unique_ptr<asapo::Producer> CreateProducer(const Args& args) {
     return producer;
 }
 
-int main(int argc, char* argv[]) {
+void OutputResults(const Args& args, int nfiles, int nerrors, uint64_t duration_ms, uint64_t duration_streamout ) {
+    std::cout << "Data source in " << std::endl;
+    std::cout << "  Processed " << nfiles << " file(s)" << std::endl;
+    std::cout << "  Successfully: " << nfiles - nerrors << std::endl;
+    std::cout << "  Errors : " << nerrors << std::endl;
+    std::cout << "  Elapsed : " << duration_ms - static_cast<unsigned long long int>(args.timeout_ms) << "ms" << std::endl;
+    std::cout << "  Rate : " << 1000.0f * static_cast<float>(nfiles) / (static_cast<float>(duration_ms
+              - static_cast<unsigned long long int>(args.timeout_ms))) << std::endl;
+
+    std::cout << "Data source out " << std::endl;
+    std::cout << "  Sent " << files_sent << " file(s)" << std::endl;
+    std::cout << "  Elapsed : " << duration_streamout << "ms" << std::endl;
+    std::cout << "  Rate : " << 1000.0f * static_cast<float>(static_cast<uint64_t>(files_sent) / duration_streamout) <<
+              std::endl;
+}
+
+Args GetCommandArgs(int argc, char* argv[]) {
     Args args;
     if (argc != 11) {
         std::cout << "Usage: " + std::string{argv[0]}
@@ -224,7 +240,11 @@ int main(int argc, char* argv[]) {
     args.timeout_ms = atoi(argv[8]);
     args.timeout_ms_producer = atoi(argv[9]);
     args.transfer_data = atoi(argv[10]) == 1;
+    return args;
+}
 
+int main(int argc, char* argv[]) {
+    auto args = GetCommandArgs(argc, argv);
     auto producer = CreateProducer(args);
     files_sent = 0;
     streamout_timer_started = false;
@@ -236,22 +256,11 @@ int main(int argc, char* argv[]) {
     if (producer->WaitRequestsFinished(static_cast<uint64_t>(args.timeout_ms_producer)) != nullptr) {
         std::cerr << "Data source out exit on timeout " << std::endl;
     }
-    auto duration_streamout = std::chrono::duration_cast<std::chrono::milliseconds>(streamout_finish - streamout_start);
 
-    std::cout << "Data source in " << std::endl;
-    std::cout << "  Processed " << nfiles << " file(s)" << std::endl;
-    std::cout << "  Successfully: " << nfiles - nerrors << std::endl;
-    std::cout << "  Errors : " << nerrors << std::endl;
-    std::cout << "  Elapsed : " << duration_ms - static_cast<unsigned long long int>(args.timeout_ms) << "ms" << std::endl;
-    std::cout << "  Rate : " << 1000.0f * static_cast<float>(nfiles) / (static_cast<float>(duration_ms
-              - static_cast<unsigned long long int>(args.timeout_ms))) << std::endl;
-
-    std::cout << "Data source out " << std::endl;
-    std::cout << "  Sent " << files_sent << " file(s)" << std::endl;
-    std::cout << "  Elapsed : " << duration_streamout.count() << "ms" << std::endl;
-    std::cout << "  Rate : " << 1000.0f * static_cast<float>(files_sent / duration_streamout.count()) << std::endl;
-
-    std::this_thread::sleep_for(std::chrono::milliseconds(1000));
+    auto duration_streamout = static_cast<uint64_t>(std::chrono::duration_cast<std::chrono::milliseconds>
+                                                    (streamout_finish - streamout_start).count());
+    OutputResults(args, nfiles, nerrors, duration_ms, duration_streamout);
 
     return (nerrors == 0) && (files_sent == nfiles) ? 0 : 1;
 }
+
diff --git a/examples/producer/dummy-data-producer/dummy_data_producer.cpp b/examples/producer/dummy-data-producer/dummy_data_producer.cpp
index 585d47375..9792fec45 100644
--- a/examples/producer/dummy-data-producer/dummy_data_producer.cpp
+++ b/examples/producer/dummy-data-producer/dummy_data_producer.cpp
@@ -20,23 +20,26 @@ struct Args {
     std::string beamtime_id;
     std::string data_source;
     std::string token;
-    size_t number_of_bytes;
+    size_t data_size;
     uint64_t iterations;
     uint8_t nthreads;
     uint64_t mode;
     uint64_t timeout_ms;
     uint64_t messages_in_set;
+    bool write_files;
+    asapo::SourceType type;
+    asapo::RequestHandlerType handler;
 };
 
 void PrintCommandArguments(const Args& args) {
     std::cout << "discovery_service_endpoint: " << args.discovery_service_endpoint << std::endl
               << "beamtime_id: " << args.beamtime_id << std::endl
-              << "Package size: " << args.number_of_bytes / 1000 << "k" << std::endl
+              << "Package size: " << args.data_size / 1000 << "k" << std::endl
               << "iterations: " << args.iterations << std::endl
               << "nthreads: " << args.nthreads << std::endl
               << "mode: " << args.mode << std::endl
-              << "Write files: " << ((args.mode % 100) / 10 == 1) << std::endl
-              << "Tcp mode: " << ((args.mode % 10) == 0 ) << std::endl
+              << "Write files: " << args.write_files << std::endl
+              << "Tcp mode: " << ((args.mode % 10) == 0) << std::endl
               << "Raw: " << (args.mode / 100 == 1) << std::endl
               << "timeout: " << args.timeout_ms << std::endl
               << "messages in set: " << args.messages_in_set << std::endl
@@ -48,7 +51,7 @@ void TryGetDataSourceAndToken(Args* args) {
     std::string segment;
     std::vector<std::string> seglist;
 
-    while(std::getline(test, segment, '%')) {
+    while (std::getline(test, segment, '%')) {
         seglist.push_back(segment);
     }
     if (seglist.size() == 1) {
@@ -65,9 +68,6 @@ void TryGetDataSourceAndToken(Args* args) {
 
 }
 
-
-
-
 void ProcessCommandArguments(int argc, char* argv[], Args* args) {
     if (argc != 8 && argc != 9) {
         std::cout <<
@@ -81,7 +81,7 @@ void ProcessCommandArguments(int argc, char* argv[], Args* args) {
         args->discovery_service_endpoint = argv[1];
         args->beamtime_id = argv[2];
         TryGetDataSourceAndToken(args);
-        args->number_of_bytes = std::stoull(argv[3]) * 1000;
+        args->data_size = std::stoull(argv[3]) * 1000;
         args->iterations = std::stoull(argv[4]);
         args->nthreads = static_cast<uint8_t>(std::stoi(argv[5]));
         args->mode = std::stoull(argv[6]);
@@ -91,9 +91,13 @@ void ProcessCommandArguments(int argc, char* argv[], Args* args) {
         } else {
             args->messages_in_set = 1;
         }
+        args->write_files = (args->mode % 100) / 10 == 0;
+        args->type = args->mode / 100 == 0 ? asapo::SourceType::kProcessed : asapo::SourceType::kRaw;
+        args->handler = args->mode % 10 == 0 ? asapo::RequestHandlerType::kTcp
+                        : asapo::RequestHandlerType::kFilesystem;
         PrintCommandArguments(*args);
         return;
-    } catch(std::exception& e) {
+    } catch (std::exception& e) {
         std::cerr << "Fail to parse arguments" << std::endl;
         std::cerr << e.what() << std::endl;
         exit(EXIT_FAILURE);
@@ -127,12 +131,55 @@ asapo::MessageData CreateMemoryBuffer(size_t size) {
     return asapo::MessageData(new uint8_t[size]);
 }
 
+asapo::MessageHeader PrepareMessageHeader(uint64_t i, const Args& args) {
+    std::string message_folder = GetStringFromSourceType(args.type) + asapo::kPathSeparator;
+    asapo::MessageHeader message_header{i + 1, args.data_size, std::to_string(i + 1)};
+    std::string meta = "{\"user_meta\":\"test" + std::to_string(i + 1) + "\"}";
+    if (!args.data_source.empty()) {
+        message_header.file_name = args.data_source + "/" + message_header.file_name;
+    }
+    message_header.file_name = message_folder + message_header.file_name;
+    message_header.user_metadata = meta;
+    return message_header;
+}
+
+asapo::Error Send(asapo::Producer* producer, const asapo::MessageHeader& message_header, const Args& args) {
+    auto buffer = CreateMemoryBuffer(args.data_size);
+    return producer->Send(message_header,
+                          std::move(buffer),
+                          args.write_files ? asapo::kDefaultIngestMode : asapo::kTransferData,
+                          "default",
+                          &ProcessAfterSend);
+}
+
+bool SendSingleMesssage(asapo::Producer* producer, uint64_t i, const Args& args) {
+    asapo::MessageHeader message_header = PrepareMessageHeader(i, args);
+    auto err = Send(producer, message_header, args);
+    if (err) {
+        std::cerr << "Cannot send file: " << err << std::endl;
+        return false;
+    }
+    return true;
+}
+
+bool SendDataset(asapo::Producer* producer, uint64_t i, uint64_t id, const Args& args) {
+    asapo::MessageHeader message_header = PrepareMessageHeader(i, args);
+    message_header.dataset_substream = id + 1;
+    message_header.dataset_size = args.messages_in_set;
+    message_header.file_name += "_" + std::to_string(id + 1);
+    auto err = Send(producer, message_header, args);
+    if (err) {
+        std::cerr << "Cannot send file: " << err << std::endl;
+        return false;
+    }
+    return true;
+}
 
-bool SendDummyData(asapo::Producer* producer, size_t number_of_byte, uint64_t iterations, uint64_t messages_in_set,
-                   const std::string& data_source, bool write_files, asapo::SourceType type) {
+
+bool SendDummyData(asapo::Producer* producer, const Args& args) {
 
     asapo::Error err;
-    if (iterations == 0) {
+    if (args.iterations == 0) {
         auto mode = asapo::MetaIngestMode{asapo::MetaIngestOp::kReplace, true};
         err = producer->SendBeamtimeMetadata("{\"dummy_meta\":\"test\"}", mode, &ProcessAfterMetaDataSend);
         if (err) {
@@ -141,65 +188,30 @@ bool SendDummyData(asapo::Producer* producer, size_t number_of_byte, uint64_t it
         }
     }
 
-    std::string message_folder = GetStringFromSourceType(type) + asapo::kPathSeparator;
-
-
-    for (uint64_t i = 0; i < iterations; i++) {
-        asapo::MessageHeader message_header{i + 1, number_of_byte, std::to_string(i + 1)};
-        std::string meta = "{\"user_meta\":\"test" + std::to_string(i + 1) + "\"}";
-        if (messages_in_set == 1) {
-            auto buffer = CreateMemoryBuffer(number_of_byte);
-            if (!data_source.empty()) {
-                message_header.file_name = data_source + "/" + message_header.file_name;
-            }
-            message_header.file_name = message_folder + message_header.file_name;
-            message_header.user_metadata = meta;
-            err = producer->Send(message_header,
-                                 std::move(buffer),
-                                 write_files ? asapo::kDefaultIngestMode :
-                                 asapo::kTransferData,
-                                 "default",
-                                 &ProcessAfterSend);
-            if (err) {
-                std::cerr << "Cannot send file: " << err << std::endl;
-                return false;
-            }
+    for (uint64_t i = 0; i < args.iterations; i++) {
+        bool res = true;
+        if (args.messages_in_set == 1) {
+            res = SendSingleMesssage(producer, i, args);
         } else {
-            for (uint64_t id = 0; id < messages_in_set; id++) {
-                auto buffer = CreateMemoryBuffer(number_of_byte);
-                message_header.dataset_substream = id + 1;
-                message_header.dataset_size = messages_in_set;
-                message_header.message_id = i + 1;
-                message_header.file_name = std::to_string(i + 1) + "_" + std::to_string(id + 1);
-                if (!data_source.empty()) {
-                    message_header.file_name = data_source + "/" + message_header.file_name;
-                }
-                message_header.file_name = message_folder + message_header.file_name;
-                message_header.user_metadata = meta;
-                err =
-                    producer->Send(message_header,
-                                   std::move(buffer),
-                                   write_files ? asapo::kDefaultIngestMode :
-                                   asapo::kTransferData,
-                                   "default",
-                                   &ProcessAfterSend);
-                if (err) {
-                    std::cerr << "Cannot send file: " << err << std::endl;
-                    return false;
-                }
+            for (uint64_t id = 0; id < args.messages_in_set; id++) {
+                res &= SendDataset(producer, i, id, args);
             }
         }
+        if (!res) {
+            return false;
+        }
     }
-    return producer->SendStreamFinishedFlag("default", iterations, "", nullptr) == nullptr;
+    return producer->SendStreamFinishedFlag("default", args.iterations, "", nullptr) == nullptr;
 }
 
 std::unique_ptr<asapo::Producer> CreateProducer(const Args& args) {
     asapo::Error err;
-    auto producer = asapo::Producer::Create(args.discovery_service_endpoint, args.nthreads,
-                                            args.mode % 10 == 0 ? asapo::RequestHandlerType::kTcp : asapo::RequestHandlerType::kFilesystem,
-                                            asapo::SourceCredentials{args.mode / 100 == 0 ? asapo::SourceType::kProcessed : asapo::SourceType::kRaw, args.beamtime_id, "", args.data_source, args.token },
-                                            3600000, &err);
-    if(err) {
+    auto producer = asapo::Producer::Create(args.discovery_service_endpoint, args.nthreads, args.handler,
+    asapo::SourceCredentials{
+        args.type, args.beamtime_id,
+        "", args.data_source, args.token},
+    3600000, &err);
+    if (err) {
         std::cerr << "Cannot start producer. ProducerError: " << err << std::endl;
         exit(EXIT_FAILURE);
     }
@@ -211,16 +223,17 @@ std::unique_ptr<asapo::Producer> CreateProducer(const Args& args) {
 
 void PrintOutput(const Args& args, const system_clock::time_point& start) {
     system_clock::time_point t2 = system_clock::now();
-    double duration_sec = static_cast<double>(std::chrono::duration_cast<std::chrono::milliseconds>(t2 - start).count())
-                          / 1000.0;
-    double size_gb = static_cast<double>(args.number_of_bytes * args.iterations) / 1000.0 / 1000.0 / 1000.0 * 8.0;
+    double duration_sec =
+        static_cast<double>(std::chrono::duration_cast<std::chrono::milliseconds>(t2 - start).count())
+        / 1000.0;
+    double size_gb = static_cast<double>(args.data_size * args.iterations) / 1000.0 / 1000.0 / 1000.0 * 8.0;
     double rate = static_cast<double>(args.iterations) / duration_sec;
     std::cout << "Rate: " << rate << " Hz" << std::endl;
     std::cout << "Bandwidth " << size_gb / duration_sec << " Gbit/s" << std::endl;
     std::cout << "Bandwidth " << size_gb / duration_sec / 8 << " GBytes/s" << std::endl;
 }
 
-int main (int argc, char* argv[]) {
+int main(int argc, char* argv[]) {
     Args args;
     ProcessCommandArguments(argc, argv, &args);
 
@@ -234,8 +247,7 @@ int main (int argc, char* argv[]) {
 
     system_clock::time_point start_time = system_clock::now();
 
-    if(!SendDummyData(producer.get(), args.number_of_bytes, args.iterations, args.messages_in_set, args.data_source,
-                      (args.mode % 100) / 10 == 0, args.mode / 100 == 0 ? asapo::SourceType::kProcessed : asapo::SourceType::kRaw)) {
+    if (!SendDummyData(producer.get(), args)) {
         return EXIT_FAILURE;
     }
 
-- 
GitLab