Commit c131ddde authored by Tigran Mkrtchyan's avatar Tigran Mkrtchyan
Browse files

nfsv41: fix race condition during multiple callbacks

Motivation:
The NFSv4.1 server might decide to send multiple callback to a client.
The parallelism is defined by number of callback slots specified by the
client. As each slot maintain it's own sequence, a new request can be
sent only when the slot is not in use.

Modification:
Introduce a ClientCB$SesionSlot class that represents session slot and
associated sequence. Introduce a blocking queue, that represents
available session slots. When slot is used, then it removed from the
queue and putted back, after callback is complete.

Result:
Multiple callbacks can be issued to the client in parallel without
misordering the requests.

Acked-by: Paul Millar
Target: master
parent 5f38d46b
......@@ -19,6 +19,13 @@
*/
package org.dcache.nfs.v4;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.dcache.nfs.v4.xdr.uint64_t;
import org.dcache.nfs.v4.xdr.nfs_cb_opnum4;
import org.dcache.nfs.v4.xdr.layoutrecall_type4;
......@@ -39,9 +46,6 @@ import org.dcache.nfs.v4.xdr.sessionid4;
import org.dcache.nfs.v4.xdr.layoutrecall4;
import org.dcache.nfs.v4.xdr.referring_call_list4;
import org.dcache.nfs.v4.xdr.nfs_cb_argop4;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.dcache.nfs.nfsstat;
import org.dcache.nfs.v4.xdr.CB_NOTIFY_DEVICEID4args;
import org.dcache.nfs.v4.xdr.bitmap4;
......@@ -88,29 +92,68 @@ public class ClientCB {
private final RpcAuth _auth;
/**
* highest slot id to use
* The highest slot id that can be used.
*/
private final int _highestSlotId;
/**
* requests sequence id
* rpc call to use to communicate with client
*/
private int _sequenceid = 0;
private final RpcCall _rpc;
/** Queue that maintains the available/unused sessions slots. */
private final BlockingQueue<SessionSlot> _unusedSessionSlots = new LinkedBlockingQueue<>();
/** session slot with associated id and sequence. */
private static class SessionSlot {
/** slot id */
private final slotid4 id;
/** requests sequence id */
private int sequenceid = 0;
private SessionSlot(int id) {
this.id = new slotid4(id);
}
slotid4 getId() {
return id;
}
sequenceid4 nextSequenceId() {
return new sequenceid4(++sequenceid);
}
}
/**
* rpc call to use to communicate with client
* Get available session slot, waiting if necessary until a sot becomes available.
*/
private final RpcCall _rpc;
private SessionSlot acquireSlot() throws IOException {
try {
return _unusedSessionSlots.take();
} catch (InterruptedException e) {
InterruptedIOException eio = new InterruptedIOException(e.getMessage());
// preserve the original stacktrace
eio.setStackTrace(e.getStackTrace());
throw eio;
}
}
/** Return session slot into pool of available slots. */
private void releaseSlot(SessionSlot slot) {
_unusedSessionSlots.offer(slot);
}
/**
* @param transport for call-back communication
* @param minorVersion nfs4 protocol minor version used by client.
* @param session associated with the client
* @param highestSlotId highest slot id to use
* @param maxrequests the maximum number of concurrent requests.
* @param program RPC program number to use
* @param sec_parms supported security flavors
*/
ClientCB(RpcTransport transport, int program, int minorVersion, sessionid4 session, int highestSlotId,
ClientCB(RpcTransport transport, int program, int minorVersion, sessionid4 session, int maxrequests,
callback_sec_parms4[] sec_parms) {
_minorVersion = minorVersion;
_session = session;
......@@ -130,7 +173,10 @@ public class ClientCB {
default:
throw new IllegalArgumentException("Unsuppotred security flavor");
}
_highestSlotId = highestSlotId -1;
_highestSlotId = maxrequests -1;
for(int i = 0; i < maxrequests; i++) {
_unusedSessionSlots.add(new SessionSlot(i));
}
_rpc = new RpcCall(program, CB_VERSION, _auth, transport);
}
......@@ -143,14 +189,13 @@ public class ClientCB {
_rpc.call(nfs4_prot.CB_NULL_1, XdrVoid.XDR_VOID, XdrVoid.XDR_VOID, 1, TimeUnit.SECONDS);
}
private XdrAble generateCompound(String tag, nfs_cb_argop4...cbOperations) {
private XdrAble generateCompound(SessionSlot sessionSlot, String tag, nfs_cb_argop4...cbOperations) {
_sequenceid++;
CB_SEQUENCE4args cbSequence = new CB_SEQUENCE4args();
cbSequence.csa_cachethis = false;
cbSequence.csa_highest_slotid = new slotid4(_highestSlotId);
cbSequence.csa_sequenceid = new sequenceid4(_sequenceid);
cbSequence.csa_slotid = new slotid4(0);
cbSequence.csa_sequenceid = sessionSlot.nextSequenceId();
cbSequence.csa_slotid = sessionSlot.getId();
cbSequence.csa_sessionid = _session;
cbSequence.csa_referring_call_lists = new referring_call_list4[0];
......@@ -186,8 +231,13 @@ public class ClientCB {
opArgs.argop = nfs_cb_opnum4.OP_CB_LAYOUTRECALL;
opArgs.opcblayoutrecall = cbLayoutrecall;
XdrAble args = generateCompound("cb_layout_recall_fs", opArgs);
SessionSlot slot = acquireSlot();
try{
XdrAble args = generateCompound(slot,"cb_layout_recall_fs", opArgs);
_rpc.call(nfs4_prot.CB_COMPOUND_1, args, new CB_COMPOUND4res());
} finally {
releaseSlot(slot);
}
}
public void cbLayoutRecallFile(nfs_fh4 fh, stateid4 stateid) throws OncRpcException, IOException {
......@@ -208,11 +258,16 @@ public class ClientCB {
opArgs.argop = nfs_cb_opnum4.OP_CB_LAYOUTRECALL;
opArgs.opcblayoutrecall = cbLayoutrecall;
XdrAble args = generateCompound("cb_layout_recall_file", opArgs);
SessionSlot slot = acquireSlot();
try{
XdrAble args = generateCompound(slot,"cb_layout_recall_file", opArgs);
CB_COMPOUND4res res = new CB_COMPOUND4res();
_rpc.call(nfs4_prot.CB_COMPOUND_1, args, res);
nfsstat.throwIfNeeded(res.status);
} finally {
releaseSlot(slot);
}
}
public void cbDeleteDevice(deviceid4 id) throws OncRpcException, IOException {
......@@ -241,11 +296,16 @@ public class ClientCB {
opArgs.argop = nfs_cb_opnum4.OP_CB_NOTIFY_DEVICEID;
opArgs.opcbnotify_deviceid = cbDeleteDeciveId;
XdrAble args = generateCompound("cb_delete_device", opArgs);
SessionSlot slot = acquireSlot();
try{
XdrAble args = generateCompound(slot,"cb_delete_device", opArgs);
CB_COMPOUND4res res = new CB_COMPOUND4res();
_rpc.call(nfs4_prot.CB_COMPOUND_1, args, res);
nfsstat.throwIfNeeded(res.status);
} finally {
releaseSlot(slot);
}
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment