Commit fd278a18 authored by Tigran Mkrtchyan's avatar Tigran Mkrtchyan
Browse files

driver: remove requests from pending queue on success/failure

parent 84615ce9
Pipeline #12497 passed with stages
in 2 minutes and 32 seconds
......@@ -2,6 +2,7 @@ package org.dcache.nearline.cta;
import static com.google.common.base.Preconditions.checkArgument;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
import com.google.common.net.HostAndPort;
import com.google.protobuf.Empty;
......@@ -12,9 +13,11 @@ import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.UnknownHostException;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
......@@ -30,6 +33,7 @@ import org.dcache.pool.nearline.spi.NearlineRequest;
import org.dcache.pool.nearline.spi.NearlineStorage;
import org.dcache.pool.nearline.spi.RemoveRequest;
import org.dcache.pool.nearline.spi.StageRequest;
import org.dcache.util.Checksum;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -115,7 +119,34 @@ public class CtaNearlineStorage implements NearlineStorage {
*/
@Override
public void flush(Iterable<FlushRequest> requests) {
for (var r : requests) {
for (var fr : requests) {
var id = fr.getFileAttributes().getPnfsId().toString();
var r = new ForwardingFlushRequest() {
@Override
protected FlushRequest delegate() {
return fr;
}
@Override
public void failed(Exception e) {
pendingRequests.remove(id);
super.failed(e);
}
@Override
public void failed(int i, String s) {
pendingRequests.remove(id);
super.failed(i, s);
}
@Override
public void completed(Set<URI> uris) {
pendingRequests.remove(id);
super.completed(uris);
}
};
try {
r.activate().get();
......@@ -126,8 +157,6 @@ public class CtaNearlineStorage implements NearlineStorage {
}
var ar = ctaRequestFactory.valueOf(r);
var id = r.getFileAttributes().getPnfsId().toString();
cta.archive(ar, new StreamObserver<>() {
@Override
......@@ -163,7 +192,33 @@ public class CtaNearlineStorage implements NearlineStorage {
*/
@Override
public void stage(Iterable<StageRequest> requests) {
for (var r : requests) {
for (var sr : requests) {
var id = sr.getFileAttributes().getPnfsId().toString();
var r = new ForwardingStageRequest() {
@Override
protected StageRequest delegate() {
return sr;
}
@Override
public void failed(Exception e) {
pendingRequests.remove(id);
super.failed(e);
}
@Override
public void failed(int i, String s) {
pendingRequests.remove(id);
super.failed(i, s);
}
@Override
public void completed(Set<Checksum> checksums) {
pendingRequests.remove(id);
super.completed(checksums);
}
};
try {
r.activate().get();
......@@ -176,7 +231,6 @@ public class CtaNearlineStorage implements NearlineStorage {
}
var rr = ctaRequestFactory.valueOf(r);
var id = r.getFileAttributes().getPnfsId().toString();
cta.retrieve(rr, new StreamObserver<>() {
@Override
......@@ -309,23 +363,23 @@ public class CtaNearlineStorage implements NearlineStorage {
cta = CtaRpcGrpc.newStub(channel);
channel.notifyWhenStateChanged(ConnectivityState.CONNECTING, () ->
cta.version(Empty.newBuilder().build(), new StreamObserver<>() {
@Override
public void onNext(Version version) {
LOGGER.info("Connected to CTA version {} : {}", version.getCtaVersion(),
version.getXrootdSsiProtobufInterfaceVersion());
}
@Override
public void onError(Throwable t) {
LOGGER.error("Failed to get CTA version {}", t.getMessage());
}
@Override
public void onCompleted() {
}
})
cta.version(Empty.newBuilder().build(), new StreamObserver<>() {
@Override
public void onNext(Version version) {
LOGGER.info("Connected to CTA version {} : {}", version.getCtaVersion(),
version.getXrootdSsiProtobufInterfaceVersion());
}
@Override
public void onError(Throwable t) {
LOGGER.error("Failed to get CTA version {}", t.getMessage());
}
@Override
public void onCompleted() {
}
})
);
ctaRequestFactory = new RequestsFactory(instanceName, ctaUser, ctaGroup, dataMover);
......@@ -347,4 +401,14 @@ public class CtaNearlineStorage implements NearlineStorage {
channel.shutdown();
}
}
@VisibleForTesting
int getPendingRequestsCount() {
return pendingRequests.size();
}
@VisibleForTesting
NearlineRequest getRequest(String id) {
return pendingRequests.get(id);
}
}
package org.dcache.nearline.cta;
import static org.dcache.nearline.cta.CtaNearlineStorage.*;
import static org.dcache.nearline.cta.CtaNearlineStorage.CTA_ENDPOINT;
import static org.dcache.nearline.cta.CtaNearlineStorage.CTA_GROUP;
import static org.dcache.nearline.cta.CtaNearlineStorage.CTA_INSTANCE;
import static org.dcache.nearline.cta.CtaNearlineStorage.CTA_USER;
import static org.dcache.nearline.cta.CtaNearlineStorage.IO_PORT;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
......@@ -291,6 +296,111 @@ public class CtaNearlineStorageTest {
verify(request).failed(any());
}
@Test
public void testPendingRequestIncrementOnStageSubmit() {
var request = mockedStageRequest();
driver = new CtaNearlineStorage("foo", "bar");
driver.configure(drvConfig);
driver.start();
driver.stage(Set.of(request));
cta.waitToReply();
assertEquals("unexpected pending request queue size", 1,
driver.getPendingRequestsCount());
}
@Test
public void testPendingRequestDecOnStageComplete() {
var request = mockedStageRequest();
driver = new CtaNearlineStorage("foo", "bar");
driver.configure(drvConfig);
driver.start();
driver.stage(Set.of(request));
cta.waitToReply();
driver.getRequest("0000C9B4E3768770452E8B1B8E0232584872").completed(Set.of());
assertEquals("pending request count not zero", 0, driver.getPendingRequestsCount());
}
@Test
public void testPendingRequestDecOnStageFailedV1() {
var request = mockedStageRequest();
driver = new CtaNearlineStorage("foo", "bar");
driver.configure(drvConfig);
driver.start();
driver.stage(Set.of(request));
cta.waitToReply();
driver.getRequest("0000C9B4E3768770452E8B1B8E0232584872").failed(new Exception());
assertEquals("pending request count not zero", 0, driver.getPendingRequestsCount());
}
@Test
public void testPendingRequestDecOnStageFailedV2() {
var request = mockedStageRequest();
driver = new CtaNearlineStorage("foo", "bar");
driver.configure(drvConfig);
driver.start();
driver.stage(Set.of(request));
cta.waitToReply();
driver.getRequest("0000C9B4E3768770452E8B1B8E0232584872").failed(1, "foo");
assertEquals("pending request count not zero", 0, driver.getPendingRequestsCount());
}
@Test
public void testPendingRequestIncrementOnFlushSubmit() {
var request = mockedFlushRequest();
driver = new CtaNearlineStorage("foo", "bar");
driver.configure(drvConfig);
driver.start();
driver.flush(Set.of(request));
cta.waitToReply();
assertEquals("unexpected pending request queue size", 1,
driver.getPendingRequestsCount());
}
@Test
public void testPendingRequestDecOnFlushFailedV1() {
var request = mockedFlushRequest();
driver = new CtaNearlineStorage("foo", "bar");
driver.configure(drvConfig);
driver.start();
driver.flush(Set.of(request));
cta.waitToReply();
driver.getRequest("0000C9B4E3768770452E8B1B8E0232584872").failed(new Exception());
assertEquals("pending request count not zero", 0, driver.getPendingRequestsCount());
}
@Test
public void testPendingRequestDecOnFlushFailedV2() {
var request = mockedFlushRequest();
driver = new CtaNearlineStorage("foo", "bar");
driver.configure(drvConfig);
driver.start();
driver.flush(Set.of(request));
cta.waitToReply();
driver.getRequest("0000C9B4E3768770452E8B1B8E0232584872").failed(1, "foo");
assertEquals("pending request count not zero", 0, driver.getPendingRequestsCount());
}
void waitToComplete() {
try {
waitForComplete.get(1, TimeUnit.SECONDS);
......
......@@ -9,6 +9,7 @@ import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import org.dcache.cta.rpc.ArchiveRequest;
import org.dcache.cta.rpc.ArchiveResponse;
import org.dcache.cta.rpc.CtaRpcGrpc;
......@@ -105,4 +106,12 @@ public class DummyCta {
}
}
}
public void waitToReply() throws AssertionError {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new AssertionError("Should neve happen", e);
}
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment