Commit b69f8225 authored by Tigran Mkrtchyan's avatar Tigran Mkrtchyan
Browse files

driver: introduce PendingRequest class to trace CTA scheduling time

parent 1167e2a0
Pipeline #13647 passed with stages
in 3 minutes and 21 seconds
......@@ -15,6 +15,7 @@ import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.UnknownHostException;
import java.time.Instant;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
......@@ -28,6 +29,7 @@ import org.dcache.cta.rpc.CtaRpcGrpc;
import org.dcache.cta.rpc.CtaRpcGrpc.CtaRpcStub;
import org.dcache.cta.rpc.RetrieveResponse;
import org.dcache.nearline.cta.xrootd.DataMover;
import org.dcache.nearline.cta.xrootd.PendingRequest;
import org.dcache.pool.nearline.spi.FlushRequest;
import org.dcache.pool.nearline.spi.NearlineRequest;
import org.dcache.pool.nearline.spi.NearlineStorage;
......@@ -101,7 +103,7 @@ public class CtaNearlineStorage implements NearlineStorage {
/**
* Requests submitted to CTA.
*/
private final ConcurrentMap<String, NearlineRequest> pendingRequests = new ConcurrentHashMap<>();
private final ConcurrentMap<String, PendingRequest> pendingRequests = new ConcurrentHashMap<>();
public CtaNearlineStorage(String type, String name) {
......@@ -179,7 +181,7 @@ public class CtaNearlineStorage implements NearlineStorage {
@Override
public void onCompleted() {
pendingRequests.put(id, r);
pendingRequests.put(id, new PendingRequest(Instant.now(), r));
}
});
}
......@@ -252,7 +254,7 @@ public class CtaNearlineStorage implements NearlineStorage {
@Override
public void onCompleted() {
pendingRequests.put(id, r);
pendingRequests.put(id, new PendingRequest(Instant.now(), r));
}
});
}
......@@ -408,7 +410,7 @@ public class CtaNearlineStorage implements NearlineStorage {
}
@VisibleForTesting
NearlineRequest getRequest(String id) {
PendingRequest getRequest(String id) {
return pendingRequests.get(id);
}
}
......@@ -48,7 +48,7 @@ public class DataMover extends AbstractIdleService implements CtaTransportProvid
private final EventLoopGroup bossGroup;
private final EventLoopGroup workerGroup;
private final ConcurrentMap<String, ? extends NearlineRequest> pendingRequests;
private final ConcurrentMap<String, PendingRequest> pendingRequests;
/**
* Driver configured hsm name.
......@@ -63,7 +63,7 @@ public class DataMover extends AbstractIdleService implements CtaTransportProvid
private volatile String url;
public DataMover(String type, String name, InetSocketAddress sa,
ConcurrentMap<String, ? extends NearlineRequest> pendingRequests) {
ConcurrentMap<String, PendingRequest> pendingRequests) {
hsmType = type;
hsmName = name;
......
......@@ -133,7 +133,7 @@ public class DataServerHandler extends XrootdRequestHandler {
*/
private final ReadWriteLock openFileLock = new ReentrantReadWriteLock();
private final ConcurrentMap<String, ? extends NearlineRequest> pendingRequests;
private final ConcurrentMap<String, PendingRequest> pendingRequests;
/**
* Driver configured hsm name.
......@@ -146,7 +146,7 @@ public class DataServerHandler extends XrootdRequestHandler {
private final String hsmType;
public DataServerHandler(String type, String name,
ConcurrentMap<String, ? extends NearlineRequest> pendingRequests) {
ConcurrentMap<String, PendingRequest> pendingRequests) {
hsmType = type;
hsmName = name;
......@@ -205,9 +205,12 @@ public class DataServerHandler extends XrootdRequestHandler {
OpenRequest msg)
throws XrootdException {
try {
NearlineRequest r = getIORequest(msg.getPath());
var pr = getIORequest(msg.getPath());
var r = pr.getRequest();
var file = getFile(r);
LOGGER.info("Request {} scheduling time: {}", file, TimeUtils.describe(Duration.between(Instant.now(), pr.getSubmitionTime()).abs()).orElse("-"));
RandomAccessFile raf;
if (msg.isReadWrite() || msg.isNew() || msg.isDelete()) {
if (!(r instanceof StageRequest)) {
......@@ -374,10 +377,11 @@ public class DataServerHandler extends XrootdRequestHandler {
var uriQuery = url.getQuery();
var requestId = new File(url.getPath()).getName();
var r = pendingRequests.remove(requestId);
if (r == null) {
var pr = pendingRequests.remove(requestId);
if (pr == null) {
throw new XrootdException(kXR_ArgInvalid, "Invalid request id");
}
var r = pr.getRequest();
if (query.startsWith("/error/")) {
if (!uriQuery.startsWith(errorPrefix)) {
......@@ -462,7 +466,7 @@ public class DataServerHandler extends XrootdRequestHandler {
throw new XrootdException(kXR_FileNotOpen, "Invalid file descriptor");
}
private NearlineRequest getIORequest(String path)
private PendingRequest getIORequest(String path)
throws XrootdException {
var r = pendingRequests.get(path);
......@@ -526,7 +530,7 @@ public class DataServerHandler extends XrootdRequestHandler {
}
private FileStatus statusByPath(String path) throws XrootdException {
File file = getFile(getIORequest(path));
File file = getFile(getIORequest(path).getRequest());
return statusByFile(file);
}
......
package org.dcache.nearline.cta.xrootd;
import java.time.Instant;
import org.dcache.pool.nearline.spi.NearlineRequest;
/**
* Represents Nearline request in pending queue.
*/
public class PendingRequest {
/**
* Point on the time-line when request was submitted into pending queue.
*/
private final Instant submitionTime;
/**
* The nearline request.
*/
private final NearlineRequest request;
public PendingRequest(Instant submitionTime, NearlineRequest request) {
this.submitionTime = submitionTime;
this.request = request;
}
public Instant getSubmitionTime() {
return submitionTime;
}
public NearlineRequest getRequest() {
return request;
}
}
......@@ -322,7 +322,7 @@ public class CtaNearlineStorageTest {
driver.stage(Set.of(request));
cta.waitToReply();
driver.getRequest("0000C9B4E3768770452E8B1B8E0232584872").completed(Set.of());
driver.getRequest("0000C9B4E3768770452E8B1B8E0232584872").getRequest().completed(Set.of());
assertEquals("pending request count not zero", 0, driver.getPendingRequestsCount());
}
......@@ -337,7 +337,7 @@ public class CtaNearlineStorageTest {
driver.flush(Set.of(request));
cta.waitToReply();
driver.getRequest("0000C9B4E3768770452E8B1B8E0232584872").completed(Set.of());
driver.getRequest("0000C9B4E3768770452E8B1B8E0232584872").getRequest().completed(Set.of());
assertEquals("pending request count not zero", 0, driver.getPendingRequestsCount());
}
......@@ -352,7 +352,7 @@ public class CtaNearlineStorageTest {
driver.stage(Set.of(request));
cta.waitToReply();
driver.getRequest("0000C9B4E3768770452E8B1B8E0232584872").failed(new Exception());
driver.getRequest("0000C9B4E3768770452E8B1B8E0232584872").getRequest().failed(new Exception());
assertEquals("pending request count not zero", 0, driver.getPendingRequestsCount());
}
......@@ -367,7 +367,7 @@ public class CtaNearlineStorageTest {
driver.stage(Set.of(request));
cta.waitToReply();
driver.getRequest("0000C9B4E3768770452E8B1B8E0232584872").failed(1, "foo");
driver.getRequest("0000C9B4E3768770452E8B1B8E0232584872").getRequest().failed(1, "foo");
assertEquals("pending request count not zero", 0, driver.getPendingRequestsCount());
}
......@@ -397,7 +397,7 @@ public class CtaNearlineStorageTest {
driver.flush(Set.of(request));
cta.waitToReply();
driver.getRequest("0000C9B4E3768770452E8B1B8E0232584872").failed(new Exception());
driver.getRequest("0000C9B4E3768770452E8B1B8E0232584872").getRequest().failed(new Exception());
assertEquals("pending request count not zero", 0, driver.getPendingRequestsCount());
}
......@@ -412,7 +412,7 @@ public class CtaNearlineStorageTest {
driver.flush(Set.of(request));
cta.waitToReply();
driver.getRequest("0000C9B4E3768770452E8B1B8E0232584872").failed(1, "foo");
driver.getRequest("0000C9B4E3768770452E8B1B8E0232584872").getRequest().failed(1, "foo");
assertEquals("pending request count not zero", 0, driver.getPendingRequestsCount());
}
......
......@@ -18,7 +18,7 @@ import org.junit.Test;
public class DataMoverTest {
private DataMover dataMover;
private ConcurrentMap<String, NearlineRequest> requests;
private ConcurrentMap<String, PendingRequest> requests;
@Before
public void setUp() throws UnknownHostException {
......
......@@ -18,6 +18,7 @@ import io.netty.channel.ChannelHandlerContext;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.time.Instant;
import java.util.Base64;
import java.util.Set;
import java.util.UUID;
......@@ -44,7 +45,7 @@ public class DataServerHandlerTest {
private DataServerHandler handler;
private ConcurrentMap<String, NearlineRequest> requests;
private ConcurrentMap<String, PendingRequest> requests;
private ChannelHandlerContext ctx;
private CompletableFuture<Void> waitForComplete;
......@@ -482,7 +483,7 @@ public class DataServerHandlerTest {
when(request.getReplicaUri()).thenReturn(f.toURI());
requests.put("0000C9B4E3768770452E8B1B8E0232584872", request);
requests.put("0000C9B4E3768770452E8B1B8E0232584872", new PendingRequest(Instant.now(), request));
waitForComplete = new CompletableFuture<>();
......@@ -524,7 +525,7 @@ public class DataServerHandlerTest {
when(request.getReplicaUri()).thenReturn(f.toURI());
requests.put("0000C9B4E3768770452E8B1B8E0232584872", request);
requests.put("0000C9B4E3768770452E8B1B8E0232584872", new PendingRequest(Instant.now(), request));
return request;
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment