/* * @project The CERN Tape Archive (CTA) * @copyright Copyright(C) 2003-2021 CERN * @license This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ #include "castor/messages/TapeserverProxyDummy.hpp" #include "castor/tape/tapeserver/daemon/DiskWriteThreadPool.hpp" #include "castor/tape/tapeserver/daemon/RecallTaskInjector.hpp" #include "castor/tape/tapeserver/daemon/TapeServerReporter.hpp" #include "castor/tape/tapeserver/daemon/TapeReadSingleThread.hpp" #include "castor/tape/tapeserver/daemon/TaskWatchDog.hpp" #include "castor/tape/tapeserver/drive/FakeDrive.hpp" #include "common/log/DummyLogger.hpp" #include "common/log/StringLogger.hpp" #include "common/processCap/ProcessCapDummy.hpp" #include "mediachanger/MediaChangerFacade.hpp" #include "scheduler/SchedulerDatabase.hpp" #include "scheduler/testingMocks/MockRetrieveMount.hpp" #include "scheduler/TapeMountDummy.hpp" #include using namespace castor::tape::tapeserver::daemon; using namespace castor::tape; namespace unitTests { const int blockSize=4096; class castor_tape_tapeserver_daemonTest: public ::testing::Test { protected: void SetUp() { } void TearDown() { } }; // class castor_tape_tapeserver_daemonTest struct MockRecallReportPacker : public RecallReportPacker { void reportCompletedJob(std::unique_ptr successfulRetrieveJob) override { cta::threading::MutexLocker ml(m_mutex); completeJobs++; } void reportFailedJob(std::unique_ptr failedRetrieveJob, const cta::exception::Exception & ex) override { cta::threading::MutexLocker ml(m_mutex); failedJobs++; } void disableBulk() override {} void reportEndOfSession() override { cta::threading::MutexLocker ml(m_mutex); endSessions++; } void reportEndOfSessionWithErrors(const std::string msg, int error_code) override { cta::threading::MutexLocker ml(m_mutex); endSessionsWithError++; } MockRecallReportPacker(cta::RetrieveMount *rm,cta::log::LogContext lc): RecallReportPacker(rm,lc), completeJobs(0), failedJobs(0), endSessions(0), endSessionsWithError(0) {} cta::threading::Mutex m_mutex; int completeJobs; int failedJobs; int endSessions; int endSessionsWithError; }; class FakeDiskWriteThreadPool: public DiskWriteThreadPool { public: using DiskWriteThreadPool::m_tasks; FakeDiskWriteThreadPool(RecallReportPacker &rrp, RecallWatchDog &rwd, cta::log::LogContext & lc): DiskWriteThreadPool(1,rrp, rwd,lc,"/dev/null", 0){} virtual ~FakeDiskWriteThreadPool() {}; }; class FakeSingleTapeReadThread : public TapeSingleThreadInterface { public: using TapeSingleThreadInterface::m_tasks; FakeSingleTapeReadThread(tapeserver::drive::DriveInterface& drive, cta::mediachanger::MediaChangerFacade & mc, tapeserver::daemon::TapeServerReporter & tsr, const tapeserver::daemon::VolumeInfo& volInfo, cta::server::ProcessCap& cap, const uint32_t tapeLoadTimeout, cta::log::LogContext & lc): TapeSingleThreadInterface(drive, mc, tsr, volInfo,cap, lc, "", tapeLoadTimeout){} ~FakeSingleTapeReadThread(){ const unsigned int size= m_tasks.size(); for(unsigned int i=0;i > getNextJobBatch(uint64_t filesRequested, uint64_t bytesRequested, cta::disk::DiskSystemFreeSpaceList & diskSystemFreeSpace, cta::log::LogContext& logContext) override { throw std::runtime_error("Not implemented");} void complete(time_t completionTime) override { throw std::runtime_error("Not implemented"); } void setDriveStatus(cta::common::dataStructures::DriveStatus status, time_t completionTime,const cta::optional & reason) override { throw std::runtime_error("Not implemented"); } void setTapeSessionStats(const castor::tape::tapeserver::daemon::TapeSessionStats &stats) override { throw std::runtime_error("Not implemented"); } void flushAsyncSuccessReports(std::list& jobsBatch, cta::log::LogContext& lc) override { throw std::runtime_error("Not implemented"); } }; TEST_F(castor_tape_tapeserver_daemonTest, RecallTaskInjectorNominal) { const int nbJobs=15; const int maxNbJobsInjectedAtOnce = 6; cta::log::StringLogger log("dummy","castor_tape_tapeserver_daemon_RecallTaskInjectorTest",cta::log::DEBUG); cta::log::LogContext lc(log); RecallMemoryManager mm(50U, 50U, lc); castor::tape::tapeserver::drive::FakeDrive drive; auto catalogue = cta::catalogue::DummyCatalogue(); cta::MockRetrieveMount trm(catalogue); trm.createRetrieveJobs(nbJobs); //EXPECT_CALL(trm, internalGetNextJob()).Times(nbJobs+1); castor::messages::TapeserverProxyDummy tspd; cta::TapeMountDummy tmd; RecallWatchDog rwd(1,1,tspd,tmd,"",lc); std::unique_ptr dbrm(new TestingDatabaseRetrieveMount()); MockRecallReportPacker mrrp(&trm,lc); FakeDiskWriteThreadPool diskWrite(mrrp,rwd,lc); cta::log::DummyLogger dummyLog("dummy","dummy"); cta::mediachanger::MediaChangerFacade mc(dummyLog); castor::messages::TapeserverProxyDummy initialProcess; castor::tape::tapeserver::daemon::VolumeInfo volume; volume.vid="V12345"; volume.mountType=cta::common::dataStructures::MountType::Retrieve; castor::tape::tapeserver::daemon::TapeServerReporter gsr(initialProcess, cta::tape::daemon::TpconfigLine(), "0.0.0.0", volume, lc); cta::server::ProcessCapDummy cap; FakeSingleTapeReadThread tapeRead(drive, mc, gsr, volume, cap, 60, lc); tapeserver::daemon::RecallTaskInjector rti(mm, tapeRead, diskWrite, trm, maxNbJobsInjectedAtOnce, blockSize, lc); bool noFilesToRecall; ASSERT_EQ(true, rti.synchronousFetch(noFilesToRecall)); ASSERT_FALSE(noFilesToRecall); ASSERT_EQ(maxNbJobsInjectedAtOnce, diskWrite.m_tasks.size()); ASSERT_EQ(maxNbJobsInjectedAtOnce, tapeRead.m_tasks.size()); rti.startThreads(); rti.requestInjection(false); rti.requestInjection(true); rti.finish(); ASSERT_NO_THROW(rti.waitThreads()); ASSERT_EQ(nbJobs+1, trm.getJobs); //pushed nbFile*2 files + 1 end of work ASSERT_EQ(nbJobs+1, diskWrite.m_tasks.size()); ASSERT_EQ(nbJobs+1, tapeRead.m_tasks.size()); for(int i=0; i(NULL), diskWriteTask); ASSERT_EQ(static_cast(NULL), tapeReadTask); delete diskWriteTask; delete tapeReadTask; } } TEST_F(castor_tape_tapeserver_daemonTest, RecallTaskInjectorNoFiles) { cta::log::StringLogger log("dummy","castor_tape_tapeserver_daemon_RecallTaskInjectorTest",cta::log::DEBUG); cta::log::LogContext lc(log); RecallMemoryManager mm(50U, 50U, lc); castor::tape::tapeserver::drive::FakeDrive drive; auto catalogue = cta::catalogue::DummyCatalogue(); cta::MockRetrieveMount trm(catalogue); trm.createRetrieveJobs(0); //EXPECT_CALL(trm, internalGetNextJob()).Times(1); //no work: single call to getnextjob castor::messages::TapeserverProxyDummy tspd; cta::TapeMountDummy tmd; RecallWatchDog rwd(1,1,tspd,tmd,"",lc); std::unique_ptr dbrm(new TestingDatabaseRetrieveMount()); MockRecallReportPacker mrrp(&trm,lc); FakeDiskWriteThreadPool diskWrite(mrrp,rwd,lc); cta::log::DummyLogger dummyLog("dummy","dummy"); cta::mediachanger::MediaChangerFacade mc(dummyLog); castor::messages::TapeserverProxyDummy initialProcess; castor::tape::tapeserver::daemon::VolumeInfo volume; volume.vid="V12345"; volume.mountType=cta::common::dataStructures::MountType::Retrieve; cta::server::ProcessCapDummy cap; castor::tape::tapeserver::daemon::TapeServerReporter tsr(initialProcess, cta::tape::daemon::TpconfigLine(), "0.0.0.0", volume, lc); FakeSingleTapeReadThread tapeRead(drive, mc, tsr, volume, cap, 60, lc); tapeserver::daemon::RecallTaskInjector rti(mm, tapeRead, diskWrite, trm, 6, blockSize, lc); bool noFilesToRecall; ASSERT_FALSE(rti.synchronousFetch(noFilesToRecall)); ASSERT_EQ(0U, diskWrite.m_tasks.size()); ASSERT_EQ(0U, tapeRead.m_tasks.size()); ASSERT_EQ(1, trm.getJobs); ASSERT_TRUE(noFilesToRecall); } }