Commit 7e4179dd authored by Tigran Mkrtchyan's avatar Tigran Mkrtchyan
Browse files

rpc: refactor client reply handling

use RpcReply class by clients and servers
parent c92edf31
/*
* This library is free software; you can redistribute it and/or modify
* it under the terms of the GNU Library General Public License as
* published by the Free Software Foundation; either version 2 of the
* License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this program (see the file COPYING.LIB for more
* details); if not, write to the Free Software Foundation, Inc.,
* 675 Mass Ave, Cambridge, MA 02139, USA.
*/
package org.dcache.xdr;
import com.sun.grizzly.ConnectorHandler;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
public class ClientTransport implements XdrTransport {
private final ConnectorHandler _connectorHandler;
private final ReplyQueue<Integer, RpcReply> _replyQueue;
public ClientTransport(ConnectorHandler connectorHandler ,
ReplyQueue<Integer, RpcReply> replyQueue ) {
_replyQueue = replyQueue;
_connectorHandler = connectorHandler;
}
public void send(ByteBuffer data) throws IOException {
_connectorHandler.write(data, true);
}
public InetSocketAddress getLocalSocketAddress() {
throw new UnsupportedOperationException("Not supported yet.");
}
public InetSocketAddress getRemoteSocketAddress() {
throw new UnsupportedOperationException("Not supported yet.");
}
public ReplyQueue<Integer, RpcReply> getReplyQueue() {
return _replyQueue;
}
}
......@@ -61,4 +61,8 @@ public class GrizzlyXdrTransport implements XdrTransport {
public InetSocketAddress getRemoteSocketAddress() {
return _remote;
}
public ReplyQueue getReplyQueue() {
return null;
}
}
/*
* This library is free software; you can redistribute it and/or modify
* it under the terms of the GNU Library General Public License as
* published by the Free Software Foundation; either version 2 of the
* License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this program (see the file COPYING.LIB for more
* details); if not, write to the Free Software Foundation, Inc.,
* 675 Mass Ave, Cambridge, MA 02139, USA.
*/
package org.dcache.xdr;
public final class IpProtocolType {
private IpProtocolType() {}
public final static int TCP = 6;
public final static int UDP = 17;
public static String toString(int i) {
switch(i) {
case TCP: return "tcp";
case UDP: return "udp";
}
return "Unknown";
}
}
......@@ -27,6 +27,8 @@ public class MismatchInfo implements XdrAble {
_max = max;
}
public MismatchInfo() {}
@Override
public void xdrEncode(XdrEncodingStream xdr) {
xdr.xdrEncodeInt(_min);
......@@ -39,4 +41,9 @@ public class MismatchInfo implements XdrAble {
_max = xdr.xdrDecodeInt();
}
@Override
public String toString() {
return String.format("mismatch info: [%d, %d]", _min, _max);
}
}
......@@ -17,11 +17,11 @@
package org.dcache.xdr;
/**
* A reply to a call message can take on two forms:
* The message was either accepted or rejected.
*/
public interface RpcReplyStats {
public static final int MSG_ACCEPTED = 0;
public static final int MSG_DENIED = 1;
public class OncRpcAcceptedException extends OncRpcException {
private final int _status;
public OncRpcAcceptedException(int status) {
super(RpcAccepsStatus.toString(status));
_status = status;
}
}
/*
* This library is free software; you can redistribute it and/or modify
* it under the terms of the GNU Library General Public License as
* published by the Free Software Foundation; either version 2 of the
* License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this program (see the file COPYING.LIB for more
* details); if not, write to the Free Software Foundation, Inc.,
* 675 Mass Ave, Cambridge, MA 02139, USA.
*/
package org.dcache.xdr;
import com.sun.grizzly.BaseSelectionKeyHandler;
import com.sun.grizzly.CallbackHandler;
import com.sun.grizzly.Context;
import com.sun.grizzly.Controller;
import com.sun.grizzly.ControllerStateListenerAdapter;
import com.sun.grizzly.DefaultProtocolChain;
import com.sun.grizzly.DefaultProtocolChainInstanceHandler;
import com.sun.grizzly.ProtocolChain;
import com.sun.grizzly.ProtocolChainInstanceHandler;
import com.sun.grizzly.ProtocolFilter;
import com.sun.grizzly.TCPConnectorHandler;
import com.sun.grizzly.TCPSelectorHandler;
import com.sun.grizzly.util.ConnectionCloseHandler;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.util.concurrent.CountDownLatch;
import java.util.logging.Level;
import java.util.logging.Logger;
public class OncRpcClient {
private final static Logger _log = Logger.getLogger(OncRpcClient.class.getName());
private final CountDownLatch clientReady = new CountDownLatch(1);
private final Controller controller = new Controller();
private int _port;
private final InetAddress _address;
private final ReplyQueue<Integer, RpcReply> _replyQueue =
new ReplyQueue<Integer, RpcReply>();
public OncRpcClient(InetAddress address, int protocol) {
_address = address;
init(address, protocol);
}
public OncRpcClient(InetAddress address, int protocol, int port) {
_address = address;
_port = port;
init(address, protocol);
}
private void init(InetAddress address, int protocol) {
TCPSelectorHandler tcp_handler = new TCPSelectorHandler(true);
BaseSelectionKeyHandler selectionKeyHandler = new BaseSelectionKeyHandler();
selectionKeyHandler.setConnectionCloseHandler(new ConnectionCloseHandler() {
public void locallyClosed(SelectionKey sk) {
System.out.println("Connection closed (locally)");
}
public void remotlyClosed(SelectionKey sk) {
System.out.println("Remote peer closed connection");
}
});
tcp_handler.setSelectionKeyHandler(selectionKeyHandler);
controller.addSelectorHandler(tcp_handler);
controller.addStateListener(
new ControllerStateListenerAdapter() {
@Override
public void onReady() {
clientReady.countDown();
_log.log(Level.INFO, "Client ready");
}
@Override
public void onException(Throwable e) {
_log.log(Level.SEVERE, "Grizzly controller exception:" + e.getMessage());
}
});
final ProtocolFilter rpcFilter = new RpcParserProtocolFilter();
final ProtocolFilter rpcProcessor = new RpcProtocolFilter(_replyQueue);
final ProtocolChain protocolChain = new DefaultProtocolChain();
protocolChain.addFilter(rpcFilter);
protocolChain.addFilter(rpcProcessor);
ProtocolChainInstanceHandler pciHandler = new DefaultProtocolChainInstanceHandler() {
@Override
public ProtocolChain poll() {
return protocolChain;
}
@Override
public boolean offer(ProtocolChain pc) {
return false;
}
};
controller.setProtocolChainInstanceHandler(pciHandler);
}
public XdrTransport connect() throws IOException {
new Thread(controller).start();
try{
clientReady.await();
}catch(InterruptedException e) {
_log.log(Level.SEVERE, "client initialization interrupted");
throw new IOException(e.getMessage());
}
final TCPConnectorHandler connector_handler;
connector_handler = (TCPConnectorHandler) controller.acquireConnectorHandler(Controller.Protocol.TCP);
connector_handler.connect(new InetSocketAddress(_address, _port), (CallbackHandler<Context>) null);
return new ClientTransport(connector_handler, _replyQueue);
}
public void close() throws IOException {
controller.stop();
}
}
......@@ -17,14 +17,11 @@
package org.dcache.xdr;
public abstract class RpcAcceptedReply extends RpcReply {
public class OncRpcRejectedException extends OncRpcException {
private final int _status;
/* (non-Javadoc)
* @see org.dcache.xdr.RpcReply#xdrEncode(org.dcache.xdr.Xdr)
*/
@Override
public void xdrEncode(XdrEncodingStream xdr) {
super.xdrEncode(xdr);
public OncRpcRejectedException(int status) {
super(RpcRejectStatus.toString(status));
_status = status;
}
}
\ No newline at end of file
}
/*
* This library is free software; you can redistribute it and/or modify
* it under the terms of the GNU Library General Public License as
* published by the Free Software Foundation; either version 2 of the
* License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this program (see the file COPYING.LIB for more
* details); if not, write to the Free Software Foundation, Inc.,
* 675 Mass Ave, Cambridge, MA 02139, USA.
*/
package org.dcache.xdr;
import java.util.HashMap;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
public class ReplyQueue<K, V> {
private final static Logger _log = Logger.getLogger(ReplyQueue.class.getName());
private final Map<K, V> _queue = new HashMap<K, V>();
/**
* Create a placeholder for specified key.
* @param key
*/
public synchronized void registerKey(K key) {
_log.log(Level.FINEST, "Registering key {0}", key);
_queue.put(key, null);
}
/**
* Put the value into Queue only and only if key is registered.
* @param key
* @param value
*/
public synchronized void put(K key, V value) {
_log.log(Level.FINEST, "updating key {0}", key);
if (_queue.containsKey(key)) {
_queue.put(key, value);
notifyAll();
}
}
/**
* Get value for defined key. The call will block if value is not available yet.
* On completion key will be unregistered.
*
* @param key
* @return value
* @throws InterruptedException
* @throws IllegalArgumentException if key is not registered.
*/
public synchronized V get(K key) throws InterruptedException {
_log.log(Level.FINEST, "query key {0}", key);
if (!_queue.containsKey(key)) {
throw new IllegalArgumentException("defined key does not exist: " + key);
}
while (_queue.get(key) == null) {
wait();
}
return _queue.remove(key);
}
/**
* Get value for defined key. The call will block up to defined timeout
* if value is not available yet. On completion key will be unregistered.
*
* @param key
* @param timeout in milliseconds
* @return value or null if timeout expired.
* @throwns IllegalArgumentException if key is not registered.
*/
public synchronized V get(K key, int timeout) throws InterruptedException {
_log.log(Level.FINEST, "query key {0} with timeout", key);
if (!_queue.containsKey(key)) {
throw new IllegalArgumentException("defined key does not exist: " + key);
}
long timeToWait = timeout;
long deadline = System.currentTimeMillis() + timeout;
while (timeToWait > 0 && _queue.get(key) == null) {
wait(timeToWait);
timeToWait = deadline - System.currentTimeMillis();
}
return _queue.remove(key);
}
}
......@@ -21,7 +21,9 @@ package org.dcache.xdr;
* Given that a call message was accepted, the following is the
* status of an attempt to call a remote procedure.
*/
public interface RpcAccepsStatus {
public final class RpcAccepsStatus {
private RpcAccepsStatus(){}
/**
* RPC executed successfully
......@@ -43,4 +45,20 @@ public interface RpcAccepsStatus {
* Procedure can't decode params.
*/
public static final int GARBAGE_ARGS = 4;
/**
* Undefined system error
*/
public static final int SYSTEM = 5;
public static String toString(int status) {
switch(status) {
case SUCCESS: return "SUCCESS";
case PROG_UNAVAIL: return "PROG_UNAVIAL";
case PROG_MISMATCH: return "PROG_MISMATCH";
case PROC_UNAVAIL: return "PROC_UNAVAIL";
case GARBAGE_ARGS: return "GARBAGE_ARGS";
case SYSTEM: return "SYSTEM";
}
return "UNKNOWN";
}
}
......@@ -17,18 +17,16 @@
package org.dcache.xdr;
public class RpcAuthMissmatch extends RpcRejectedReply {
public class RpcAuthMissmatch extends OncRpcException {
private final int _status;
public RpcAuthMissmatch(int status) {
super(RpcAuthStat.toString(status));
_status = status;
}
@Override
public void xdrEncode(XdrEncodingStream xdr) {
xdr.xdrEncodeInt(RpcRejectStatus.AUTH_ERROR);
xdr.xdrEncodeInt(_status);
public int getAuthError() {
return _status;
}
}
......@@ -21,20 +21,60 @@ package org.dcache.xdr;
*
* Why authentication failed.
*/
public interface RpcAuthStat {
public final class RpcAuthStat {
private RpcAuthStat() {}
/*
* failed on remote end
*/
public static final int AUTH_OK = 0; /* success */
public static final int AUTH_BADCRED = 1; /* bad credential (seal broken) */
public static final int AUTH_REJECTEDCRED = 2; /* client must begin new session */
public static final int AUTH_BADVERF = 3; /* bad verifier (seal broken) */
public static final int AUTH_REJECTEDVERF = 4; /* verifier expired or replayed */
public static final int AUTH_TOOWEAK = 5; /* rejected for security reasons */
public static final int AUTH_OK = 0; /* success */
public static final int AUTH_BADCRED = 1; /* bad credential (seal broken) */
public static final int AUTH_REJECTEDCRED = 2; /* client must begin new session */
public static final int AUTH_BADVERF = 3; /* bad verifier (seal broken) */
public static final int AUTH_REJECTEDVERF = 4; /* verifier expired or replayed */
public static final int AUTH_TOOWEAK = 5; /* rejected for security reasons */
/*
* failed locally
*/
public static final int AUTH_INVALIDRESP = 6; /* bogus response verifier */
public static final int AUTH_FAILED = 7; /* reason unknown */
public static final int AUTH_INVALIDRESP = 6; /* bogus response verifier */
public static final int AUTH_FAILED = 7; /* reason unknown */
/*
* AUTH_KERB errors; deprecated. See [RFC2695]
*/
public static final int AUTH_KERB_GENERIC = 8; /* kerberos generic error */
public static final int AUTH_TIMEEXPIRE = 9; /* time of credential expired */
public static final int AUTH_TKT_FILE = 10; /* problem with ticket file */
public static final int AUTH_DECODE = 11; /* can't decode authenticator */
public static final int AUTH_NET_ADDR = 12; /* wrong net address in ticket */
/*
* RPCSEC_GSS GSS related errors
*/
public static final int RPCSEC_GSS_CREDPROBLEM = 13; /* no credentials for user */
public static final int RPCSEC_GSS_CTXPROBLEM = 14; /* problem with context */
public static String toString(int i) {
switch(i) {
case AUTH_OK: return "OK";
case AUTH_BADCRED: return "AUTH_BADCRED";
case AUTH_REJECTEDCRED: return "AUTH_REJECTEDCRED";
case AUTH_BADVERF: return "AUTH_BADVERF";
case AUTH_REJECTEDVERF: return "AUTH_REJECTEDVERF";
case AUTH_TOOWEAK: return "AUTH_TOOWEAK";
case AUTH_INVALIDRESP: return "AUTH_INVALIDRESP";
case AUTH_FAILED: return "AUTH_FAILED";
case AUTH_KERB_GENERIC: return "AUTH_KERB_GENERIC";
case AUTH_TIMEEXPIRE: return "AUTH_TIMEEXPIRE";
case AUTH_TKT_FILE: return "AUTH_TKT_FILE";
case AUTH_DECODE: return "AUTH_DECODE";
case AUTH_NET_ADDR: return "AUTH_NET_ADDR";
case RPCSEC_GSS_CREDPROBLEM: return "RPCSEC_GSS_CREDPROBLEM";
case RPCSEC_GSS_CTXPROBLEM: return "RPCSEC_GSS_CTXPROBLEM";
}
return "Unknow state " + i;
}
}
......@@ -33,6 +33,16 @@ public class RpcAuthTypeUnix implements RpcAuth, XdrAble {
private final static Logger _log = Logger.getLogger(RpcAuthTypeUnix.class.getName());
public RpcAuthTypeUnix() {}
public RpcAuthTypeUnix(int uid, int gid, int[] gids, int stamp, String machine) {
_uid = uid;
_gid = gid;
_gids = gids;
_stamp = stamp;
_machine = machine;
}
public void xdrDecode(XdrDecodingStream xdr) {
_len = xdr.xdrDecodeInt();
......
......@@ -19,6 +19,7 @@ package org.dcache.xdr;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
......@@ -26,6 +27,13 @@ public class RpcCall {
private final static Logger _log = Logger.getLogger(RpcCall.class.getName());
/**
* XID number generator
*/
private final static AtomicInteger NEXT_XID = new AtomicInteger(0);
private int _xid;
/**
* Supported RPC protocol version
*/
......@@ -51,83 +59,75 @@ public class RpcCall {
*/
private int _rpcvers;
private RpcAuth _authVerf;
private RpcAuth _auth;
/**
* Authentication credential.
*/
private RpcAuth _cred;
/**
* transport to reply messages back
* Authentication verifier.
*/
private final XdrTransport _transport;
private RpcAuth _verf;
/**
* RPC request id
* RPC call transport.
*/
private int _xid;
private final XdrTransport _transport;
/**
* xdr engine
* Call body.
*/
private Xdr _xdr;
private final Xdr _xdr;
public RpcCall(int xid, XdrTransport transport) {
public RpcCall(int prog, int ver, RpcAuth auth, RpcAuth verif, XdrTransport transport) {
_prog = prog;
_version = ver;
_cred = auth;
_verf = verif;
_transport = transport;
_xid = xid;
_xdr = new Xdr(Xdr.MAX_XDR_SIZE);