/*
* @project The CERN Tape Archive (CTA)
* @copyright Copyright(C) 2003-2021 CERN
* @license This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see .
*/
#include "castor/messages/TapeserverProxyDummy.hpp"
#include "castor/tape/tapeserver/daemon/DiskWriteThreadPool.hpp"
#include "castor/tape/tapeserver/daemon/RecallTaskInjector.hpp"
#include "castor/tape/tapeserver/daemon/TapeServerReporter.hpp"
#include "castor/tape/tapeserver/daemon/TapeReadSingleThread.hpp"
#include "castor/tape/tapeserver/daemon/TaskWatchDog.hpp"
#include "castor/tape/tapeserver/drive/FakeDrive.hpp"
#include "common/log/DummyLogger.hpp"
#include "common/log/StringLogger.hpp"
#include "common/processCap/ProcessCapDummy.hpp"
#include "mediachanger/MediaChangerFacade.hpp"
#include "scheduler/SchedulerDatabase.hpp"
#include "scheduler/testingMocks/MockRetrieveMount.hpp"
#include "scheduler/TapeMountDummy.hpp"
#include
using namespace castor::tape::tapeserver::daemon;
using namespace castor::tape;
namespace unitTests
{
const int blockSize=4096;
class castor_tape_tapeserver_daemonTest: public ::testing::Test {
protected:
void SetUp() {
}
void TearDown() {
}
}; // class castor_tape_tapeserver_daemonTest
struct MockRecallReportPacker : public RecallReportPacker {
void reportCompletedJob(std::unique_ptr successfulRetrieveJob) override {
cta::threading::MutexLocker ml(m_mutex);
completeJobs++;
}
void reportFailedJob(std::unique_ptr failedRetrieveJob, const cta::exception::Exception & ex) override {
cta::threading::MutexLocker ml(m_mutex);
failedJobs++;
}
void disableBulk() override {}
void reportEndOfSession() override {
cta::threading::MutexLocker ml(m_mutex);
endSessions++;
}
void reportEndOfSessionWithErrors(const std::string msg, int error_code) override {
cta::threading::MutexLocker ml(m_mutex);
endSessionsWithError++;
}
MockRecallReportPacker(cta::RetrieveMount *rm,cta::log::LogContext lc):
RecallReportPacker(rm,lc), completeJobs(0), failedJobs(0),
endSessions(0), endSessionsWithError(0) {}
cta::threading::Mutex m_mutex;
int completeJobs;
int failedJobs;
int endSessions;
int endSessionsWithError;
};
class FakeDiskWriteThreadPool: public DiskWriteThreadPool
{
public:
using DiskWriteThreadPool::m_tasks;
FakeDiskWriteThreadPool(RecallReportPacker &rrp, RecallWatchDog &rwd,
cta::log::LogContext & lc):
DiskWriteThreadPool(1,rrp,
rwd,lc,"/dev/null", 0){}
virtual ~FakeDiskWriteThreadPool() {};
};
class FakeSingleTapeReadThread : public TapeSingleThreadInterface
{
public:
using TapeSingleThreadInterface::m_tasks;
FakeSingleTapeReadThread(tapeserver::drive::DriveInterface& drive,
cta::mediachanger::MediaChangerFacade & mc,
tapeserver::daemon::TapeServerReporter & tsr,
const tapeserver::daemon::VolumeInfo& volInfo,
cta::server::ProcessCap& cap,
const uint32_t tapeLoadTimeout,
cta::log::LogContext & lc):
TapeSingleThreadInterface(drive, mc, tsr, volInfo,cap, lc, "", tapeLoadTimeout){}
~FakeSingleTapeReadThread(){
const unsigned int size= m_tasks.size();
for(unsigned int i=0;i > getNextJobBatch(uint64_t filesRequested, uint64_t bytesRequested,
cta::disk::DiskSystemFreeSpaceList & diskSystemFreeSpace, cta::log::LogContext& logContext) override { throw std::runtime_error("Not implemented");}
void complete(time_t completionTime) override { throw std::runtime_error("Not implemented"); }
void setDriveStatus(cta::common::dataStructures::DriveStatus status, time_t completionTime,const cta::optional & reason) override { throw std::runtime_error("Not implemented"); }
void setTapeSessionStats(const castor::tape::tapeserver::daemon::TapeSessionStats &stats) override { throw std::runtime_error("Not implemented"); }
void flushAsyncSuccessReports(std::list& jobsBatch, cta::log::LogContext& lc) override { throw std::runtime_error("Not implemented"); }
};
TEST_F(castor_tape_tapeserver_daemonTest, RecallTaskInjectorNominal) {
const int nbJobs=15;
const int maxNbJobsInjectedAtOnce = 6;
cta::log::StringLogger log("dummy","castor_tape_tapeserver_daemon_RecallTaskInjectorTest",cta::log::DEBUG);
cta::log::LogContext lc(log);
RecallMemoryManager mm(50U, 50U, lc);
castor::tape::tapeserver::drive::FakeDrive drive;
auto catalogue = cta::catalogue::DummyCatalogue();
cta::MockRetrieveMount trm(catalogue);
trm.createRetrieveJobs(nbJobs);
//EXPECT_CALL(trm, internalGetNextJob()).Times(nbJobs+1);
castor::messages::TapeserverProxyDummy tspd;
cta::TapeMountDummy tmd;
RecallWatchDog rwd(1,1,tspd,tmd,"",lc);
std::unique_ptr dbrm(new TestingDatabaseRetrieveMount());
MockRecallReportPacker mrrp(&trm,lc);
FakeDiskWriteThreadPool diskWrite(mrrp,rwd,lc);
cta::log::DummyLogger dummyLog("dummy","dummy");
cta::mediachanger::MediaChangerFacade mc(dummyLog);
castor::messages::TapeserverProxyDummy initialProcess;
castor::tape::tapeserver::daemon::VolumeInfo volume;
volume.vid="V12345";
volume.mountType=cta::common::dataStructures::MountType::Retrieve;
castor::tape::tapeserver::daemon::TapeServerReporter gsr(initialProcess, cta::tape::daemon::TpconfigLine(), "0.0.0.0", volume, lc);
cta::server::ProcessCapDummy cap;
FakeSingleTapeReadThread tapeRead(drive, mc, gsr, volume, cap, 60, lc);
tapeserver::daemon::RecallTaskInjector rti(mm, tapeRead, diskWrite, trm, maxNbJobsInjectedAtOnce, blockSize, lc);
bool noFilesToRecall;
ASSERT_EQ(true, rti.synchronousFetch(noFilesToRecall));
ASSERT_FALSE(noFilesToRecall);
ASSERT_EQ(maxNbJobsInjectedAtOnce, diskWrite.m_tasks.size());
ASSERT_EQ(maxNbJobsInjectedAtOnce, tapeRead.m_tasks.size());
rti.startThreads();
rti.requestInjection(false);
rti.requestInjection(true);
rti.finish();
ASSERT_NO_THROW(rti.waitThreads());
ASSERT_EQ(nbJobs+1, trm.getJobs);
//pushed nbFile*2 files + 1 end of work
ASSERT_EQ(nbJobs+1, diskWrite.m_tasks.size());
ASSERT_EQ(nbJobs+1, tapeRead.m_tasks.size());
for(int i=0; i(NULL), diskWriteTask);
ASSERT_EQ(static_cast(NULL), tapeReadTask);
delete diskWriteTask;
delete tapeReadTask;
}
}
TEST_F(castor_tape_tapeserver_daemonTest, RecallTaskInjectorNoFiles) {
cta::log::StringLogger log("dummy","castor_tape_tapeserver_daemon_RecallTaskInjectorTest",cta::log::DEBUG);
cta::log::LogContext lc(log);
RecallMemoryManager mm(50U, 50U, lc);
castor::tape::tapeserver::drive::FakeDrive drive;
auto catalogue = cta::catalogue::DummyCatalogue();
cta::MockRetrieveMount trm(catalogue);
trm.createRetrieveJobs(0);
//EXPECT_CALL(trm, internalGetNextJob()).Times(1); //no work: single call to getnextjob
castor::messages::TapeserverProxyDummy tspd;
cta::TapeMountDummy tmd;
RecallWatchDog rwd(1,1,tspd,tmd,"",lc);
std::unique_ptr dbrm(new TestingDatabaseRetrieveMount());
MockRecallReportPacker mrrp(&trm,lc);
FakeDiskWriteThreadPool diskWrite(mrrp,rwd,lc);
cta::log::DummyLogger dummyLog("dummy","dummy");
cta::mediachanger::MediaChangerFacade mc(dummyLog);
castor::messages::TapeserverProxyDummy initialProcess;
castor::tape::tapeserver::daemon::VolumeInfo volume;
volume.vid="V12345";
volume.mountType=cta::common::dataStructures::MountType::Retrieve;
cta::server::ProcessCapDummy cap;
castor::tape::tapeserver::daemon::TapeServerReporter tsr(initialProcess, cta::tape::daemon::TpconfigLine(), "0.0.0.0", volume, lc);
FakeSingleTapeReadThread tapeRead(drive, mc, tsr, volume, cap, 60, lc);
tapeserver::daemon::RecallTaskInjector rti(mm, tapeRead, diskWrite, trm, 6, blockSize, lc);
bool noFilesToRecall;
ASSERT_FALSE(rti.synchronousFetch(noFilesToRecall));
ASSERT_EQ(0U, diskWrite.m_tasks.size());
ASSERT_EQ(0U, tapeRead.m_tasks.size());
ASSERT_EQ(1, trm.getJobs);
ASSERT_TRUE(noFilesToRecall);
}
}