diff --git a/common/cpp/include/common/networking.h b/common/cpp/include/common/networking.h index 7f59eb7cfd74afba30a696ec53e78da272a61df5..c1911c2521b7c13c6bd59eec633ae006169f8107 100644 --- a/common/cpp/include/common/networking.h +++ b/common/cpp/include/common/networking.h @@ -18,6 +18,7 @@ enum Opcode : uint8_t { enum NetworkErrorCode : uint16_t { kNetErrorNoError, + kNetAuthorizationError, kNetErrorFileIdAlreadyInUse, kNetErrorAllocateStorageFailed, kNetErrorInternalServerError = 65535, @@ -32,6 +33,8 @@ enum NetworkErrorCode : uint16_t { */ const std::size_t kMaxFileNameSize = 1024; +const std::size_t kMaxFileMessageSize = 1024; + struct GenericRequestHeader { GenericRequestHeader(Opcode i_op_code = kOpcodeUnknownOp, uint64_t i_data_id = 0, uint64_t i_data_size = 0, const std::string& i_file_name = ""): @@ -49,6 +52,7 @@ struct GenericNetworkResponse { Opcode op_code; NetworkRequestId request_id; NetworkErrorCode error_code; + char message[kMaxFileMessageSize]; }; /** diff --git a/producer/api/include/producer/producer_error.h b/producer/api/include/producer/producer_error.h index 5a0641b7dd8c7ad1ee1159fee85fee954ebbfa5e..b847d055ba27c9461f5bed4587e6869438d9827b 100644 --- a/producer/api/include/producer/producer_error.h +++ b/producer/api/include/producer/producer_error.h @@ -10,6 +10,7 @@ enum class ProducerErrorType { kConnectionNotReady, kFileTooLarge, kFileIdAlreadyInUse, + kAuthorizationFailed, kInternalServerError, kCannotSendDataToReceivers, kRequestPoolIsFull @@ -74,6 +75,10 @@ auto const kFileIdAlreadyInUse = ProducerErrorTemplate { "File already in use", ProducerErrorType::kFileIdAlreadyInUse }; +auto const kAuthorizationFailed = ProducerErrorTemplate { + "Authorization failed:", ProducerErrorType::kAuthorizationFailed +}; + auto const kInternalServerError = ProducerErrorTemplate { "Internal server error", ProducerErrorType::kInternalServerError }; diff --git a/producer/api/src/request_handler_tcp.cpp b/producer/api/src/request_handler_tcp.cpp index e85d51645444fa3b9e81912e554853a0d356800d..297a7d45f462ff4709b7a6f36422be10d5f388a8 100644 --- a/producer/api/src/request_handler_tcp.cpp +++ b/producer/api/src/request_handler_tcp.cpp @@ -22,6 +22,16 @@ Error RequestHandlerTcp::ConnectToReceiver(const std::string& receiver_address) return err; } log__->Info("connected to receiver at " + receiver_address); + + err = ReceiveResponse(); + if (err != nullptr) { + log__->Error("authorization failed at " + receiver_address + " - " + err->Explain()); + Disconnect(); + return err; + } + + log__->Debug("authorized at " + receiver_address); + connected_receiver_uri_ = receiver_address; return nullptr; } @@ -48,14 +58,19 @@ Error RequestHandlerTcp::ReceiveResponse() { if(err != nullptr) { return err; } - - if(sendDataResponse.error_code) { - if(sendDataResponse.error_code == kNetErrorFileIdAlreadyInUse) { - return ProducerErrorTemplates::kFileIdAlreadyInUse.Generate(); - } + switch (sendDataResponse.error_code) { + case kNetErrorFileIdAlreadyInUse : + return ProducerErrorTemplates::kFileIdAlreadyInUse.Generate(); + case kNetAuthorizationError : { + auto res_err = ProducerErrorTemplates::kAuthorizationFailed.Generate(); + res_err->Append(sendDataResponse.message); + return res_err; + } + case kNetErrorNoError : + return nullptr; + default: return ProducerErrorTemplates::kInternalServerError.Generate(); } - return nullptr; } Error RequestHandlerTcp::TrySendToReceiver(const Request* request) { diff --git a/producer/api/unittests/test_request_handler_tcp.cpp b/producer/api/unittests/test_request_handler_tcp.cpp index 7e8b3acb693525d21ee6b80724f1606218ec2bca..62457388f27b37020a12026496c6c1b51767f205 100644 --- a/producer/api/unittests/test_request_handler_tcp.cpp +++ b/producer/api/unittests/test_request_handler_tcp.cpp @@ -30,6 +30,7 @@ using testing::NiceMock; using ::testing::InSequence; using ::testing::HasSubstr; +using ::testing::Sequence; TEST(RequestHandlerTcp, Constructor) { @@ -41,6 +42,7 @@ TEST(RequestHandlerTcp, Constructor) { ASSERT_THAT(request.discovery_service__, Eq(&ds)); } +std::string expected_auth_message = {"12345"}; class RequestHandlerTcpTests : public testing::Test { public: @@ -78,7 +80,10 @@ class RequestHandlerTcpTests : public testing::Test { std::vector<asapo::SocketDescriptor> expected_sds{83942, 83943}; + Sequence s; void ExpectFailConnect(bool only_once = false); + void ExpectFailAuthorize(bool only_once = false); + void ExpectOKAuthorize(bool only_once = false); void ExpectFailSendHeader(bool only_once = false); void ExpectFailSendData(bool only_once = false); void ExpectOKConnect(bool only_once = false); @@ -103,6 +108,7 @@ class RequestHandlerTcpTests : public testing::Test { ACTION_P(A_WriteSendDataResponse, error_code) { ((asapo::SendDataResponse*)arg1)->op_code = asapo::kOpcodeTransferData; ((asapo::SendDataResponse*)arg1)->error_code = error_code; + strcpy(((asapo::SendDataResponse*)arg1)->message, expected_auth_message.c_str()); } MATCHER_P2(M_CheckSendDataRequest, file_id, file_size, @@ -132,6 +138,54 @@ void RequestHandlerTcpTests::ExpectFailConnect(bool only_once) { } + +void RequestHandlerTcpTests::ExpectFailAuthorize(bool only_once) { + int i = 0; + for (auto expected_sd : expected_sds) { + EXPECT_CALL(mock_io, Receive_t(expected_sd, _, sizeof(asapo::SendDataResponse), _)) + .InSequence(s) + .WillOnce( + DoAll( + testing::SetArgPointee<3>(nullptr), + A_WriteSendDataResponse(asapo::kNetAuthorizationError), + testing::ReturnArg<2>() + )); + EXPECT_CALL(mock_io, CloseSocket_t(expected_sd, _)); + EXPECT_CALL(mock_logger, Error(AllOf( + HasSubstr("authorization"), + HasSubstr(expected_auth_message), + HasSubstr(receivers_list[i]) + ) + )); + if (only_once) break; + i++; + } +} + +void RequestHandlerTcpTests::ExpectOKAuthorize(bool only_once) { + int i = 0; + for (auto expected_sd : expected_sds) { + EXPECT_CALL(mock_io, Receive_t(expected_sd, _, sizeof(asapo::SendDataResponse), _)) + .InSequence(s) + .WillOnce( + DoAll( + testing::SetArgPointee<3>(nullptr), + A_WriteSendDataResponse(asapo::kNetErrorNoError), + testing::ReturnArg<2>() + )); + EXPECT_CALL(mock_logger, Debug(AllOf( + HasSubstr("authorized"), + HasSubstr(receivers_list[i]) + ) + )); + if (only_once) break; + i++; + } + +} + + + void RequestHandlerTcpTests::ExpectFailSendHeader(bool only_once) { int i = 0; for (auto expected_sd : expected_sds) { @@ -194,7 +248,7 @@ void RequestHandlerTcpTests::ExpectFailReceive(bool only_once) { int i = 0; for (auto expected_sd : expected_sds) { EXPECT_CALL(mock_io, Receive_t(expected_sd, _, sizeof(asapo::SendDataResponse), _)) - .Times(1) + .InSequence(s) .WillOnce( DoAll( testing::SetArgPointee<3>(asapo::IOErrorTemplates::kBadFileNumber.Generate().release()), @@ -276,6 +330,7 @@ void RequestHandlerTcpTests::ExpectOKReceive(bool only_once) { int i = 0; for (auto expected_sd : expected_sds) { EXPECT_CALL(mock_io, Receive_t(expected_sd, _, sizeof(asapo::SendDataResponse), _)) + .InSequence(s) .WillOnce( DoAll( testing::SetArgPointee<3>(nullptr), @@ -293,7 +348,11 @@ void RequestHandlerTcpTests::ExpectOKReceive(bool only_once) { } void RequestHandlerTcpTests::DoSingleSend(bool connect, bool success) { - if (connect) ExpectOKConnect(true); + if (connect) { + ExpectOKConnect(true); + ExpectOKAuthorize(true); + } + ExpectOKSendHeader(true); ExpectOKSendData(true); if (success) { @@ -370,6 +429,19 @@ TEST_F(RequestHandlerTcpTests, TriesConnectWhenNotConnected) { ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kCannotSendDataToReceivers)); } +TEST_F(RequestHandlerTcpTests, FailsWhenCannotAuthorize) { + ExpectOKConnect(); + ExpectFailAuthorize(); + + request_handler.PrepareProcessingRequestLocked(); + auto err = request_handler.ProcessRequestUnlocked(&request); + request_handler.TearDownProcessingRequestLocked(err); + + ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kCannotSendDataToReceivers)); + ASSERT_THAT(n_connections, Eq(0)); +} + + TEST_F(RequestHandlerTcpTests, DoesNotTryConnectWhenConnected) { DoSingleSend(); @@ -392,6 +464,7 @@ TEST_F(RequestHandlerTcpTests, DoesNotTryConnectWhenConnected) { TEST_F(RequestHandlerTcpTests, DoNotCloseWhenNotConnected) { EXPECT_CALL(mock_io, CloseSocket_t(_, _)).Times(0); ExpectOKConnect(); + ExpectOKAuthorize(); ExpectFailSendHeader(); request_handler.PrepareProcessingRequestLocked(); @@ -419,6 +492,7 @@ TEST_F(RequestHandlerTcpTests, CloseConnectionWhenRebalance) { TEST_F(RequestHandlerTcpTests, ErrorWhenCannotSendHeader) { ExpectOKConnect(); + ExpectOKAuthorize(); ExpectFailSendHeader(); request_handler.PrepareProcessingRequestLocked(); @@ -430,6 +504,7 @@ TEST_F(RequestHandlerTcpTests, ErrorWhenCannotSendHeader) { TEST_F(RequestHandlerTcpTests, ErrorWhenCannotSendData) { ExpectOKConnect(); + ExpectOKAuthorize(); ExpectOKSendHeader(); ExpectFailSendData(); @@ -440,10 +515,14 @@ TEST_F(RequestHandlerTcpTests, ErrorWhenCannotSendData) { } TEST_F(RequestHandlerTcpTests, ErrorWhenCannotReceiveData) { - ExpectOKConnect(); - ExpectOKSendHeader(); - ExpectOKSendData(); - ExpectFailReceive(); + EXPECT_CALL(mock_discovery_service, RotatedUriList(_)). + WillOnce(Return(receivers_list_single)); + + ExpectOKConnect(true); + ExpectOKAuthorize(true); + ExpectOKSendHeader(true); + ExpectOKSendData(true); + ExpectFailReceive(true); request_handler.PrepareProcessingRequestLocked(); auto err = request_handler.ProcessRequestUnlocked(&request); @@ -453,10 +532,12 @@ TEST_F(RequestHandlerTcpTests, ErrorWhenCannotReceiveData) { TEST_F(RequestHandlerTcpTests, ImmediatelyCallBackErrorIfFileAlreadyInUse) { ExpectOKConnect(true); + ExpectOKAuthorize(true); ExpectOKSendHeader(true); ExpectOKSendData(true); EXPECT_CALL(mock_io, Receive_t(expected_sds[0], _, sizeof(asapo::SendDataResponse), _)) + .InSequence(s) .WillOnce( DoAll( testing::SetArgPointee<3>(nullptr), @@ -476,6 +557,7 @@ TEST_F(RequestHandlerTcpTests, ImmediatelyCallBackErrorIfFileAlreadyInUse) { TEST_F(RequestHandlerTcpTests, SendEmptyCallBack) { ExpectOKConnect(true); + ExpectOKAuthorize(true); ExpectOKSendHeader(true); ExpectOKSendData(true); ExpectOKReceive(); @@ -489,6 +571,7 @@ TEST_F(RequestHandlerTcpTests, SendEmptyCallBack) { TEST_F(RequestHandlerTcpTests, SendOK) { ExpectOKConnect(true); + ExpectOKAuthorize(true); ExpectOKSendHeader(true); ExpectOKSendData(true); ExpectOKReceive();