Skip to content
Snippets Groups Projects
Commit eebd7a53 authored by Carsten Patzke's avatar Carsten Patzke
Browse files

[RDS] Removed source_id exposure to the client

parent 4d4caf8e
Branches
Tags
No related merge requests found
......@@ -10,11 +10,10 @@ namespace asapo {
class RdsNetServer {
public:
virtual GenericRequests GetNewRequests(Error* err) const noexcept = 0;
virtual Error SendResponse(const ReceiverDataServerRequest* request, uint64_t source_id,
virtual Error SendResponse(const ReceiverDataServerRequest* request,
const GenericNetworkResponse* response) const noexcept = 0;
virtual Error
SendResponseAndSlotData(const ReceiverDataServerRequest* request, uint64_t source_id,
const GenericNetworkResponse* response,
SendResponseAndSlotData(const ReceiverDataServerRequest* request, const GenericNetworkResponse* response,
const CacheMeta* cache_slot) const noexcept = 0;
virtual void HandleAfterError(uint64_t source_id) const noexcept = 0;
virtual ~RdsNetServer() = default;
......
......@@ -19,7 +19,7 @@ Error ReceiverDataServerRequestHandler::SendResponse(const ReceiverDataServerReq
GenericNetworkResponse response{};
response.op_code = kOpcodeGetBufferData;
response.error_code = code;
return server_->SendResponse(request, request->source_id, &response);
return server_->SendResponse(request, &response);
}
Error ReceiverDataServerRequestHandler::SendResponseAndSlotData(const ReceiverDataServerRequest* request,
......@@ -27,7 +27,7 @@ Error ReceiverDataServerRequestHandler::SendResponseAndSlotData(const ReceiverDa
GenericNetworkResponse response{};
response.op_code = kOpcodeGetBufferData;
response.error_code = kNetErrorNoError;
return server_->SendResponseAndSlotData(request, request->source_id, &response,
return server_->SendResponseAndSlotData(request, &response,
meta);
}
......
......@@ -94,10 +94,10 @@ void TcpServer::HandleAfterError(uint64_t source_id) const noexcept {
CloseSocket(source_id);
}
Error TcpServer::SendResponse(const ReceiverDataServerRequest* request, uint64_t source_id,
Error TcpServer::SendResponse(const ReceiverDataServerRequest* request,
const GenericNetworkResponse* response) const noexcept {
Error err;
io__->Send(source_id, response, sizeof(*response), &err);
io__->Send(request->source_id, response, sizeof(*response), &err);
if (err) {
log__->Error("cannot send to consumer" + err->Explain());
}
......@@ -105,17 +105,16 @@ Error TcpServer::SendResponse(const ReceiverDataServerRequest* request, uint64_t
}
Error
TcpServer::SendResponseAndSlotData(const ReceiverDataServerRequest* request, uint64_t source_id,
const GenericNetworkResponse* response,
TcpServer::SendResponseAndSlotData(const ReceiverDataServerRequest* request, const GenericNetworkResponse* response,
const CacheMeta* cache_slot) const noexcept {
Error err;
err = SendResponse(nullptr, source_id, response);
err = SendResponse(request, response);
if (err) {
return err;
}
io__->Send(source_id, cache_slot->addr, cache_slot->size, &err);
io__->Send(request->source_id, cache_slot->addr, cache_slot->size, &err);
if (err) {
log__->Error("cannot send slot to worker" + err->Explain());
}
......
......@@ -14,10 +14,9 @@ class TcpServer : public RdsNetServer {
explicit TcpServer(std::string address);
~TcpServer() override;
GenericRequests GetNewRequests(Error* err) const noexcept override ;
Error SendResponse(const ReceiverDataServerRequest* request, uint64_t source_id,
Error SendResponse(const ReceiverDataServerRequest* request,
const GenericNetworkResponse* response) const noexcept override;
Error SendResponseAndSlotData(const ReceiverDataServerRequest* request, uint64_t source_id,
const GenericNetworkResponse* response,
Error SendResponseAndSlotData(const ReceiverDataServerRequest* request, const GenericNetworkResponse* response,
const CacheMeta* cache_slot) const noexcept override;
void HandleAfterError(uint64_t source_id) const noexcept override;
std::unique_ptr<IO> io__;
......
......@@ -27,20 +27,19 @@ class MockNetServer : public RdsNetServer {
MOCK_CONST_METHOD1(GetNewRequests_t, std::vector<ReceiverDataServerRequest> (ErrorInterface**
error));
Error SendResponse(const ReceiverDataServerRequest* request, uint64_t source_id,
Error SendResponse(const ReceiverDataServerRequest* request,
const GenericNetworkResponse* response) const noexcept override {
return Error{SendResponse_t(request, source_id, response)};
return Error{SendResponse_t(request, response)};
};
MOCK_CONST_METHOD3(SendResponse_t, ErrorInterface * (const ReceiverDataServerRequest* request, uint64_t source_id,
MOCK_CONST_METHOD2(SendResponse_t, ErrorInterface * (const ReceiverDataServerRequest* request,
const GenericNetworkResponse* response));
Error SendResponseAndSlotData(const ReceiverDataServerRequest* request, uint64_t source_id,
const GenericNetworkResponse* response,
Error SendResponseAndSlotData(const ReceiverDataServerRequest* request, const GenericNetworkResponse* response,
const CacheMeta* cache_slot) const noexcept override {
return Error{SendResponseAndSlotData_t(request, source_id, response, cache_slot)};
return Error{SendResponseAndSlotData_t(request, response, cache_slot)};
};
MOCK_CONST_METHOD4(SendResponseAndSlotData_t, ErrorInterface * (const ReceiverDataServerRequest* request,
uint64_t source_id, const GenericNetworkResponse* response,
MOCK_CONST_METHOD3(SendResponseAndSlotData_t, ErrorInterface * (const ReceiverDataServerRequest* request,
const GenericNetworkResponse* response,
const CacheMeta* cache_slot));
void HandleAfterError(uint64_t source_id) const noexcept override {
......
......@@ -88,7 +88,6 @@ void RequestHandlerTests::MockGetSlotAndUnlockIt(bool return_without_error) {
void RequestHandlerTests::MockSendResponse(asapo::NetworkErrorCode expected_response_code, bool return_without_error) {
EXPECT_CALL(mock_net, SendResponse_t(
&request,
expected_source_id,
M_CheckResponse(asapo::kOpcodeGetBufferData, expected_response_code, "")
)).WillOnce(
Return(return_without_error ? nullptr : asapo::IOErrorTemplates::kUnknownIOError.Generate().release())
......@@ -99,7 +98,6 @@ void RequestHandlerTests::MockSendResponseAndSlotData(asapo::NetworkErrorCode ex
bool return_without_error) {
EXPECT_CALL(mock_net, SendResponseAndSlotData_t(
&request,
expected_source_id,
M_CheckResponse(asapo::kOpcodeGetBufferData, expected_response_code, ""),
&expected_meta
)).WillOnce(
......
......@@ -247,7 +247,7 @@ TEST_F(TCPServerTests, SendResponse) {
EXPECT_CALL(mock_logger, Error(HasSubstr("cannot send")));
auto err = tcp_server.SendResponse(&expectedRequest, 30, &tmp);
auto err = tcp_server.SendResponse(&expectedRequest, &tmp);
ASSERT_THAT(err, Ne(nullptr));
}
......@@ -270,7 +270,7 @@ TEST_F(TCPServerTests, SendResponseAndSlotData_SendResponseError) {
));
EXPECT_CALL(mock_logger, Error(HasSubstr("cannot send")));
auto err = tcp_server.SendResponseAndSlotData(&expectedRequest, 30, &tmp, &expectedMeta);
auto err = tcp_server.SendResponseAndSlotData(&expectedRequest, &tmp, &expectedMeta);
ASSERT_THAT(err, Ne(nullptr));
}
......@@ -296,7 +296,7 @@ TEST_F(TCPServerTests, SendResponseAndSlotData_SendDataError) {
EXPECT_CALL(mock_logger, Error(HasSubstr("cannot send")));
auto err = tcp_server.SendResponseAndSlotData(&expectedRequest, 30, &tmp, &expectedMeta);
auto err = tcp_server.SendResponseAndSlotData(&expectedRequest, &tmp, &expectedMeta);
ASSERT_THAT(err, Ne(nullptr));
}
......@@ -316,7 +316,7 @@ TEST_F(TCPServerTests, SendResponseAndSlotData_Ok) {
EXPECT_CALL(mock_io, Send_t(30, expectedMeta.addr, expectedMeta.size, _))
.WillOnce(Return(expectedMeta.size));
auto err = tcp_server.SendResponseAndSlotData(&expectedRequest, 30, &tmp, &expectedMeta);
auto err = tcp_server.SendResponseAndSlotData(&expectedRequest, &tmp, &expectedMeta);
ASSERT_THAT(err, Eq(nullptr));
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment