Commit 9fe56e88 authored by Cedric CAFFY's avatar Cedric CAFFY
Browse files

Repack RetrieveRequest reported as success after mount and retrieve successful...

Repack RetrieveRequest reported as success after mount and retrieve successful (expandRepackRequest)
parent e1a217f9
......@@ -34,6 +34,10 @@ MountPolicy::MountPolicy():
retrieveMinRequestAge(0),
maxDrivesAllowed(0) {}
MountPolicy::MountPolicy(const std::string name, const uint64_t archivePriority,const uint64_t archiveMinRequestAge, const uint64_t retrievePriority,
const uint64_t retrieveMinRequestAge, const uint64_t maxDrivesAllowed):
name(name), archivePriority(archivePriority), archiveMinRequestAge(archiveMinRequestAge), retrievePriority(retrievePriority),
retrieveMinRequestAge(retrieveMinRequestAge), maxDrivesAllowed(maxDrivesAllowed) {}
//------------------------------------------------------------------------------
// operator==
//------------------------------------------------------------------------------
......@@ -72,6 +76,8 @@ std::ostream &operator<<(std::ostream &os, const MountPolicy &obj) {
return os;
}
MountPolicy MountPolicy::s_defaultMountPolicyForRepack("default_mount_policy_repack",1,1,1,1,1);
} // namespace dataStructures
} // namespace common
} // namespace cta
......@@ -50,6 +50,12 @@ struct MountPolicy {
EntryLog lastModificationLog;
std::string comment;
//As repack request does not have mount policy yet, we need to create a fake one to be able to do a Retrieve mount or Archive mount
static struct MountPolicy s_defaultMountPolicyForRepack;
private:
MountPolicy(const std::string name,const uint64_t archivePriority,const uint64_t archiveMinRequestAge, const uint64_t retrievePriority, const uint64_t retrieveMinRequestAge, const uint64_t maxDrivesAllowed);
}; // struct MountPolicy
std::ostream &operator<<(std::ostream &os, const MountPolicy &obj);
......
......@@ -3044,13 +3044,13 @@ void OStoreDB::RetrieveJob::failReport(const std::string &failureReason, log::Lo
switch (enQueueingNextStep.nextStep) {
// We have a reduced set of supported next steps as some are not compatible with this event
case NextStep::EnqueueForReport: {
typedef objectstore::ContainerAlgorithms<RetrieveQueue,RetrieveQueueToReport> CaAqtr;
CaAqtr caAqtr(m_oStoreDB.m_objectStore, *m_oStoreDB.m_agentReference);
CaAqtr::InsertedElement::list insertedElements;
insertedElements.push_back(CaAqtr::InsertedElement{
typedef objectstore::ContainerAlgorithms<RetrieveQueue,RetrieveQueueToReport> CaRqtr;
CaRqtr caRqtr(m_oStoreDB.m_objectStore, *m_oStoreDB.m_agentReference);
CaRqtr::InsertedElement::list insertedElements;
insertedElements.push_back(CaRqtr::InsertedElement{
&m_retrieveRequest, tf.copyNb, tf.fSeq, af.fileSize, rfqc.mountPolicy, serializers::RetrieveJobStatus::RJS_ToReportForFailure
});
caAqtr.referenceAndSwitchOwnership(tf.vid, insertedElements, lc);
caRqtr.referenceAndSwitchOwnership(tf.vid, insertedElements, lc);
log::ScopedParamContainer params(lc);
params.add("fileId", archiveFile.archiveFileID)
.add("copyNb", tf.copyNb)
......
......@@ -18,6 +18,7 @@
#include "scheduler/RetrieveMount.hpp"
#include "common/Timer.hpp"
#include "common/log/TimingList.hpp"
//------------------------------------------------------------------------------
// constructor
......@@ -144,8 +145,8 @@ std::list<std::unique_ptr<cta::RetrieveJob> > cta::RetrieveMount::getNextJobBatc
// waitAndFinishSettingJobsBatchRetrieved()
//------------------------------------------------------------------------------
void cta::RetrieveMount::waitAndFinishSettingJobsBatchRetrieved(std::queue<std::unique_ptr<cta::RetrieveJob> >& successfulRetrieveJobs, cta::log::LogContext& logContext) {
std::list<std::unique_ptr<cta::RetrieveJob> > validatedSuccessfulArchiveJobs;
std::list<cta::SchedulerDatabase::RetrieveJob *> validatedSuccessfulDBArchiveJobs;
std::list<std::unique_ptr<cta::RetrieveJob> > validatedSuccessfulRetrieveJobs;
std::list<cta::SchedulerDatabase::RetrieveJob *> validatedSuccessfulDBRetrieveJobs;
std::unique_ptr<cta::RetrieveJob> job;
double waitUpdateCompletionTime=0;
double jobBatchFinishingTime=0;
......@@ -153,6 +154,7 @@ void cta::RetrieveMount::waitAndFinishSettingJobsBatchRetrieved(std::queue<std::
uint64_t files=0;
uint64_t bytes=0;
utils::Timer t;
log::TimingList tl;
try {
while (!successfulRetrieveJobs.empty()) {
job = std::move(successfulRetrieveJobs.front());
......@@ -161,23 +163,24 @@ void cta::RetrieveMount::waitAndFinishSettingJobsBatchRetrieved(std::queue<std::
files++;
bytes+=job->archiveFile.fileSize;
job->checkComplete();
validatedSuccessfulDBArchiveJobs.emplace_back(job->m_dbJob.get());
validatedSuccessfulArchiveJobs.emplace_back(std::move(job));
validatedSuccessfulDBRetrieveJobs.emplace_back(job->m_dbJob.get());
validatedSuccessfulRetrieveJobs.emplace_back(std::move(job));
job.reset();
}
waitUpdateCompletionTime=t.secs(utils::Timer::resetCounter);
tl.insertAndReset("waitUpdateCompletionTime",t);
// Complete the cleaning up of the jobs in the mount
m_dbMount->finishSettingJobsBatchSuccessful(validatedSuccessfulDBArchiveJobs, logContext);
m_dbMount->finishSettingJobsBatchSuccessful(validatedSuccessfulDBRetrieveJobs, logContext);
jobBatchFinishingTime=t.secs();
tl.insertOrIncrement("jobBatchFinishingTime",jobBatchFinishingTime);
schedulerDbTime=jobBatchFinishingTime + waitUpdateCompletionTime;
tl.insertOrIncrement("schedulerDbTime",schedulerDbTime);
{
cta::log::ScopedParamContainer params(logContext);
params.add("successfulRetrieveJobs", successfulRetrieveJobs.size())
.add("files", files)
.add("bytes", bytes)
.add("waitUpdateCompletionTime", waitUpdateCompletionTime)
.add("jobBatchFinishingTime", jobBatchFinishingTime)
.add("schedulerDbTime", schedulerDbTime);
.add("bytes", bytes);
tl.addToLog(params);
logContext.log(cta::log::DEBUG,"In RetrieveMout::waitAndFinishSettingJobsBatchRetrieved(): deleted complete retrieve jobs.");
}
} catch(const cta::exception::Exception& e){
......
......@@ -189,6 +189,7 @@ void Scheduler::queueRetrieve(
} else {
//Repack does not need policy
queueCriteria.archiveFile = m_catalogue.getArchiveFileById(request.archiveFileID);
queueCriteria.mountPolicy = common::dataStructures::MountPolicy::s_defaultMountPolicyForRepack;
}
auto catalogueTime = t.secs(cta::utils::Timer::resetCounter);
std::string selectedVid = m_db.queueRetrieve(request, queueCriteria, lc);
......@@ -410,7 +411,6 @@ const std::string Scheduler::generateRetrieveDstURL(const cta::common::dataStruc
void Scheduler::expandRepackRequest(std::unique_ptr<RepackRequest>& repackRequest, log::TimingList&, utils::Timer&, log::LogContext& lc) {
uint64_t fseq = c_defaultFseqForRepack;
std::list<common::dataStructures::ArchiveFile> files;
std::list<uint64_t> copyNbs;
auto vid = repackRequest->getRepackInfo().vid;
while(true) {
files = m_catalogue.getFilesForRepack(vid,fseq,c_defaultMaxNbFilesForRepack);
......@@ -421,7 +421,6 @@ void Scheduler::expandRepackRequest(std::unique_ptr<RepackRequest>& repackReques
retrieveRequest.diskFileInfo = archiveFile.diskFileInfo;
retrieveRequest.dstURL = generateRetrieveDstURL(archiveFile.diskFileInfo);
retrieveRequest.isRepack = true;
//retrieveRequest.requester = repackRequest->
queueRetrieve(archiveFile.diskInstance,retrieveRequest,lc);
}
if (files.size()) {
......
......@@ -35,6 +35,7 @@
#include "objectstore/BackendRadosTestSwitch.hpp"
#include "tests/TestsCompileTimeSwitches.hpp"
#include "common/Timer.hpp"
#include "tapeserver/castor/tape/tapeserver/daemon/RecallReportPacker.hpp"
#ifdef STDOUT_LOGGING
#include "common/log/StdoutLogger.hpp"
......@@ -1082,20 +1083,14 @@ TEST_P(SchedulerTest, getNextRepackRequestToExpand) {
TEST_P(SchedulerTest, expandRepackRequest) {
using namespace cta;
setupDefaultCatalogue();
auto &catalogue = getCatalogue();
auto &scheduler = getScheduler();
setupDefaultCatalogue();
cta::log::DummyLogger dummyLogger("dummy","dummy");
log::LogContext lc(dummyLogger);
const std::string vid1 = "VID123";
const std::string vid2 = "VID456";
const std::string mediaType = "media_type";
const std::string vendor = "vendor";
const std::string logicalLibraryName = "logical_library_name";
const std::string tapePoolName1 = "tape_pool_name_1";
const std::string vo = "vo";
const uint64_t capacityInBytes = (uint64_t)10 * 1000 * 1000 * 1000 * 1000;
const bool disabledValue = false;
const bool fullValue = false;
......@@ -1104,10 +1099,14 @@ TEST_P(SchedulerTest, expandRepackRequest) {
admin.username = "admin_user_name";
admin.host = "admin_host";
catalogue.createLogicalLibrary(admin, logicalLibraryName, "Create logical library");
catalogue.createTapePool(admin, tapePoolName1, vo, 1, true, "Create tape pool");
catalogue.createTape(admin, vid1, mediaType, vendor, logicalLibraryName, tapePoolName1, capacityInBytes,
//Create a logical library in the catalogue
catalogue.createLogicalLibrary(admin, s_libraryName, "Create logical library");
//Create the tape from which we will retrieve
catalogue.createTape(s_adminOnAdminHost, s_vid, s_mediaType, s_vendor, s_libraryName, s_tapePoolName, capacityInBytes,
disabledValue, fullValue, comment);
//Create a storage class in the catalogue
common::dataStructures::StorageClass storageClass;
storageClass.diskInstance = "disk_instance";
storageClass.name = "storage_class";
......@@ -1118,19 +1117,19 @@ TEST_P(SchedulerTest, expandRepackRequest) {
const std::string checksumType = "checksum_type";
const std::string checksumValue = "checksum_value";
const std::string tapeDrive = "tape_drive";
const uint64_t nbArchiveFiles = 10; // Must be a multiple of 2 fo rthis test
const uint64_t nbArchiveFiles = 10;
const uint64_t archiveFileSize = 2 * 1000 * 1000 * 1000;
const uint64_t compressedFileSize = archiveFileSize;
//Simulate the writing of 10 files in a tape (just in the catalogue)
std::set<catalogue::TapeItemWrittenPointer> tapeFilesWrittenCopy1;
{
for(uint64_t i = 1; i <= nbArchiveFiles; i++) {
std::ostringstream diskFileId;
diskFileId << (12345677 + i);
std::ostringstream diskFilePath;
diskFilePath << "/public_dir/public_file_" << i;
// Tape copy 1 written to tape
auto fileWrittenUP=cta::make_unique<cta::catalogue::TapeFileWritten>();
auto & fileWritten = *fileWrittenUP;
fileWritten.archiveFileId = i;
......@@ -1144,7 +1143,7 @@ TEST_P(SchedulerTest, expandRepackRequest) {
fileWritten.checksumType = checksumType;
fileWritten.checksumValue = checksumValue;
fileWritten.storageClassName = storageClass.name;
fileWritten.vid = vid1;
fileWritten.vid = s_vid;
fileWritten.fSeq = i;
fileWritten.blockId = i * 100;
fileWritten.compressedSize = compressedFileSize;
......@@ -1154,34 +1153,85 @@ TEST_P(SchedulerTest, expandRepackRequest) {
}
//update the DB tape
catalogue.filesWrittenToTape(tapeFilesWrittenCopy1);
scheduler.queueRepack(admin,vid1,"bufferURL",common::dataStructures::RepackInfo::Type::ExpandOnly,lc);
}
//Test the expandRepackRequest method
scheduler.waitSchedulerDbSubthreadsComplete();
{
scheduler.queueRepack(admin,s_vid,"bufferURL",common::dataStructures::RepackInfo::Type::ExpandOnly,lc);
scheduler.promoteRepackRequestsToToExpand(lc);
auto repackRequestToExpand = scheduler.getNextRepackRequestToExpand();
log::TimingList tl;
utils::Timer t;
scheduler.expandRepackRequest(repackRequestToExpand,tl,t,lc);
scheduler.waitSchedulerDbSubthreadsComplete();
std::list<common::dataStructures::RetrieveJob> retrieveJobs = scheduler.getPendingRetrieveJobs(vid1,lc);
}
{
//The expandRepackRequest method should have queued 10 retrieve request corresponding to the 10 previous file inserted in the catalogue
std::list<common::dataStructures::RetrieveJob> retrieveJobs = scheduler.getPendingRetrieveJobs(s_vid,lc);
ASSERT_EQ(retrieveJobs.size(),10);
int i = 1;
for(auto retrieveJob : retrieveJobs){
//Test that the informations are correct for each file
ASSERT_EQ(retrieveJob.request.archiveFileID,i);
ASSERT_EQ(retrieveJob.fileSize,compressedFileSize);
std::stringstream ss;
ss<<"repack://public_dir/public_file_"<<i;
ASSERT_EQ(retrieveJob.request.dstURL, ss.str());
ASSERT_EQ(retrieveJob.tapeCopies[vid1].second.copyNb,1);
ASSERT_EQ(retrieveJob.tapeCopies[vid1].second.checksumType,checksumType);
ASSERT_EQ(retrieveJob.tapeCopies[vid1].second.checksumValue,checksumValue);
ASSERT_EQ(retrieveJob.tapeCopies[vid1].second.blockId,i*100);
ASSERT_EQ(retrieveJob.tapeCopies[vid1].second.compressedSize,compressedFileSize);
ASSERT_EQ(retrieveJob.tapeCopies[vid1].second.fSeq,i);
ASSERT_EQ(retrieveJob.tapeCopies[vid1].second.vid,vid1);
ASSERT_EQ(retrieveJob.tapeCopies[s_vid].second.copyNb,1);
ASSERT_EQ(retrieveJob.tapeCopies[s_vid].second.checksumType,checksumType);
ASSERT_EQ(retrieveJob.tapeCopies[s_vid].second.checksumValue,checksumValue);
ASSERT_EQ(retrieveJob.tapeCopies[s_vid].second.blockId,i*100);
ASSERT_EQ(retrieveJob.tapeCopies[s_vid].second.compressedSize,compressedFileSize);
ASSERT_EQ(retrieveJob.tapeCopies[s_vid].second.fSeq,i);
ASSERT_EQ(retrieveJob.tapeCopies[s_vid].second.vid,s_vid);
++i;
}
}
//Now, we need to simulate a retrieve for each file
scheduler.waitSchedulerDbSubthreadsComplete();
{
// Emulate a tape server by asking for a mount and then a file
std::unique_ptr<cta::TapeMount> mount;
mount.reset(scheduler.getNextMount(s_libraryName, "drive0", lc).release());
ASSERT_NE(nullptr, mount.get());
ASSERT_EQ(cta::common::dataStructures::MountType::Retrieve, mount.get()->getMountType());
std::unique_ptr<cta::RetrieveMount> retrieveMount;
retrieveMount.reset(dynamic_cast<cta::RetrieveMount*>(mount.release()));
ASSERT_NE(nullptr, retrieveMount.get());
std::unique_ptr<cta::RetrieveJob> retrieveJob;
std::list<std::unique_ptr<cta::RetrieveJob>> executedJobs;
for(int i = 1;i<=10;++i)
{
auto jobBatch = retrieveMount->getNextJobBatch(1,archiveFileSize,lc);
retrieveJob.reset(jobBatch.front().release());
ASSERT_NE(nullptr, retrieveJob.get());
ASSERT_EQ(retrieveJob->archiveFile.archiveFileID,i);
//Set the retrieve job to succeed
retrieveJob->asyncComplete();
retrieveJob->checkComplete();
executedJobs.push_back(std::move(retrieveJob));
}
//Now, report the retrieve jobs to be completed
castor::tape::tapeserver::daemon::RecallReportPacker rrp(retrieveMount.get(),lc);
rrp.startThreads();
for(auto it = executedJobs.begin(); it != executedJobs.end(); ++it)
{
rrp.reportCompletedJob(std::move(*it));
}
rrp.setDiskDone();
rrp.setTapeDone();
rrp.reportDriveStatus(cta::common::dataStructures::DriveStatus::Unmounting);
rrp.reportEndOfSession();
rrp.waitThread();
ASSERT_EQ(rrp.allThreadsDone(),true);
}
}
#undef TEST_MOCK_DB
......
......@@ -23,7 +23,7 @@
#pragma once
#include "castor/tape/tapeserver/daemon/ReportPackerInterface.hpp"
#include "tapeserver/castor/tape/tapeserver/daemon/ReportPackerInterface.hpp"
#include "common/log/LogContext.hpp"
#include "common/threading/Thread.hpp"
#include "common/threading/BlockingQueue.hpp"
......
......@@ -24,7 +24,7 @@
#pragma once
#include "common/log/LogContext.hpp"
#include "castor/tape/tapeserver/utils/suppressUnusedVariable.hpp"
#include "tapeserver/castor/tape/tapeserver/utils/suppressUnusedVariable.hpp"
#include <memory>
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment