Commit e977e574 authored by Tigran Mkrtchyan's avatar Tigran Mkrtchyan
Browse files

nfs4: pull-out client session slot handling into a dedicated class

Motivation:
client session slot handling is not only used by call-back but as well
by a regular client and proxy-io client inside dCache. Thus, this functionality
should be shared to reduce code duplication.

Modification:
introduce ClientSession class that does the session slot management.

Result:
client side session slot management can be shared with other classes.

Acked-by: Paul Millar
Acked-by: Lea Morschel
Target: master
parent cd7aa48f
...@@ -20,45 +20,40 @@ ...@@ -20,45 +20,40 @@
package org.dcache.nfs.v4; package org.dcache.nfs.v4;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import org.dcache.nfs.nfsstat;
import org.dcache.nfs.v4.xdr.uint64_t; import org.dcache.nfs.v4.xdr.CB_COMPOUND4args;
import org.dcache.nfs.v4.xdr.nfs_cb_opnum4;
import org.dcache.nfs.v4.xdr.layoutrecall_type4;
import org.dcache.nfs.v4.xdr.callback_sec_parms4;
import org.dcache.nfs.v4.xdr.fsid4;
import org.dcache.nfs.v4.xdr.nfs4_prot;
import org.dcache.nfs.v4.xdr.CB_COMPOUND4res; import org.dcache.nfs.v4.xdr.CB_COMPOUND4res;
import org.dcache.nfs.v4.xdr.sequenceid4;
import org.dcache.nfs.v4.xdr.slotid4;
import org.dcache.nfs.v4.xdr.CB_SEQUENCE4args;
import org.dcache.nfs.v4.xdr.utf8str_cs;
import org.dcache.nfs.v4.xdr.layoutiomode4;
import org.dcache.nfs.v4.xdr.uint32_t;
import org.dcache.nfs.v4.xdr.layouttype4;
import org.dcache.nfs.v4.xdr.CB_LAYOUTRECALL4args; import org.dcache.nfs.v4.xdr.CB_LAYOUTRECALL4args;
import org.dcache.nfs.v4.xdr.CB_COMPOUND4args;
import org.dcache.nfs.v4.xdr.sessionid4;
import org.dcache.nfs.v4.xdr.layoutrecall4;
import org.dcache.nfs.v4.xdr.referring_call_list4;
import org.dcache.nfs.v4.xdr.nfs_cb_argop4;
import org.dcache.nfs.nfsstat;
import org.dcache.nfs.v4.xdr.CB_NOTIFY_DEVICEID4args; import org.dcache.nfs.v4.xdr.CB_NOTIFY_DEVICEID4args;
import org.dcache.nfs.v4.xdr.CB_SEQUENCE4args;
import org.dcache.nfs.v4.xdr.bitmap4; import org.dcache.nfs.v4.xdr.bitmap4;
import org.dcache.nfs.v4.xdr.callback_sec_parms4;
import org.dcache.nfs.v4.xdr.deviceid4; import org.dcache.nfs.v4.xdr.deviceid4;
import org.dcache.nfs.v4.xdr.fsid4;
import org.dcache.nfs.v4.xdr.layoutiomode4;
import org.dcache.nfs.v4.xdr.layoutrecall4;
import org.dcache.nfs.v4.xdr.layoutrecall_file4; import org.dcache.nfs.v4.xdr.layoutrecall_file4;
import org.dcache.nfs.v4.xdr.layoutrecall_type4;
import org.dcache.nfs.v4.xdr.layouttype4;
import org.dcache.nfs.v4.xdr.length4; import org.dcache.nfs.v4.xdr.length4;
import org.dcache.nfs.v4.xdr.nfs4_prot;
import org.dcache.nfs.v4.xdr.nfs_cb_argop4;
import org.dcache.nfs.v4.xdr.nfs_cb_opnum4;
import org.dcache.nfs.v4.xdr.nfs_fh4; import org.dcache.nfs.v4.xdr.nfs_fh4;
import org.dcache.nfs.v4.xdr.notify4; import org.dcache.nfs.v4.xdr.notify4;
import org.dcache.nfs.v4.xdr.notify_deviceid_delete4; import org.dcache.nfs.v4.xdr.notify_deviceid_delete4;
import org.dcache.nfs.v4.xdr.notify_deviceid_type4; import org.dcache.nfs.v4.xdr.notify_deviceid_type4;
import org.dcache.nfs.v4.xdr.notifylist4; import org.dcache.nfs.v4.xdr.notifylist4;
import org.dcache.nfs.v4.xdr.offset4; import org.dcache.nfs.v4.xdr.offset4;
import org.dcache.nfs.v4.xdr.referring_call_list4;
import org.dcache.nfs.v4.xdr.sessionid4;
import org.dcache.nfs.v4.xdr.slotid4;
import org.dcache.nfs.v4.xdr.stateid4; import org.dcache.nfs.v4.xdr.stateid4;
import org.dcache.nfs.v4.xdr.uint32_t;
import org.dcache.nfs.v4.xdr.uint64_t;
import org.dcache.nfs.v4.xdr.utf8str_cs;
import org.dcache.oncrpc4j.rpc.OncRpcException; import org.dcache.oncrpc4j.rpc.OncRpcException;
import org.dcache.oncrpc4j.rpc.RpcAuth; import org.dcache.oncrpc4j.rpc.RpcAuth;
import org.dcache.oncrpc4j.rpc.RpcAuthType; import org.dcache.oncrpc4j.rpc.RpcAuthType;
...@@ -101,49 +96,9 @@ public class ClientCB { ...@@ -101,49 +96,9 @@ public class ClientCB {
*/ */
private final RpcCall _rpc; private final RpcCall _rpc;
/** Queue that maintains the available/unused sessions slots. */
private final BlockingQueue<SessionSlot> _unusedSessionSlots = new LinkedBlockingQueue<>();
/** session slot with associated id and sequence. */
private static class SessionSlot {
/** slot id */
private final slotid4 id;
/** requests sequence id */
private int sequenceid = 0;
private SessionSlot(int id) {
this.id = new slotid4(id);
}
slotid4 getId() {
return id;
}
sequenceid4 nextSequenceId() {
return new sequenceid4(++sequenceid);
}
}
/** Session associated with this callback channel */
/** private final ClientSession _clientSession;
* Get available session slot, waiting if necessary until a sot becomes available.
*/
private SessionSlot acquireSlot() throws IOException {
try {
return _unusedSessionSlots.take();
} catch (InterruptedException e) {
InterruptedIOException eio = new InterruptedIOException(e.getMessage());
// preserve the original stacktrace
eio.setStackTrace(e.getStackTrace());
throw eio;
}
}
/** Return session slot into pool of available slots. */
private void releaseSlot(SessionSlot slot) {
_unusedSessionSlots.offer(slot);
}
/** /**
* @param transport for call-back communication * @param transport for call-back communication
...@@ -173,10 +128,9 @@ public class ClientCB { ...@@ -173,10 +128,9 @@ public class ClientCB {
default: default:
throw new IllegalArgumentException("Unsuppotred security flavor"); throw new IllegalArgumentException("Unsuppotred security flavor");
} }
_highestSlotId = maxrequests -1;
for(int i = 0; i < maxrequests; i++) { _highestSlotId = maxrequests - 1;
_unusedSessionSlots.add(new SessionSlot(i)); _clientSession = new ClientSession(session, maxrequests);
}
_rpc = new RpcCall(program, CB_VERSION, _auth, transport); _rpc = new RpcCall(program, CB_VERSION, _auth, transport);
} }
...@@ -189,7 +143,7 @@ public class ClientCB { ...@@ -189,7 +143,7 @@ public class ClientCB {
_rpc.call(nfs4_prot.CB_NULL_1, XdrVoid.XDR_VOID, XdrVoid.XDR_VOID, 1, TimeUnit.SECONDS); _rpc.call(nfs4_prot.CB_NULL_1, XdrVoid.XDR_VOID, XdrVoid.XDR_VOID, 1, TimeUnit.SECONDS);
} }
private XdrAble generateCompound(SessionSlot sessionSlot, String tag, nfs_cb_argop4...cbOperations) { private XdrAble generateCompound(ClientSession.SessionSlot sessionSlot, String tag, nfs_cb_argop4...cbOperations) {
CB_SEQUENCE4args cbSequence = new CB_SEQUENCE4args(); CB_SEQUENCE4args cbSequence = new CB_SEQUENCE4args();
cbSequence.csa_cachethis = false; cbSequence.csa_cachethis = false;
...@@ -231,12 +185,12 @@ public class ClientCB { ...@@ -231,12 +185,12 @@ public class ClientCB {
opArgs.argop = nfs_cb_opnum4.OP_CB_LAYOUTRECALL; opArgs.argop = nfs_cb_opnum4.OP_CB_LAYOUTRECALL;
opArgs.opcblayoutrecall = cbLayoutrecall; opArgs.opcblayoutrecall = cbLayoutrecall;
SessionSlot slot = acquireSlot(); var slot = _clientSession.acquireSlot();
try{ try{
XdrAble args = generateCompound(slot,"cb_layout_recall_fs", opArgs); XdrAble args = generateCompound(slot,"cb_layout_recall_fs", opArgs);
_rpc.call(nfs4_prot.CB_COMPOUND_1, args, new CB_COMPOUND4res()); _rpc.call(nfs4_prot.CB_COMPOUND_1, args, new CB_COMPOUND4res());
} finally { } finally {
releaseSlot(slot); _clientSession.releaseSlot(slot);
} }
} }
...@@ -258,7 +212,7 @@ public class ClientCB { ...@@ -258,7 +212,7 @@ public class ClientCB {
opArgs.argop = nfs_cb_opnum4.OP_CB_LAYOUTRECALL; opArgs.argop = nfs_cb_opnum4.OP_CB_LAYOUTRECALL;
opArgs.opcblayoutrecall = cbLayoutrecall; opArgs.opcblayoutrecall = cbLayoutrecall;
SessionSlot slot = acquireSlot(); var slot = _clientSession.acquireSlot();
try{ try{
XdrAble args = generateCompound(slot,"cb_layout_recall_file", opArgs); XdrAble args = generateCompound(slot,"cb_layout_recall_file", opArgs);
...@@ -266,7 +220,7 @@ public class ClientCB { ...@@ -266,7 +220,7 @@ public class ClientCB {
_rpc.call(nfs4_prot.CB_COMPOUND_1, args, res); _rpc.call(nfs4_prot.CB_COMPOUND_1, args, res);
nfsstat.throwIfNeeded(res.status); nfsstat.throwIfNeeded(res.status);
} finally { } finally {
releaseSlot(slot); _clientSession.releaseSlot(slot);
} }
} }
...@@ -296,7 +250,7 @@ public class ClientCB { ...@@ -296,7 +250,7 @@ public class ClientCB {
opArgs.argop = nfs_cb_opnum4.OP_CB_NOTIFY_DEVICEID; opArgs.argop = nfs_cb_opnum4.OP_CB_NOTIFY_DEVICEID;
opArgs.opcbnotify_deviceid = cbDeleteDeciveId; opArgs.opcbnotify_deviceid = cbDeleteDeciveId;
SessionSlot slot = acquireSlot(); var slot = _clientSession.acquireSlot();
try{ try{
XdrAble args = generateCompound(slot,"cb_delete_device", opArgs); XdrAble args = generateCompound(slot,"cb_delete_device", opArgs);
...@@ -304,7 +258,7 @@ public class ClientCB { ...@@ -304,7 +258,7 @@ public class ClientCB {
_rpc.call(nfs4_prot.CB_COMPOUND_1, args, res); _rpc.call(nfs4_prot.CB_COMPOUND_1, args, res);
nfsstat.throwIfNeeded(res.status); nfsstat.throwIfNeeded(res.status);
} finally { } finally {
releaseSlot(slot); _clientSession.releaseSlot(slot);
} }
} }
......
/*
* Copyright (c) 2020 Deutsches Elektronen-Synchroton,
* Member of the Helmholtz Association, (DESY), HAMBURG, GERMANY
*
* This library is free software; you can redistribute it and/or modify
* it under the terms of the GNU Library General Public License as
* published by the Free Software Foundation; either version 2 of the
* License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this program (see the file COPYING.LIB for more
* details); if not, write to the Free Software Foundation, Inc.,
* 675 Mass Ave, Cambridge, MA 02139, USA.
*/
package org.dcache.nfs.v4;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.dcache.nfs.v4.xdr.sequenceid4;
import org.dcache.nfs.v4.xdr.sessionid4;
import org.dcache.nfs.v4.xdr.slotid4;
/**
* NFSv4.1 session on the client side.
*/
public class ClientSession {
/**
* session it provided by the server
*/
private final sessionid4 id;
/**
* maximal number of concurrent requests the client can sent to on this
* session
*/
private final int maxRequests;
/**
* Queue that maintains the available/unused sessions slots.
*/
private final BlockingQueue<SessionSlot> unusedSessionSlots = new LinkedBlockingQueue<>();
/**
* session slot with associated id and sequence.
*/
public static class SessionSlot {
/**
* slot id
*/
private final slotid4 id;
/**
* requests sequence id
*/
private int sequenceid = 0;
public SessionSlot(int id) {
this.id = new slotid4(id);
}
public slotid4 getId() {
return id;
}
public sequenceid4 nextSequenceId() {
return new sequenceid4(++sequenceid);
}
}
/**
* Create a new client side session with given is and maximal number of concurrent requests.
* @param sessionid session id generated by server.
* @param maxRequests maximal number of concurrent requests.
*/
public ClientSession(sessionid4 sessionid, int maxRequests) {
this.id = sessionid;
this.maxRequests = maxRequests;
for (int i = 0; i < maxRequests; i++) {
unusedSessionSlots.add(new SessionSlot(i));
}
}
/**
* Get available session slot, waiting if necessary until a sot becomes
* available.
*/
public SessionSlot acquireSlot() throws IOException {
try {
return unusedSessionSlots.take();
} catch (InterruptedException e) {
InterruptedIOException eio = new InterruptedIOException(e.getMessage());
// preserve the original stacktrace
eio.setStackTrace(e.getStackTrace());
throw eio;
}
}
/**
* Return session slot into pool of available slots.
*/
public void releaseSlot(SessionSlot slot) {
unusedSessionSlots.offer(slot);
}
/**
* Get the session id.
* @return session id.
*/
public sessionid4 sessionId() {
return id;
}
/**
* Get the maximal number of concurrent requests that can be sent to the server.
* @return maximal number of concurrent requests.
*/
public int maxRequests() {
return maxRequests;
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment