Skip to content
Snippets Groups Projects
Commit fc9052d0 authored by Carsten Patzke's avatar Carsten Patzke
Browse files

[RDS] Refactored NetServer to SendResponse/SendResponseAndSlotData

parent 25f03db0
No related branches found
No related tags found
No related merge requests found
......@@ -4,13 +4,16 @@
#include "common/error.h"
#include "request/request.h"
#include "../data_cache.h"
namespace asapo {
class NetServer {
public:
virtual GenericRequests GetNewRequests(Error* err) const noexcept = 0;
virtual Error SendData(uint64_t source_id, void* buf, uint64_t size) const noexcept = 0;
virtual Error SendResponse(uint64_t source_id, GenericNetworkResponse* response) const noexcept = 0;
virtual Error SendResponseAndSlotData(uint64_t source_id, GenericNetworkResponse* response,
GenericRequestHeader* request, CacheMeta* cache_slot) const noexcept = 0;
virtual void HandleAfterError(uint64_t source_id) const noexcept = 0;
virtual ~NetServer() = default;
};
......
......@@ -15,34 +15,30 @@ bool ReceiverDataServerRequestHandler::CheckRequest(const ReceiverDataServerRequ
return request->header.op_code == kOpcodeGetBufferData;
}
Error ReceiverDataServerRequestHandler::SendData(const ReceiverDataServerRequest* request,
void* data,
CacheMeta* meta) {
auto err = SendResponce(request, kNetErrorNoError);
if (err) {
data_cache_->UnlockSlot(meta);
return err;
}
err = server_->SendData(request->source_id, data, request->header.data_size);
log__->Debug("sending data from memory cache, id:" + std::to_string(request->header.data_id));
data_cache_->UnlockSlot(meta);
return err;
Error ReceiverDataServerRequestHandler::SendResponse(const ReceiverDataServerRequest* request, NetworkErrorCode code) {
GenericNetworkResponse response{};
response.op_code = kOpcodeGetBufferData;
response.error_code = code;
return server_->SendResponse(request->source_id, &response);
}
Error ReceiverDataServerRequestHandler::SendResponseAndSlotData(const ReceiverDataServerRequest* request,
CacheMeta* meta) {
GenericNetworkResponse response{};
response.op_code = kOpcodeGetBufferData;
response.error_code = kNetErrorNoError;
return server_->SendResponseAndSlotData(request->source_id, &response, (GenericRequestHeader*) &request->header, meta);
}
void* ReceiverDataServerRequestHandler::GetSlot(const ReceiverDataServerRequest* request, CacheMeta** meta) {
void* buf = nullptr;
CacheMeta* ReceiverDataServerRequestHandler::GetSlotAndLock(const ReceiverDataServerRequest* request) {
CacheMeta* meta = nullptr;
if (data_cache_) {
buf = data_cache_->GetSlotToReadAndLock(request->header.data_id, request->header.data_size,
meta);
if (!buf) {
data_cache_->GetSlotToReadAndLock(request->header.data_id, request->header.data_size, &meta);
if (!meta) {
log__->Debug("data not found in memory cache, id:" + std::to_string(request->header.data_id));
}
}
if (buf == nullptr) {
SendResponce(request, kNetErrorNoData);
}
return buf;
return meta;
}
......@@ -50,19 +46,26 @@ bool ReceiverDataServerRequestHandler::ProcessRequestUnlocked(GenericRequest* re
*retry = false;
auto receiver_request = dynamic_cast<ReceiverDataServerRequest*>(request);
if (!CheckRequest(receiver_request)) {
SendResponce(receiver_request, kNetErrorWrongRequest);
SendResponse(receiver_request, kNetErrorWrongRequest);
server_->HandleAfterError(receiver_request->source_id);
log__->Error("wrong request, code:" + std::to_string(receiver_request->header.op_code));
return true;
}
CacheMeta* meta;
auto buf = GetSlot(receiver_request, &meta);
if (buf == nullptr) {
CacheMeta* meta = GetSlotAndLock(receiver_request);
if (!meta) {
SendResponse(receiver_request, kNetErrorNoData);
return true;
}
auto err = SendResponseAndSlotData(receiver_request, meta);
data_cache_->UnlockSlot(meta);
if (err) {
log__->Error("failed to send slot:" + err->Explain());
server_->HandleAfterError(receiver_request->source_id);
return true;
}
SendData(receiver_request, buf, meta);
statistics__->IncreaseRequestCounter();
statistics__->IncreaseRequestDataVolume(receiver_request->header.data_size);
return true;
......@@ -76,19 +79,12 @@ void ReceiverDataServerRequestHandler::PrepareProcessingRequestLocked() {
// do nothing
}
void ReceiverDataServerRequestHandler::TearDownProcessingRequestLocked(bool processing_succeeded) {
void ReceiverDataServerRequestHandler::TearDownProcessingRequestLocked(bool /*processing_succeeded*/) {
// do nothing
}
Error ReceiverDataServerRequestHandler::SendResponce(const ReceiverDataServerRequest* request, NetworkErrorCode code) {
GenericNetworkResponse responce;
responce.op_code = kOpcodeGetBufferData;
responce.error_code = code;
return server_->SendData(request->source_id, &responce, sizeof(GenericNetworkResponse));
}
void ReceiverDataServerRequestHandler::ProcessRequestTimeout(GenericRequest* request) {
void ReceiverDataServerRequestHandler::ProcessRequestTimeout(GenericRequest* /*request*/) {
// do nothing
}
}
\ No newline at end of file
}
......@@ -25,9 +25,9 @@ class ReceiverDataServerRequestHandler: public RequestHandler {
const NetServer* server_;
DataCache* data_cache_;
bool CheckRequest(const ReceiverDataServerRequest* request);
Error SendResponce(const ReceiverDataServerRequest* request, NetworkErrorCode code);
Error SendData(const ReceiverDataServerRequest* request, void* data, CacheMeta* meta);
void* GetSlot(const ReceiverDataServerRequest* request, CacheMeta** meta);
Error SendResponse(const ReceiverDataServerRequest* request, NetworkErrorCode code);
Error SendResponseAndSlotData(const ReceiverDataServerRequest* request, CacheMeta* meta);
CacheMeta* GetSlotAndLock(const ReceiverDataServerRequest* request);
};
}
......
......@@ -51,7 +51,7 @@ ReceiverDataServerRequestPtr TcpServer::ReadRequest(SocketDescriptor socket, Err
);
return nullptr;
}
return ReceiverDataServerRequestPtr{new ReceiverDataServerRequest{std::move(header), (uint64_t) socket}};
return ReceiverDataServerRequestPtr{new ReceiverDataServerRequest{header, (uint64_t) socket}};
}
GenericRequests TcpServer::ReadRequests(const ListSocketDescriptors& sockets) const noexcept {
......@@ -90,18 +90,33 @@ TcpServer::~TcpServer() {
io__->CloseSocket(master_socket_, nullptr);
}
void TcpServer::HandleAfterError(uint64_t source_id) const noexcept {
CloseSocket(source_id);
}
Error TcpServer::SendData(uint64_t source_id, void* buf, uint64_t size) const noexcept {
Error TcpServer::SendResponse(uint64_t source_id, GenericNetworkResponse* response) const noexcept {
Error err;
io__->Send(source_id, buf, size, &err);
io__->Send(source_id, response, sizeof(*response), &err);
if (err) {
log__->Error("cannot send to consumer" + err->Explain());
}
return err;
}
void TcpServer::HandleAfterError(uint64_t source_id) const noexcept {
CloseSocket(source_id);
Error TcpServer::SendResponseAndSlotData(uint64_t source_id, GenericNetworkResponse* response,
GenericRequestHeader* /*request*/, CacheMeta* cache_slot) const noexcept {
Error err;
err = SendResponse(source_id, response);
if (err) {
return err;
}
io__->Send(source_id, cache_slot->addr, cache_slot->size, &err);
if (err) {
log__->Error("cannot send slot to worker" + err->Explain());
}
return err;
}
}
\ No newline at end of file
}
#ifndef ASAPO_TCP_SERVER_H
#define ASAPO_TCP_SERVER_H
#ifndef ASAPO_RDS_TCP_SERVER_H
#define ASAPO_RDS_TCP_SERVER_H
#include "net_server.h"
#include "io/io.h"
......@@ -11,10 +11,12 @@ const int kMaxPendingConnections = 5;
class TcpServer : public NetServer {
public:
TcpServer(std::string address);
~TcpServer();
explicit TcpServer(std::string address);
~TcpServer() override;
GenericRequests GetNewRequests(Error* err) const noexcept override ;
Error SendData(uint64_t source_id, void* buf, uint64_t size) const noexcept override;
Error SendResponse(uint64_t source_id, GenericNetworkResponse* response) const noexcept override;
Error SendResponseAndSlotData(uint64_t source_id, GenericNetworkResponse* response,
GenericRequestHeader* request, CacheMeta* cache_slot) const noexcept override;
void HandleAfterError(uint64_t source_id) const noexcept override;
std::unique_ptr<IO> io__;
const AbstractLogger* log__;
......@@ -31,4 +33,4 @@ class TcpServer : public NetServer {
}
#endif //ASAPO_TCP_SERVER_H
#endif //ASAPO_RDS_TCP_SERVER_H
......@@ -27,12 +27,17 @@ class MockNetServer : public NetServer {
MOCK_CONST_METHOD1(GetNewRequests_t, std::vector<ReceiverDataServerRequest> (ErrorInterface**
error));
Error SendData(uint64_t source_id, void* buf, uint64_t size) const noexcept override {
return Error{SendData_t(source_id, buf, size)};
Error SendResponse(uint64_t source_id, GenericNetworkResponse* response) const noexcept override {
return Error{SendResponse_t(source_id, response)};
};
MOCK_CONST_METHOD2(SendResponse_t, ErrorInterface * (uint64_t source_id, GenericNetworkResponse* response));
MOCK_CONST_METHOD3(SendData_t, ErrorInterface * (uint64_t source_id, void* buf, uint64_t size));
Error SendResponseAndSlotData(uint64_t source_id, GenericNetworkResponse* response, GenericRequestHeader* request,
CacheMeta* cache_slot) const noexcept override {
return Error{SendResponseAndSlotData_t(source_id, response, request, cache_slot)};
};
MOCK_CONST_METHOD4(SendResponseAndSlotData_t, ErrorInterface * (uint64_t source_id, GenericNetworkResponse* response,
GenericRequestHeader* request, CacheMeta* cache_slot));
void HandleAfterError(uint64_t source_id) const noexcept override {
HandleAfterError_t(source_id);
......@@ -47,7 +52,7 @@ class MockPool : public RequestPool {
Error AddRequests(GenericRequests requests) noexcept override {
std::vector<GenericRequest> reqs;
for (const auto& preq : requests) {
reqs.push_back(GenericRequest{preq->header, 0});
reqs.emplace_back(preq->header, 0);
}
return Error(AddRequests_t(std::move(reqs)));
......
......@@ -31,10 +31,10 @@ using asapo::ReceiverDataServerRequestHandler;
namespace {
MATCHER_P3(M_CheckResponce, op_code, error_code, message,
"Checks if a valid GenericNetworkResponse was used") {
MATCHER_P3(M_CheckResponse, op_code, error_code, message,
"Checks if a valid GenericNetworkResponse was used") {
return ((asapo::GenericNetworkResponse*)arg)->op_code == op_code
&& ((asapo::GenericNetworkResponse*)arg)->error_code == uint64_t(error_code);
&& ((asapo::GenericNetworkResponse*)arg)->error_code == uint64_t(error_code);
}
TEST(RequestHandlerTest, Constructor) {
......@@ -57,6 +57,7 @@ class RequestHandlerTests : public Test {
uint64_t expected_meta_size = 100;
uint64_t expected_buf_id = 12345;
uint64_t expected_source_id = 11;
asapo::CacheMeta expected_meta;
bool retry;
asapo::GenericRequestHeader header{asapo::kOpcodeGetBufferData, expected_buf_id, expected_data_size,
expected_meta_size, ""};
......@@ -67,22 +68,42 @@ class RequestHandlerTests : public Test {
}
void TearDown() override {
}
void MockGetSlot(bool ok = true);
void MockSendResponce(asapo::NetworkErrorCode err_code, bool ok = true);
void MockGetSlotAndUnlockIt(bool return_without_error = true);
void MockSendResponse(asapo::NetworkErrorCode expected_response_code, bool return_without_error);
void MockSendResponseAndSlotData(asapo::NetworkErrorCode expected_response_code, bool return_without_error);
};
void RequestHandlerTests::MockGetSlot(bool ok) {
EXPECT_CALL(mock_cache, GetSlotToReadAndLock(expected_buf_id, expected_data_size, _)).WillOnce(
Return(ok ? &tmp : nullptr)
void RequestHandlerTests::MockGetSlotAndUnlockIt(bool return_without_error) {
EXPECT_CALL(mock_cache, GetSlotToReadAndLock(expected_buf_id, expected_data_size, _)).WillOnce(DoAll(
SetArgPointee<2>(return_without_error ? &expected_meta : nullptr),
Return(return_without_error ? &tmp : nullptr)
));
if (return_without_error) {
EXPECT_CALL(mock_cache, UnlockSlot(_));
}
}
void RequestHandlerTests::MockSendResponse(asapo::NetworkErrorCode expected_response_code, bool return_without_error) {
EXPECT_CALL(mock_net, SendResponse_t(
expected_source_id,
M_CheckResponse(asapo::kOpcodeGetBufferData, expected_response_code, "")
)).WillOnce(
Return(return_without_error ? nullptr : asapo::IOErrorTemplates::kUnknownIOError.Generate().release())
);
}
void RequestHandlerTests::MockSendResponce(asapo::NetworkErrorCode err_code, bool ok) {
EXPECT_CALL(mock_net, SendData_t(expected_source_id,
M_CheckResponce(asapo::kOpcodeGetBufferData, err_code, ""), sizeof(asapo::GenericNetworkResponse))).WillOnce(
Return(ok ? nullptr : asapo::IOErrorTemplates::kUnknownIOError.Generate().release())
);
void RequestHandlerTests::MockSendResponseAndSlotData(asapo::NetworkErrorCode expected_response_code,
bool return_without_error) {
EXPECT_CALL(mock_net, SendResponseAndSlotData_t(
expected_source_id,
M_CheckResponse(asapo::kOpcodeGetBufferData, expected_response_code, ""),
&request.header,
&expected_meta
)).WillOnce(
Return(return_without_error ? nullptr : asapo::IOErrorTemplates::kUnknownIOError.Generate().release())
);
}
TEST_F(RequestHandlerTests, RequestAlwaysReady) {
......@@ -91,9 +112,9 @@ TEST_F(RequestHandlerTests, RequestAlwaysReady) {
ASSERT_THAT(res, Eq(true));
}
TEST_F(RequestHandlerTests, ProcessRequest_WronOpCode) {
TEST_F(RequestHandlerTests, ProcessRequest_WrongOpCode) {
request.header.op_code = asapo::kOpcodeUnknownOp;
MockSendResponce(asapo::kNetErrorWrongRequest, false);
MockSendResponse(asapo::kNetErrorWrongRequest, false);
EXPECT_CALL(mock_net, HandleAfterError_t(expected_source_id));
EXPECT_CALL(mock_logger, Error(HasSubstr("wrong request")));
......@@ -103,8 +124,8 @@ TEST_F(RequestHandlerTests, ProcessRequest_WronOpCode) {
ASSERT_THAT(success, Eq(true));
}
TEST_F(RequestHandlerTests, ProcessRequestReturnsNoDataWhenCacheNotUsed) {
MockSendResponce(asapo::kNetErrorNoData, true);
TEST_F(RequestHandlerTests, ProcessRequest_ReturnsNoDataWhenCacheNotUsed) {
MockSendResponse(asapo::kNetErrorNoData, true);
auto success = handler_no_cache.ProcessRequestUnlocked(&request, &retry);
EXPECT_CALL(mock_logger, Debug(_)).Times(0);
......@@ -112,9 +133,9 @@ TEST_F(RequestHandlerTests, ProcessRequestReturnsNoDataWhenCacheNotUsed) {
ASSERT_THAT(success, Eq(true));
}
TEST_F(RequestHandlerTests, ProcessRequestReadSlotReturnsNull) {
MockGetSlot(false);
MockSendResponce(asapo::kNetErrorNoData, true);
TEST_F(RequestHandlerTests, ProcessRequest_ReadSlotReturnsNull) {
MockGetSlotAndUnlockIt(false);
MockSendResponse(asapo::kNetErrorNoData, true);
EXPECT_CALL(mock_logger, Debug(HasSubstr("not found")));
auto success = handler.ProcessRequestUnlocked(&request, &retry);
......@@ -122,28 +143,19 @@ TEST_F(RequestHandlerTests, ProcessRequestReadSlotReturnsNull) {
ASSERT_THAT(success, Eq(true));
}
TEST_F(RequestHandlerTests, ProcessRequestReadSlotErrorSendingResponce) {
MockGetSlot(true);
MockSendResponce(asapo::kNetErrorNoError, false);
EXPECT_CALL(mock_net, SendData_t(expected_source_id, &tmp, expected_data_size)).Times(0);
EXPECT_CALL(mock_cache, UnlockSlot(_));
TEST_F(RequestHandlerTests, ProcessRequest_ReadSlotErrorSendingResponse) {
MockGetSlotAndUnlockIt(true);
MockSendResponseAndSlotData(asapo::kNetErrorNoError, false);
EXPECT_CALL(mock_net, HandleAfterError_t(_));
auto success = handler.ProcessRequestUnlocked(&request, &retry);
ASSERT_THAT(success, Eq(true));
}
TEST_F(RequestHandlerTests, ProcessRequestOk) {
MockGetSlot(true);
MockSendResponce(asapo::kNetErrorNoError, true);
EXPECT_CALL(mock_net, SendData_t(expected_source_id, &tmp, expected_data_size)).WillOnce(
Return(nullptr)
);
EXPECT_CALL(mock_cache, UnlockSlot(_));
EXPECT_CALL(mock_logger, Debug(HasSubstr("sending")));
TEST_F(RequestHandlerTests, ProcessRequest_Ok) {
MockGetSlotAndUnlockIt(true);
MockSendResponseAndSlotData(asapo::kNetErrorNoError, true);
EXPECT_CALL(mock_stat, IncreaseRequestCounter_t());
EXPECT_CALL(mock_stat, IncreaseRequestDataVolume_t(expected_data_size));
auto success = handler.ProcessRequestUnlocked(&request, &retry);
......
......@@ -234,23 +234,94 @@ TEST_F(TCPServerTests, GetNewRequestsReadOk) {
}
TEST_F(TCPServerTests, SendData) {
uint8_t tmp;
TEST_F(TCPServerTests, SendResponse) {
asapo::GenericNetworkResponse tmp {};
EXPECT_CALL(mock_io, Send_t(1, &tmp, 10, _))
.WillOnce(
DoAll(
testing::SetArgPointee<3>(asapo::IOErrorTemplates::kUnknownIOError.Generate().release()),
Return(1)
));
EXPECT_CALL(mock_io, Send_t(1, &tmp, sizeof(asapo::GenericNetworkResponse), _))
.WillOnce(
DoAll(
testing::SetArgPointee<3>(asapo::IOErrorTemplates::kUnknownIOError.Generate().release()),
Return(1)
));
EXPECT_CALL(mock_logger, Error(HasSubstr("cannot send")));
auto err = tcp_server.SendData(1, &tmp, 10);
auto err = tcp_server.SendResponse(1, &tmp);
ASSERT_THAT(err, Ne(nullptr));
}
TEST_F(TCPServerTests, SendResponseAndSlotData_SendResponseError) {
asapo::GenericNetworkResponse tmp {};
asapo::GenericRequestHeader expectedRequest {};
asapo::CacheMeta expectedMeta {};
expectedMeta.id = 20;
expectedMeta.addr = (void*)0x9234;
expectedMeta.size = 50;
expectedMeta.lock = 123;
EXPECT_CALL(mock_io, Send_t(1, &tmp, sizeof(asapo::GenericNetworkResponse), _))
.WillOnce(DoAll(
testing::SetArgPointee<3>(asapo::IOErrorTemplates::kUnknownIOError.Generate().release()),
Return(0)
));
EXPECT_CALL(mock_logger, Error(HasSubstr("cannot send")));
auto err = tcp_server.SendResponseAndSlotData(1, &tmp, &expectedRequest, &expectedMeta);
ASSERT_THAT(err, Ne(nullptr));
}
TEST_F(TCPServerTests, SendResponseAndSlotData_SendDataError) {
asapo::GenericNetworkResponse tmp {};
asapo::GenericRequestHeader expectedRequest {};
asapo::CacheMeta expectedMeta {};
expectedMeta.id = 20;
expectedMeta.addr = (void*)0x9234;
expectedMeta.size = 50;
expectedMeta.lock = 123;
EXPECT_CALL(mock_io, Send_t(1, &tmp, sizeof(asapo::GenericNetworkResponse), _))
.WillOnce(Return(1));
EXPECT_CALL(mock_io, Send_t(1, expectedMeta.addr, expectedMeta.size, _))
.WillOnce(
DoAll(
testing::SetArgPointee<3>(asapo::IOErrorTemplates::kUnknownIOError.Generate().release()),
Return(0)
));
EXPECT_CALL(mock_logger, Error(HasSubstr("cannot send")));
auto err = tcp_server.SendResponseAndSlotData(1, &tmp, &expectedRequest, &expectedMeta);
ASSERT_THAT(err, Ne(nullptr));
}
TEST_F(TCPServerTests, SendResponseAndSlotData_Ok) {
asapo::GenericNetworkResponse tmp {};
asapo::GenericRequestHeader expectedRequest {};
asapo::CacheMeta expectedMeta {};
expectedMeta.id = 20;
expectedMeta.addr = (void*)0x9234;
expectedMeta.size = 50;
expectedMeta.lock = 123;
EXPECT_CALL(mock_io, Send_t(1, &tmp, sizeof(asapo::GenericNetworkResponse), _))
.WillOnce(Return(1));
EXPECT_CALL(mock_io, Send_t(1, expectedMeta.addr, expectedMeta.size, _))
.WillOnce(Return(expectedMeta.size));
auto err = tcp_server.SendResponseAndSlotData(1, &tmp, &expectedRequest, &expectedMeta);
ASSERT_THAT(err, Eq(nullptr));
}
TEST_F(TCPServerTests, HandleAfterError) {
EXPECT_CALL(mock_io, CloseSocket_t(expected_client_sockets[0], _));
tcp_server.HandleAfterError(expected_client_sockets[0]);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment