Commit 86c05375 authored by David COME's avatar David COME
Browse files

ZMQReactor is working

parent fff72d00
......@@ -80,6 +80,7 @@ set (CLIENT_LIB_SRC_FILES
io/PollEventHandler.cpp
io/PollReactor.cpp
io/PollReactorImpl.cpp
io/ZMQReactor.cpp
io/ServerSocket.cpp
io/StreamAddress.cpp
io/StreamBaseCnv.cpp
......@@ -280,7 +281,7 @@ add_library (castorclient SHARED ${CLIENT_LIB_SRC_FILES})
# the compiler only complains in -O2 mode (RelWithDebInfo in cmake).
set_property(SOURCE ../client/src/stager/stager_errmsg.c APPEND PROPERTY COMPILE_FLAGS -fno-strict-aliasing)
CastorSetLibraryVersions (castorclient)
target_link_libraries (castorclient castordlf)
target_link_libraries (castorclient castordlf zmq)
install (TARGETS castorclient DESTINATION ${CASTOR_DEST_LIB_DIR})
if (${COMPILE_SERVER} STREQUAL "1")
......
......@@ -25,7 +25,7 @@
#include "castor/utils/SmartArrayPtr.hpp"
#include <unistd.h>
#include <poll.h>
//------------------------------------------------------------------------------
// constructor
//------------------------------------------------------------------------------
......
......@@ -26,6 +26,7 @@
#include "castor/log/Logger.hpp"
#include <map>
#include <poll.h>
namespace castor {
namespace io {
......
/******************************************************************************
* castor/io/PollEventHandler.hpp
*
* This file is part of the Castor project.
* See http://castor.web.cern.ch/castor
*
* Copyright (C) 2003 CERN
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*
* @author Steven.Murray@cern.ch
*****************************************************************************/
#pragma once
#include "castor/exception/Exception.hpp"
#include "zmq/zmqcastor.hpp"
#include <poll.h>
namespace castor {
namespace io {
/**
* Handles the events that occur on a poll() file descriptor.
*
* This class is part of an implementation of the Reactor architecture pattern
* described in the following book:
*
* Pattern-Oriented Software Architecture Volume 2
* Patterns for Concurrent and Networked Objects
* Authors: Schmidt, Stal, Rohnert and Buschmann
* Publication date: 2000
* ISBN 0-471-60695-2
*/
class ZMQPollEventHandler {
public:
/**
* Fills the specified poll file-descriptor ready to be used in a call to
* poll().
*/
virtual void fillPollFd(zmq::pollitem_t &pollitem) =0;
/**
* Handles the specified event.
*
* @param fd The poll file-descriptor describing the event.
* @return true if the event handler should be removed from and deleted by
* the reactor.
*/
virtual bool handleEvent(const zmq::pollitem_t &fd)=0;
}; // class PollEventHandler
} // namespace io
} // namespace castor
#include "castor/io/ZMQReactor.hpp"
#include "zmq/zmqcastor.hpp"
#include <algorithm>
namespace{
bool operator==(const zmq::pollitem_t& a,const zmq::pollitem_t& b){
if( (a.fd==b.fd && a.fd!= -1 && b.fd != -1) ||
(a.socket==b.socket && a.socket!=NULL && b.socket != NULL) ){
return true;
}
return false;
}
}
namespace castor {
namespace io {
//------------------------------------------------------------------------------
// constructor
//------------------------------------------------------------------------------
ZMQReactor::ZMQReactor(log::Logger& log,zmq::context_t& ctx):
m_context(ctx),m_log(log){
}
//------------------------------------------------------------------------------
// destructor
//------------------------------------------------------------------------------
ZMQReactor::~ZMQReactor(){
clear();
}
//------------------------------------------------------------------------------
// clear
//------------------------------------------------------------------------------
void ZMQReactor::clear(){
for(HandlerMap::iterator it=m_handlers.begin();it!=m_handlers.end();++it){
delete it->second;
}
m_handlers.clear();
}
//------------------------------------------------------------------------------
// registerHandler
//------------------------------------------------------------------------------
void ZMQReactor::registerHandler(ZMQPollEventHandler *const handler){
zmq::pollitem_t item;
handler->fillPollFd(item);
item.events = ZMQ_POLLIN;
//TODO, handle double registration
m_handlers.push_back(std::make_pair(item,handler));
}
//------------------------------------------------------------------------------
// handleEvents
//------------------------------------------------------------------------------
void ZMQReactor::handleEvents(const int timeout){
//it should not bring any copy, thanks to NRVO
std::vector<zmq::pollitem_t> pollFD=buildPollFds();
const int pollrc = zmq::poll(&pollFD[0], pollFD.size(), timeout);
if(pollrc !=0){
dispatchEventHandlers(pollFD);
}
}
//------------------------------------------------------------------------------
// dispatchEventHandlers
//------------------------------------------------------------------------------
void ZMQReactor::dispatchEventHandlers(const std::vector<zmq::pollitem_t>& pollFD){
for(std::vector<zmq::pollitem_t>::const_iterator it=pollFD.begin();
it!=pollFD.end();
++it) {
// Find and dispatch the appropriate handler if there is a pending event
if(it->revents & ZMQ_POLLIN) {
ZMQPollEventHandler *handler = findHandler(*it);
if(handler) {
const bool removeAndDeleteHandler = handler->handleEvent(*it);
if(removeAndDeleteHandler) {
removeHandler(handler);
delete(handler);
}
}else {
std::list<log::Param> params;
params.push_back(log::Param("fd",it->fd));
params.push_back(log::Param("socket",it->socket));
m_log(LOG_ERR, "Event on some poll, but no handler to match it", params);
}
}
}
}
ZMQPollEventHandler *
ZMQReactor::findHandler(const zmq::pollitem_t& pollfd) const{
for(HandlerMap::const_iterator it=m_handlers.begin();it!=m_handlers.end();++it){
if(pollfd==it->first){
return it->second;
}
}
return NULL;
}
//------------------------------------------------------------------------------
// removeHandler
//------------------------------------------------------------------------------
void ZMQReactor::removeHandler(ZMQPollEventHandler *const handler){
zmq::pollitem_t pollitem;
for(HandlerMap::iterator it=m_handlers.begin();it!=m_handlers.end();++it){
if(it->second==handler){
pollitem=it->first;
m_handlers.erase(it);
break;
}
}
}
//------------------------------------------------------------------------------
// buildPollFds
//------------------------------------------------------------------------------
std::vector<zmq::pollitem_t> ZMQReactor::buildPollFds() const{
std::vector<zmq::pollitem_t> vec;
vec.reserve(m_handlers.size());
for(HandlerMap::const_iterator it=m_handlers.begin();it!=m_handlers.end();++it){
vec.push_back(it->first);
}
return vec;
}
}}
/******************************************************************************
* ZMQReactor.hpp
*
* This file is part of the Castor project.
* See http://castor.web.cern.ch/castor
*
* Copyright (C) 2003 CERN
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*
* @author Steven.Murray@cern.ch
*****************************************************************************/
#pragma once
#include "castor/log/Logger.hpp"
#include "castor/io/ZMQPollEventHandler.hpp"
#include "zmq/zmqcastor.hpp"
#include <vector>
#include <utility>
namespace castor {
namespace io {
/**
* This reactor wraps the poll() system call.
*
* This class is part of an implementation of the Reactor architecture pattern
o * described in the following book:
*
* Pattern-Oriented Software Architecture Volume 2
* Patterns for Concurrent and Networked Objects
* Authors: Schmidt, Stal, Rohnert and Buschmann
* Publication date: 2000
* ISBN 0-471-60695-2
*/
class ZMQReactor {
public:
/**
* Constructor.
*/
ZMQReactor(log::Logger& log,zmq::context_t& ctx);
~ZMQReactor();
/**
* Removes and deletes all of the event handlers registered with the reactor.
*/
void clear();
/**
* Registers the specified handler.
*
* Please note that the reactor takes ownership of the handler and will
* delete it as appropriate.
*
* @param handler The handler to be registered. Please note that the handler
* MUST be allocated on the heap because the reactor will own the handler
* and therefore delete it as needed.
*/
void registerHandler(ZMQPollEventHandler *const handler);
/**
* Handles any pending events.
*
* @param timeout Timeout in milliseconds.
*/
void handleEvents(const int timeout);
private:
/**
* Allocates and builds the array of file descriptors to be passed to poll().
*
* @return The array of file descriptors. Please note that is the
* responsibility of the caller to delete the array.
*/
std::vector<zmq::pollitem_t> buildPollFds() const ;
/**
* Returns the event handler associated with the specified integer
* file-descriptor (null if there no handler associated, if ti happens = bug)
*/
ZMQPollEventHandler* findHandler(const zmq::pollitem_t&) const;
/**
* Dispatches the appropriate event handlers based on the specified result
* from poll().
*/
void dispatchEventHandlers(const std::vector<zmq::pollitem_t>& pollFD);
/**
* Removes the specified handler from the reactor. This method effectively
* does the opposite of registerHandler().
*
* @param handler The handler to be removed.
*/
void removeHandler(ZMQPollEventHandler *const handler);
/**
* Type used to map file descriptor to event handler.
*/
typedef std::vector<std::pair<zmq::pollitem_t, ZMQPollEventHandler*> > HandlerMap;
/**
* Map of file descriptor to registered event handler.
*/
HandlerMap m_handlers;
zmq::context_t& m_context;
/**
* Object representing the API of the CASTOR logging system.
*/
log::Logger& m_log;
}; // class ZMQReactor
} // namespace io
} // namespace castor
......@@ -23,7 +23,7 @@
#pragma once
#include "castor/legacymsg/RmcProxyFactory.hpp"
#include "castor/log/Logger.hpp"
namespace castor {
namespace legacymsg {
......
......@@ -35,7 +35,7 @@
// constructor
//------------------------------------------------------------------------------
castor::tape::rmc::AcceptHandler::AcceptHandler(
const int fd, io::PollReactor &reactor, log::Logger &log)
const int fd, io::ZMQReactor &reactor, log::Logger &log)
throw(): m_fd(fd), m_reactor(reactor), m_log(log) {
}
......@@ -62,18 +62,17 @@ int castor::tape::rmc::AcceptHandler::getFd() throw() {
//------------------------------------------------------------------------------
// fillPollFd
//------------------------------------------------------------------------------
void castor::tape::rmc::AcceptHandler::fillPollFd(
struct pollfd &fd) throw() {
void castor::tape::rmc::AcceptHandler::fillPollFd(zmq::pollitem_t &fd) throw() {
fd.fd = m_fd;
fd.events = POLLRDNORM;
fd.revents = 0;
fd.socket = NULL;
}
//------------------------------------------------------------------------------
// handleEvent
//------------------------------------------------------------------------------
bool castor::tape::rmc::AcceptHandler::handleEvent(
const struct pollfd &fd) {
const zmq::pollitem_t &fd) {
{
log::Param params[] = {
log::Param("fd" , fd.fd ),
......@@ -92,16 +91,6 @@ bool castor::tape::rmc::AcceptHandler::handleEvent(
checkHandleEventFd(fd.fd);
// Do nothing if there is no data to read
//
// POLLIN is unfortuntaley not the logical or of POLLRDNORM and POLLRDBAND
// on SLC 5. I therefore replaced POLLIN with the logical or. I also
// added POLLPRI into the mix to cover all possible types of read event.
if(0 == (fd.revents & POLLRDNORM) && 0 == (fd.revents & POLLRDBAND) &&
0 == (fd.revents & POLLPRI)) {
return false; // Stay registeed with the reactor
}
// Accept the connection
castor::utils::SmartFd connection;
try {
......
......@@ -23,7 +23,7 @@
#pragma once
#include "castor/io/PollEventHandler.hpp"
#include "castor/io/PollReactor.hpp"
#include "castor/io/ZMQReactor.hpp"
#include "castor/log/Logger.hpp"
#include <poll.h>
......@@ -36,7 +36,7 @@ namespace rmc {
* Handles the events of the socket listening for connections from clients of
* the rmcd daemon.
*/
class AcceptHandler: public io::PollEventHandler {
class AcceptHandler: public io::ZMQPollEventHandler {
public:
/**
......@@ -48,7 +48,7 @@ public:
* registered.
* @param log The object representing the API of the CASTOR logging system.
*/
AcceptHandler(const int fd, io::PollReactor &reactor, log::Logger &log) throw();
AcceptHandler(const int fd, io::ZMQReactor &reactor, log::Logger &log) throw();
/**
* Returns the integer file descriptor of this event handler.
......@@ -59,7 +59,7 @@ public:
* Fills the specified poll file-descriptor ready to be used in a call to
* poll().
*/
void fillPollFd(struct pollfd &fd) throw();
void fillPollFd(zmq::pollitem_t &fd) throw();
/**
* Handles the specified event.
......@@ -68,7 +68,7 @@ public:
* @return true if the event handler should be removed from and deleted by
* the reactor.
*/
bool handleEvent(const struct pollfd &fd)
bool handleEvent(const zmq::pollitem_t &fd)
;
/**
......@@ -94,7 +94,7 @@ private:
/**
* The reactor to which new connection handlers are to be registered.
*/
io::PollReactor &m_reactor;
io::ZMQReactor &m_reactor;
/**
* The object representing the API of the CASTOR logging system.
......
......@@ -28,7 +28,7 @@
// constructor
//------------------------------------------------------------------------------
castor::tape::rmc::ConnectionHandler::ConnectionHandler(
const int fd, io::PollReactor &reactor, log::Logger &log) throw():
const int fd, io::ZMQReactor &reactor, log::Logger &log) throw():
m_fd(fd),
m_reactor(reactor),
m_log(log),
......@@ -55,16 +55,16 @@ int castor::tape::rmc::ConnectionHandler::getFd() throw() {
//------------------------------------------------------------------------------
// fillPollFd
//------------------------------------------------------------------------------
void castor::tape::rmc::ConnectionHandler::fillPollFd(struct pollfd &fd) throw() {
void castor::tape::rmc::ConnectionHandler::fillPollFd(zmq::pollitem_t &fd) throw() {
fd.fd = m_fd;
fd.events = POLLRDNORM;
fd.revents = 0;
fd.socket = NULL;
}
//------------------------------------------------------------------------------
// handleEvent
//------------------------------------------------------------------------------
bool castor::tape::rmc::ConnectionHandler::handleEvent(const struct pollfd &fd) {
bool castor::tape::rmc::ConnectionHandler::handleEvent(const zmq::pollitem_t &fd) {
std::list<log::Param> params;
params.push_back(log::Param("fd" , fd.fd ));
params.push_back(log::Param("POLLIN" , fd.revents & POLLIN ? "true" : "false"));
......@@ -81,15 +81,6 @@ bool castor::tape::rmc::ConnectionHandler::handleEvent(const struct pollfd &fd)
checkHandleEventFd(fd.fd);
// Do nothing if there is no data to read
//
// POLLIN is unfortuntaley not the logical or of POLLRDNORM and POLLRDBAND
// on SLC 5. I therefore replaced POLLIN with the logical or. I also
// added POLLPRI into the mix to cover all possible types of read event.
if(0 == (fd.revents & POLLRDNORM) && 0 == (fd.revents & POLLRDBAND) &&
0 == (fd.revents & POLLPRI)) {
return false; // Stay registered with the reactor
}
if(!connectionIsAuthorized()) {
return true; // Ask reactor to remove and delete this handler
......
......@@ -24,7 +24,7 @@
#include "castor/io/io.hpp"
#include "castor/io/PollEventHandler.hpp"
#include "castor/io/PollReactor.hpp"
#include "castor/io/ZMQReactor.hpp"
#include "castor/log/Logger.hpp"
#include "castor/legacymsg/CommonMarshal.hpp"
#include "castor/legacymsg/MessageHeader.hpp"
......@@ -38,7 +38,7 @@ namespace rmc {
/**
* Handles the events of a client connection.
*/
class ConnectionHandler: public io::PollEventHandler {
class ConnectionHandler: public io::ZMQPollEventHandler {
public:
/**
......@@ -50,7 +50,7 @@ public:
*/
ConnectionHandler(
const int fd,
io::PollReactor &reactor,
io::ZMQReactor &reactor,
log::Logger &log) throw();
/**
......@@ -62,14 +62,14 @@ public:
* Fills the specified poll file-descriptor ready to be used in a call to
* poll().
*/
void fillPollFd(struct pollfd &fd) throw();
void fillPollFd(zmq::pollitem_t &fd) throw();
/**
* Handles the specified event.
*
* @param fd The poll file-descriptor describing the event.
*/
bool handleEvent(const struct pollfd &fd) ;
bool handleEvent(const zmq::pollitem_t &fd) ;
/**
* Destructor.
......@@ -88,7 +88,7 @@ private:
/**
* The reactor with which this event handler is registered.
*/
io::PollReactor &m_reactor;
io::ZMQReactor &m_reactor;
/**
* The object representing the API of the CASTOR logging system.
......
......@@ -49,7 +49,7 @@ castor::tape::rmc::RmcDaemon::RmcDaemon::RmcDaemon(
std::ostream &stdOut,
std::ostream &stdErr,
log::Logger &log,
io::PollReactor &reactor,
io::ZMQReactor &reactor,
legacymsg::CupvProxy &cupv) :
castor::server::Daemon(stdOut, stdErr, log),
m_reactor(reactor),
......
......@@ -26,7 +26,7 @@
#include "castor/exception/Exception.hpp"
#include "castor/legacymsg/CupvProxy.hpp"
#include "castor/io/PollReactor.hpp"
#include "castor/io/ZMQReactor.hpp"
#include "castor/server/Daemon.hpp"
#include <string>
......@@ -56,7 +56,7 @@ public:
std::ostream &stdOut,
std::ostream &stdErr,
log::Logger &log,
io::PollReactor &reactor,
io::ZMQReactor &reactor,
legacymsg::CupvProxy &cupv) ;
/**
......@@ -190,7 +190,7 @@ protected:
* The reactor responsible for dispatching the file-descriptor event-handlers
* of the rmcd daemon.
*/
io::PollReactor &m_reactor;
io::ZMQReactor &m_reactor;
/**
* Proxy object representing the cupvd daemon.
......
......@@ -23,7 +23,7 @@
*****************************************************************************/
#include "castor/common/CastorConfiguration.hpp"
#include "castor/io/PollReactorImpl.hpp"
#include "castor/io/ZMQReactor.hpp"
#include "castor/legacymsg/CupvProxyTcpIp.hpp"
#include "castor/log/SyslogLogger.hpp"
#include "castor/tape/rmc/RmcDaemon.hpp"
......@@ -91,8 +91,8 @@ int main(const int argc, char **const argv) {
//------------------------------------------------------------------------------
static int exceptionThrowingMain(const int argc, char **const argv, castor::log::Logger &log) {
const std::string cupvHostName = getConfigParam("UPV", "HOST");
castor::io::PollReactorImpl reactor(log);
zmq::context_t ctx;
castor::io::ZMQReactor reactor(log,ctx);
const int netTimeout = 10; // Timeout in seconds
castor::legacymsg::CupvProxyTcpIp cupv(log, cupvHostName, CUPV_PORT, netTimeout);
castor::tape::rmc::RmcDaemon daemon(std::cout, std::cerr, log, reactor, cupv);
......
......@@ -49,7 +49,8 @@ TEST_F(castor_tape_rmc_RmcDaemonTest, constructor) {
std::ostringstream stdOut;
std::ostringstream stdErr;
castor