Commit 34da1530 authored by Cedric CAFFY's avatar Cedric CAFFY
Browse files

Status of repack retrieve request changed to RJS_Succeeded, started to insert...

Status of repack retrieve request changed to RJS_Succeeded, started to insert the repack retrieve request into RetrieveQueueToReportToRepackForSuccess
parent 20dd06da
......@@ -208,7 +208,6 @@ trimContainerIfNeeded(Container& cont, ScopedExclusiveLock & contLock,
try {
// The queue should be removed as it is empty.
ContainerTraits<ArchiveQueue,C>::QueueType queueType;
//TODO : Voir si le queue type correspondra au bon type
RootEntry re(cont.m_objectStore);
ScopedExclusiveLock rexl(re);
re.fetch();
......
......@@ -2352,13 +2352,16 @@ std::set<cta::SchedulerDatabase::RetrieveJob *> OStoreDB::RetrieveMount::batchSu
std::list<cta::SchedulerDatabase::RetrieveJob *> & jobsBatch, cta::log::LogContext & lc)
{
std::set<cta::SchedulerDatabase::RetrieveJob *> ret;
//typedef objectstore::ContainerAlgorithms<RetrieveQueue,RetrieveQueueToReportToRepackForSuccess> RQTRTRAlgos;
typedef objectstore::ContainerAlgorithms<objectstore::RetrieveQueue,objectstore::RetrieveQueueToReportToRepackForSuccess> AqtrtrfsCa;
AqtrtrfsCa aqtrtrfsCa(m_oStoreDB.m_objectStore, *m_oStoreDB.m_agentReference);
std::map<std::string, AqtrtrfsCa::InsertedElement::list> insertedElementsLists;
for(auto & retrieveJob : jobsBatch){
auto osdbJob = castFromSchedDBJob(retrieveJob);
osdbJob->asyncSucceedForRepack();
ret.insert(retrieveJob);
//osdbJob->asyncSucceedForRepack();
auto update_callback = [this,&osdbJob](const std::string &in)->std::string{
// We have a locked and fetched object, so we just need to work on its representation.
// We have a locked and fetched object, so we just need to work on its representation.
cta::objectstore::serializers::ObjectHeader oh;
if (!oh.ParseFromString(in)) {
// Use a the tolerant parser to assess the situation.
......@@ -2384,6 +2387,7 @@ std::set<cta::SchedulerDatabase::RetrieveJob *> OStoreDB::RetrieveMount::batchSu
for(auto &job : *retrieveJobs){
if(job.copynb() == osdbJob->selectedCopyNb)
{
//Change the status to RJS_Succeed
job.set_status(serializers::RetrieveJobStatus::RJS_Succeeded);
oh.set_payload(payload.SerializePartialAsString());
return oh.SerializeAsString();
......@@ -2393,9 +2397,14 @@ std::set<cta::SchedulerDatabase::RetrieveJob *> OStoreDB::RetrieveMount::batchSu
};
std::function <std::string(const std::string &)> update = update_callback;
cta::objectstore::Backend::AsyncUpdater * updater = this->m_oStoreDB.m_objectStore.asyncUpdate(osdbJob->m_retrieveRequest.getAddressIfSet(), update);
//cta::objectstore::Backend::AsyncUpdater * updater = osdbJob->m_retrieveMount->m_oStoreDB.m_objectStore.asyncUpdate(osdbJob->m_retrieveRequest.getAddressIfSet(), update);
updater->wait();//osdbJob->m_retrieveRequest.getAddressIfSet())<<std::endl;
//TODO : Should the wait be removed ?
updater->wait();
auto & tapeFile = osdbJob->archiveFile.tapeFiles[osdbJob->selectedCopyNb];
std::string vid = osdbJob->m_retrieveMount->mountInfo.vid;
insertedElementsLists[vid].emplace_back(AqtrtrfsCa::InsertedElement{&osdbJob->m_retrieveRequest, (uint16_t)osdbJob->selectedCopyNb, tapeFile.fSeq,osdbJob->archiveFile.fileSize,cta::common::dataStructures::MountPolicy::s_defaultMountPolicyForRepack,serializers::RetrieveJobStatus::RJS_Succeeded});
//TODO : Insert the retrieve request into the RetrieveQueueToReporttoRepackForSuccess
}
return ret;
}
//------------------------------------------------------------------------------
......@@ -3163,10 +3172,6 @@ void OStoreDB::RetrieveJob::asyncSucceed() {
m_jobDelete.reset(m_retrieveRequest.asyncDeleteJob());
}
void OStoreDB::RetrieveJob::asyncSucceedForRepack(){
//TODO : put the code to async change retrieve request status as RJS_Success
}
//------------------------------------------------------------------------------
// OStoreDB::RetrieveJob::checkSucceed()
//------------------------------------------------------------------------------
......
......@@ -231,7 +231,6 @@ public:
CTA_GENERATE_EXCEPTION_CLASS(NoSuchJob);
virtual void asyncSucceed() override;
virtual void checkSucceed() override;
virtual void asyncSucceedForRepack() override;
void failTransfer(const std::string& failureReason, log::LogContext& lc) override;
void failReport(const std::string& failureReason, log::LogContext& lc) override;
virtual ~RetrieveJob() override;
......
......@@ -188,6 +188,7 @@ void cta::RetrieveMount::waitAndFinishSettingJobsBatchRetrieved(std::queue<std::
.add("files", files)
.add("bytes", bytes);
tl.addToLog(params);
//TODO : if repack, add log to say that the jobs were marked as RJS_Succeeded
logContext.log(cta::log::DEBUG,"In RetrieveMout::waitAndFinishSettingJobsBatchRetrieved(): deleted complete retrieve jobs.");
}
} catch(const cta::exception::Exception& e){
......@@ -201,6 +202,7 @@ void cta::RetrieveMount::waitAndFinishSettingJobsBatchRetrieved(std::queue<std::
}
const std::string msg_error="In RetrieveMount::waitAndFinishSettingJobsBatchRetrieved(): got an exception";
logContext.log(cta::log::ERR, msg_error);
logContext.logBacktrace(cta::log::ERR, e.backtrace());
// Failing here does not really affect the session so we can carry on. Reported jobs are reported, non-reported ones
// will be retried.
} catch(const std::exception& e){
......
......@@ -360,7 +360,6 @@ public:
uint64_t selectedCopyNb;
virtual void asyncSucceed() = 0;
virtual void checkSucceed() = 0;
virtual void asyncSucceedForRepack() = 0;
virtual void failTransfer(const std::string &failureReason, log::LogContext &lc) = 0;
virtual void failReport(const std::string &failureReason, log::LogContext &lc) = 0;
virtual ~RetrieveJob() {}
......
......@@ -19,6 +19,7 @@
#include "catalogue/InMemoryCatalogue.hpp"
#include "catalogue/SchemaCreatingSqliteCatalogue.hpp"
#include "common/log/DummyLogger.hpp"
#include "common/log/StdoutLogger.hpp"
#include "common/make_unique.hpp"
#include "scheduler/ArchiveMount.hpp"
#include "scheduler/LogicalLibrary.hpp"
......@@ -33,6 +34,8 @@
#include "common/log/DummyLogger.hpp"
#include "objectstore/GarbageCollector.hpp"
#include "objectstore/BackendRadosTestSwitch.hpp"
#include "objectstore/RootEntry.hpp"
#include "objectstore/JobQueueType.hpp"
#include "tests/TestsCompileTimeSwitches.hpp"
#include "common/Timer.hpp"
#include "tapeserver/castor/tape/tapeserver/daemon/RecallReportPacker.hpp"
......@@ -1085,10 +1088,11 @@ TEST_P(SchedulerTest, expandRepackRequest) {
auto &catalogue = getCatalogue();
auto &scheduler = getScheduler();
//auto &schedulerDB = getSchedulerDB();
setupDefaultCatalogue();
cta::log::DummyLogger dummyLogger("dummy","dummy");
cta::log::StdoutLogger dummyLogger("dummy","dummy");
log::LogContext lc(dummyLogger);
const uint64_t capacityInBytes = (uint64_t)10 * 1000 * 1000 * 1000 * 1000;
......@@ -1117,7 +1121,7 @@ TEST_P(SchedulerTest, expandRepackRequest) {
const std::string checksumType = "checksum_type";
const std::string checksumValue = "checksum_value";
const std::string tapeDrive = "tape_drive";
const uint64_t nbArchiveFiles = 10;
const uint64_t nbArchiveFiles = 1;
const uint64_t archiveFileSize = 2 * 1000 * 1000 * 1000;
const uint64_t compressedFileSize = archiveFileSize;
......@@ -1166,9 +1170,9 @@ TEST_P(SchedulerTest, expandRepackRequest) {
scheduler.waitSchedulerDbSubthreadsComplete();
}
{
//The expandRepackRequest method should have queued 10 retrieve request corresponding to the 10 previous file inserted in the catalogue
//The expandRepackRequest method should have queued nbArchiveFiles retrieve request corresponding to the nbArchiveFiles previous file inserted in the catalogue
std::list<common::dataStructures::RetrieveJob> retrieveJobs = scheduler.getPendingRetrieveJobs(s_vid,lc);
ASSERT_EQ(retrieveJobs.size(),10);
ASSERT_EQ(retrieveJobs.size(),nbArchiveFiles);
int i = 1;
for(auto retrieveJob : retrieveJobs){
//Test that the informations are correct for each file
......@@ -1202,18 +1206,15 @@ TEST_P(SchedulerTest, expandRepackRequest) {
std::list<std::unique_ptr<cta::RetrieveJob>> executedJobs;
for(int i = 1;i<=10;++i)
for(uint64_t i = 1;i<=nbArchiveFiles;++i)
{
auto jobBatch = retrieveMount->getNextJobBatch(1,archiveFileSize,lc);
retrieveJob.reset(jobBatch.front().release());
ASSERT_NE(nullptr, retrieveJob.get());
ASSERT_EQ(retrieveJob->archiveFile.archiveFileID,i);
//Set the retrieve job to succeed
retrieveJob->asyncComplete();
retrieveJob->checkComplete();
executedJobs.push_back(std::move(retrieveJob));
}
//Now, report the retrieve jobs to be completed
castor::tape::tapeserver::daemon::RecallReportPacker rrp(retrieveMount.get(),lc);
......@@ -1229,7 +1230,14 @@ TEST_P(SchedulerTest, expandRepackRequest) {
rrp.reportEndOfSession();
rrp.waitThread();
ASSERT_EQ(rrp.allThreadsDone(),true);
/*cta::objectstore::RootEntry re(schedulerDB.getBackend());
cta::objectstore::ScopedExclusiveLock sel(re);
re.fetchNoLock();
std::string test = re.getRetrieveQueueAddress(s_vid,cta::objectstore::JobQueueType::JobsToTransfer);*/
}
}
......
......@@ -115,7 +115,9 @@ void RecallReportPacker::reportTestGoingToEnd(){
//ReportSuccessful::execute
//------------------------------------------------------------------------------
void RecallReportPacker::ReportSuccessful::execute(RecallReportPacker& parent){
m_successfulRetrieveJob->asyncComplete();
if(!m_successfulRetrieveJob->retrieveRequest.isRepack){
m_successfulRetrieveJob->asyncComplete();
}
parent.m_successfulRetrieveJobs.push(std::move(m_successfulRetrieveJob));
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment