Skip to content
Snippets Groups Projects
Commit 9eb85726 authored by Eric Cano's avatar Eric Cano
Browse files

Added usage of PEEK option to SocketPair::receive, allowing dynamic allocation...

Added usage of PEEK option to SocketPair::receive, allowing dynamic allocation of a receive buffer of the right length.
Tested receiption up to 100kB in unit tests.
parent d208cc75
Branches
Tags
No related merge requests found
......@@ -164,15 +164,26 @@ int SocketPair::getFdForAccess(Side sourceOrDestination) {
//------------------------------------------------------------------------------
std::string SocketPair::receive(Side source) {
int fd=getFdForAccess(source);
char buff[2048];
// First, get the message size (using peek option)
ssize_t sizePeek = recv(fd, nullptr, 0, MSG_DONTWAIT | MSG_PEEK | MSG_TRUNC);
if (!sizePeek) {
throw PeerDisconnected("In SocketPair::receive(): connection reset by peer.");
} else if (sizePeek < 0) {
if (errno == EAGAIN) {
throw NothingToReceive("In SocketPair::receive(): nothing to receive.");
} else {
throw cta::exception::Errnum("In SocketPair::receive(): failed to recv(): ");
}
}
std::unique_ptr<char[]> buff(new char[sizePeek]);
struct ::msghdr hdr;
struct ::iovec iov;
hdr.msg_name = nullptr;
hdr.msg_namelen = 0;
hdr.msg_iov = &iov;
hdr.msg_iovlen = 1;
hdr.msg_iov->iov_base = (void*)buff;
hdr.msg_iov->iov_len = sizeof(buff);
hdr.msg_iov->iov_base = (void*)buff.get();
hdr.msg_iov->iov_len = sizePeek;
hdr.msg_control = nullptr;
hdr.msg_controllen = 0;
hdr.msg_flags = 0;
......@@ -182,7 +193,7 @@ std::string SocketPair::receive(Side source) {
throw Overflow("In SocketPair::receive(): message was truncated.");
}
std::string ret;
ret.append(buff, size);
ret.append(buff.get(), size);
return ret;
} else if (!size) {
throw PeerDisconnected("In SocketPair::receive(): connection reset by peer.");
......@@ -190,7 +201,7 @@ std::string SocketPair::receive(Side source) {
if (errno == EAGAIN) {
throw NothingToReceive("In SocketPair::receive(): nothing to receive.");
} else {
throw cta::exception::Errnum("In SocketPair::receive(): failed to recv(): ");
throw cta::exception::Errnum("In SocketPair::receive(): failed to recvmsg(): ");
}
}
}
......
......@@ -77,27 +77,26 @@ TEST(cta_threading_SocketPair, Multimessages) {
}
TEST(cta_threading_SocketPair, MaxLength) {
// We should be able to read up to 2048 bytes (this is an internal limit that
// could be raised)
// Limit to send is higher
// 1) prepare messages.
// Try to send and receive messages up to 100kB
std::string smallMessage = "Hello!";
std::string maxMessage;
std::string bigMessage;
int i = 0;
maxMessage.resize(2048, '.');
std::for_each(maxMessage.begin(), maxMessage.end(), [&](char &c){ c='A' + (i++ % 26);});
std::string oversizeMessage;
oversizeMessage.resize(2049, '.');
bigMessage.resize(10*1024, '.');
std::for_each(bigMessage.begin(), bigMessage.end(), [&](char &c){ c='A' + (i++ % 26);});
std::string hugeMessage;
hugeMessage.resize(100*1024, '.');
std::for_each(hugeMessage.begin(), hugeMessage.end(), [&](char &c){ c='Z' - (i++ % 26);});
// 2) send/receive them
using cta::server::SocketPair;
cta::server::SocketPair sp;
sp.send(smallMessage, SocketPair::Side::parent);
sp.send(maxMessage, SocketPair::Side::parent);
sp.send(oversizeMessage, SocketPair::Side::parent);
sp.send(bigMessage, SocketPair::Side::parent);
sp.send(hugeMessage, SocketPair::Side::parent);
sp.send(smallMessage, SocketPair::Side::parent);
ASSERT_EQ(smallMessage, sp.receive(SocketPair::Side::child));
ASSERT_EQ(maxMessage, sp.receive(SocketPair::Side::child));
ASSERT_THROW(sp.receive(SocketPair::Side::child), SocketPair::Overflow);
ASSERT_EQ(bigMessage, sp.receive(SocketPair::Side::child));
ASSERT_EQ(hugeMessage, sp.receive(SocketPair::Side::child));
//ASSERT_THROW(sp.receive(SocketPair::Side::child), SocketPair::Overflow);
ASSERT_EQ(smallMessage, sp.receive(SocketPair::Side::child));
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment