Commit 5435941d authored by Eric Cano's avatar Eric Cano
Browse files

#60 (patrtial) added logging for archive and retrieve queueing.

parent 27a91641
......@@ -23,6 +23,7 @@
#include "ArchiveMount.hpp"
#include "RetrieveMount.hpp"
#include "common/utils/utils.hpp"
#include "common/Timer.hpp"
#include "common/exception/NonRetryableError.hpp"
#include "common/exception/UserError.hpp"
......@@ -73,9 +74,46 @@ void Scheduler::authorizeAdmin(const common::dataStructures::SecurityIdentity &c
//------------------------------------------------------------------------------
// queueArchive
//------------------------------------------------------------------------------
uint64_t Scheduler::queueArchive(const std::string &instanceName, const common::dataStructures::ArchiveRequest &request) {
uint64_t Scheduler::queueArchive(const std::string &instanceName, const common::dataStructures::ArchiveRequest &request,
log::LogContext & lc) {
cta::utils::Timer t;
using utils::postEllipsis;
using utils::midEllipsis;
auto catalogueInfo = m_catalogue.prepareForNewFile(instanceName, request.storageClass, request.requester);
auto catalogueTime = t.secs(cta::utils::Timer::resetCounter);
m_db.queueArchive(instanceName, request, catalogueInfo);
auto schedulerDbTime = t.secs();
log::ScopedParamContainer spc(lc);
spc.add("instanceName", instanceName)
.add("storageClass", request.storageClass)
.add("diskFileID", request.diskFileID)
.add("fileSize", request.fileSize)
.add("fileId", catalogueInfo.fileId);
for (auto & ctp: catalogueInfo.copyToPoolMap) {
std::stringstream tp;
tp << "tapePool" << ctp.first;
spc.add(tp.str(), ctp.second);
}
spc.add("policyName", catalogueInfo.mountPolicy.name)
.add("policyArchiveMinAge", catalogueInfo.mountPolicy.archiveMinRequestAge)
.add("policyArchivePriority", catalogueInfo.mountPolicy.archivePriority)
.add("policyMaxDrives", catalogueInfo.mountPolicy.maxDrivesAllowed)
.add("diskFilePath", request.diskFileInfo.path)
.add("diskFileOwner", request.diskFileInfo.owner)
.add("diskFileGroup", request.diskFileInfo.group)
.add("diskFileRecoveryBlob", postEllipsis(request.diskFileInfo.recoveryBlob, 20))
.add("checksumValue", request.checksumValue)
.add("checksumType", request.checksumType)
.add("archiveReportURL", midEllipsis(request.archiveReportURL, 50, 15))
.add("creationHost", request.creationLog.host)
.add("creationTime", request.creationLog.time)
.add("creationUser", request.creationLog.username)
.add("requesterName", request.requester.name)
.add("requesterGroup", request.requester.group)
.add("srcURL", midEllipsis(request.srcURL, 50, 15))
.add("catalogueTime", catalogueTime)
.add("schedulerDbTime", schedulerDbTime);
lc.log(log::INFO, "Queued archive request");
return catalogueInfo.fileId;
}
......@@ -84,7 +122,11 @@ uint64_t Scheduler::queueArchive(const std::string &instanceName, const common::
//------------------------------------------------------------------------------
void Scheduler::queueRetrieve(
const std::string &instanceName,
const common::dataStructures::RetrieveRequest &request) {
const common::dataStructures::RetrieveRequest &request,
log::LogContext & lc) {
using utils::postEllipsis;
using utils::midEllipsis;
utils::Timer t;
// Get the
const common::dataStructures::RetrieveFileQueueCriteria queueCriteria =
m_catalogue.prepareToRetrieveFile(instanceName, request.archiveFileID, request.requester);
......@@ -101,6 +143,7 @@ void Scheduler::queueRetrieve(
}
if (vids.empty())
throw exception::NonRetryableError("In Scheduler::queueRetrieve(): all copies are on disabled tapes");
auto catalogueTime = t.secs(utils::Timer::resetCounter);
// Get the statistics for the potential tapes on which we will retrieve.
auto stats=m_db.getRetrieveQueueStatistics(queueCriteria, vids);
// Sort the potential queues.
......@@ -122,6 +165,45 @@ void Scheduler::queueRetrieve(
std::advance(it, index);
std::string selectedVid=*it;
m_db.queueRetrieve(request, queueCriteria, selectedVid);
auto schedulerDbTime = t.secs();
log::ScopedParamContainer spc(lc);
spc.add("archiveFileID", request.archiveFileID)
.add("instanceName", instanceName)
.add("diskFilePath", request.diskFileInfo.path)
.add("diskFileOwner", request.diskFileInfo.owner)
.add("diskFileGroup", request.diskFileInfo.group)
.add("diskFileRecoveryBlob", postEllipsis(request.diskFileInfo.recoveryBlob, 20))
.add("dstURL", request.dstURL)
.add("creationHost", request.entryLog.host)
.add("creationTime", request.entryLog.time)
.add("creationUser", request.entryLog.username)
.add("requesterName", request.requester.name)
.add("requesterGroup", request.requester.group)
.add("criteriaArchiveFileId", queueCriteria.archiveFile.archiveFileID)
.add("criteriaChecksumType", queueCriteria.archiveFile.checksumType)
.add("criteriaChecksumValue", queueCriteria.archiveFile.checksumValue)
.add("criteriaCreationTime", queueCriteria.archiveFile.creationTime)
.add("criteriaDiskFileId", queueCriteria.archiveFile.diskFileId)
.add("criteriaDiskFilePath", queueCriteria.archiveFile.diskFileInfo.path)
.add("criteriaDiskFileOwner", queueCriteria.archiveFile.diskFileInfo.owner)
.add("criteriaDiskRecoveryBlob", postEllipsis(queueCriteria.archiveFile.diskFileInfo.recoveryBlob, 20))
.add("criteriaDiskInstance", queueCriteria.archiveFile.diskInstance)
.add("criteriaFileSize", queueCriteria.archiveFile.fileSize)
.add("reconciliationTime", queueCriteria.archiveFile.reconciliationTime)
.add("storageClass", queueCriteria.archiveFile.storageClass);
for (auto & tf:queueCriteria.archiveFile.tapeFiles) {
std::stringstream tc;
tc << "tapeCopy" << tf.first;
spc.add(tc.str(), tf.second);
}
spc.add("selectedVid", selectedVid)
.add("policyName", queueCriteria.mountPolicy.name)
.add("policyMaxDrives", queueCriteria.mountPolicy.maxDrivesAllowed)
.add("policyMinAge", queueCriteria.mountPolicy.retrieveMinRequestAge)
.add("policyPriority", queueCriteria.mountPolicy.retrievePriority)
.add("catalogueTime", catalogueTime)
.add("schedulerDbTime", schedulerDbTime);
lc.log(log::INFO, "Queued retrieve request");
}
//------------------------------------------------------------------------------
......
......@@ -42,6 +42,7 @@
#include "common/dataStructures/WriteTestResult.hpp"
#include "common/exception/Exception.hpp"
#include "common/log/LogContext.hpp"
#include "scheduler/TapeMount.hpp"
#include "scheduler/SchedulerDatabase.hpp"
......@@ -88,15 +89,21 @@ public:
* Queue an archive request and return the CTA file ID.
* Throws a UserError exception in case of wrong request parameters (ex. no route to tape)
* Throws a (Non)RetryableError exception in case something else goes wrong with the request
* @param instanceName name of the EOS instance
* @param request the archive request
* @param lc a log context allowing logging from within the scheduler routine.
* @return
*/
uint64_t queueArchive(const std::string &instanceName, const cta::common::dataStructures::ArchiveRequest &request);
uint64_t queueArchive(const std::string &instanceName, const cta::common::dataStructures::ArchiveRequest &request,
log::LogContext &lc);
/**
* Queue a retrieve request.
* Throws a UserError exception in case of wrong request parameters (ex. unknown file id)
* Throws a (Non)RetryableError exception in case something else goes wrong with the request
*/
void queueRetrieve(const std::string &instanceName, const cta::common::dataStructures::RetrieveRequest &request);
void queueRetrieve(const std::string &instanceName, const cta::common::dataStructures::RetrieveRequest &request,
log::LogContext &lc);
/**
* Delete an archived file or a file which is in the process of being archived. Returns the information
......
......@@ -32,6 +32,7 @@
#include "scheduler/SchedulerDatabaseFactory.hpp"
#include "scheduler/TapeMount.hpp"
#include "tests/TempFile.hpp"
#include "common/log/DummyLogger.hpp"
#include <exception>
#include <gtest/gtest.h>
......@@ -236,7 +237,9 @@ TEST_P(SchedulerTest, archive_to_new_file) {
request.srcURL="srcURL";
request.storageClass=s_storageClassName;
scheduler.queueArchive(s_diskInstance, request);
log::DummyLogger dl("");
log::LogContext lc(dl);
scheduler.queueArchive(s_diskInstance, request, lc);
{
auto rqsts = scheduler.getPendingArchiveJobs();
......@@ -288,7 +291,9 @@ TEST_P(SchedulerTest, delete_archive_request) {
request.srcURL="srcURL";
request.storageClass=s_storageClassName;
auto archiveFileId = scheduler.queueArchive(s_diskInstance, request);
log::DummyLogger dl("");
log::LogContext lc(dl);
auto archiveFileId = scheduler.queueArchive(s_diskInstance, request, lc);
// Check that we have the file in the queues
// TODO: for this to work all the time, we need an index of all requests
......@@ -329,6 +334,9 @@ TEST_P(SchedulerTest, archive_and_retrieve_new_file) {
setupDefaultCatalogue();
log::DummyLogger dl("");
log::LogContext lc(dl);
uint64_t archiveFileId;
{
// Queue an archive request.
......@@ -354,7 +362,7 @@ TEST_P(SchedulerTest, archive_and_retrieve_new_file) {
request.requester = requester;
request.srcURL="srcURL";
request.storageClass=s_storageClassName;
archiveFileId = scheduler.queueArchive(s_diskInstance, request);
archiveFileId = scheduler.queueArchive(s_diskInstance, request, lc);
}
// Check that we have the file in the queues
......@@ -436,7 +444,7 @@ TEST_P(SchedulerTest, archive_and_retrieve_new_file) {
request.dstURL = "dstURL";
request.requester.name = s_userName;
request.requester.group = "userGroup";
scheduler.queueRetrieve("disk_instance", request);
scheduler.queueRetrieve("disk_instance", request, lc);
}
// Check that the retrieve request is queued
......@@ -484,6 +492,9 @@ TEST_P(SchedulerTest, retry_archive_until_max_reached) {
auto &scheduler = getScheduler();
auto &catalogue = getCatalogue();
log::DummyLogger dl("");
log::LogContext lc(dl);
uint64_t archiveFileId;
{
// Queue an archive request.
......@@ -509,7 +520,7 @@ TEST_P(SchedulerTest, retry_archive_until_max_reached) {
request.requester = requester;
request.srcURL="srcURL";
request.storageClass=s_storageClassName;
archiveFileId = scheduler.queueArchive(s_diskInstance, request);
archiveFileId = scheduler.queueArchive(s_diskInstance, request, lc);
}
// Create the environment for the migration to happen (library + tape)
......@@ -565,6 +576,9 @@ TEST_P(SchedulerTest, retrieve_non_existing_file) {
setupDefaultCatalogue();
Scheduler &scheduler = getScheduler();
log::DummyLogger dl("");
log::LogContext lc(dl);
{
cta::common::dataStructures::EntryLog creationLog;
......@@ -583,7 +597,7 @@ TEST_P(SchedulerTest, retrieve_non_existing_file) {
request.dstURL = "dstURL";
request.requester.name = s_userName;
request.requester.group = "userGroup";
ASSERT_THROW(scheduler.queueRetrieve("disk_instance", request), cta::exception::Exception);
ASSERT_THROW(scheduler.queueRetrieve("disk_instance", request, lc), cta::exception::Exception);
}
}
......
......@@ -305,6 +305,7 @@ protected:
TEST_P(DataTransferSessionTest, DataTransferSessionGooddayRecall) {
// 0) Prepare the logger for everyone
cta::log::StringLogger logger("tapeServerUnitTest",cta::log::DEBUG);
cta::log::LogContext logContext(logger);
setupDefaultCatalogue();
// 1) prepare the fake scheduler
......@@ -414,7 +415,7 @@ TEST_P(DataTransferSessionTest, DataTransferSessionGooddayRecall) {
rReq.requester.group = "someGroup";
rReq.dstURL = remoteFilePaths.back();
std::list<std::string> archiveFilePaths;
scheduler.queueRetrieve(diskInstance, rReq);
scheduler.queueRetrieve(diskInstance, rReq, logContext);
}
}
......@@ -476,6 +477,7 @@ TEST_P(DataTransferSessionTest, DataTransferSessionWrongRecall) {
// 0) Prepare the logger for everyone
cta::log::StringLogger logger("tapeServerUnitTest",cta::log::DEBUG);
cta::log::LogContext logContext(logger);
setupDefaultCatalogue();
// 1) prepare the fake scheduler
......@@ -605,7 +607,7 @@ TEST_P(DataTransferSessionTest, DataTransferSessionWrongRecall) {
rReq.requester.group = "someGroup";
rReq.dstURL = remoteFilePaths.back();
std::list<std::string> archiveFilePaths;
scheduler.queueRetrieve(diskInstance, rReq);
scheduler.queueRetrieve(diskInstance, rReq, logContext);
}
}
......@@ -659,6 +661,7 @@ TEST_P(DataTransferSessionTest, DataTransferSessionNoSuchDrive) {
// 0) Prepare the logger for everyone
cta::log::StringLogger logger("tapeServerUnitTest",cta::log::DEBUG);
cta::log::LogContext logContext(logger);
setupDefaultCatalogue();
// 1) prepare the fake scheduler
......@@ -768,7 +771,7 @@ TEST_P(DataTransferSessionTest, DataTransferSessionNoSuchDrive) {
rReq.requester.group = "someGroup";
rReq.dstURL = remoteFilePaths.back();
std::list<std::string> archiveFilePaths;
scheduler.queueRetrieve(diskInstance, rReq);
scheduler.queueRetrieve(diskInstance, rReq, logContext);
}
}
......@@ -800,6 +803,7 @@ TEST_P(DataTransferSessionTest, DataTransferSessionFailtoMount) {
// 0) Prepare the logger for everyone
cta::log::StringLogger logger("tapeServerUnitTest",cta::log::DEBUG);
cta::log::LogContext logContext(logger);
setupDefaultCatalogue();
// 1) prepare the fake scheduler
......@@ -910,7 +914,7 @@ TEST_P(DataTransferSessionTest, DataTransferSessionFailtoMount) {
rReq.requester.group = "someGroup";
rReq.dstURL = remoteFilePaths.back();
std::list<std::string> archiveFilePaths;
scheduler.queueRetrieve(diskInstance, rReq);
scheduler.queueRetrieve(diskInstance, rReq, logContext);
}
}
......@@ -957,6 +961,7 @@ TEST_P(DataTransferSessionTest, DataTransferSessionGooddayMigration) {
// 0) Prepare the logger for everyone
cta::log::StringLogger logger("tapeServerUnitTest",cta::log::DEBUG);
cta::log::LogContext logContext(logger);
setupDefaultCatalogue();
// 1) prepare the fake scheduler
......@@ -1036,7 +1041,7 @@ TEST_P(DataTransferSessionTest, DataTransferSessionGooddayMigration) {
ar.diskFileInfo.owner = "z";
ar.diskFileInfo.group = "g";
ar.diskFileInfo.recoveryBlob = "b";
archiveFileIds.push_back(scheduler.queueArchive(s_diskInstance,ar));
archiveFileIds.push_back(scheduler.queueArchive(s_diskInstance,ar,logContext));
}
}
// Report the drive's existence and put it up in the drive register.
......@@ -1093,6 +1098,7 @@ TEST_P(DataTransferSessionTest, DataTransferSessionMissingFilesMigration) {
// 0) Prepare the logger for everyone
cta::log::StringLogger logger("tapeServerUnitTest",cta::log::DEBUG);
cta::log::LogContext logContext(logger);
setupDefaultCatalogue();
// 1) prepare the fake scheduler
......@@ -1172,7 +1178,7 @@ TEST_P(DataTransferSessionTest, DataTransferSessionMissingFilesMigration) {
ar.diskFileInfo.owner = "z";
ar.diskFileInfo.group = "g";
ar.diskFileInfo.recoveryBlob = "b";
archiveFileIds.push_back(scheduler.queueArchive(s_diskInstance,ar));
archiveFileIds.push_back(scheduler.queueArchive(s_diskInstance,ar,logContext));
// Delete the file: the migration will fail.
sourceFiles.clear();
}
......@@ -1226,6 +1232,7 @@ TEST_P(DataTransferSessionTest, DataTransferSessionMissingFilesMigration) {
TEST_P(DataTransferSessionTest, DataTransferSessionTapeFullMigration) {
// 0) Prepare the logger for everyone
cta::log::StringLogger logger("tapeServerUnitTest",cta::log::DEBUG);
cta::log::LogContext logContext(logger);
setupDefaultCatalogue();
// 1) prepare the fake scheduler
......@@ -1306,7 +1313,7 @@ TEST_P(DataTransferSessionTest, DataTransferSessionTapeFullMigration) {
ar.diskFileInfo.owner = "z";
ar.diskFileInfo.group = "g";
ar.diskFileInfo.recoveryBlob = "b";
archiveFileIds.push_back(scheduler.queueArchive(s_diskInstance,ar));
archiveFileIds.push_back(scheduler.queueArchive(s_diskInstance,ar,logContext));
}
}
// Report the drive's existence and put it up in the drive register.
......@@ -1371,6 +1378,7 @@ TEST_P(DataTransferSessionTest, DataTransferSessionTapeFullMigration) {
TEST_P(DataTransferSessionTest, DataTransferSessionTapeFullOnFlushMigration) {
// 0) Prepare the logger for everyone
cta::log::StringLogger logger("tapeServerUnitTest",cta::log::DEBUG);
cta::log::LogContext logContext(logger);
setupDefaultCatalogue();
// 1) prepare the fake scheduler
......@@ -1452,7 +1460,7 @@ TEST_P(DataTransferSessionTest, DataTransferSessionTapeFullOnFlushMigration) {
ar.diskFileInfo.owner = "z";
ar.diskFileInfo.group = "g";
ar.diskFileInfo.recoveryBlob = "b";
archiveFileIds.push_back(scheduler.queueArchive(s_diskInstance,ar));
archiveFileIds.push_back(scheduler.queueArchive(s_diskInstance,ar,logContext));
}
}
// Report the drive's existence and put it up in the drive register.
......
......@@ -185,7 +185,7 @@ void MigrationReportPacker::ReportFlush::execute(MigrationReportPacker& reportPa
.add("lastKnownDiskPath", job->archiveFile.diskFileInfo.path);
}
const std::string msg_error="An exception was caught trying to call reportMigrationResults";
reportPacker.m_lc.log(cta::log::ERR,msg_error);
reportPacker.m_lc.log(cta::log::ERR, msg_error);
throw failedMigrationRecallResult(msg_error);
} catch(const std::exception& e){
cta::log::ScopedParamContainer params(reportPacker.m_lc);
......@@ -197,7 +197,7 @@ void MigrationReportPacker::ReportFlush::execute(MigrationReportPacker& reportPa
.add("lastKnownDiskPath", job->archiveFile.diskFileInfo.path);
}
const std::string msg_error="An std::exception was caught trying to call reportMigrationResults";
reportPacker.m_lc.log(cta::log::ERR,msg_error);
reportPacker.m_lc.log(cta::log::ERR, msg_error);
throw failedMigrationRecallResult(msg_error);
}
} else {
......
......@@ -2026,7 +2026,8 @@ void XrdCtaFile::xCom_archive() {
request.srcURL=srcurl.value();
request.storageClass=storageclass.value();
request.archiveReportURL=archiveReportURL.value();
uint64_t archiveFileId = m_scheduler->queueArchive(m_cliIdentity.username, request);
log::LogContext lc(m_log);
uint64_t archiveFileId = m_scheduler->queueArchive(m_cliIdentity.username, request, lc);
cmdlineOutput << "<eos::wfe::path::fxattr:sys.archiveFileId>" << archiveFileId << std::endl;
logRequestAndSetCmdlineResult(cta::common::dataStructures::FrontendReturnCode::ok, cmdlineOutput.str());
}
......@@ -2062,7 +2063,8 @@ void XrdCtaFile::xCom_retrieve() {
request.archiveFileID=id.value();
request.requester=originator;
request.dstURL=dsturl.value();
m_scheduler->queueRetrieve(m_cliIdentity.username, request);
log::LogContext lc(m_log);
m_scheduler->queueRetrieve(m_cliIdentity.username, request, lc);
logRequestAndSetCmdlineResult(cta::common::dataStructures::FrontendReturnCode::ok, cmdlineOutput.str());
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment