Commit 943584b9 authored by Cedric CAFFY's avatar Cedric CAFFY
Browse files

Prepared the test for the reporting of succeeded ArchiveJobs from Repack ArchiveRequests

parent 8f2d02ae
...@@ -407,6 +407,7 @@ switchElementsOwnership(PoppedElementsBatch &poppedElementBatch, const Container ...@@ -407,6 +407,7 @@ switchElementsOwnership(PoppedElementsBatch &poppedElementBatch, const Container
e->archiveReportURL = u->get()->getArchiveReportURL(); e->archiveReportURL = u->get()->getArchiveReportURL();
e->errorReportURL = u->get()->getArchiveErrorReportURL(); e->errorReportURL = u->get()->getArchiveErrorReportURL();
e->srcURL = u->get()->getSrcURL(); e->srcURL = u->get()->getSrcURL();
e->repackInfo = u->get()->getRepackInfo();
switch(u->get()->getJobStatus()) { switch(u->get()->getJobStatus()) {
case serializers::ArchiveJobStatus::AJS_ToReportToUserForTransfer: case serializers::ArchiveJobStatus::AJS_ToReportToUserForTransfer:
e->reportType = SchedulerDatabase::ArchiveJob::ReportType::CompletionReport; e->reportType = SchedulerDatabase::ArchiveJob::ReportType::CompletionReport;
......
...@@ -560,6 +560,10 @@ ArchiveRequest::AsyncJobOwnerUpdater* ArchiveRequest::asyncUpdateJobOwner(uint32 ...@@ -560,6 +560,10 @@ ArchiveRequest::AsyncJobOwnerUpdater* ArchiveRequest::asyncUpdateJobOwner(uint32
retRef.m_archiveReportURL = payload.archivereporturl(); retRef.m_archiveReportURL = payload.archivereporturl();
retRef.m_archiveErrorReportURL = payload.archiveerrorreporturl(); retRef.m_archiveErrorReportURL = payload.archiveerrorreporturl();
retRef.m_srcURL = payload.srcurl(); retRef.m_srcURL = payload.srcurl();
retRef.m_repackInfo.fSeq = payload.repack_info().fseq();
retRef.m_repackInfo.fileBufferURL = payload.repack_info().file_buffer_url();
retRef.m_repackInfo.isRepack = payload.isrepack();
retRef.m_repackInfo.repackRequestAddress = payload.repack_info().repack_request_address();
if (j->failurelogs_size()) { if (j->failurelogs_size()) {
retRef.m_latestError = j->failurelogs(j->failurelogs_size()-1); retRef.m_latestError = j->failurelogs(j->failurelogs_size()-1);
} }
...@@ -626,6 +630,13 @@ objectstore::serializers::ArchiveJobStatus ArchiveRequest::AsyncJobOwnerUpdater: ...@@ -626,6 +630,13 @@ objectstore::serializers::ArchiveJobStatus ArchiveRequest::AsyncJobOwnerUpdater:
return m_jobStatus; return m_jobStatus;
} }
//------------------------------------------------------------------------------
// ArchiveRequest::AsyncJobOwnerUpdater::getRepackInfo()
//------------------------------------------------------------------------------
ArchiveRequest::RepackInfo ArchiveRequest::AsyncJobOwnerUpdater::getRepackInfo(){
return m_repackInfo;
}
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
// ArchiveRequest::asyncUpdateTransferSuccessful() // ArchiveRequest::asyncUpdateTransferSuccessful()
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
......
...@@ -93,6 +93,15 @@ private: ...@@ -93,6 +93,15 @@ private:
*/ */
EnqueueingNextStep determineNextStep(uint32_t copyNumberToUpdate, JobEvent jobEvent, log::LogContext & lc); EnqueueingNextStep determineNextStep(uint32_t copyNumberToUpdate, JobEvent jobEvent, log::LogContext & lc);
public: public:
// Repack information
struct RepackInfo {
bool isRepack = false;
uint64_t fSeq = 0;
std::string repackRequestAddress;
std::string fileBufferURL;
};
void setRepackInfo(const RepackInfo & repackInfo);
RepackInfo getRepackInfo();
EnqueueingNextStep addTransferFailure(uint32_t copyNumber, uint64_t sessionId, const std::string & failureReason, EnqueueingNextStep addTransferFailure(uint32_t copyNumber, uint64_t sessionId, const std::string & failureReason,
log::LogContext &lc); //< returns next step to take with the job log::LogContext &lc); //< returns next step to take with the job
EnqueueingNextStep addReportFailure(uint32_t copyNumber, uint64_t sessionId, const std::string & failureReason, EnqueueingNextStep addReportFailure(uint32_t copyNumber, uint64_t sessionId, const std::string & failureReason,
...@@ -113,6 +122,7 @@ public: ...@@ -113,6 +122,7 @@ public:
const std::string & getArchiveErrorReportURL(); const std::string & getArchiveErrorReportURL();
const std::string & getLastestError(); const std::string & getLastestError();
serializers::ArchiveJobStatus getJobStatus(); serializers::ArchiveJobStatus getJobStatus();
RepackInfo getRepackInfo();
// TODO: use the more general structure from utils. // TODO: use the more general structure from utils.
struct TimingsReport { struct TimingsReport {
double lockFetchTime = 0; double lockFetchTime = 0;
...@@ -128,6 +138,7 @@ public: ...@@ -128,6 +138,7 @@ public:
std::string m_archiveReportURL; std::string m_archiveReportURL;
std::string m_archiveErrorReportURL; std::string m_archiveErrorReportURL;
serializers::ArchiveJobStatus m_jobStatus; serializers::ArchiveJobStatus m_jobStatus;
RepackInfo m_repackInfo;
std::string m_latestError; std::string m_latestError;
utils::Timer m_timer; utils::Timer m_timer;
TimingsReport m_timingReport; TimingsReport m_timingReport;
...@@ -136,16 +147,6 @@ public: ...@@ -136,16 +147,6 @@ public:
// one, the request will do nothing and not fail. // one, the request will do nothing and not fail.
AsyncJobOwnerUpdater * asyncUpdateJobOwner(uint32_t copyNumber, const std::string & owner, const std::string &previousOwner, AsyncJobOwnerUpdater * asyncUpdateJobOwner(uint32_t copyNumber, const std::string & owner, const std::string &previousOwner,
const cta::optional<serializers::ArchiveJobStatus>& newStatus); const cta::optional<serializers::ArchiveJobStatus>& newStatus);
// Repack information
struct RepackInfo {
bool isRepack = false;
uint64_t fSeq = 0;
std::string repackRequestAddress;
std::string fileBufferURL;
};
void setRepackInfo(const RepackInfo & repackInfo);
RepackInfo getRepackInfo();
struct RepackInfoSerDeser: public RepackInfo { struct RepackInfoSerDeser: public RepackInfo {
operator RepackInfo() { return RepackInfo(*this); } operator RepackInfo() { return RepackInfo(*this); }
......
...@@ -3860,6 +3860,7 @@ objectstore::ArchiveRequest::RepackInfo OStoreDB::ArchiveJob::getRepackInfoAfter ...@@ -3860,6 +3860,7 @@ objectstore::ArchiveRequest::RepackInfo OStoreDB::ArchiveJob::getRepackInfoAfter
// OStoreDB::RepackArchiveSuccessesReportBatch::report() // OStoreDB::RepackArchiveSuccessesReportBatch::report()
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
void OStoreDB::RepackArchiveSuccessesReportBatch::report(log::LogContext& lc) { void OStoreDB::RepackArchiveSuccessesReportBatch::report(log::LogContext& lc) {
//TODO : Do the reporting of RepackArchiveSuccessesReportBatch
throw 1; throw 1;
} }
......
...@@ -855,7 +855,7 @@ void Scheduler::sortAndGetTapesForMountInfo(std::unique_ptr<SchedulerDatabase::T ...@@ -855,7 +855,7 @@ void Scheduler::sortAndGetTapesForMountInfo(std::unique_ptr<SchedulerDatabase::T
// list of tapes from the catalogue. // list of tapes from the catalogue.
if (std::count_if( if (std::count_if(
mountInfo->potentialMounts.cbegin(), mountInfo->potentialMounts.cend(), mountInfo->potentialMounts.cbegin(), mountInfo->potentialMounts.cend(),
[](decltype(*mountInfo->potentialMounts.cbegin())& m){ return m.type == common::dataStructures::MountType::ArchiveForUser; } )) { [](decltype(*mountInfo->potentialMounts.cbegin())& m){ return common::dataStructures::getMountBasicType(m.type) == common::dataStructures::MountType::ArchiveAllTypes; } )) {
tapeList = m_catalogue.getTapesForWriting(logicalLibraryName); tapeList = m_catalogue.getTapesForWriting(logicalLibraryName);
getTapeForWriteTime = timer.secs(utils::Timer::resetCounter); getTapeForWriteTime = timer.secs(utils::Timer::resetCounter);
} }
......
...@@ -42,6 +42,7 @@ ...@@ -42,6 +42,7 @@
#include "tapeserver/castor/tape/tapeserver/daemon/RecallReportPacker.hpp" #include "tapeserver/castor/tape/tapeserver/daemon/RecallReportPacker.hpp"
#include "objectstore/Algorithms.hpp" #include "objectstore/Algorithms.hpp"
#include "common/range.hpp" #include "common/range.hpp"
#include "tapeserver/castor/tape/tapeserver/daemon/MigrationReportPacker.hpp"
#ifdef STDOUT_LOGGING #ifdef STDOUT_LOGGING
#include "common/log/StdoutLogger.hpp" #include "common/log/StdoutLogger.hpp"
...@@ -1920,6 +1921,244 @@ TEST_P(SchedulerTest, expandRepackRequestFailedRetrieve) { ...@@ -1920,6 +1921,244 @@ TEST_P(SchedulerTest, expandRepackRequestFailedRetrieve) {
} }
} }
TEST_P(SchedulerTest, expandRepackRequestArchiveSuccess) {
using namespace cta;
using namespace cta::objectstore;
auto &catalogue = getCatalogue();
auto &scheduler = getScheduler();
auto &schedulerDB = getSchedulerDB();
cta::objectstore::Backend& backend = schedulerDB.getBackend();
setupDefaultCatalogue();
#ifdef STDOUT_LOGGING
log::StdoutLogger dl("dummy", "unitTest");
#else
log::DummyLogger dl("", "");
#endif
log::LogContext lc(dl);
//Create an agent to represent this test process
cta::objectstore::AgentReference agentReference("expandRepackRequestTest", dl);
cta::objectstore::Agent agent(agentReference.getAgentAddress(), backend);
agent.initialize();
agent.setTimeout_us(0);
agent.insertAndRegisterSelf(lc);
const uint64_t capacityInBytes = (uint64_t)10 * 1000 * 1000 * 1000 * 1000;
const bool disabledValue = false;
const bool fullValue = false;
const std::string comment = "Create tape";
cta::common::dataStructures::SecurityIdentity admin;
admin.username = "admin_user_name";
admin.host = "admin_host";
const std::string diskFileUser = "public_disk_user";
const std::string diskFileGroup = "public_disk_group";
const std::string diskFileRecoveryBlob = "opaque_disk_file_recovery_contents";
//Create a logical library in the catalogue
catalogue.createLogicalLibrary(admin, s_libraryName, "Create logical library");
std::ostringstream ossVid;
ossVid << s_vid << "_" << 1;
std::string vid = ossVid.str();
catalogue.createTape(s_adminOnAdminHost,vid, s_mediaType, s_vendor, s_libraryName, s_tapePoolName, capacityInBytes,
disabledValue, fullValue, comment);
//Create a storage class in the catalogue
common::dataStructures::StorageClass storageClass;
storageClass.diskInstance = s_diskInstance;
storageClass.name = s_storageClassName;
storageClass.nbCopies = 2;
storageClass.comment = "Create storage class";
const std::string checksumType = "checksum_type";
const std::string checksumValue = "checksum_value";
const std::string tapeDrive = "tape_drive";
const uint64_t nbArchiveFilesPerTape = 10;
const uint64_t archiveFileSize = 2 * 1000 * 1000 * 1000;
const uint64_t compressedFileSize = archiveFileSize;
//Simulate the writing of 10 files per tape in the catalogue
std::set<catalogue::TapeItemWrittenPointer> tapeFilesWrittenCopy1;
{
uint64_t archiveFileId = 1;
std::string currentVid = vid;
for(uint64_t j = 1; j <= nbArchiveFilesPerTape; ++j) {
std::ostringstream diskFileId;
diskFileId << (12345677 + archiveFileId);
std::ostringstream diskFilePath;
diskFilePath << "/public_dir/public_file_"<<1<<"_"<< j;
auto fileWrittenUP=cta::make_unique<cta::catalogue::TapeFileWritten>();
auto & fileWritten = *fileWrittenUP;
fileWritten.archiveFileId = archiveFileId++;
fileWritten.diskInstance = storageClass.diskInstance;
fileWritten.diskFileId = diskFileId.str();
fileWritten.diskFilePath = diskFilePath.str();
fileWritten.diskFileUser = diskFileUser;
fileWritten.diskFileGroup = diskFileGroup;
fileWritten.diskFileRecoveryBlob = diskFileRecoveryBlob;
fileWritten.size = archiveFileSize;
fileWritten.checksumType = checksumType;
fileWritten.checksumValue = checksumValue;
fileWritten.storageClassName = s_storageClassName;
fileWritten.vid = currentVid;
fileWritten.fSeq = j;
fileWritten.blockId = j * 100;
fileWritten.compressedSize = compressedFileSize;
fileWritten.copyNb = 1;
fileWritten.tapeDrive = tapeDrive;
tapeFilesWrittenCopy1.emplace(fileWrittenUP.release());
}
//update the DB tape
catalogue.filesWrittenToTape(tapeFilesWrittenCopy1);
tapeFilesWrittenCopy1.clear();
}
//Test the expandRepackRequest method
scheduler.waitSchedulerDbSubthreadsComplete();
{
scheduler.queueRepack(admin,vid,"root://repackData/buffer",common::dataStructures::RepackInfo::Type::MoveOnly,lc);
scheduler.waitSchedulerDbSubthreadsComplete();
//scheduler.waitSchedulerDbSubthreadsComplete();
log::TimingList tl;
utils::Timer t;
//The promoteRepackRequestsToToExpand will only promote 2 RepackRequests to ToExpand status at a time.
scheduler.promoteRepackRequestsToToExpand(lc);
scheduler.waitSchedulerDbSubthreadsComplete();
auto repackRequestToExpand = scheduler.getNextRepackRequestToExpand();
//If we have expanded 2 repack requests, the getNextRepackRequestToExpand will return null as it is not possible
//to promote more than 2 repack requests at a time. So we break here.
scheduler.expandRepackRequest(repackRequestToExpand,tl,t,lc);
scheduler.waitSchedulerDbSubthreadsComplete();
}
{
std::unique_ptr<cta::TapeMount> mount;
mount.reset(scheduler.getNextMount(s_libraryName, "drive0", lc).release());
ASSERT_NE(nullptr, mount.get());
ASSERT_EQ(cta::common::dataStructures::MountType::Retrieve, mount.get()->getMountType());
std::unique_ptr<cta::RetrieveMount> retrieveMount;
retrieveMount.reset(dynamic_cast<cta::RetrieveMount*>(mount.release()));
ASSERT_NE(nullptr, retrieveMount.get());
std::unique_ptr<cta::RetrieveJob> retrieveJob;
std::list<std::unique_ptr<cta::RetrieveJob>> executedJobs;
//For each tape we will see if the retrieve jobs are not null
for(uint64_t j = 1; j<=nbArchiveFilesPerTape; ++j)
{
auto jobBatch = retrieveMount->getNextJobBatch(1,archiveFileSize,lc);
retrieveJob.reset(jobBatch.front().release());
ASSERT_NE(nullptr, retrieveJob.get());
executedJobs.push_back(std::move(retrieveJob));
}
//Now, report the retrieve jobs to be completed
castor::tape::tapeserver::daemon::RecallReportPacker rrp(retrieveMount.get(),lc);
rrp.startThreads();
//Report all jobs as succeeded
for(auto it = executedJobs.begin(); it != executedJobs.end(); ++it)
{
rrp.reportCompletedJob(std::move(*it));
}
rrp.setDiskDone();
rrp.setTapeDone();
rrp.reportDriveStatus(cta::common::dataStructures::DriveStatus::Unmounting);
rrp.reportEndOfSession();
rrp.waitThread();
ASSERT_TRUE(rrp.allThreadsDone());
}
{
//Do the reporting of RetrieveJobs, will transform the Retrieve request in Archive requests
while (true) {
auto rep = schedulerDB.getNextRepackReportBatch(lc);
if (nullptr == rep) break;
rep->report(lc);
}
}
//All retrieve have been successfully executed, let's get all the ArchiveJobs generated from the succeeded RetrieveJobs of Repack
{
scheduler.waitSchedulerDbSubthreadsComplete();
std::unique_ptr<cta::TapeMount> mount;
mount.reset(scheduler.getNextMount(s_libraryName, "drive0", lc).release());
ASSERT_NE(nullptr, mount.get());
ASSERT_EQ(cta::common::dataStructures::MountType::ArchiveForUser, mount.get()->getMountType());
std::unique_ptr<cta::ArchiveMount> archiveMount;
archiveMount.reset(dynamic_cast<cta::ArchiveMount*>(mount.release()));
ASSERT_NE(nullptr, archiveMount.get());
std::unique_ptr<cta::ArchiveJob> archiveJob;
//Get all Archive jobs
std::list<std::unique_ptr<cta::ArchiveJob>> executedJobs;
for(uint64_t j = 1;j<=nbArchiveFilesPerTape;++j){
auto jobBatch = archiveMount->getNextJobBatch(1,archiveFileSize,lc);
archiveJob.reset(jobBatch.front().release());
//TODO : how to generate properly the blockId, checksumType, checksumValue and compressedSize ?? Should it be done im OStoreDB::ArchiveMount::getNextJobBatch() ?
archiveJob->tapeFile.blockId = j * 101;
archiveJob->tapeFile.checksumType = checksumType;
archiveJob->tapeFile.checksumValue = checksumValue;
archiveJob->tapeFile.compressedSize = compressedFileSize;
ASSERT_NE(nullptr,archiveJob.get());
executedJobs.push_back(std::move(archiveJob));
}
castor::tape::tapeserver::daemon::MigrationReportPacker mrp(archiveMount.get(),lc);
mrp.startThreads();
//Report all archive jobs as succeeded except the first one.
for(auto it = executedJobs.begin();it != executedJobs.end(); ++it){
mrp.reportCompletedJob(std::move(*it),lc);
}
castor::tape::tapeserver::drive::compressionStats compressStats;
mrp.reportFlush(compressStats,lc);
mrp.reportEndOfSession(lc);
mrp.reportTestGoingToEnd(lc);
mrp.waitThread();
//Check that the ArchiveRequests are in the ArchiveQueueToReportToRepackForSuccess
{
//Verify that the job is in the ArchiveQueueToReportToRepackForSuccess
cta::objectstore::RootEntry re(backend);
cta::objectstore::ScopedExclusiveLock sel(re);
re.fetch();
// Get the ArchiveQueueToReportToRepackForSuccess
// The queue is named after the repack request: we need to query the repack index
objectstore::RepackIndex ri(re.getRepackIndexAddress(), schedulerDB.getBackend());
ri.fetchNoLock();
std::string archiveQueueToReportToRepackForSuccessAddress = re.getArchiveQueueAddress(ri.getRepackRequestAddress(vid),cta::objectstore::JobQueueType::JobsToReportToRepackForSuccess);
cta::objectstore::ArchiveQueue aq(archiveQueueToReportToRepackForSuccessAddress,backend);
//Fetch the queue so that we can get the retrieveRequests from it
cta::objectstore::ScopedExclusiveLock aql(aq);
aq.fetch();
ASSERT_EQ(aq.dumpJobs().size(),10);
for(auto& job: aq.dumpJobs()){
ASSERT_EQ(1,job.copyNb);
ASSERT_EQ(archiveFileSize,job.size);
}
}
{
//Do the reporting of the Archive Jobs succeeded
Scheduler::RepackReportBatch reports = scheduler.getNextRepackReportBatch(lc);
//TODO : uncomment and do the reporting
//reports.report(lc);
}
}
}
#undef TEST_MOCK_DB #undef TEST_MOCK_DB
#ifdef TEST_MOCK_DB #ifdef TEST_MOCK_DB
static cta::MockSchedulerDatabaseFactory mockDbFactory; static cta::MockSchedulerDatabaseFactory mockDbFactory;
......
...@@ -24,8 +24,10 @@ ...@@ -24,8 +24,10 @@
#pragma once #pragma once
#include "common/threading/BlockingQueue.hpp" #include "common/threading/BlockingQueue.hpp"
#include "castor/tape/tapeserver/daemon/ReportPackerInterface.hpp" /*#include "castor/tape/tapeserver/daemon/ReportPackerInterface.hpp"
#include "castor/tape/tapeserver/drive/DriveInterface.hpp" #include "castor/tape/tapeserver/drive/DriveInterface.hpp"*/
#include "tapeserver/castor/tape/tapeserver/daemon/ReportPackerInterface.hpp"
#include "tapeserver/castor/tape/tapeserver/drive/DriveInterface.hpp"
#include "scheduler/ArchiveMount.hpp" #include "scheduler/ArchiveMount.hpp"
#include "scheduler/ArchiveJob.hpp" #include "scheduler/ArchiveJob.hpp"
#include <list> #include <list>
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment