Commit 84193728 authored by Eric Cano's avatar Eric Cano
Browse files

Fixed OStoreDB::ArchiveMount::getNextJobBatch() not fetching job contents.

Adapted unit tests to validate this fetching.
parent c6bd96e1
......@@ -399,8 +399,10 @@ void ArchiveRequest::setJobOwner(
ArchiveRequest::AsyncJobOwnerUpdater* ArchiveRequest::asyncUpdateJobOwner(uint16_t copyNumber,
const std::string& owner, const std::string& previousOwner) {
std::unique_ptr<AsyncJobOwnerUpdater> ret(new AsyncJobOwnerUpdater);
// Passing a reference to the unique pointer led to strange behaviors.
auto & retRef = *ret;
ret->m_updaterCallback=
[this, copyNumber, owner, previousOwner](const std::string &in)->std::string {
[this, copyNumber, owner, previousOwner, &retRef](const std::string &in)->std::string {
// We have a locked and fetched object, so we just need to work on its representation.
serializers::ObjectHeader oh;
oh.ParseFromString(in);
......@@ -419,6 +421,25 @@ ArchiveRequest::AsyncJobOwnerUpdater* ArchiveRequest::asyncUpdateJobOwner(uint16
throw WrongPreviousOwner("In ArchiveRequest::asyncUpdateJobOwner()::lambda(): Job not owned.");
}
j->set_owner(owner);
// We also need to gather all the job content for the user to get in-memory
// representation.
// TODO this is an unfortunate duplication of the getXXX() members of ArchiveRequest.
// We could try and refactor this.
retRef.m_archiveFile.archiveFileID = payload.archivefileid();
retRef.m_archiveFile.checksumType = payload.checksumtype();
retRef.m_archiveFile.checksumValue = payload.checksumvalue();
retRef.m_archiveFile.creationTime = payload.creationtime();
retRef.m_archiveFile.diskFileId = payload.diskfileid();
retRef.m_archiveFile.diskFileInfo.group = payload.diskfileinfo().group();
retRef.m_archiveFile.diskFileInfo.owner = payload.diskfileinfo().owner();
retRef.m_archiveFile.diskFileInfo.path = payload.diskfileinfo().path();
retRef.m_archiveFile.diskFileInfo.recoveryBlob = payload.diskfileinfo().recoveryblob();
retRef.m_archiveFile.diskInstance = payload.diskinstance();
retRef.m_archiveFile.fileSize = payload.filesize();
retRef.m_archiveFile.reconciliationTime = payload.reconcilationtime();
retRef.m_archiveFile.storageClass = payload.storageclass();
retRef.m_archiveReportURL = payload.archivereporturl();
retRef.m_srcURL = payload.srcurl();
oh.set_payload(payload.SerializePartialAsString());
return oh.SerializeAsString();
}
......@@ -434,6 +455,17 @@ void ArchiveRequest::AsyncJobOwnerUpdater::wait() {
m_backendUpdater->wait();
}
const common::dataStructures::ArchiveFile& ArchiveRequest::AsyncJobOwnerUpdater::getArchiveFile() {
return m_archiveFile;
}
const std::string& ArchiveRequest::AsyncJobOwnerUpdater::getArchiveReportURL() {
return m_archiveReportURL;
}
const std::string& ArchiveRequest::AsyncJobOwnerUpdater::getSrcURL() {
return m_srcURL;
}
std::string ArchiveRequest::getJobOwner(uint16_t copyNumber) {
checkPayloadReadable();
......
......@@ -67,9 +67,15 @@ public:
friend class ArchiveRequest;
public:
void wait();
const common::dataStructures::ArchiveFile & getArchiveFile();
const std::string & getSrcURL();
const std::string & getArchiveReportURL();
private:
std::function<std::string(const std::string &)> m_updaterCallback;
std::unique_ptr<Backend::AsyncUpdater> m_backendUpdater;
common::dataStructures::ArchiveFile m_archiveFile;
std::string m_srcURL;
std::string m_archiveReportURL;
};
// An job owner updater factory. The owner MUST be previousOwner for the update to be executed.
CTA_GENERATE_EXCEPTION_CLASS(WrongPreviousOwner);
......
......@@ -37,6 +37,7 @@
#ifdef LOW_LEVEL_TRACING
#include <iostream>
#endif
#include <valgrind/helgrind.h>
namespace cta { namespace objectstore {
......@@ -312,14 +313,17 @@ BackendVFS::AsyncUpdater::AsyncUpdater(BackendVFS & be, const std::string& name,
try { // locking already throws proper exceptions for no such file.
sl.reset(m_backend.lockExclusive(m_name));
} catch (Backend::NoSuchObject &) {
ANNOTATE_HAPPENS_BEFORE(&m_job);
throw;
} catch (cta::exception::Exception & ex) {
ANNOTATE_HAPPENS_BEFORE(&m_job);
throw Backend::CouldNotLock(ex.getMessageValue());
}
std::string preUpdateData;
try {
preUpdateData=m_backend.read(m_name);
} catch (cta::exception::Exception & ex) {
ANNOTATE_HAPPENS_BEFORE(&m_job);
throw Backend::CouldNotFetch(ex.getMessageValue());
}
// Let user's exceptions go through.
......@@ -327,13 +331,16 @@ BackendVFS::AsyncUpdater::AsyncUpdater(BackendVFS & be, const std::string& name,
try {
m_backend.atomicOverwrite(m_name, postUpdateData);
} catch (cta::exception::Exception & ex) {
ANNOTATE_HAPPENS_BEFORE(&m_job);
throw Backend::CouldNotCommit(ex.getMessageValue());
}
try {
sl->release();
} catch (cta::exception::Exception & ex) {
ANNOTATE_HAPPENS_BEFORE(&m_job);
throw Backend::CouldNotUnlock(ex.getMessageValue());
}
ANNOTATE_HAPPENS_BEFORE(&m_job);
}))
{}
......@@ -344,6 +351,8 @@ Backend::AsyncUpdater* BackendVFS::asyncUpdate(const std::string & name, std::fu
void BackendVFS::AsyncUpdater::wait() {
m_job.get();
ANNOTATE_HAPPENS_AFTER(&m_job);
ANNOTATE_HAPPENS_BEFORE_FORGET_ALL(&m_job);
}
......
......@@ -1705,7 +1705,8 @@ auto OStoreDB::ArchiveMount::getNextJob(log::LogContext &logContext) -> std::uni
//------------------------------------------------------------------------------
// OStoreDB::ArchiveMount::getNextJobBatch()
//------------------------------------------------------------------------------
std::list<std::unique_ptr<SchedulerDatabase::ArchiveJob> > OStoreDB::ArchiveMount::getNextJobBatch(uint64_t filesRequested, uint64_t bytesRequested, log::LogContext& logContext) {
std::list<std::unique_ptr<SchedulerDatabase::ArchiveJob> > OStoreDB::ArchiveMount::getNextJobBatch(uint64_t filesRequested,
uint64_t bytesRequested, log::LogContext& logContext) {
// Find the next files to archive
// Get the archive queue
objectstore::RootEntry re(m_objectStore);
......@@ -1776,7 +1777,7 @@ std::list<std::unique_ptr<SchedulerDatabase::ArchiveJob> > OStoreDB::ArchiveMoun
candidateJobs.emplace_back(new OStoreDB::ArchiveJob(job.address, m_objectStore, m_agentReference, *this));
candidateJobs.back()->tapeFile.copyNb = job.copyNb;
}
// We now have a patch of jobs to try and dequeue. Should not be empty.
// We now have a batch of jobs to try and dequeue. Should not be empty.
// First add the jobs to the owned list of the agent.
std::list<std::string> addedJobs;
for (const auto &j: candidateJobs) addedJobs.emplace_back(j->m_archiveRequest.getAddressIfSet());
......@@ -1797,11 +1798,24 @@ std::list<std::unique_ptr<SchedulerDatabase::ArchiveJob> > OStoreDB::ArchiveMoun
// Get the processing status of update
try {
(*ju)->wait();
// Getting here means the update went through... We can proceed with removing the
// job from the queue, and populating the job to report in memory.
jobsToDequeue.emplace_back((*j)->m_archiveRequest.getAddressIfSet());
(*j)->archiveFile = (*ju)->getArchiveFile();
(*j)->srcURL = (*ju)->getSrcURL();
(*j)->archiveReportURL = (*ju)->getArchiveReportURL();
(*j)->tapeFile.fSeq = ++nbFilesCurrentlyOnTape;
(*j)->tapeFile.vid = mountInfo.vid;
(*j)->tapeFile.blockId =
std::numeric_limits<decltype((*j)->tapeFile.blockId)>::max();
(*j)->m_jobOwned = true;
(*j)->m_mountId = mountInfo.mountId;
(*j)->m_tapePool = mountInfo.tapePool;
log::ScopedParamContainer params(logContext);
params.add("tapepool", mountInfo.tapePool)
.add("queueObject", aq.getAddressIfSet())
.add("archiveRequest", (*j)->m_archiveRequest.getAddressIfSet());
.add("archiveRequest", (*j)->m_archiveRequest.getAddressIfSet())
.add("fileId", (*j)->archiveFile.archiveFileID);
logContext.log(log::INFO, "In ArchiveMount::getNextJobBatch(): popped one job");
validatedJobs.emplace_back(std::move(*j));
jobsInThisRound++;
......@@ -1882,10 +1896,9 @@ std::list<std::unique_ptr<SchedulerDatabase::ArchiveJob> > OStoreDB::ArchiveMoun
}
}
// Log the outcome.
uint64_t nFiles=0;
uint64_t nFiles=privateRet.size();
uint64_t nBytes=0;
for (auto & j: privateRet) {
nFiles++;
nBytes+=j->archiveFile.fileSize;
}
{
......
......@@ -121,6 +121,7 @@ TEST_P(OStoreDBTest, getBatchArchiveJob) {
for (size_t i=0; i<10; i++) {
cta::common::dataStructures::ArchiveRequest ar;
cta::common::dataStructures::ArchiveFileQueueCriteria afqc;
ar.fileSize=123*(i+1);
afqc.copyToPoolMap[1] = "Tapepool1";
afqc.fileId = i;
afqc.mountPolicy.name = "policy";
......@@ -131,7 +132,8 @@ TEST_P(OStoreDBTest, getBatchArchiveJob) {
afqc.mountPolicy.maxDrivesAllowed = 1;
osdbi.queueArchive("testInstance", ar, afqc, lc);
}
// Delete the first job from the queue, change
// Delete the first job from the queue, change owner of second.
// They will be automatically skipped when getting jobs
std::string aqAddr;
{
// Get hold of the queue
......@@ -167,6 +169,11 @@ TEST_P(OStoreDBTest, getBatchArchiveJob) {
auto giveAll = std::numeric_limits<uint64_t>::max();
auto jobs = mount->getNextJobBatch(giveAll, giveAll, lc);
ASSERT_EQ(8, jobs.size());
// With the first 2 jobs removed from queue, we get the 3 and next. (i=2...)
size_t i=2;
for (auto & j:jobs) {
ASSERT_EQ(123*(i++ + 1), j->archiveFile.fileSize);
}
// Check the queue has been emptied, and hence removed.
ASSERT_EQ(false, osdbi.getBackend().exists(aqAddr));
}
......
......@@ -37,6 +37,7 @@
#include "common/exception/Exception.hpp"
#include "common/log/DummyLogger.hpp"
#include "common/log/StringLogger.hpp"
#include "common/log/StdoutLogger.hpp"
#include "common/make_unique.hpp"
#include "common/processCap/ProcessCapDummy.hpp"
#include "common/threading/Thread.hpp"
......@@ -1277,6 +1278,8 @@ TEST_P(DataTransferSessionTest, DataTransferSessionGooddayMigration) {
castorConf.nbBufs = 10;
castorConf.bulkRequestRecallMaxBytes = UINT64_C(100)*1000*1000*1000;
castorConf.bulkRequestRecallMaxFiles = 1000;
castorConf.bulkRequestMigrationMaxBytes = UINT64_C(100)*1000*1000*1000;
castorConf.bulkRequestMigrationMaxFiles = 1000;
castorConf.nbDiskThreads = 1;
cta::log::DummyLogger dummyLog("dummy");
cta::mediachanger::MediaChangerFacade mc(dummyLog);
......@@ -1417,6 +1420,8 @@ TEST_P(DataTransferSessionTest, DataTransferSessionMissingFilesMigration) {
castorConf.nbBufs = 10;
castorConf.bulkRequestRecallMaxBytes = UINT64_C(100)*1000*1000*1000;
castorConf.bulkRequestRecallMaxFiles = 1000;
castorConf.bulkRequestMigrationMaxBytes = UINT64_C(100)*1000*1000*1000;
castorConf.bulkRequestMigrationMaxFiles = 1000;
castorConf.nbDiskThreads = 1;
cta::log::DummyLogger dummyLog("dummy");
cta::mediachanger::MediaChangerFacade mc(dummyLog);
......@@ -1551,6 +1556,8 @@ TEST_P(DataTransferSessionTest, DataTransferSessionTapeFullMigration) {
castorConf.nbBufs = 10;
castorConf.bulkRequestRecallMaxBytes = UINT64_C(100)*1000*1000*1000;
castorConf.bulkRequestRecallMaxFiles = 1000;
castorConf.bulkRequestMigrationMaxBytes = UINT64_C(100)*1000*1000*1000;
castorConf.bulkRequestMigrationMaxFiles = 1000;
castorConf.nbDiskThreads = 1;
cta::log::DummyLogger dummyLog("dummy");
cta::mediachanger::MediaChangerFacade mc(dummyLog);
......@@ -1699,6 +1706,8 @@ TEST_P(DataTransferSessionTest, DataTransferSessionTapeFullOnFlushMigration) {
castorConf.nbBufs = 10;
castorConf.bulkRequestRecallMaxBytes = UINT64_C(100)*1000*1000*1000;
castorConf.bulkRequestRecallMaxFiles = 1000;
castorConf.bulkRequestMigrationMaxBytes = UINT64_C(100)*1000*1000*1000;
castorConf.bulkRequestMigrationMaxFiles = 1000;
castorConf.nbDiskThreads = 1;
cta::log::DummyLogger dummyLog("dummy");
cta::mediachanger::MediaChangerFacade mc(dummyLog);
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment