From b78ce272d801b296e188d75232d9e1901ae64934 Mon Sep 17 00:00:00 2001
From: Sergey Yakubov <sergey.yakubov@desy.de>
Date: Thu, 31 May 2018 09:46:29 +0200
Subject: [PATCH] refactor dummy producer

---
 .../dummy_data_producer.cpp                   | 127 ++++++++++--------
 1 file changed, 72 insertions(+), 55 deletions(-)

diff --git a/examples/producer/dummy-data-producer/dummy_data_producer.cpp b/examples/producer/dummy-data-producer/dummy_data_producer.cpp
index 3917685a1..5add8c660 100644
--- a/examples/producer/dummy-data-producer/dummy_data_producer.cpp
+++ b/examples/producer/dummy-data-producer/dummy_data_producer.cpp
@@ -11,10 +11,27 @@
 using std::chrono::high_resolution_clock;
 
 std::mutex mutex;
-int nfiles;
+int iterations_remained;
 
-typedef std::tuple<std::string, size_t, uint64_t, uint64_t, uint64_t> ArgumentTuple;
-ArgumentTuple ProcessCommandArguments(int argc, char* argv[]) {
+struct Args {
+    std::string receiver_address;
+    size_t number_of_bytes;
+    uint64_t iterations;
+    uint64_t nthreads;
+    uint64_t mode;
+};
+
+void PrintCommandArguments(const Args& args) {
+    std::cout << "receiver_address: " << args.receiver_address << std::endl
+              << "Package size: " << args.number_of_bytes / 1024 << "k" << std::endl
+              << "iterations: " << args.iterations << std::endl
+              << "nthreads: " << args.nthreads << std::endl
+              << "mode: " << args.mode << std::endl
+              << std::endl;
+}
+
+
+void ProcessCommandArguments(int argc, char* argv[], Args* args) {
     if (argc != 6) {
         std::cout <<
                   "Usage: " << argv[0] << " <destination> <number_of_byte> <iterations> <nthreads> <mode 0 -t tcp, 1 - filesystem>"
@@ -22,7 +39,13 @@ ArgumentTuple ProcessCommandArguments(int argc, char* argv[]) {
         exit(EXIT_FAILURE);
     }
     try {
-        return ArgumentTuple(argv[1], std::stoull(argv[2]), std::stoull(argv[3]), std::stoull(argv[4]), std::stoull(argv[5]));
+        args->receiver_address = argv[1];
+        args->number_of_bytes = std::stoull(argv[2]) * 1024;
+        args->iterations = std::stoull(argv[3]);
+        args->nthreads = std::stoull(argv[4]);
+        args->mode = std::stoull(argv[5]);
+        PrintCommandArguments(*args);
+        return;
     } catch(std::exception& e) {
         std::cerr << "Fail to parse arguments" << std::endl;
         std::cerr << e.what() << std::endl;
@@ -30,83 +53,49 @@ ArgumentTuple ProcessCommandArguments(int argc, char* argv[]) {
     }
 }
 
-void work(asapo::GenericRequestHeader header, asapo::Error err) {
+void ProcessAfterSend(asapo::GenericRequestHeader header, asapo::Error err) {
     mutex.lock();
-    nfiles--;
+    iterations_remained--;
     if (err) {
         std::cerr << "File was not successfully send: " << err << std::endl;
-        //nfiles = 0;
         mutex.unlock();
         return;
     }
-    // std::cerr << "File was successfully send." << header.data_id << std::endl;
     mutex.unlock();
 }
 
 bool SendDummyData(asapo::Producer* producer, uint8_t* data, size_t number_of_byte, uint64_t iterations) {
 
     for(uint64_t i = 0; i < iterations; i++) {
-//        std::cerr << "Send file " << i + 1 << "/" << iterations << std::endl;
-        auto err = producer->Send(i + 1, data, number_of_byte, std::to_string(i), &work);
+        auto err = producer->Send(i + 1, data, number_of_byte, std::to_string(i), &ProcessAfterSend);
         if (err) {
             std::cerr << "Cannot send file: " << err << std::endl;
             return false;
         }
     }
-
     return true;
 }
 
-int main (int argc, char* argv[]) {
-    std::string receiver_address;
-    size_t number_of_kbytes;
-    uint64_t iterations;
-    uint64_t nthreads;
-    uint64_t mode;
-
-    std::tie(receiver_address, number_of_kbytes, iterations, nthreads, mode) = ProcessCommandArguments(argc, argv);
-
-    std::cout << "receiver_address: " << receiver_address << std::endl
-              << "Package size: " << number_of_kbytes << "k" << std::endl
-              << "iterations: " << iterations << std::endl
-              << "nthreads: " << nthreads << std::endl
-              << "mode: " << mode << std::endl
-              << std::endl;
-
-    nfiles = iterations;
-
+std::unique_ptr<asapo::Producer> CreateProducer(const Args& args) {
     asapo::Error err;
-    std::unique_ptr<asapo::Producer> producer;
-    if (mode == 0) {
-        producer = asapo::Producer::Create(receiver_address, nthreads, asapo::RequestHandlerType::kTcp, &err);
-    } else {
-        producer = asapo::Producer::Create(receiver_address, nthreads, asapo::RequestHandlerType::kFilesystem, &err);
-    }
-    producer->EnableLocalLog(true);
-    producer->SetLogLevel(asapo::LogLevel::Debug);
-    std::this_thread::sleep_for(std::chrono::milliseconds(1000));
-
-
-    size_t number_of_byte = number_of_kbytes * 1024;
-    auto buffer = std::unique_ptr<uint8_t>(new uint8_t[number_of_byte]);
-
-
+    auto producer = asapo::Producer::Create(args.receiver_address, args.nthreads,
+                                            args.mode == 0 ? asapo::RequestHandlerType::kTcp : asapo::RequestHandlerType::kFilesystem, &err);
     if(err) {
         std::cerr << "Cannot start producer. ProducerError: " << err << std::endl;
-        return EXIT_FAILURE;
+        exit(EXIT_FAILURE);
     }
 
-    high_resolution_clock::time_point t1 = high_resolution_clock::now();
-    if(!SendDummyData(producer.get(), buffer.get(), number_of_byte, iterations)) {
-        return EXIT_FAILURE;
-    }
+    producer->EnableLocalLog(true);
+    producer->SetLogLevel(asapo::LogLevel::Debug);
+    return producer;
+}
 
+void WaitThreadsFinished(const Args& args) {
     uint64_t elapsed_ms = 0;
     uint64_t timeout_sec = 30;
-
     while (true) {
         mutex.lock();
-        if (nfiles <= 0) {
+        if (iterations_remained <= 0) {
             mutex.unlock();
             break;
         }
@@ -115,16 +104,44 @@ int main (int argc, char* argv[]) {
         elapsed_ms += 100;
         if (elapsed_ms > timeout_sec * 1000) {
             std::cerr << "Exit on timeout " << std::endl;
-            return EXIT_FAILURE;
+            exit(EXIT_FAILURE);
         }
     }
 
+}
+
+void PrintOutput(const Args& args, const high_resolution_clock::time_point& start) {
     high_resolution_clock::time_point t2 = high_resolution_clock::now();
-    double duration_sec = std::chrono::duration_cast<std::chrono::milliseconds>( t2 - t1 ).count() / 1000.0;
-    double size_gb = double(number_of_kbytes) * iterations / 1024.0 / 1024.0 * 8.0;
-    double rate = iterations / duration_sec;
+    double duration_sec = std::chrono::duration_cast<std::chrono::milliseconds>( t2 - start ).count() / 1000.0;
+    double size_gb = double(args.number_of_bytes) * args.iterations / 1024.0  / 1024.0 / 1024.0 * 8.0;
+    double rate = args.iterations / duration_sec;
     std::cout << "Rate: " << rate << " Hz" << std::endl;
     std::cout << "Bandwidth " << size_gb / duration_sec << " Gbit/s" << std::endl;
+}
+
+
+std::unique_ptr<uint8_t> CreateMemoryBuffer(const Args& args) {
+    return std::unique_ptr<uint8_t>(new uint8_t[args.number_of_bytes]);
+}
+
+int main (int argc, char* argv[]) {
+    Args args;
+    ProcessCommandArguments(argc, argv, &args);
+
+    auto producer = CreateProducer(args);
+
+    iterations_remained = args.iterations;
+
+    auto buffer = CreateMemoryBuffer(args);
+
+    high_resolution_clock::time_point start_time = high_resolution_clock::now();
+
+    if(!SendDummyData(producer.get(), buffer.get(), args.number_of_bytes, args.iterations)) {
+        return EXIT_FAILURE;
+    }
+
+    WaitThreadsFinished(args);
+    PrintOutput(args, start_time);
 
     return EXIT_SUCCESS;
 }
-- 
GitLab