Commit b434aae6 authored by Tigran Mkrtchyan's avatar Tigran Mkrtchyan
Browse files

rpc: added udp support for clients

parent 55078e33
......@@ -18,12 +18,17 @@
package org.dcache.xdr;
import com.sun.grizzly.ConnectorHandler;
import com.sun.grizzly.Controller;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.logging.Level;
import java.util.logging.Logger;
public class ClientTransport implements XdrTransport {
private final static Logger _log = Logger.getLogger(ClientTransport.class.getName());
private final ConnectorHandler _connectorHandler;
private final ReplyQueue<Integer, RpcReply> _replyQueue;
......@@ -34,7 +39,12 @@ public class ClientTransport implements XdrTransport {
}
public void send(ByteBuffer data) throws IOException {
_connectorHandler.write(data, true);
if( _connectorHandler.protocol() == Controller.Protocol.UDP ) {
// skip fragment marker
data.getInt();
}
long n = _connectorHandler.write(data, true);
_log.log(Level.FINEST, "Send {0} bytes", n);
}
public InetSocketAddress getLocalSocketAddress() {
......
......@@ -18,8 +18,7 @@
package org.dcache.xdr;
import com.sun.grizzly.BaseSelectionKeyHandler;
import com.sun.grizzly.CallbackHandler;
import com.sun.grizzly.Context;
import com.sun.grizzly.ConnectorHandler;
import com.sun.grizzly.Controller;
import com.sun.grizzly.ControllerStateListenerAdapter;
import com.sun.grizzly.DefaultProtocolChain;
......@@ -27,8 +26,8 @@ import com.sun.grizzly.DefaultProtocolChainInstanceHandler;
import com.sun.grizzly.ProtocolChain;
import com.sun.grizzly.ProtocolChainInstanceHandler;
import com.sun.grizzly.ProtocolFilter;
import com.sun.grizzly.TCPConnectorHandler;
import com.sun.grizzly.TCPSelectorHandler;
import com.sun.grizzly.UDPSelectorHandler;
import com.sun.grizzly.util.ConnectionCloseHandler;
import java.io.IOException;
import java.net.InetAddress;
......@@ -44,25 +43,23 @@ public class OncRpcClient {
private final CountDownLatch clientReady = new CountDownLatch(1);
private final Controller controller = new Controller();
private int _port;
private final int _port;
private final InetAddress _address;
private Controller.Protocol _prorocol;
private final ReplyQueue<Integer, RpcReply> _replyQueue =
new ReplyQueue<Integer, RpcReply>();
public OncRpcClient(InetAddress address, int protocol) {
_address = address;
init(address, protocol);
}
public OncRpcClient(InetAddress address, int protocol, int port) {
_address = address;
_port = port;
init(address, protocol);
}
private void init(InetAddress address, int protocol) {
TCPSelectorHandler tcp_handler = new TCPSelectorHandler(true);
if( protocol == IpProtocolType.TCP ) {
_prorocol = Controller.Protocol.TCP;
} else if ( protocol == IpProtocolType.UDP ) {
_prorocol = Controller.Protocol.UDP;
}else {
throw new IllegalArgumentException("Unsupported protocol type: " + protocol);
}
BaseSelectionKeyHandler selectionKeyHandler = new BaseSelectionKeyHandler();
selectionKeyHandler.setConnectionCloseHandler(new ConnectionCloseHandler() {
......@@ -75,10 +72,16 @@ public class OncRpcClient {
_log.log(Level.FINE, "Remote peer closed connection");
}
});
tcp_handler.setSelectionKeyHandler(selectionKeyHandler);
final TCPSelectorHandler tcp_handler = new TCPSelectorHandler(true);
tcp_handler.setSelectionKeyHandler(selectionKeyHandler);
controller.addSelectorHandler(tcp_handler);
final UDPSelectorHandler udp_handler = new UDPSelectorHandler(true);
udp_handler.setSelectionKeyHandler(selectionKeyHandler);
controller.addSelectorHandler(udp_handler);
controller.addStateListener(
new ControllerStateListenerAdapter() {
......@@ -93,11 +96,12 @@ public class OncRpcClient {
_log.log(Level.SEVERE, "Grizzly controller exception:" + e.getMessage());
}
});
final ProtocolFilter protocolKeeper = new ProtocolKeeperFilter();
final ProtocolFilter rpcFilter = new RpcParserProtocolFilter();
final ProtocolFilter rpcProcessor = new RpcProtocolFilter(_replyQueue);
final ProtocolChain protocolChain = new DefaultProtocolChain();
protocolChain.addFilter(protocolKeeper);
protocolChain.addFilter(rpcFilter);
protocolChain.addFilter(rpcProcessor);
......@@ -129,10 +133,10 @@ public class OncRpcClient {
throw new IOException(e.getMessage());
}
final TCPConnectorHandler connector_handler;
connector_handler = (TCPConnectorHandler) controller.acquireConnectorHandler(Controller.Protocol.TCP);
connector_handler.connect(new InetSocketAddress(_address, _port), (CallbackHandler<Context>) null);
final ConnectorHandler connector_handler;
connector_handler = controller.acquireConnectorHandler(_prorocol);
connector_handler.connect(new InetSocketAddress(_address, _port));
if( !connector_handler.isConnected() ) {
throw new IOException("Failed to connect");
......
......@@ -293,6 +293,10 @@ public class RpcCall {
RpcReply reply;
try {
reply = _transport.getReplyQueue().get(xid, timeout);
if( reply == null ) {
_log.log(Level.INFO, "Did not get reply in time");
throw new IOException("Did not get reply in time");
}
} catch (InterruptedException e) {
_log.log(Level.SEVERE, "call processing interrupted");
throw new IOException(e.getMessage());
......
......@@ -22,6 +22,7 @@ import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.dcache.xdr.IpProtocolType;
import org.dcache.xdr.OncRpcClient;
import org.dcache.xdr.OncRpcException;
import org.dcache.xdr.RpcAuthTypeNone;
......@@ -62,7 +63,7 @@ public class GenericPortmapClient implements OncPortmapClient {
public static void main(String[] args) throws UnknownHostException, InterruptedException, IOException, OncRpcException {
OncRpcClient rpcClient = new OncRpcClient(InetAddress.getByName("127.0.0.1"), 0, 111);
OncRpcClient rpcClient = new OncRpcClient(InetAddress.getByName("127.0.0.1"), IpProtocolType.UDP, 111);
XdrTransport transport = rpcClient.connect();
OncPortmapClient portmapClient = new GenericPortmapClient(transport);
......
......@@ -20,6 +20,7 @@ package org.dcache.xdr.portmap;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import org.dcache.xdr.IpProtocolType;
import org.dcache.xdr.OncRpcClient;
import org.dcache.xdr.OncRpcException;
import org.dcache.xdr.OncRpcSvc;
......@@ -45,7 +46,7 @@ public class OncRpcEmbeddedPortmap {
OncRpcClient rpcClient = null;
boolean localPortmapperRunning = false;
try {
rpcClient = new OncRpcClient(InetAddress.getLocalHost(), 0, OncRpcPortmap.PORTMAP_PORT);
rpcClient = new OncRpcClient(InetAddress.getLocalHost(), IpProtocolType.UDP, OncRpcPortmap.PORTMAP_PORT);
XdrTransport transport = rpcClient.connect();
/* check for version 2, 3 and 4 */
for (int i = 2; i < 5; i++) {
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment