Commit dc319385 authored by Tigran Mkrtchyan's avatar Tigran Mkrtchyan
Browse files

driver: implement cancelation of state and flush requests

the flush request implemented on CTA side as delete. Thus we send
a delete requests that included scheduler job id. On stage, cancelRetrieves
 gRPC is made.

In both cases the cancelation is bound to PendingRequest::cancel method.
parent 635b207e
......@@ -103,6 +103,33 @@ public class CtaNearlineStorage implements NearlineStorage {
*/
private final ConcurrentMap<String, PendingRequest> pendingRequests = new ConcurrentHashMap<>();
/**
* {@link StreamObserver} that the given runnable when complete.
*/
private static class OnSuccessStreamObserver implements StreamObserver<Empty> {
private final Runnable r;
public OnSuccessStreamObserver(Runnable r) {
this.r = r;
}
@Override
public void onNext(Empty o) {
}
@Override
public void onError(Throwable t) {
LOGGER.error("Failed to submit request {}", t.getMessage());
}
@Override
public void onCompleted() {
r.run();
}
}
public CtaNearlineStorage(String type, String name) {
Objects.requireNonNull(type, "HSM type is not provided");
......@@ -167,7 +194,17 @@ public class CtaNearlineStorage implements NearlineStorage {
response.getFid(),
response.getReqId()
);
pendingRequests.put(id, new PendingRequest(Instant.now(), r));
var cancelRequest = ctaRequestFactory.cancelValueOf(ar, response);
pendingRequests.put(id, new PendingRequest(Instant.now(), r) {
@Override
public void cancel() {
// on cancel send the request to CTA; on success cancel the requests
Runnable r = super::cancel;
cta.delete(cancelRequest, new OnSuccessStreamObserver(r));
}
}
);
}
@Override
......@@ -240,7 +277,16 @@ public class CtaNearlineStorage implements NearlineStorage {
r.getFileAttributes().getPnfsId(),
response.getReqId()
);
pendingRequests.put(id, new PendingRequest(Instant.now(), r));
var cancelRequest = ctaRequestFactory.cancelValueOf(rr, response);
pendingRequests.put(id, new PendingRequest(Instant.now(), r) {
@Override
public void cancel() {
// on cancel send the request to CTA; on success cancel the requests
Runnable r = super::cancel;
cta.cancelRetrieve(cancelRequest, new OnSuccessStreamObserver(r));
}
}
);
}
@Override
......
......@@ -53,7 +53,6 @@ public class PendingRequest {
}
public void cancel() {
// FIXME: we need to cancel the requests in CTA.
request.failed(new CancellationException("Canceled by dCache"));
}
}
......@@ -7,9 +7,12 @@ import cta.eos.CtaEos;
import cta.eos.CtaEos.Transport;
import java.io.File;
import java.util.Objects;
import org.dcache.cta.rpc.ArchiveResponse;
import org.dcache.cta.rpc.CancelRetrieveRequest;
import org.dcache.cta.rpc.DeleteRequest;
import org.dcache.cta.rpc.FileInfo;
import org.dcache.cta.rpc.RetrieveRequest;
import org.dcache.cta.rpc.RetrieveResponse;
import org.dcache.namespace.FileAttribute;
import org.dcache.pool.nearline.spi.FlushRequest;
import org.dcache.cta.rpc.ArchiveRequest;
......@@ -159,4 +162,25 @@ public class RequestsFactory {
.build();
}
public CancelRetrieveRequest cancelValueOf(RetrieveRequest request, RetrieveResponse response) {
return CancelRetrieveRequest.newBuilder()
.setInstance(instance)
.setCli(client)
.setFid(request.getArchiveId())
.setReqId(response.getReqId())
.build();
}
public DeleteRequest cancelValueOf(ArchiveRequest request, ArchiveResponse response) {
return DeleteRequest.newBuilder()
.setInstance(instance)
.setCli(client)
.setFile(request.getFile())
.setArchiveId(response.getFid())
.setReqId(response.getReqId())
.build();
}
}
......@@ -11,6 +11,7 @@ import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
......@@ -22,6 +23,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
......@@ -425,10 +427,10 @@ public class CtaNearlineStorageTest {
driver.start();
driver.stage(Set.of(request));
cta.waitToReply();
driver.cancel(request.getId());
cta.waitToReply();
assertEquals("unexpected pending request queue size", 0, driver.getPendingRequestsCount());
}
......@@ -450,6 +452,81 @@ public class CtaNearlineStorageTest {
}
@Test
public void testCancelOfPendingStageRequest() {
var request = mockedStageRequest();
driver = new CtaNearlineStorage("foo", "bar");
driver.configure(drvConfig);
driver.start();
driver.stage(Set.of(request));
cta.waitToReply();
driver.cancel(request.getId());
cta.waitToReply();
verify(request).failed(any(CancellationException.class));
}
@Test
public void testCancelOfPendingStageRequestOnError() {
var request = mockedStageRequest();
driver = new CtaNearlineStorage("foo", "bar");
driver.configure(drvConfig);
driver.start();
driver.stage(Set.of(request));
cta.waitToReply();
cta.fail();
driver.cancel(request.getId());
verify(request, times(0)).failed(any(CancellationException.class));
verify(request, times(0)).failed(anyInt(), any());
verify(request, times(0)).completed(any());
}
@Test
public void testCancelOfPendingFlushRequest() {
var request = mockedFlushRequest();
driver = new CtaNearlineStorage("foo", "bar");
driver.configure(drvConfig);
driver.start();
driver.flush(Set.of(request));
cta.waitToReply();
driver.cancel(request.getId());
cta.waitToReply();
verify(request).failed(any(CancellationException.class));
}
@Test
public void testCancelOfPendingFlushRequestOnError() {
var request = mockedFlushRequest();
driver = new CtaNearlineStorage("foo", "bar");
driver.configure(drvConfig);
driver.start();
driver.flush(Set.of(request));
cta.waitToReply();
cta.fail();
driver.cancel(request.getId());
verify(request, times(0)).failed(any(CancellationException.class));
verify(request, times(0)).failed(anyInt(), any());
verify(request, times(0)).completed(any());
}
void waitToComplete() {
try {
waitForComplete.get(1, TimeUnit.SECONDS);
......
......@@ -12,6 +12,7 @@ import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import org.dcache.cta.rpc.ArchiveRequest;
import org.dcache.cta.rpc.ArchiveResponse;
import org.dcache.cta.rpc.CancelRetrieveRequest;
import org.dcache.cta.rpc.CtaRpcGrpc;
import org.dcache.cta.rpc.DeleteRequest;
import org.dcache.cta.rpc.RetrieveRequest;
......@@ -94,6 +95,19 @@ public class DummyCta {
}
}
@Override
public void cancelRetrieve(CancelRetrieveRequest request,
StreamObserver<Empty> responseObserver) {
if (!fail) {
var response = Empty.newBuilder()
.build();
responseObserver.onNext(response);
responseObserver.onCompleted();
} else {
responseObserver.onError(new StatusException(Status.INTERNAL));
}
}
@Override
public void delete(DeleteRequest request, StreamObserver<Empty> responseObserver) {
if (!fail) {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment