Commit 75798f45 authored by Eric Cano's avatar Eric Cano
Browse files

Added logging for archive request queuing/poping and archive queue deletion.

parent 606b4ddf
......@@ -161,7 +161,7 @@ void RootEntry::removeArchiveQueueAndCommit(const std::string& tapePool) {
// Check the archive queue is empty
if (!aq.isEmpty()) {
throw ArchivelQueueNotEmpty ("In RootEntry::removeArchiveQueueAndCommit(): trying to "
"remove a non-empty tape pool");
"remove a non-empty archive queue");
}
// We can delete the queue
aq.remove();
......
......@@ -95,12 +95,12 @@ std::string cta::ArchiveMount::getMountTransactionId() const {
//------------------------------------------------------------------------------
// getNextJob
//------------------------------------------------------------------------------
std::unique_ptr<cta::ArchiveJob> cta::ArchiveMount::getNextJob() {
std::unique_ptr<cta::ArchiveJob> cta::ArchiveMount::getNextJob(log::LogContext &logContext) {
// Check we are still running the session
if (!m_sessionRunning)
throw SessionNotRunning("In ArchiveMount::getNextJob(): trying to get job from complete/not started session");
// try and get a new job from the DB side
std::unique_ptr<cta::SchedulerDatabase::ArchiveJob> dbJob(m_dbMount->getNextJob().release());
std::unique_ptr<cta::SchedulerDatabase::ArchiveJob> dbJob(m_dbMount->getNextJob(logContext).release());
if (!dbJob.get())
return std::unique_ptr<cta::ArchiveJob>();
// We have something to archive: prepare the response
......
......@@ -110,7 +110,7 @@ namespace cta {
* @return A unique_ptr to the next archive job or NULL if there are no more
* archive jobs left for this tape mount.
*/
virtual std::unique_ptr<ArchiveJob> getNextJob();
virtual std::unique_ptr<ArchiveJob> getNextJob(log::LogContext &logContext);
/**
* Returns the tape pool of the tape to be mounted.
......
......@@ -320,7 +320,7 @@ void OStoreDB::getLockedAndFetchedArchiveQueue(cta::objectstore::ArchiveQueue& a
// OStoreDB::queueArchive()
//------------------------------------------------------------------------------
void OStoreDB::queueArchive(const std::string &instanceName, const cta::common::dataStructures::ArchiveRequest &request,
const cta::common::dataStructures::ArchiveFileQueueCriteria &criteria) {
const cta::common::dataStructures::ArchiveFileQueueCriteria &criteria, log::LogContext &logContext) {
assertAgentAddressSet();
// Construct the return value immediately
cta::objectstore::ArchiveRequest aReq(m_agentReference->nextId("ArchiveRequest"), m_objectStore);
......@@ -363,11 +363,18 @@ void OStoreDB::queueArchive(const std::string &instanceName, const cta::common::
ScopedExclusiveLock arl(aReq);
// We can now enqueue the requests
std::list<std::string> linkedTapePools;
std::string currentTapepool;
try {
for (auto &j: aReq.dumpJobs()) {
currentTapepool = j.tapePool;
ostoredb::MemArchiveQueue::sharedAddToArchiveQueue(j, aReq, *this);
linkedTapePools.push_back(j.ArchiveQueueAddress);
aReq.setJobOwner(j.copyNb, j.ArchiveQueueAddress);
log::ScopedParamContainer params(logContext);
params.add("tapepool", j.tapePool)
.add("queueObject", j.ArchiveQueueAddress)
.add("jobObject", aReq.getAddressIfSet());
logContext.log(log::INFO, "In OStoreDB::queueArchive(): added job to queue");
}
} catch (NoSuchArchiveQueue &) {
// Unlink the request from already connected tape pools
......@@ -379,6 +386,10 @@ void OStoreDB::queueArchive(const std::string &instanceName, const cta::common::
aq.commit();
aReq.remove();
}
log::ScopedParamContainer params(logContext);
params.add("tapepool", currentTapepool)
.add("jobObject", aReq.getAddressIfSet());
logContext.log(log::INFO, "In OStoreDB::queueArchive(): failed to enqueue job");
throw;
}
// The request is now fully set. As it's multi-owned, we do not set the owner,
......@@ -1471,7 +1482,7 @@ const SchedulerDatabase::ArchiveMount::MountInfo& OStoreDB::ArchiveMount::getMou
//------------------------------------------------------------------------------
// OStoreDB::ArchiveMount::getNextJob()
//------------------------------------------------------------------------------
auto OStoreDB::ArchiveMount::getNextJob() -> std::unique_ptr<SchedulerDatabase::ArchiveJob> {
auto OStoreDB::ArchiveMount::getNextJob(log::LogContext &logContext) -> std::unique_ptr<SchedulerDatabase::ArchiveJob> {
// Find the next file to archive
// Get the archive queue
objectstore::RootEntry re(m_objectStore);
......@@ -1490,9 +1501,9 @@ auto OStoreDB::ArchiveMount::getNextJob() -> std::unique_ptr<SchedulerDatabase::
// Try and open the archive queue. It could be gone by now.
try {
objectstore::ArchiveQueue aq(aqAddress, m_objectStore);
objectstore::ScopedExclusiveLock aql;
objectstore::ScopedExclusiveLock aqlock;
try {
aql.lock(aq);
aqlock.lock(aq);
aq.fetch();
} catch (cta::exception::Exception & ex) {
// The queue is now absent. We can remove its reference in the root entry.
......@@ -1505,10 +1516,19 @@ auto OStoreDB::ArchiveMount::getNextJob() -> std::unique_ptr<SchedulerDatabase::
re.fetch();
try {
re.removeArchiveQueueAndCommit(mountInfo.tapePool);
log::ScopedParamContainer params(logContext);
params.add("tapepool", mountInfo.tapePool)
.add("queueObject", aq.getAddressIfSet());
logContext.log(log::INFO, "In ArchiveMount::getNextJob(): de-referenced missing queue from root entry");
} catch (RootEntry::ArchivelQueueNotEmpty & ex) {
// TODO: improve: if we fail here we could retry to fetch a job.
return nullptr;
log::ScopedParamContainer params(logContext);
params.add("tapepool", mountInfo.tapePool)
.add("queueObject", aq.getAddressIfSet())
.add("Message", ex.getMessageValue());
logContext.log(log::INFO, "In ArchiveMount::getNextJob(): could not de-referenced missing queue from root entry");
}
return nullptr;
}
// Pop jobs until we find one actually belonging to the queue.
// Any job not really belonging is an uncommitted pop, which we will
......@@ -1549,7 +1569,7 @@ auto OStoreDB::ArchiveMount::getNextJob() -> std::unique_ptr<SchedulerDatabase::
// We can commit and release the archive queue lock, we will only fill up
// memory structure from here on.
aq.commit();
aql.release();
aqlock.release();
privateRet->archiveFile = privateRet->m_archiveRequest.getArchiveFile();
privateRet->srcURL = privateRet->m_archiveRequest.getSrcURL();
privateRet->archiveReportURL = privateRet->m_archiveRequest.getArchiveReportURL();
......@@ -1561,16 +1581,34 @@ auto OStoreDB::ArchiveMount::getNextJob() -> std::unique_ptr<SchedulerDatabase::
privateRet->m_jobOwned = true;
privateRet->m_mountId = mountInfo.mountId;
privateRet->m_tapePool = mountInfo.tapePool;
log::ScopedParamContainer params(logContext);
params.add("tapepool", mountInfo.tapePool)
.add("queueObject", aq.getAddressIfSet())
.add("jobObject", privateRet->m_archiveRequest.getAddressIfSet());
logContext.log(log::INFO, "In ArchiveMount::getNextJob(): poped job from queue");
return std::unique_ptr<SchedulerDatabase::ArchiveJob> (privateRet.release());
}
// If we get here, we exhausted the queue. We can now remove it.
// removeArchiveQueueAndCommit is safe, as it checks whether the queue is empty
// before deleting it.
aq.remove();
objectstore::RootEntry re(m_objectStore);
objectstore::ScopedExclusiveLock rel (re);
re.fetch();
re.removeArchiveQueueAndCommit(mountInfo.tapePool);
// before deleting it. It will throw an exception in such a case (allowing us to
// log such instance.) We need to release the lock here
aqlock.release();
try {
objectstore::RootEntry re(m_objectStore);
objectstore::ScopedExclusiveLock rel (re);
re.fetch();
re.removeArchiveQueueAndCommit(mountInfo.tapePool);
log::ScopedParamContainer params(logContext);
params.add("tapepool", mountInfo.tapePool)
.add("queueObject", aq.getAddressIfSet());
logContext.log(log::INFO, "In ArchiveMount::getNextJob(): deleted empty queue");
} catch (cta::exception::Exception &ex) {
log::ScopedParamContainer params(logContext);
params.add("tapepool", mountInfo.tapePool)
.add("queueObject", aq.getAddressIfSet())
.add("Message", ex.getMessageValue());
logContext.log(log::INFO, "In ArchiveMount::getNextJob(): could not delete a presumably empty queue");
}
return nullptr;
} catch (cta::exception::Exception & ex){
return nullptr;
......
......@@ -94,7 +94,7 @@ public:
public:
CTA_GENERATE_EXCEPTION_CLASS(MaxFSeqNotGoingUp);
const MountInfo & getMountInfo() override;
std::unique_ptr<ArchiveJob> getNextJob() override;
std::unique_ptr<ArchiveJob> getNextJob(log::LogContext &logContext) override;
void complete(time_t completionTime) override;
void setDriveStatus(cta::common::dataStructures::DriveStatus status, time_t completionTime) override;
};
......@@ -161,7 +161,7 @@ public:
CTA_GENERATE_EXCEPTION_CLASS(NoSuchArchiveQueue);
void queueArchive(const std::string &instanceName, const cta::common::dataStructures::ArchiveRequest &request,
const cta::common::dataStructures::ArchiveFileQueueCriteria &criteria) override;
const cta::common::dataStructures::ArchiveFileQueueCriteria &criteria, log::LogContext &logContext) override;
private:
/**
......
......@@ -56,8 +56,8 @@ public:
m_OStoreDB.ping();
}
void queueArchive(const std::string &instanceName, const cta::common::dataStructures::ArchiveRequest& request, const cta::common::dataStructures::ArchiveFileQueueCriteria& criteria) override {
return m_OStoreDB.queueArchive(instanceName, request, criteria);
void queueArchive(const std::string &instanceName, const cta::common::dataStructures::ArchiveRequest& request, const cta::common::dataStructures::ArchiveFileQueueCriteria& criteria, log::LogContext &logContext) override {
return m_OStoreDB.queueArchive(instanceName, request, criteria, logContext);
}
......
......@@ -81,7 +81,7 @@ uint64_t Scheduler::queueArchive(const std::string &instanceName, const common::
using utils::midEllipsis;
auto catalogueInfo = m_catalogue.prepareForNewFile(instanceName, request.storageClass, request.requester);
auto catalogueTime = t.secs(cta::utils::Timer::resetCounter);
m_db.queueArchive(instanceName, request, catalogueInfo);
m_db.queueArchive(instanceName, request, catalogueInfo, lc);
auto schedulerDbTime = t.secs();
log::ScopedParamContainer spc(lc);
spc.add("instanceName", instanceName)
......
......@@ -35,6 +35,7 @@
#include "common/dataStructures/RetrieveRequest.hpp"
#include "common/dataStructures/SecurityIdentity.hpp"
#include "common/remoteFS/RemotePathAndStatus.hpp"
#include "common/log/LogContext.hpp"
#include "catalogue/TapeForWriting.hpp"
#include <list>
......@@ -99,9 +100,11 @@ public:
* @param rqst The request.
* @param criteria The criteria retrieved from the CTA catalogue to be used to
* decide how to queue the request.
* @param logContext context allowing logging db operation
*/
virtual void queueArchive(const std::string &instanceName, const cta::common::dataStructures::ArchiveRequest &request,
const cta::common::dataStructures::ArchiveFileQueueCriteria &criteria) = 0;
const cta::common::dataStructures::ArchiveFileQueueCriteria &criteria,
log::LogContext &logContext) = 0;
/**
* Returns all of the queued archive jobs. The returned jobs are
......@@ -173,7 +176,7 @@ public:
uint64_t mountId;
} mountInfo;
virtual const MountInfo & getMountInfo() = 0;
virtual std::unique_ptr<ArchiveJob> getNextJob() = 0;
virtual std::unique_ptr<ArchiveJob> getNextJob(log::LogContext &logContext) = 0;
virtual void complete(time_t completionTime) = 0;
virtual void setDriveStatus(common::dataStructures::DriveStatus status, time_t completionTime) = 0;
virtual ~ArchiveMount() {}
......
......@@ -25,6 +25,7 @@
#include "common/dataStructures/SecurityIdentity.hpp"
#include "OStoreDB/OStoreDBFactory.hpp"
#include "objectstore/BackendRados.hpp"
#include "common/log/DummyLogger.hpp"
#include <exception>
#include <gtest/gtest.h>
......@@ -128,6 +129,8 @@ const cta::common::dataStructures::SecurityIdentity SchedulerDatabaseTest::s_use
// unit test is disabled as it is pretty long to run.
TEST_P(SchedulerDatabaseTest, DISABLED_createManyArchiveJobs) {
using namespace cta;
log::DummyLogger dl("");
log::LogContext lc(dl);
SchedulerDatabase &db = getDb();
......@@ -165,7 +168,7 @@ TEST_P(SchedulerDatabaseTest, DISABLED_createManyArchiveJobs) {
ar.srcURL = std::string("root:/") + ar.diskFileInfo.path;
ar.storageClass = "storageClass";
afqc.fileId = i;
db.queueArchive("eosInstance", ar, afqc);
db.queueArchive("eosInstance", ar, afqc, lc);
}
// Then load all archive jobs into memory
......@@ -178,7 +181,7 @@ TEST_P(SchedulerDatabaseTest, DISABLED_createManyArchiveJobs) {
bool done = false;
size_t count = 0;
while (!done) {
auto aj = am->getNextJob();
auto aj = am->getNextJob(lc);
if (aj.get()) {
count++;
//std::cout << aj->archiveFile.diskFileInfo.path << std::endl;
......@@ -213,7 +216,7 @@ TEST_P(SchedulerDatabaseTest, DISABLED_createManyArchiveJobs) {
ar.srcURL = std::string("root:/") + ar.diskFileInfo.path;
ar.storageClass = "storageClass";
afqc.fileId = i;
db.queueArchive("eosInstance", ar, afqc);
db.queueArchive("eosInstance", ar, afqc, lc);
}
// Then load all archive jobs into memory (2nd pass)
......@@ -223,7 +226,7 @@ TEST_P(SchedulerDatabaseTest, DISABLED_createManyArchiveJobs) {
done = false;
count = 0;
while (!done) {
auto aj = am->getNextJob();
auto aj = am->getNextJob(lc);
if (aj.get()) {
count++;
//std::cout << aj->archiveFile.diskFileInfo.path << std::endl;
......
......@@ -415,14 +415,14 @@ TEST_P(SchedulerTest, archive_and_retrieve_new_file) {
archiveMount.reset(dynamic_cast<cta::ArchiveMount*>(mount.release()));
ASSERT_NE((cta::ArchiveMount*)NULL, archiveMount.get());
std::unique_ptr<cta::ArchiveJob> archiveJob;
archiveJob.reset(archiveMount->getNextJob().release());
archiveJob.reset(archiveMount->getNextJob(lc).release());
ASSERT_NE((cta::ArchiveJob*)NULL, archiveJob.get());
archiveJob->tapeFile.blockId = 1;
archiveJob->tapeFile.fSeq = 1;
archiveJob->tapeFile.checksumType = "ADLER32";
archiveJob->tapeFile.checksumValue = "1234abcd";
archiveJob->complete();
archiveJob.reset(archiveMount->getNextJob().release());
archiveJob.reset(archiveMount->getNextJob(lc).release());
ASSERT_EQ((cta::ArchiveJob*)NULL, archiveJob.get());
archiveMount->complete();
}
......@@ -554,7 +554,7 @@ TEST_P(SchedulerTest, retry_archive_until_max_reached) {
ASSERT_NE((cta::ArchiveMount*)NULL, archiveMount.get());
// The file should be retried 10 times
for (int i=0; i<=5; i++) {
std::unique_ptr<cta::ArchiveJob> archiveJob(archiveMount->getNextJob());
std::unique_ptr<cta::ArchiveJob> archiveJob(archiveMount->getNextJob(lc));
if (!archiveJob.get()) {
int __attribute__((__unused__)) debugI=i;
}
......@@ -565,7 +565,7 @@ TEST_P(SchedulerTest, retry_archive_until_max_reached) {
}
// Then the request should be gone
std::unique_ptr<cta::ArchiveJob> archiveJob;
archiveJob.reset(archiveMount->getNextJob().release());
archiveJob.reset(archiveMount->getNextJob(lc).release());
ASSERT_EQ((cta::ArchiveJob*)NULL, archiveJob.get());
}
}
......
......@@ -106,7 +106,7 @@ namespace daemon {
uint64_t files=0;
uint64_t bytes=0;
while(files<=m_maxFiles && bytes<=m_maxBytes) {
std::unique_ptr<cta::ArchiveJob> job=m_archiveMount.getNextJob();
std::unique_ptr<cta::ArchiveJob> job=m_archiveMount.getNextJob(m_lc);
if(!job.get()) break;
files++;
bytes+=job->archiveFile.fileSize;
......@@ -167,7 +167,7 @@ namespace daemon {
uint64_t files=0;
uint64_t bytes=0;
while(files<=req.filesRequested && bytes<=req.bytesRequested) {
std::unique_ptr<cta::ArchiveJob> job=m_parent.m_archiveMount.getNextJob();
std::unique_ptr<cta::ArchiveJob> job=m_parent.m_archiveMount.getNextJob(m_parent.m_lc);
if(!job.get()) break;
files++;
bytes+=job->archiveFile.archiveFileID;
......
......@@ -38,9 +38,9 @@ namespace tapeserver{
namespace daemon {
/**
* This classis responsible for creating the tasks in case of a recall job
* This class is responsible for creating the tasks in case of a recall job
*/
class MigrationTaskInjector /*: public TaskInjector*/ {
class MigrationTaskInjector {
public:
/**
......
......@@ -52,7 +52,7 @@ namespace daemon {
/**
* This classis responsible for creating the tasks in case of a recall job
*/
class RecallTaskInjector /*: public TaskInjector*/ {
class RecallTaskInjector {
public:
/**
* Constructor
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment