From dbb374c1a3efd76de765c3d12b97bd3533cec896 Mon Sep 17 00:00:00 2001 From: Sergey Yakubov <sergey.yakubov@desy.de> Date: Mon, 18 Jun 2018 20:38:40 +0200 Subject: [PATCH] set beamtime_id only once at producer create --- .../dummy_data_producer.cpp | 18 +++++++++++------- producer/api/include/producer/producer.h | 2 +- .../api/include/producer/producer_error.h | 4 ++++ producer/api/src/producer.cpp | 12 +++++++++--- producer/api/src/producer_impl.cpp | 6 ++++++ producer/api/unittests/test_producer.cpp | 19 +++++++++++++++---- producer/api/unittests/test_producer_impl.cpp | 12 +++++++++++- 7 files changed, 57 insertions(+), 16 deletions(-) diff --git a/examples/producer/dummy-data-producer/dummy_data_producer.cpp b/examples/producer/dummy-data-producer/dummy_data_producer.cpp index 34cfd18b7..531344235 100644 --- a/examples/producer/dummy-data-producer/dummy_data_producer.cpp +++ b/examples/producer/dummy-data-producer/dummy_data_producer.cpp @@ -14,6 +14,7 @@ int iterations_remained; struct Args { std::string receiver_address; + std::string beamtime_id; size_t number_of_bytes; uint64_t iterations; uint64_t nthreads; @@ -22,6 +23,7 @@ struct Args { void PrintCommandArguments(const Args& args) { std::cout << "receiver_address: " << args.receiver_address << std::endl + << "beamtime_id: " << args.beamtime_id << std::endl << "Package size: " << args.number_of_bytes / 1024 << "k" << std::endl << "iterations: " << args.iterations << std::endl << "nthreads: " << args.nthreads << std::endl @@ -31,18 +33,19 @@ void PrintCommandArguments(const Args& args) { void ProcessCommandArguments(int argc, char* argv[], Args* args) { - if (argc != 6) { + if (argc != 7) { std::cout << - "Usage: " << argv[0] << " <destination> <number_of_byte> <iterations> <nthreads> <mode 0 -t tcp, 1 - filesystem>" + "Usage: " << argv[0] << " <destination> <beamtime_id> <number_of_byte> <iterations> <nthreads> <mode 0 -t tcp, 1 - filesystem>" << std::endl; exit(EXIT_FAILURE); } try { args->receiver_address = argv[1]; - args->number_of_bytes = std::stoull(argv[2]) * 1024; - args->iterations = std::stoull(argv[3]); - args->nthreads = std::stoull(argv[4]); - args->mode = std::stoull(argv[5]); + args->beamtime_id = argv[2]; + args->number_of_bytes = std::stoull(argv[3]) * 1024; + args->iterations = std::stoull(argv[4]); + args->nthreads = std::stoull(argv[5]); + args->mode = std::stoull(argv[6]); PrintCommandArguments(*args); return; } catch(std::exception& e) { @@ -78,7 +81,8 @@ bool SendDummyData(asapo::Producer* producer, uint8_t* data, size_t number_of_by std::unique_ptr<asapo::Producer> CreateProducer(const Args& args) { asapo::Error err; auto producer = asapo::Producer::Create(args.receiver_address, args.nthreads, - args.mode == 0 ? asapo::RequestHandlerType::kTcp : asapo::RequestHandlerType::kFilesystem, &err); + args.mode == 0 ? asapo::RequestHandlerType::kTcp : asapo::RequestHandlerType::kFilesystem, + args.beamtime_id,&err); if(err) { std::cerr << "Cannot start producer. ProducerError: " << err << std::endl; exit(EXIT_FAILURE); diff --git a/producer/api/include/producer/producer.h b/producer/api/include/producer/producer.h index a8c808704..e16adf391 100644 --- a/producer/api/include/producer/producer.h +++ b/producer/api/include/producer/producer.h @@ -17,7 +17,7 @@ class Producer { * @return A unique_ptr to a new producer instance */ static std::unique_ptr<Producer> Create(const std::string& endpoint, uint8_t n_processing_threads, - asapo::RequestHandlerType type, + asapo::RequestHandlerType type,std::string beamtime_id, Error* err); virtual ~Producer() = default; diff --git a/producer/api/include/producer/producer_error.h b/producer/api/include/producer/producer_error.h index 6f40336e8..5ed0cdc1e 100644 --- a/producer/api/include/producer/producer_error.h +++ b/producer/api/include/producer/producer_error.h @@ -11,6 +11,7 @@ enum class ProducerErrorType { kFileTooLarge, kFileNameTooLong, kBeamtimeIdTooLong, + kBeamtimeAlreadySet, kFileIdAlreadyInUse, kAuthorizationFailed, kInternalServerError, @@ -82,6 +83,9 @@ auto const kBeamtimeIdTooLong = ProducerErrorTemplate { }; +auto const kBeamtimeAlreadySet = ProducerErrorTemplate { + "beamtime id already set", ProducerErrorType::kBeamtimeAlreadySet +}; auto const kFileIdAlreadyInUse = ProducerErrorTemplate { diff --git a/producer/api/src/producer.cpp b/producer/api/src/producer.cpp index a8e3e38b4..ac8187646 100644 --- a/producer/api/src/producer.cpp +++ b/producer/api/src/producer.cpp @@ -2,15 +2,15 @@ #include "producer_impl.h" std::unique_ptr<asapo::Producer> asapo::Producer::Create(const std::string& endpoint, uint8_t n_processing_threads, - asapo::RequestHandlerType type, Error* err) { + asapo::RequestHandlerType type, std::string beamtime_id, Error* err) { if (n_processing_threads > kMaxProcessingThreads) { *err = TextError("Too many processing threads: " + std::to_string(n_processing_threads)); return nullptr; } + std::unique_ptr<asapo::Producer> producer; try { - *err = nullptr; - return std::unique_ptr<asapo::Producer>(new ProducerImpl(endpoint, n_processing_threads, type)); + producer.reset(new ProducerImpl(endpoint, n_processing_threads, type)); } catch (const std::exception& ex) { *err = TextError(ex.what()); return nullptr; @@ -18,4 +18,10 @@ std::unique_ptr<asapo::Producer> asapo::Producer::Create(const std::string& endp *err = TextError("Unknown exception in producer_api "); return nullptr; } + + *err = producer->SetBeamtimeId(beamtime_id); + if (*err) { + return nullptr; + } + return producer; } diff --git a/producer/api/src/producer_impl.cpp b/producer/api/src/producer_impl.cpp index fd845c385..21f588990 100644 --- a/producer/api/src/producer_impl.cpp +++ b/producer/api/src/producer_impl.cpp @@ -74,6 +74,12 @@ void ProducerImpl::EnableRemoteLog(bool enable) { } Error ProducerImpl::SetBeamtimeId(std::string beamtime_id) { + + if (!beamtime_id_.empty()) { + log__->Error("beamtime_id already set"); + return ProducerErrorTemplates::kBeamtimeAlreadySet.Generate(); + } + if (beamtime_id.size() > kMaxMessageSize) { log__->Error("beamtime_id is too long - "+beamtime_id); return ProducerErrorTemplates::kBeamtimeIdTooLong.Generate(); diff --git a/producer/api/unittests/test_producer.cpp b/producer/api/unittests/test_producer.cpp index 941456e79..ed22fe6e4 100644 --- a/producer/api/unittests/test_producer.cpp +++ b/producer/api/unittests/test_producer.cpp @@ -3,6 +3,7 @@ #include "producer/producer.h" #include "../src/producer_impl.h" +#include "producer/producer_error.h" using ::testing::Ne; using ::testing::Eq; @@ -12,15 +13,25 @@ namespace { TEST(CreateProducer, TcpProducer) { asapo::Error err; std::unique_ptr<asapo::Producer> producer = asapo::Producer::Create("endpoint", 4, asapo::RequestHandlerType::kTcp, - &err); + "bt",&err); ASSERT_THAT(dynamic_cast<asapo::ProducerImpl*>(producer.get()), Ne(nullptr)); ASSERT_THAT(err, Eq(nullptr)); } +TEST(CreateProducer, ErrorBeamtime) { + asapo::Error err; + std::string expected_beamtimeid(asapo::kMaxMessageSize*10,'a'); + std::unique_ptr<asapo::Producer> producer = asapo::Producer::Create("endpoint", 4, asapo::RequestHandlerType::kTcp, + expected_beamtimeid,&err); + ASSERT_THAT(producer, Eq(nullptr)); + ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kBeamtimeIdTooLong)); +} + + TEST(CreateProducer, FileSystemProducer) { asapo::Error err; std::unique_ptr<asapo::Producer> producer = asapo::Producer::Create("endpoint", 4, - asapo::RequestHandlerType::kFilesystem, &err); + asapo::RequestHandlerType::kFilesystem,"bt", &err); ASSERT_THAT(dynamic_cast<asapo::ProducerImpl*>(producer.get()), Ne(nullptr)); ASSERT_THAT(err, Eq(nullptr)); } @@ -29,14 +40,14 @@ TEST(CreateProducer, FileSystemProducer) { TEST(CreateProducer, TooManyThreads) { asapo::Error err; std::unique_ptr<asapo::Producer> producer = asapo::Producer::Create("", asapo::kMaxProcessingThreads + 1, - asapo::RequestHandlerType::kTcp, &err); + asapo::RequestHandlerType::kTcp,"bt", &err); ASSERT_THAT(producer, Eq(nullptr)); ASSERT_THAT(err, Ne(nullptr)); } TEST(Producer, SimpleWorkflowWihoutConnection) { asapo::Error err; - std::unique_ptr<asapo::Producer> producer = asapo::Producer::Create("hello", 5, asapo::RequestHandlerType::kTcp, &err); + std::unique_ptr<asapo::Producer> producer = asapo::Producer::Create("hello", 5, asapo::RequestHandlerType::kTcp,"bt", &err); auto err_send = producer->Send(1, nullptr, 1, "", nullptr); std::this_thread::sleep_for(std::chrono::milliseconds(100)); ASSERT_THAT(producer, Ne(nullptr)); diff --git a/producer/api/unittests/test_producer_impl.cpp b/producer/api/unittests/test_producer_impl.cpp index 1af9d2fb2..106ab92a6 100644 --- a/producer/api/unittests/test_producer_impl.cpp +++ b/producer/api/unittests/test_producer_impl.cpp @@ -110,8 +110,18 @@ TEST_F(ProducerImplTests, ErrorSettingBeamtime) { auto err = producer.SetBeamtimeId(expected_beamtimeid); - ASSERT_THAT(err, Ne(nullptr)); + ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kBeamtimeIdTooLong)); } +TEST_F(ProducerImplTests, ErrorSettingSecondTime) { + EXPECT_CALL(mock_logger, Error(testing::HasSubstr("already"))); + + producer.SetBeamtimeId("1"); + auto err = producer.SetBeamtimeId("2"); + + ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kBeamtimeAlreadySet)); +} + + } -- GitLab