From da347a52ae2ce579a148ccc49130defea625dce6 Mon Sep 17 00:00:00 2001 From: Carsten Patzke <carsten.patzke@desy.de> Date: Thu, 1 Feb 2018 10:04:54 +0100 Subject: [PATCH] Refactored dummy data producer --- common/cpp/include/common/networking.h | 2 +- .../dummy_data_producer.cpp | 50 ++++++++++++------- producer/api/include/producer/producer.h | 5 +- producer/api/src/producer_impl.cpp | 10 +++- producer/api/unittests/test_producer_impl.cpp | 39 +++++++++++++++ receiver/src/network_producer_peer.cpp | 8 --- .../src/network_producer_peer_handlers.cpp | 2 +- 7 files changed, 83 insertions(+), 33 deletions(-) diff --git a/common/cpp/include/common/networking.h b/common/cpp/include/common/networking.h index f477ac9d3..4aa126303 100644 --- a/common/cpp/include/common/networking.h +++ b/common/cpp/include/common/networking.h @@ -16,7 +16,7 @@ enum OpCode : uint8_t { enum NetworkErrorCode : uint16_t { NET_ERR__NO_ERROR, - NET_ERR__FILENAME_ALREADY_IN_USE, + NET_ERR__FILEID_ALREADY_IN_USE, NET_ERR__ALLOCATE_STORAGE_FAILED, NET_ERR__INTERNAL_SERVER_ERROR = 65535, }; diff --git a/examples/producer/dummy-data-producer/dummy_data_producer.cpp b/examples/producer/dummy-data-producer/dummy_data_producer.cpp index 820df34ad..6878b06fd 100644 --- a/examples/producer/dummy-data-producer/dummy_data_producer.cpp +++ b/examples/producer/dummy-data-producer/dummy_data_producer.cpp @@ -1,10 +1,12 @@ #include <producer/producer.h> #include <iostream> -#include <tuple> +#include <chrono> +#include <vector> -typedef std::tuple<std::string, size_t, uint64_t> ArgumentTuple; +using std::chrono::high_resolution_clock; +typedef std::tuple<std::string, size_t, uint64_t> ArgumentTuple; ArgumentTuple ProcessCommandArguments(int argc, char* argv[]) { if (argc != 4) { std::cout << @@ -21,32 +23,27 @@ ArgumentTuple ProcessCommandArguments(int argc, char* argv[]) { } } -int SendDummyData(const std::string& receiver_address, size_t number_of_byte, uint64_t iterations) { - auto producer = hidra2::Producer::create(); - - hidra2::ProducerError err = producer->ConnectToReceiver(receiver_address); - if(err != hidra2::ProducerError::kNoError) { - std::cerr << "Fail to connect to receiver. ProducerError: " /*<< err*/ << std::endl;//TODO - return 1; - } - std::cout << "Successfully connected" << std::endl; - +bool SendDummyData(hidra2::Producer* producer, size_t number_of_byte, uint64_t iterations) { auto buffer = std::unique_ptr<uint8_t>(new uint8_t[number_of_byte]); for(uint64_t i = 0; i < iterations; i++) { std::cout << "Send file " << i + 1 << "/" << iterations << std::endl; - hidra2::ProducerError error; - error = producer->Send(i, buffer.get(), number_of_byte); - if (error != hidra2::ProducerError::kNoError) { - std::cerr << "File was not successfully send, ErrorCode: " /*<< error*/ << std::endl; - break; + hidra2::ProducerError err = producer->Send(i, buffer.get(), number_of_byte); + + if (err != hidra2::ProducerError::kNoError) { + if(err == hidra2::ProducerError::kFileIdAlreadyInUse) { + std::cerr << "File id is already in use." << std::endl; + } else { + std::cerr << "File was not successfully send, ErrorCode: " /*<< error*/ << std::endl; + } + return false; } else { std::cout << "File was successfully send." << std::endl; } } - return 0; + return true; } int main (int argc, char* argv[]) { @@ -60,7 +57,22 @@ int main (int argc, char* argv[]) { << "iterations: " << iterations << std::endl << std::endl; - SendDummyData(receiver_address, number_of_byte, iterations); + auto producer = hidra2::Producer::create(); + hidra2::ProducerError err = producer->ConnectToReceiver(receiver_address); + if(err != hidra2::ProducerError::kNoError) { + if (err == hidra2::ProducerError::kConnectionRefused) { + std::cerr << "Failed to connect to receiver - connection refused. Is the receiver running?" << std::endl; + } else { + std::cerr << "Failed to connect to receiver. ProducerError: " /*<< err*/ << std::endl;//TODO + } + return EXIT_FAILURE; + } + std::cout << "Successfully connected" << std::endl; + + if(!SendDummyData(producer.get(), number_of_byte, iterations)) { + return EXIT_FAILURE; + } + return EXIT_SUCCESS; } diff --git a/producer/api/include/producer/producer.h b/producer/api/include/producer/producer.h index 456ced8d1..e04f92782 100644 --- a/producer/api/include/producer/producer.h +++ b/producer/api/include/producer/producer.h @@ -6,15 +6,16 @@ namespace hidra2 { enum class ProducerError { + kUnknownError, kNoError, + kUnknownServerError, kAlreadyConnected, kConnectionNotReady, kInvalidAddressFormat, kUnexpectedIOError, kFileIdAlreadyInUse, kFileTooLarge, - kUnknownServerError, - kUnknownError, + kConnectionRefused, }; enum class ProducerStatus { diff --git a/producer/api/src/producer_impl.cpp b/producer/api/src/producer_impl.cpp index 9001457e1..da208b21e 100644 --- a/producer/api/src/producer_impl.cpp +++ b/producer/api/src/producer_impl.cpp @@ -9,7 +9,7 @@ hidra2::ProducerError hidra2::ProducerImpl::NetworkErrorToProducerError(hidra2:: switch(networkError) { case NET_ERR__NO_ERROR: return ProducerError::kNoError; - case NET_ERR__FILENAME_ALREADY_IN_USE: + case NET_ERR__FILEID_ALREADY_IN_USE: return ProducerError::kFileIdAlreadyInUse; default: return ProducerError::kUnknownServerError; @@ -36,6 +36,9 @@ hidra2::ProducerError hidra2::ProducerImpl::initialize_socket_to_receiver_(const if(err == IOErrors::kInvalidAddressFormat) { return ProducerError::kInvalidAddressFormat; } + if(err == IOErrors::kConnectionRefused) { + return ProducerError::kConnectionRefused; + } return ProducerError::kUnknownError; } @@ -102,8 +105,11 @@ hidra2::ProducerError hidra2::ProducerImpl::Send(uint64_t file_id, void* data, s } if(sendDataResponse.error_code) { - std::cerr << "Server reported an error. NetErrorCode: " << sendDataResponse.error_code << std::endl; status_ = ProducerStatus::kConnected; + if(sendDataResponse.error_code == NET_ERR__FILEID_ALREADY_IN_USE) { + return hidra2::ProducerError::kFileIdAlreadyInUse; + } + std::cerr << "Server reported an error. NetErrorCode: " << sendDataResponse.error_code << std::endl; return ProducerError::kUnknownServerError; } diff --git a/producer/api/unittests/test_producer_impl.cpp b/producer/api/unittests/test_producer_impl.cpp index b7377370b..118667004 100644 --- a/producer/api/unittests/test_producer_impl.cpp +++ b/producer/api/unittests/test_producer_impl.cpp @@ -327,6 +327,45 @@ TEST(ProducerImpl, Send__Receive_server_error) { ASSERT_THAT(status, Eq(hidra2::ProducerStatus::kConnected)); } +TEST(ProducerImpl, Send__Receive_server_error_id_already_in_use) { + InSequence sequence; + + hidra2::ProducerImpl producer; + hidra2::FileDescriptor expected_fd = 83942; + uint64_t expected_request_id = 0; + uint64_t expected_file_id = 1; + uint64_t expected_file_size = 1337; + void* expected_file_pointer = (void*)0xC00FE; + + ConnectToReceiver_DONE(producer, expected_fd); + expected_request_id++; + + hidra2::MockIO mockIO; + producer.__set_io(&mockIO); + + EXPECT_CALL(mockIO, Send(_, _, _, _)) + .Times(2) + .WillRepeatedly( + DoAll( + testing::SetArgPointee<3>(hidra2::IOErrors::kNoError), + testing::ReturnArg<2>() + )); + + EXPECT_CALL(mockIO, Receive(_, _, sizeof(hidra2::SendDataResponse), _)) + .Times(1) + .WillOnce( + DoAll( + testing::SetArgPointee<3>(hidra2::IOErrors::kNoError), + A_WriteSendDataResponse(hidra2::NET_ERR__FILEID_ALREADY_IN_USE, expected_request_id), + testing::ReturnArg<2>() + )); + + hidra2::ProducerError error = producer.Send(expected_file_id, expected_file_pointer, expected_file_size); + hidra2::ProducerStatus status = producer.GetStatus(); + + ASSERT_THAT(error, Eq(hidra2::ProducerError::kFileIdAlreadyInUse)); + ASSERT_THAT(status, Eq(hidra2::ProducerStatus::kConnected)); +} TEST(ProducerImpl, Send) { InSequence sequence; diff --git a/receiver/src/network_producer_peer.cpp b/receiver/src/network_producer_peer.cpp index 00726c5dc..b78927b75 100644 --- a/receiver/src/network_producer_peer.cpp +++ b/receiver/src/network_producer_peer.cpp @@ -20,14 +20,6 @@ uint32_t NetworkProducerPeer::connection_id() const { return connection_id_; } -const std::string& NetworkProducerPeer::address() const { - return address_; -} - -bool NetworkProducerPeer::is_listening() const { - return is_listening_; -} - void NetworkProducerPeer::start_peer_listener() { if(listener_thread_ || is_listening_) return; diff --git a/receiver/src/network_producer_peer_handlers.cpp b/receiver/src/network_producer_peer_handlers.cpp index 212f72c1b..b4d5373aa 100644 --- a/receiver/src/network_producer_peer_handlers.cpp +++ b/receiver/src/network_producer_peer_handlers.cpp @@ -35,7 +35,7 @@ void NetworkProducerPeer::handle_send_data_request_(NetworkProducerPeer* self, c FileDescriptor fd = self->CreateAndOpenFileByFileId(request->file_id, &io_err); if(io_err != IOErrors::kNoError) { - response->error_code = NET_ERR__FILENAME_ALREADY_IN_USE; + response->error_code = NET_ERR__FILEID_ALREADY_IN_USE; std::cerr << "[" << self->connection_id() << "] file_id: " << request->file_id << " does already exists" << std::endl; self->io->Skip(self->socket_fd_, request->file_size, &io_err); if(io_err != IOErrors::kNoError) { -- GitLab