Commit 55410441 authored by Tigran Mkrtchyan's avatar Tigran Mkrtchyan
Browse files

nfsv42: update OperationCOPY to support async copy

Motivation:
For a big files client might request an async copy. Moreover, the server
itself might decide to switch to async mode if requests takes too long.
Thus the server should be ready to notify the client when copy is complete.

Modification:
Update OperationCOPY to (a) use async copy if requests or switch to
async mode if copyFileRange syscall takes too long.

Result:
async behavior for server-side-copy

Acked-by: Paul Millar
Target: master
parent 20968d28
Pipeline #19298 passed with stage
in 1 minute and 47 seconds
......@@ -265,7 +265,7 @@ public class ClientCB {
}
}
public void cbOffload(nfs_fh4 fh, stateid4 stateid, write_response4 response) throws OncRpcException, IOException {
public void cbOffload(nfs_fh4 fh, stateid4 stateid, write_response4 response, int status) throws OncRpcException, IOException {
CB_OFFLOAD4args copyOffload = new CB_OFFLOAD4args();
......@@ -273,7 +273,7 @@ public class ClientCB {
copyOffload.coa_stateid = stateid;
copyOffload.coa_offload_info = new offload_info4();
copyOffload.coa_offload_info.coa_resok4 = response;
copyOffload.coa_offload_info.coa_status = nfsstat.NFS_OK;
copyOffload.coa_offload_info.coa_status = status;
nfs_cb_argop4 opArgs = new nfs_cb_argop4();
opArgs.argop = nfs_cb_opnum4.OP_CB_OFFLOAD;
......
/*
* Copyright (c) 2021 Deutsches Elektronen-Synchroton,
* Copyright (c) 2021 - 2022 Deutsches Elektronen-Synchroton,
* Member of the Helmholtz Association, (DESY), HAMBURG, GERMANY
*
* This library is free software; you can redistribute it and/or modify
......@@ -20,11 +20,13 @@
package org.dcache.nfs.v4;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import com.google.common.annotations.Beta;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.common.base.Throwables;
import org.dcache.nfs.ChimeraNFSException;
import org.dcache.nfs.nfsstat;
import org.dcache.nfs.status.NotSuppException;
......@@ -35,10 +37,12 @@ import org.dcache.nfs.v4.xdr.copy_requirements4;
import org.dcache.nfs.v4.xdr.length4;
import org.dcache.nfs.v4.xdr.nfs4_prot;
import org.dcache.nfs.v4.xdr.nfs_argop4;
import org.dcache.nfs.v4.xdr.nfs_fh4;
import org.dcache.nfs.v4.xdr.nfs_opnum4;
import org.dcache.nfs.v4.xdr.nfs_resop4;
import org.dcache.nfs.v4.xdr.stable_how4;
import org.dcache.nfs.v4.xdr.stateid4;
import org.dcache.nfs.v4.xdr.verifier4;
import org.dcache.nfs.v4.xdr.write_response4;
import org.dcache.nfs.vfs.Inode;
import org.slf4j.Logger;
......@@ -66,9 +70,13 @@ public class OperationCOPY extends AbstractNFSv4Operation {
throw new NotSuppException("Inter-server copy is not supported");
}
// only synchronous mode is supported (for now)
if (!_args.opcopy.ca_consecutive || !_args.opcopy.ca_synchronous) {
throw new NotSuppException();
// only consecutive copy is supported
if (!_args.opcopy.ca_consecutive) {
res.cr_requirements = new copy_requirements4();
res.cr_requirements.cr_consecutive = true;
res.cr_requirements.cr_synchronous = _args.opcopy.ca_synchronous;
res.cr_status = nfsstat.NFS4ERR_OFFLOAD_NO_REQS;
return;
}
Inode srcInode = context.savedInode();
......@@ -91,31 +99,73 @@ public class OperationCOPY extends AbstractNFSv4Operation {
throw new OpenModeException("Invalid destination inode open mode (required write)");
}
Future<Long> copyFuture = context.getFs().copyFileRange(srcInode, srcPos, dstInode, dstPos, len);
try {
long n = Uninterruptibles.getUninterruptibly(copyFuture);
res.cr_resok4 = new COPY4resok();
res.cr_resok4.cr_response = new write_response4();
res.cr_resok4.cr_response.wr_callback_id = new stateid4[0];
res.cr_resok4.cr_response.wr_committed = stable_how4.FILE_SYNC4;
res.cr_resok4.cr_response.wr_count = new length4(n);
res.cr_resok4.cr_response.wr_writeverf = context.getRebootVerifier();
res.cr_resok4.cr_requirements = new copy_requirements4();
res.cr_resok4.cr_requirements.cr_consecutive = true;
res.cr_resok4.cr_requirements.cr_synchronous = true;
res.cr_status = nfsstat.NFS_OK;
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause instanceof ChimeraNFSException) {
res.cr_status = ((ChimeraNFSException) cause).getStatus();
} else {
LOGGER.warn("Unexpected error during copyFileRange", e);
res.cr_resok4 = new COPY4resok();
res.cr_resok4.cr_response = new write_response4();
res.cr_resok4.cr_response.wr_writeverf = context.getRebootVerifier();
res.cr_resok4.cr_response.wr_callback_id = new stateid4[]{};
res.cr_resok4.cr_response.wr_committed = stable_how4.FILE_SYNC4;
res.cr_resok4.cr_response.wr_count = new length4(0);
res.cr_resok4.cr_requirements = new copy_requirements4();
res.cr_resok4.cr_requirements.cr_consecutive = true;
res.cr_status = nfsstat.NFS_OK;
CompletableFuture<Long> copyFuture = context.getFs().copyFileRange(srcInode, srcPos, dstInode, dstPos, len);
boolean isSync = _args.opcopy.ca_synchronous;
if (isSync) {
try {
// try sync copy and fall-back to async
long n = copyFuture.get(1, TimeUnit.SECONDS);
res.cr_resok4.cr_response.wr_count = new length4(n);
} catch (InterruptedException | ExecutionException e) {
LOGGER.error("Copy-offload interrupted: ", Throwables.getRootCause(e));
res.cr_status = nfsstat.NFSERR_IO;
} catch (TimeoutException e) {
// continue as async copy
isSync = false;
}
}
if (!isSync) {
// copy asynchronously
var copyState = notifyWhenComplete(client, dstInode, context.getRebootVerifier(), copyFuture);
res.cr_resok4.cr_response.wr_callback_id = new stateid4[]{copyState};
}
res.cr_resok4.cr_requirements.cr_synchronous = isSync;
}
private stateid4 notifyWhenComplete(NFS4Client client, Inode dstInode, verifier4 verifier, CompletableFuture<Long> copyFuture) throws ChimeraNFSException {
var openState = client.state(_args.opcopy.ca_src_stateid);
var copyState = client.createState(openState.getStateOwner(), openState).stateid();
copyFuture.handle((n, t) -> {
var cr_response = new write_response4();
cr_response.wr_callback_id = new stateid4[]{};
cr_response.wr_committed = stable_how4.FILE_SYNC4;
cr_response.wr_count = new length4(n);
cr_response.wr_writeverf = verifier;
try {
client.getCB().cbOffload(new nfs_fh4(dstInode.toNfsHandle()), copyState, cr_response, toNfsState(t));
} catch (IOException ex) {
LOGGER.warn("Failed to notify client about copy-offload completion: {}", ex.getMessage());
}
return null;
});
return copyState;
}
private int toNfsState(Throwable t) {
if (t == null) {
return nfsstat.NFS_OK;
}
// FIXME: we need some mapping between 'well known' exceptions and nfs error states.
LOGGER.warn("Copy-offload failed with exception: {}", Throwables.getRootCause(t).toString());
return nfsstat.NFSERR_IO;
}
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment