Commit d523ec22 authored by Eric Cano's avatar Eric Cano
Browse files

Added DataTransferSession to cta-taped.

parent 446d6ddc
......@@ -19,6 +19,8 @@
#pragma once
#include "scheduler/MountType.hpp"
#include <string>
#include <stdint.h>
namespace cta { namespace common {
......
......@@ -59,42 +59,43 @@ namespace cta {
*
* @return The type of this tape mount.
*/
virtual MountType::Enum getMountType() const;
MountType::Enum getMountType() const override;
/**
* Returns the volume identifier of the tape to be mounted.
*
* @return The volume identifier of the tape to be mounted.
*/
virtual std::string getVid() const;
std::string getVid() const override;
/**
* Returns the drive namn
* Returns the drive name
* @return The drive name
*/
virtual std::string getDrive() const;
std::string getDrive() const;
/**
* Returns the mount transaction id.
*
* @return The mount transaction id.
*/
virtual std::string getMountTransactionId() const;
std::string getMountTransactionId() const override;
/**
* Indicates that the mount was completed.
* This function is overridden in MockArchiveMount for unit tests.
*/
virtual void complete();
/**
* Indicates that the mount was cancelled.
*/
virtual void abort();
void abort() override;
/**
* Report a drive status change
*/
virtual void setDriveStatus(cta::common::DriveStatus status);
void setDriveStatus(cta::common::DriveStatus status) override;
/**
* Report that the tape is full.
......@@ -122,7 +123,7 @@ namespace cta {
*
* @return The mount transaction id.
*/
virtual uint32_t getNbFiles() const;
uint32_t getNbFiles() const override;
/**
* Destructor.
......
......@@ -21,7 +21,8 @@ set (CTA_SCHEDULER_SRC_FILES
MountType.cpp
UserRequest.cpp
OStoreDB/OStoreDB.cpp
OStoreDB/OStoreDBWithAgent.cpp)
OStoreDB/OStoreDBWithAgent.cpp
LabelMount.cpp)
add_library (ctascheduler SHARED
${CTA_SCHEDULER_SRC_FILES})
......
......@@ -31,7 +31,8 @@ namespace cta {
enum Enum {
NONE,
ARCHIVE,
RETRIEVE};
RETRIEVE,
LABEL};
/**
* Thread safe method that returns the string representation of the
......
......@@ -79,7 +79,7 @@ namespace cta {
*
* @return The mount transaction id.
*/
virtual uint32_t getNbFiles() const;
uint32_t getNbFiles() const override;
/**
* Report a drive status change
......
......@@ -327,6 +327,12 @@ public:
virtual void fail() = 0;
virtual ~RetrieveJob() {}
};
/*============ Label management: user side =================================*/
// TODO
/*============ Label management: tape server side ==========================*/
class LabelMount {}; // TODO
/*============ Session management ==========================================*/
/**
......@@ -334,7 +340,7 @@ public:
* comparison between mounts.
*/
struct PotentialMount {
cta::MountType::Enum type; /**< Is this an archive or retireve? */
cta::MountType::Enum type; /**< Is this an archive, retireve or label? */
std::string vid; /**< The tape VID (for a retieve) */
std::string tapePool; /**< The name of the tape pool for both archive and retrieve */
uint64_t priority; /**< The priority for the mount, defined as the highest priority of all queued jobs */
......
......@@ -19,6 +19,7 @@
#pragma once
#include "scheduler/MountType.hpp"
#include "common/DriveState.hpp"
#include <string>
......@@ -43,14 +44,14 @@ namespace cta {
* @return The volume identifier of the tape to be mounted.
*/
virtual std::string getVid() const = 0;
/**
* Returns the mount transaction id.
*
* @return The mount transaction id.
*/
virtual std::string getMountTransactionId() const = 0;
/**
* Returns the mount transaction id.
*
......@@ -62,11 +63,16 @@ namespace cta {
* Indicates that the mount was aborted.
*/
virtual void abort() = 0;
/**
* Report a drive status change
*/
virtual void setDriveStatus(cta::common::DriveStatus status) = 0;
/**
* Destructor.
*/
virtual ~TapeMount() throw() = 0;
virtual ~TapeMount() throw();
}; // class TapeMount
......
......@@ -49,7 +49,7 @@ namespace cta {
}
}
void complete() {
void complete() override {
completes++;
}
......
......@@ -48,14 +48,6 @@ struct DataTransferConfig {
*/
uint32_t nbBufs;
/**
* What to do in case of a bad MIR on load. The valid options are:
* REPAIR : SPACE to EOD, REWIND
* IGNORE : simply go on
* CANCEL : cancel the request
*/
std::string tapeBadMIRHandlingRepair;
/**
* When the tapebridged daemon requests the tapegatewayd daemon for a set of
* files to migrate to tape, this parameter defines the maximum number of
......
......@@ -119,6 +119,8 @@ castor::tape::tapeserver::daemon::Session::EndOfSessionAction
return executeRead(lc, dynamic_cast<cta::RetrieveMount *>(tapeMount.get()));
case cta::MountType::ARCHIVE:
return executeWrite(lc, dynamic_cast<cta::ArchiveMount *>(tapeMount.get()));
case cta::MountType::LABEL:
return executeLabel(lc, dynamic_cast<cta::LabelMount *>(tapeMount.get()));
default:
return MARK_DRIVE_AS_UP;
}
......@@ -326,6 +328,16 @@ castor::tape::tapeserver::daemon::Session::EndOfSessionAction
}
}
//------------------------------------------------------------------------------
//DataTransferSession::executeWrite
//------------------------------------------------------------------------------
castor::tape::tapeserver::daemon::Session::EndOfSessionAction
castor::tape::tapeserver::daemon::DataTransferSession::executeLabel(cta::log::LogContext& lc, cta::LabelMount* labelMount) {
throw 0;
// TODO
}
//------------------------------------------------------------------------------
//DataTransferSession::findDrive
//------------------------------------------------------------------------------
......
......@@ -25,16 +25,17 @@
#include "common/log/LogContext.hpp"
#include "common/log/Logger.hpp"
#include "castor/mediachanger/MediaChangerFacade.hpp"
#include "tapeserver/castor/mediachanger/MediaChangerFacade.hpp"
#include "tapeserver/daemon/TapedProxy.hpp"
#include "common/processCap/ProcessCap.hpp"
#include "castor/tape/tapeserver/daemon/DataTransferConfig.hpp"
#include "castor/tape/tapeserver/daemon/DriveConfig.hpp"
#include "castor/tape/tapeserver/daemon/Session.hpp"
#include "castor/tape/tapeserver/daemon/TapeSingleThreadInterface.hpp"
#include "castor/tape/tapeserver/system/Wrapper.hpp"
#include "DataTransferConfig.hpp"
#include "DriveConfig.hpp"
#include "Session.hpp"
#include "TapeSingleThreadInterface.hpp"
#include "tapeserver/castor/tape/tapeserver/system/Wrapper.hpp"
#include "scheduler/ArchiveMount.hpp"
#include "scheduler/RetrieveMount.hpp"
#include "scheduler/LabelMount.hpp"
#include "scheduler/Scheduler.hpp"
#include "scheduler/TapeMount.hpp"
......@@ -118,6 +119,8 @@ namespace daemon {
EndOfSessionAction executeRead(cta::log::LogContext & lc, cta::RetrieveMount *retrieveMount);
/** sub-part of execute for a write session */
EndOfSessionAction executeWrite(cta::log::LogContext & lc, cta::ArchiveMount *archiveMount);
/** sub-part of execute for a label session */
EndOfSessionAction executeLabel(cta::log::LogContext & lc, cta::LabelMount *labelMount);
/** Reference to the MediaChangerFacade, allowing the mounting of the tape
* by the library. It will be used exclusively by the tape thread. */
castor::mediachanger::MediaChangerFacade & m_mc;
......
......@@ -28,15 +28,15 @@
#pragma once
#include "castor/mediachanger/MediaChangerFacade.hpp"
#include "tapeserver/castor/mediachanger/MediaChangerFacade.hpp"
#include "common/log/LogContext.hpp"
#include "common/threading/BlockingQueue.hpp"
#include "common/processCap/ProcessCap.hpp"
#include "common/threading/Threading.hpp"
#include "castor/tape/tapeserver/daemon/Session.hpp"
#include "castor/tape/tapeserver/daemon/TapeSessionStats.hpp"
#include "castor/tape/tapeserver/daemon/VolumeInfo.hpp"
#include "castor/tape/tapeserver/drive/DriveInterface.hpp"
#include "Session.hpp"
#include "TapeSessionStats.hpp"
#include "VolumeInfo.hpp"
#include "tapeserver/castor/tape/tapeserver/drive/DriveInterface.hpp"
#include "common/Timer.hpp"
namespace castor {
......
......@@ -17,6 +17,7 @@
*/
#include "DriveHandler.hpp"
#include "DriveHandlerProxy.hpp"
#include "common/log/LogContext.hpp"
#include "common/exception/Errnum.hpp"
#include "tapeserver/daemon/WatchdogMessage.pb.h"
......@@ -26,7 +27,15 @@
#include "tapeserver/castor/acs/Constants.hpp"
#include "tapeserver/castor/mediachanger/MmcProxyLog.hpp"
#include "tapeserver/castor/tape/tapeserver/daemon/CleanerSession.hpp"
#include "tapeserver/castor/tape/tapeserver/daemon/DataTransferSession.hpp"
#include "tapeserver/castor/legacymsg/RmcProxyTcpIp.hpp"
#include "objectstore/Backend.hpp"
#include "objectstore/BackendFactory.hpp"
#include "objectstore/BackendVFS.hpp"
#include "objectstore/BackendPopulator.hpp"
#include "scheduler/OStoreDB/OStoreDBWithAgent.hpp"
#include "rdbms/Login.hpp"
#include "catalogue/CatalogueFactory.hpp"
#include <unistd.h>
#include <signal.h>
#include <sys/wait.h>
......@@ -679,8 +688,83 @@ int DriveHandler::runChild() {
60);
return cleanerSession.execute();
} else {
// TODO
throw 0;
// The next session will be a normal session
// Capabilities management.
cta::server::ProcessCap capUtils;
// Mounting management.
const int sizeOfIOThreadPoolForZMQ = 1;
castor::messages::SmartZmqContext zmqContext(
castor::messages::SmartZmqContext::instantiateZmqContext(sizeOfIOThreadPoolForZMQ,
m_processManager.logContext().logger()));
castor::messages::AcsProxyZmq acs(castor::acs::ACS_PORT, zmqContext.get());
castor::mediachanger::MmcProxyLog mmc(m_processManager.logContext().logger());
// The network timeout of rmc communications should be several minutes due
// to the time it takes to mount and unmount tapes
const int rmcNetTimeout = 600; // Timeout in seconds
castor::legacymsg::RmcProxyTcpIp rmc(RMC_PORT, rmcNetTimeout,
RMC_MAXRQSTATTEMPTS);
castor::mediachanger::MediaChangerFacade mediaChangerFacade(acs, mmc, rmc);
castor::tape::System::realWrapper sWrapper;
castor::tape::tapeserver::daemon::DriveConfig driveConfig;
driveConfig.m_unitName = m_configLine.unitName;
driveConfig.m_librarySlot = castor::mediachanger::LibrarySlotParser::parse(m_configLine.librarySlot);
driveConfig.m_devFilename = m_configLine.devFilename;
driveConfig.m_logicalLibrary = m_configLine.logicalLibrary;
cta::tape::daemon::DriveHandlerProxy driveHandlerProxy(*m_socketPair);
castor::tape::tapeserver::daemon::DataTransferConfig dataTransferConfig;
dataTransferConfig.bufsz = m_tapedConfig.bufferSize.value();
dataTransferConfig.bulkRequestMigrationMaxBytes =
m_tapedConfig.archiveFetchBytesFiles.value().maxBytes;
dataTransferConfig.bulkRequestMigrationMaxFiles =
m_tapedConfig.archiveFetchBytesFiles.value().maxFiles;
dataTransferConfig.bulkRequestRecallMaxBytes =
m_tapedConfig.retrieveFetchBytesFiles.value().maxBytes;
dataTransferConfig.bulkRequestRecallMaxFiles =
m_tapedConfig.retrieveFetchBytesFiles.value().maxFiles;
dataTransferConfig.maxBytesBeforeFlush =
m_tapedConfig.archiveFlushBytesFiles.value().maxBytes;
dataTransferConfig.maxFilesBeforeFlush =
m_tapedConfig.archiveFlushBytesFiles.value().maxFiles;
dataTransferConfig.nbBufs = m_tapedConfig.bufferCount.value();
dataTransferConfig.nbDiskThreads = m_tapedConfig.nbDiskThreads.value();
dataTransferConfig.remoteFileProtocol = "XROOT";
dataTransferConfig.useLbp = true;
dataTransferConfig.xrootPrivateKey = "";
std::unique_ptr<cta::objectstore::Backend> backend(
cta::objectstore::BackendFactory::createBackend(m_tapedConfig.objectStoreURL.value()).release());
// If the backend is a VFS, make sure we don't delete it on exit.
// If not, nevermind.
try {
dynamic_cast<cta::objectstore::BackendVFS &>(*backend).noDeleteOnExit();
} catch (std::bad_cast &){}
cta::objectstore::BackendPopulator backendPopulator(*backend);
cta::OStoreDBWithAgent osdb(*backend, backendPopulator.getAgentReference());
const cta::rdbms::Login catalogueLogin = cta::rdbms::Login::parseFile("/etc/cta/cta_catalogue_db.conf");
const uint64_t nbConns = 1;
std::unique_ptr<cta::catalogue::Catalogue> catalogue(cta::catalogue::CatalogueFactory::create(catalogueLogin, nbConns));
cta::Scheduler scheduler(*catalogue, osdb, 5, 2*1000*1000); //TODO: we have hardcoded the mount policy parameters here temporarily we will remove them once we know where to put them
castor::tape::tapeserver::daemon::DataTransferSession dataTransferSession(
cta::utils::getShortHostname(),
m_processManager.logContext().logger(),
sWrapper,
driveConfig,
mediaChangerFacade,
driveHandlerProxy,
capUtils,
dataTransferConfig,
scheduler);
return dataTransferSession.execute();
}
}
......
......@@ -27,7 +27,32 @@ DriveHandlerProxy::DriveHandlerProxy(server::SocketPair& socketPair): m_socketPa
void DriveHandlerProxy::addLogParams(const std::string& unitName, const std::list<cta::log::Param>& params) {
throw exception::Exception(std::string(__FUNCTION__) + " not implemented");
// TODO
}
void DriveHandlerProxy::deleteLogParams(const std::string& unitName, const std::list<std::string>& paramNames) {
// TODO
throw 0;
}
void DriveHandlerProxy::labelError(const std::string& unitName, const std::string& message) {
// TODO
throw 0;
}
void DriveHandlerProxy::reportHeartbeat(uint64_t totalTapeBytesMoved, uint64_t totalDiskBytesMoved) {
// TODO
throw 0;
}
void DriveHandlerProxy::reportState(const cta::tape::session::SessionState state, const cta::tape::session::SessionType type, const std::string& vid) {
// TODO
throw 0;
}
DriveHandlerProxy::~DriveHandlerProxy() { }
}}} // namespace cta::tape::daemon
......@@ -35,7 +35,7 @@ public:
* @param sopcketPair Reference to the socket pair.
*/
DriveHandlerProxy(server::SocketPair & sopcketPair);
virtual ~DriveHandlerProxy() {}
virtual ~DriveHandlerProxy();
void reportState(const cta::tape::session::SessionState state, const cta::tape::session::SessionType type, const std::string& vid) override;
void reportHeartbeat(uint64_t totalTapeBytesMoved, uint64_t totalDiskBytesMoved) override;
void addLogParams(const std::string& unitName, const std::list<cta::log::Param>& params) override;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment