/*
* @project The CERN Tape Archive (CTA)
* @copyright Copyright(C) 2003-2021 CERN
* @license This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see .
*/
#pragma once
#include
#include "common/log/LogContext.hpp"
#include "common/threading/BlockingQueue.hpp"
#include "common/threading/Thread.hpp"
#include "scheduler/RetrieveJob.hpp"
#include "scheduler/RetrieveMount.hpp"
#include "castor/tape/tapeserver/drive/DriveInterface.hpp"
#include "castor/tape/tapeserver/RAO/RAOParams.hpp"
#include "castor/tape/tapeserver/RAO/RAOManager.hpp"
#include
namespace castor{
namespace tape{
//forward declarations
namespace tapegateway {
class FileToRecallStruct;
}
namespace tapeserver{
namespace client {
class ClientInterface;
}
namespace daemon {
//forward declaration
class RecallMemoryManager;
class DiskWriteThreadPool;
class TapeReadTask;
//forward declaration of template class
template class TapeSingleThreadInterface;
/**
* This classis responsible for creating the tasks in case of a recall job
*/
class RecallTaskInjector {
public:
/**
* Constructor
* @param mm the memory manager from whom the TRT will be pulling blocks
* @param tapeReader the one object that will hold the thread which will be executing
* tape-reading tasks
* @param diskWriter the one object that will hold all the threads which will be executing
* disk-writing tasks
* @param client The one that will give us files to recall
* @param filesPerRequest number of files we request from the client at once
* @param bytesPerRequest number of bytes we request from the client at once
* we may request to the client. at once
* @param lc copied because of the threading mechanism
*/
RecallTaskInjector(RecallMemoryManager & mm,
TapeSingleThreadInterface & tapeReader,
DiskWriteThreadPool & diskWriter,cta::RetrieveMount &retrieveMount,
uint64_t filesPerRequest, uint64_t bytesPerRequest,cta::log::LogContext lc);
virtual ~RecallTaskInjector();
/**
* Function for a feed-back loop purpose between RecallTaskInjector and
* TapeReadSingleThread. When TapeReadSingleThread::popAndRequestMoreJobs detects
* it has not enough jobs to do to, it will push a request, that when executed
* will ask the client to try to fill up the queue.
* @param lastCall true if we want the new request to be a last call.
* See Request::lastCall
*/
virtual void requestInjection(bool lastCall);
/**
* Send an end token in the request queue. There should be no subsequent
* calls to requestInjection.
*/
void finish();
/**
* Contact the client to make sure there are really something to do
* Something = recall at most maxFiles or at least maxBytes
*
* @param noFilesToRecall will be true if noFilesWere popped from the queue.
* @return true if there are jobs to be done, false otherwise
*/
bool synchronousFetch(bool & noFilesToRecall);
/**
* Wait for the inner thread to finish
*/
void waitThreads();
/**
* Start the inner thread
*/
void startThreads();
/**
* Set the drive interface in use
* @param di - Drive interface
*/
void setDriveInterface(castor::tape::tapeserver::drive::DriveInterface *di);
/**
* Initialize Recommended Access Order parameters
*/
void initRAO(const castor::tape::tapeserver::rao::RAOParams & dataConfig, cta::catalogue::Catalogue * catalogue);
void waitForPromise();
void setPromise();
/**
* This method will tell the TapeReadSingleThread that the
* first batch of tasks has been injected
* by the RecallTaskInjector
*/
void setFirstTasksInjectedPromise();
/**
* This method will be called by the TapeReadSingleThread
* so that TapeReadSingleThread will wait the first batch
* of tasks to be injected by the RecallTaskInjector
*/
void waitForFirstTasksInjectedPromise();
private:
/**
* It will signal to the disk read thread pool, tape write single thread
* and to the mem manager they have to stop their threads(s)
*/
void signalEndDataMovement();
/**
* It will delete all remaining tasks
*/
void deleteAllTasks();
/**
* Create all the tape-read and write-disk tasks for set of files to retrieve
*/
void injectBulkRecalls();
/**
* A request of files to recall. We request EITHER
* - a maximum of nbMaxFiles files
* - OR at least byteSizeThreshold bytes.
* That means we stop as soon as we have nbMaxFiles files or the cumulated size
* is equal or above byteSizeThreshold.
*/
class Request {
public:
Request(uint64_t mf, uint64_t mb, bool lc):
filesRequested(mf), bytesRequested(mb), lastCall(lc),end(false) {}
Request():
filesRequested(0), bytesRequested(0), lastCall(true),end(true) {}
const uint64_t filesRequested;
const uint64_t bytesRequested;
/**
* True if it is the last call for the set of requests :it means
* we don't need to try to get more files to recall
* and can send into all the different threads a signal .
*/
const bool lastCall;
/**
* Set as true if the loop back process notified to us the end
*/
const bool end;
};
class WorkerThread: public cta::threading::Thread {
public:
WorkerThread(RecallTaskInjector & rji): m_parent(rji) {}
virtual void run();
private:
RecallTaskInjector & m_parent;
} m_thread;
///The memory manager for accessing memory blocks.
RecallMemoryManager & m_memManager;
///the one object that will hold the thread which will be executing
///tape-reading tasks
TapeSingleThreadInterface & m_tapeReader;
///the one object that will hold all the threads which will be executing
///disk-writing tasks
DiskWriteThreadPool & m_diskWriter;
/// the client who is sending us jobs
cta::RetrieveMount &m_retrieveMount;
/// Drive interface needed for performing Recommended Access Order query
castor::tape::tapeserver::drive::DriveInterface * m_drive;
std::vector> m_jobs;
/**
* utility member to log some pieces of information
*/
cta::log::LogContext m_lc;
cta::threading::Mutex m_producerProtection;
cta::threading::BlockingQueue m_queue;
//maximal number of files requested. at once
const uint64_t m_maxFiles;
//maximal number of cumulated byte requested. at once
const uint64_t m_maxBytes;
/**
* The RAO manager to perofrm RAO operations
*/
castor::tape::tapeserver::rao::RAOManager m_raoManager;
/** Number of jobs to be fetched before the tape is mounted.
* The desired number is m_raoLimits.maxSupported
*/
unsigned int m_fetched;
/**
* The promise for reordering the read tasks according to RAO by the
* RecallTaskInjector. The tasks to be run are placed in the m_tasks queue
*/
std::promise m_raoPromise;
std::future m_raoFuture;
std::promise m_firstTasksInjectedPromise;
std::future m_firstTasksInjectedFuture;
bool m_promiseFirstTaskInjectedSet = false;
};
} //end namespace daemon
} //end namespace tapeserver
} //end namespace tape
} //end namespace castor