/* * @project The CERN Tape Archive (CTA) * @copyright Copyright(C) 2003-2021 CERN * @license This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ #pragma once #include "castor/tape/tapeserver/daemon/MigrationMemoryManager.hpp" #include "castor/tape/tapeserver/daemon/TapeWriteSingleThread.hpp" #include "castor/tape/tapeserver/daemon/TapeWriteTask.hpp" #include "castor/tape/tapeserver/daemon/DiskReadThreadPool.hpp" #include "castor/tape/tapeserver/daemon/DiskReadTask.hpp" #include "common/log/LogContext.hpp" #include "common/threading/AtomicCounter.hpp" #include "scheduler/ArchiveMount.hpp" namespace castor{ namespace tape{ namespace tapeserver{ namespace daemon { /** * This class is responsible for creating the tasks in case of a recall job */ class MigrationTaskInjector { public: /** * Constructor * @param mm The memory manager for accessing memory blocks. * The Newly created tapeWriter Tasks will register themselves * as a client to it. * @param diskReader the one object that will hold all the threads which will be executing * disk-reading tasks * @param tapeWriter the one object that will hold the thread which will be executing * tape-writing tasks * @param client The one that will give us files to migrate * @param maxFiles maximal number of files we may request to the client at once * @param byteSizeThreshold maximal number of cumulated byte * we may request to the client. at once * @param lc log context, copied because of the threading mechanism */ MigrationTaskInjector(MigrationMemoryManager & mm, DiskReadThreadPool & diskReader, TapeSingleThreadInterface & tapeWriter,cta::ArchiveMount &archiveMount, uint64_t maxFiles, uint64_t byteSizeThreshold,cta::log::LogContext lc); /** * Wait for the inner thread to finish */ void waitThreads(); /** * Start the inner thread */ void startThreads(); /** * Function for a feed-back loop purpose between MigrationTaskInjector and * DiskReadThreadPool. When DiskReadThreadPool::popAndRequestMoreJobs detects * it has not enough jobs to do to, it is class to push a request * in order to (try) fill up the queue. * @param lastCall true if we want the new request to be a last call. * See Request::lastCall */ void requestInjection(bool lastCall); /** * Contact the client to make sure there are really something to do * Something = migration at most maxFiles or at least maxBytes * * @param noFilesToMigrate[out] will be true if it triggered an empty mount because of no files to migrate * @return true if there are jobs to be done, false otherwise */ bool synchronousInjection(bool & noFilesToMigrate); /** * Send an end token in the request queue. There should be no subsequent * calls to requestInjection. */ void finish(); /** * Return the first file to be written's fseq * @return */ uint64_t firstFseqToWrite() const; /** * Public interface allowing to set the error flag. This is used * by the tasks (disk and tape) and the tape thread to indicate * that the session cannot continue. */ void setErrorFlag() { m_errorFlag.set(); } /** * Public interface to the error flag, allowing the disk and tape tasks * to decide whether they should carry on or just free memory. * @return value of the error flag */ bool hasErrorFlag() { return m_errorFlag; } private: /** * Create all the tape-read and write-disk tasks for set of files to retrieve * @param jobs the list of FileToMigrateStructs we have to transform in a pair of task */ void injectBulkMigrations(std::list>& jobs); /*Compute how many blocks are needed for a file of fileSize bytes*/ size_t howManyBlocksNeeded(size_t fileSize,size_t blockCapacity){ return fileSize/blockCapacity + ((fileSize%blockCapacity==0) ? 0 : 1); } /** * It will signal to the disk read thread pool, tape write single thread * and to the mem manager they have to stop their threads(s) */ void signalEndDataMovement(); /** * A request of files to migrate. We request EITHER * - a maximum of nbMaxFiles files * - OR at least byteSizeThreshold bytes. * That means we stop as soon as we have nbMaxFiles files or the cumulated size * is equal or above byteSizeThreshold. */ class Request { public: Request(uint64_t mf, uint64_t mb, bool lc): filesRequested(mf), bytesRequested(mb), lastCall(lc), end(false) {} Request(): filesRequested(-1), bytesRequested(-1), lastCall(true),end(true) {} const uint64_t filesRequested; const uint64_t bytesRequested; /** * True if it is the last call for the set of requests :it means * we don't need to try to get more files to recall * and can send into all the different threads a signal . */ const bool lastCall; /** * True indicates the task injector will not receive any more request. */ const bool end; }; class WorkerThread: public cta::threading::Thread { public: WorkerThread(MigrationTaskInjector & rji): m_parent(rji) {} virtual void run(); private: MigrationTaskInjector & m_parent; } m_thread; ///The memory manager for accessing memory blocks. MigrationMemoryManager & m_memManager; ///the one object that will hold the thread which will be executing ///tape-writing tasks TapeSingleThreadInterface& m_tapeWriter; ///the one object that will hold all the threads which will be executing ///disk-reading tasks DiskReadThreadPool & m_diskReader; /// the client who is sending us jobs cta::ArchiveMount &m_archiveMount; /** * utility member to log some pieces of information */ cta::log::LogContext m_lc; cta::threading::Mutex m_producerProtection; ///all the requests for work we will forward to the client. cta::threading::BlockingQueue m_queue; /** a shared flag among the all tasks related to migration, set as true * as soon a single task encounters a failure. That way we go into a degraded mode * where we only circulate memory without writing anything on tape */ cta::threading::AtomicFlag m_errorFlag; /// The maximum number of files we ask per request. const uint64_t m_maxFiles; /// Same as m_maxFilesReq for size per request. (in bytes)) const uint64_t m_maxBytes; /**The last fseq used on the tape. We should not see this but * IT is computed by subtracting 1 to fSeg of the first file to migrate we * receive. That part is done by the * MigrationTaskInjector.::synchronousInjection. Thus, we compute it into * that function and retrieve/set it within DataTransferSession executeWrite * after we make sure synchronousInjection returned true. To do so, we * need to store it */ uint64_t m_firstFseqToWrite; }; } //end namespace daemon } //end namespace tapeserver } //end namespace tape } //end namespace castor