Commit 55078e33 authored by Tigran Mkrtchyan's avatar Tigran Mkrtchyan
Browse files

rpc: added support for UDP

parent c9f18815
......@@ -26,7 +26,10 @@ import java.util.logging.Level;
import java.util.logging.Logger;
import com.sun.grizzly.Context;
import com.sun.grizzly.filter.ReadFilter;
import com.sun.grizzly.util.OutputWriter;
import java.net.SocketAddress;
import java.nio.channels.DatagramChannel;
public class GrizzlyXdrTransport implements XdrTransport {
......@@ -38,9 +41,22 @@ public class GrizzlyXdrTransport implements XdrTransport {
public GrizzlyXdrTransport(Context context) {
_context = context;
SocketChannel socketChannel = ((SocketChannel)context.getSelectionKey().channel());
_local =(InetSocketAddress) socketChannel.socket().getLocalSocketAddress();
_remote =(InetSocketAddress)socketChannel.socket().getRemoteSocketAddress();
switch(_context.getProtocol()) {
case TCP:
SocketChannel socketChannel = ((SocketChannel)context.getSelectionKey().channel());
_local = (InetSocketAddress) socketChannel.socket().getLocalSocketAddress();
_remote =(InetSocketAddress) socketChannel.socket().getRemoteSocketAddress();
break;
case UDP:
_remote = (InetSocketAddress) _context.getAttribute(ReadFilter.UDP_SOCKETADDRESS);
_local = null;
break;
default:
_local = null;
_remote = null;
_log.log(Level.SEVERE, "Unsupported ptotocol: {0}", _context.getProtocol());
}
_log.log(Level.FINE, "RPC call: remote/local: {0}/{1}", new Object[] { _remote, _local } );
}
......@@ -50,7 +66,18 @@ public class GrizzlyXdrTransport implements XdrTransport {
_log.log(Level.FINE, "reply sent: {0}", data);
SelectableChannel channel = _context.getSelectionKey().channel();
OutputWriter.flushChannel(channel, data);
switch(_context.getProtocol()) {
case TCP:
OutputWriter.flushChannel(channel, data);
break;
case UDP:
DatagramChannel datagramChannel = (DatagramChannel) channel;
SocketAddress address = (SocketAddress) _context.getAttribute(ReadFilter.UDP_SOCKETADDRESS);
OutputWriter.flushChannel(datagramChannel, address, data);
break;
default:
_log.log(Level.SEVERE, "Unsupported ptotocol: {0}", _context.getProtocol());
}
}
......
......@@ -68,11 +68,11 @@ public class OncRpcClient {
selectionKeyHandler.setConnectionCloseHandler(new ConnectionCloseHandler() {
public void locallyClosed(SelectionKey sk) {
System.out.println("Connection closed (locally)");
_log.log(Level.FINE, "Connection closed (locally)");
}
public void remotlyClosed(SelectionKey sk) {
System.out.println("Remote peer closed connection");
_log.log(Level.FINE, "Remote peer closed connection");
}
});
tcp_handler.setSelectionKeyHandler(selectionKeyHandler);
......
......@@ -16,7 +16,6 @@
*/
package org.dcache.xdr;
import com.sun.grizzly.BaseSelectionKeyHandler;
import com.sun.grizzly.Controller;
import com.sun.grizzly.ControllerStateListenerAdapter;
import com.sun.grizzly.DefaultProtocolChain;
......@@ -25,6 +24,7 @@ import com.sun.grizzly.ProtocolChain;
import com.sun.grizzly.ProtocolChainInstanceHandler;
import com.sun.grizzly.ProtocolFilter;
import com.sun.grizzly.TCPSelectorHandler;
import com.sun.grizzly.UDPSelectorHandler;
import com.sun.grizzly.util.DefaultThreadPool;
import java.io.IOException;
import java.util.Map;
......@@ -54,15 +54,19 @@ public class OncRpcSvc {
*/
public OncRpcSvc(int port) {
final ProtocolFilter protocolKeeper = new ProtocolKeeperFilter();
final ProtocolFilter rpcFilter = new RpcParserProtocolFilter();
final ProtocolFilter rpcProcessor = new RpcProtocolFilter();
final ProtocolFilter rpcDispatcher = new RpcDispatcher(_programs);
final TCPSelectorHandler tcp_handler = new TCPSelectorHandler();
final TCPSelectorHandler tcp_handler = new TCPSelectorHandler();
tcp_handler.setPort(port);
tcp_handler.setSelectionKeyHandler(new BaseSelectionKeyHandler());
_controller.addSelectorHandler(tcp_handler);
final UDPSelectorHandler udp_handler = new UDPSelectorHandler();
udp_handler.setPort(port);
_controller.addSelectorHandler(udp_handler);
_controller.addStateListener(
new ControllerStateListenerAdapter() {
......@@ -86,6 +90,7 @@ public class OncRpcSvc {
5);
final ProtocolChain protocolChain = new DefaultProtocolChain();
protocolChain.addFilter(protocolKeeper);
protocolChain.addFilter(rpcFilter);
protocolChain.addFilter(rpcProcessor);
protocolChain.addFilter(rpcDispatcher);
......
/*
* This library is free software; you can redistribute it and/or modify
* it under the terms of the GNU Library General Public License as
* published by the Free Software Foundation; either version 2 of the
* License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this program (see the file COPYING.LIB for more
* details); if not, write to the Free Software Foundation, Inc.,
* 675 Mass Ave, Cambridge, MA 02139, USA.
*/
package org.dcache.xdr;
import com.sun.grizzly.Context;
import com.sun.grizzly.Controller;
import com.sun.grizzly.ProtocolFilter;
import com.sun.grizzly.util.WorkerThread;
import java.io.IOException;
/**
* This is a helper {@link ProtocolFilter} to store
* used protocol (TCP/UDP/SSL) in thread context.
*
* As a workaround of Grizzly limitation.
*/
public class ProtocolKeeperFilter implements ProtocolFilter {
public static final String CONNECTION_PROTOCOL = "ConnectionProtocol";
public boolean execute(Context cntxt) throws IOException {
Controller.Protocol protocol = cntxt.getProtocol();
((WorkerThread)Thread.currentThread()).getAttachment().setAttribute(CONNECTION_PROTOCOL, protocol);
return true;
}
public boolean postExecute(Context cntxt) throws IOException {
((WorkerThread)Thread.currentThread()).getAttachment().removeAttribute(CONNECTION_PROTOCOL);
return true;
}
}
......@@ -17,7 +17,9 @@
package org.dcache.xdr;
import com.sun.grizzly.Controller;
import com.sun.grizzly.ProtocolParser;
import com.sun.grizzly.util.WorkerThread;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
......@@ -101,7 +103,7 @@ public class RpcProtocolPaser implements ProtocolParser<Xdr> {
!_expectingMoreData &&
_buffer.position() > _nextMessageStartPosition;
_log.log(Level.FINEST, "hasMoreBytesToParse {0}, buffer : {1}, next read at: {2}",
_log.log(Level.ALL, "hasMoreBytesToParse {0}, buffer : {1}, next read at: {2}",
new Object[]{rc, _buffer, _nextMessageStartPosition});
return rc;
}
......@@ -135,6 +137,26 @@ public class RpcProtocolPaser implements ProtocolParser<Xdr> {
return false;
}
if( Thread.currentThread() instanceof WorkerThread ) {
/*
* we are runnig inside grizzly
*/
Controller.Protocol protocol = (Controller.Protocol)((WorkerThread)Thread.currentThread()).getAttachment().getAttribute(ProtocolKeeperFilter.CONNECTION_PROTOCOL);
if( protocol != null && protocol == Controller.Protocol.UDP ) {
_log.log(Level.FINEST, "UDP XDR packet");
/*
* UDP packets arriving in one go.
*/
ByteBuffer b = _buffer.duplicate();
b.limit(_buffer.position());
b.position(0);
_nextMessageStartPosition = b.remaining();
_xdr = new XdrBuffer(Xdr.MAX_XDR_SIZE);
_xdr.fill(b);
_expectingMoreData = false;
return true;
}
}
_expectingMoreData = true;
ByteBuffer bytes = _buffer.duplicate();
bytes.position(_nextMessageStartPosition);
......
......@@ -35,7 +35,7 @@ public class Xdr implements XdrDecodingStream, XdrEncodingStream {
/**
* Byte buffer used by XDR record.
*/
private final ByteBuffer _body;
protected final ByteBuffer _body;
/**
* First position in <code>_body</code> which is used by this
......
......@@ -42,6 +42,7 @@ public class XdrBuffer extends Xdr {
@Override
public void beginDecoding() {
_body.rewind();
}
@Override
......@@ -51,11 +52,12 @@ public class XdrBuffer extends Xdr {
@Override
public void beginEncoding() {
_body.clear();
}
@Override
public void endEncoding() {
super.body().flip();
_body.flip();
}
......
......@@ -64,4 +64,14 @@ public class netid {
return (p1 << 8) + p2;
}
public static int idOf(String id) {
if("tcp".equals(id)) {
return IpProtocolType.TCP;
}else if ("udp".equals(id)) {
return IpProtocolType.UDP;
}else{
return -1;
}
}
}
......@@ -20,6 +20,7 @@ package org.dcache.xdr.portmap;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.dcache.xdr.OncRpcClient;
import org.dcache.xdr.OncRpcException;
......@@ -42,6 +43,7 @@ public class GenericPortmapClient implements OncPortmapClient {
// FIXME: return correct exception
throw new IllegalStateException("portmap service not available");
}
_log.log(Level.INFO, "Using portmap V2");
}
_portmapClient = portmapClient;
}
......
......@@ -20,6 +20,7 @@ package org.dcache.xdr.portmap;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.dcache.xdr.OncRpcException;
import org.dcache.xdr.RpcCall;
......@@ -40,6 +41,7 @@ public class OncRpcbindServer implements RpcDispatchable {
public OncRpcbindServer() {
_services.add(new rpcb(100000, 2, "tcp", "0.0.0.0.0.111", "superuser"));
_services.add(new rpcb(100000, 2, "udp", "0.0.0.0.0.111", "superuser"));
//_services.add(new rpcb(100000, 4, "tcp", "0.0.0.0.0.111", "superuser"));
}
public void dispatchOncRpcCall(RpcCall call) throws OncRpcException, IOException {
......@@ -63,6 +65,7 @@ public class OncRpcbindServer implements RpcDispatchable {
call.reply(XdrVoid.XDR_VOID);
break;
case OncRpcPortmap.PMAPPROC_SET:
_log.log(Level.ALL, "Got PMAPPROC_SET");
mapping newMapping = new mapping();
call.retrieveCall(newMapping);
// we sore every thing in v4 format
......
......@@ -99,7 +99,8 @@ public class rpcb implements XdrAble {
}
public mapping toMapping() {
return new mapping(_prog, _vers, 6 , netid.getPort(_addr) );
return new mapping(_prog, _vers, netid.idOf(_netid) , netid.getPort(_addr) );
}
boolean match(rpcb query) {
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment