Commit f45265f2 authored by Eric Cano's avatar Eric Cano
Browse files

Implemented Scheduler::getRetrieveRequests() (and changed its signature).

parent 02aeaa41
......@@ -39,16 +39,18 @@ void cta::objectstore::RetrieveToFileRequest::initialize() {
m_payloadInterpreted = true;
}
void cta::objectstore::RetrieveToFileRequest::addJob(uint16_t copyNumber,
const std::string& tape, const std::string& tapeaddress) {
void cta::objectstore::RetrieveToFileRequest::addJob(const cta::TapeFileLocation & tapeFileLocation,
const std::string& tapeaddress) {
checkPayloadWritable();
auto *j = m_payload.add_jobs();
j->set_copynb(copyNumber);
j->set_copynb(tapeFileLocation.copyNb);
j->set_status(serializers::RetrieveJobStatus::RJS_LinkingToTape);
j->set_tape(tape);
j->set_tape(tapeFileLocation.vid);
j->set_tapeaddress(tapeaddress);
j->set_totalretries(0);
j->set_retrieswithinmount(0);
j->set_blockid(tapeFileLocation.blockId);
j->set_fseq(tapeFileLocation.fSeq);
}
void cta::objectstore::RetrieveToFileRequest::setArchiveFile(
......
......@@ -21,6 +21,7 @@
#include "ObjectOps.hpp"
#include "objectstore/cta.pb.h"
#include <list>
#include "common/archiveNS/TapeFileLocation.hpp"
namespace cta { namespace objectstore {
......@@ -34,7 +35,7 @@ public:
RetrieveToFileRequest(const std::string & address, Backend & os);
RetrieveToFileRequest(GenericObject & go);
void initialize();
void addJob(uint16_t copyNumber, const std::string & tape,
void addJob(const cta::TapeFileLocation & tapeFileLocation,
const std::string & tapeaddress);
void setArchiveFile(const std::string & archiveFile);
std::string getArchiveFile();
......@@ -51,6 +52,8 @@ public:
uint16_t copyNb;
std::string tape;
std::string tapeAddress;
uint64_t fseq;
uint64_t blockid;
};
std::list<JobDump> dumpJobs();
};
......
......@@ -18,6 +18,7 @@
#include "Tape.hpp"
#include "GenericObject.hpp"
#include "CreationLog.hpp"
cta::objectstore::Tape::Tape(const std::string& address, Backend& os):
ObjectOps<serializers::Tape>(os, address) { }
......@@ -31,9 +32,11 @@ cta::objectstore::Tape::Tape(GenericObject& go):
}
void cta::objectstore::Tape::initialize(const std::string &name,
const std::string &logicallibrary) {
const std::string &logicallibrary, const cta::CreationLog & creationLog) {
ObjectOps<serializers::Tape>::initialize();
// Set the reguired fields
objectstore::CreationLog oscl(creationLog);
oscl.serialize(*m_payload.mutable_log());
m_payload.set_vid(name);
m_payload.set_bytesstored(0);
m_payload.set_lastfseq(0);
......@@ -70,6 +73,20 @@ void cta::objectstore::Tape::removeIfEmpty() {
}
remove();
}
cta::CreationLog cta::objectstore::Tape::getCreationLog() {
checkPayloadReadable();
objectstore::CreationLog oscl;
oscl.deserialize(m_payload.log());
return cta::CreationLog(oscl);
}
void cta::objectstore::Tape::setCreationLog(const cta::CreationLog& creationLog) {
checkPayloadWritable();
objectstore::CreationLog oscl(creationLog);
oscl.serialize(*m_payload.mutable_log());
}
void cta::objectstore::Tape::setStoredData(uint64_t bytes) {
checkPayloadWritable();
m_payload.set_bytesstored(bytes);
......@@ -144,6 +161,7 @@ void cta::objectstore::Tape::addJob(const RetrieveToFileRequest::JobDump& job,
auto * j = m_payload.add_retrievejobs();
j->set_address(retrieveToFileAddress);
j->set_size(size);
j->set_copynb(job.copyNb);
}
std::string cta::objectstore::Tape::getLogicalLibrary() {
......@@ -161,6 +179,37 @@ cta::objectstore::Tape::JobsSummary cta::objectstore::Tape::getJobsSummary() {
return ret;
}
auto cta::objectstore::Tape::dumpAndFetchRetrieveRequests()
-> std::list<RetrieveRequestDump> {
checkPayloadReadable();
std::list<RetrieveRequestDump> ret;
auto & rjl = m_payload.retrievejobs();
for (auto rj=rjl.begin(); rj!=rjl.end(); rj++) {
try {
cta::objectstore::RetrieveToFileRequest rtfr(rj->address(),m_objectStore);
objectstore::ScopedSharedLock rtfrl(rtfr);
rtfr.fetch();
ret.push_back(RetrieveRequestDump());
auto & retReq = ret.back();
retReq.archiveFile = rtfr.getArchiveFile();
retReq.remoteFile = rtfr.getRemoteFile();
// Find the copy number from the list of jobs
retReq.activeCopyNb = rj->copynb();
auto jl = rtfr.dumpJobs();
for (auto j=jl.begin(); j!= jl.end(); j++) {
retReq.tapeCopies.push_back(TapeFileLocation());
auto & retJob = retReq.tapeCopies.back();
retJob.blockId = j->blockid;
retJob.copyNb = j->copyNb;
retJob.fSeq = j->fseq;
retJob.vid = j->tape;
}
} catch (cta::exception::Exception &) {}
}
return ret;
}
bool cta::objectstore::Tape::isArchived() {
checkPayloadReadable();
return m_payload.archived();
......
......@@ -21,6 +21,8 @@
#include "ObjectOps.hpp"
#include "objectstore/cta.pb.h"
#include "RetrieveToFileRequest.hpp"
#include "common/archiveNS/TapeFileLocation.hpp"
#include "scheduler/RetrieveRequestDump.hpp"
namespace cta { namespace objectstore {
......@@ -32,12 +34,15 @@ class Tape: public ObjectOps<serializers::Tape> {
public:
Tape(const std::string & address, Backend & os);
Tape(GenericObject & go);
void initialize(const std::string & vid, const std::string &logicalLibrary);
void initialize(const std::string & vid, const std::string &logicalLibrary,
const cta::CreationLog & creationLog);
void garbageCollect();
bool isEmpty();
CTA_GENERATE_EXCEPTION_CLASS(NotEmpty);
void removeIfEmpty();
std::string dump();
cta::CreationLog getCreationLog();
void setCreationLog(const cta::CreationLog & creationLog);
// Tape location management ==================================================
std::string getLogicalLibrary();
......@@ -72,6 +77,7 @@ public:
uint64_t priority;
};
JobsSummary getJobsSummary();
std::list<RetrieveRequestDump> dumpAndFetchRetrieveRequests();
// -- Stored data counting ---------------------------------------------------
uint64_t getStoredData();
......
......@@ -90,7 +90,7 @@ std::string cta::objectstore::TapePool::addOrGetTapeAndCommit(const std::string&
agent.commit();
// The create the tape object
Tape t(tapeAddress, ObjectOps<serializers::TapePool>::m_objectStore);
t.initialize(vid, logicalLibraryName);
t.initialize(vid, logicalLibraryName, creationLog);
t.setOwner(agent.getAddressIfSet());
t.setBackupOwner(getAddressIfSet());
t.insert();
......@@ -141,6 +141,21 @@ void cta::objectstore::TapePool::removeTapeAndCommit(const std::string& vid) {
}
}
auto cta::objectstore::TapePool::dumpTapes() -> std::list<TapeBasicDump>{
checkPayloadReadable();
std::list<TapeBasicDump> ret;
auto & tl = m_payload.tapes();
for (auto tp=tl.begin(); tp!=tl.end(); tp++) {
ret.push_back(TapeBasicDump());
ret.back().address = tp->address();
ret.back().vid = tp->vid();
ret.back().capacityInBytes = tp->capacity();
ret.back().logicalLibraryName = tp->library();
ret.back().log.deserialize(tp->log());
}
return ret;
}
auto cta::objectstore::TapePool::dumpTapesAndFetchStatus() -> std::list<TapeDump>{
checkPayloadReadable();
std::list<TapeDump> ret;
......
......@@ -56,6 +56,15 @@ public:
CTA_GENERATE_EXCEPTION_CLASS(WrongTape);
void removeTapeAndCommit(const std::string &vid);
std::string getTapeAddress(const std::string &vid);
class TapeBasicDump {
public:
std::string vid;
std::string address;
std::string logicalLibraryName;
uint64_t capacityInBytes;
objectstore::CreationLog log;
};
std::list<TapeBasicDump> dumpTapes();
class TapeDump {
public:
std::string vid;
......
......@@ -31,7 +31,8 @@ TEST(ObjectStore, TapeBasicAccess) {
{
// Try to create the tape entry
cta::objectstore::Tape t(tapeAddress, be);
t.initialize("V12345", "LIB0");
cta::CreationLog cl(cta::UserIdentity(123,456), "testHost", time(NULL), "Unit test");
t.initialize("V12345", "LIB0", cl);
t.insert();
}
{
......
......@@ -189,6 +189,7 @@ message ArchiveJobPointer {
message RetrieveJobPointer {
required uint64 size = 3101;
required string address = 3102;
required uint32 copynb = 3103;
}
// ------------- Mount criteria and quota -------------------------------------
......@@ -259,6 +260,8 @@ message Tape {
required bool disabled = 4314;
required bool readonly = 4315;
required bool full = 4316;
// Creation log
required CreationLog log = 4317;
}
// ------------- Archive Jobs --------------------------------------------------
......@@ -331,9 +334,11 @@ message RetrieveJobEntry {
required uint32 copynb = 4700;
required string tape = 4701;
required string tapeaddress = 4702;
required RetrieveJobStatus status = 4703;
required uint32 totalretries = 4704;
required uint32 retrieswithinmount = 4705;
required uint64 fseq = 4703;
required uint64 blockid = 4704;
required RetrieveJobStatus status = 4705;
required uint32 totalretries = 4706;
required uint32 retrieswithinmount = 4707;
}
message RetrieveToFileRequest {
......
......@@ -20,7 +20,6 @@ set (CTA_SCHEDULER_SRC_FILES
MountType.cpp
PositioningMethod.cpp
RetrieveJob.cpp
RetrieveFromTapeCopyRequest.cpp
RetrieveMount.cpp
RetrieveRequest.cpp
RetrieveToDirRequest.cpp
......
......@@ -35,7 +35,7 @@
#include "scheduler/ArchiveToTapeCopyRequest.hpp"
#include "scheduler/DummyScheduler.hpp"
#include "scheduler/LogicalLibrary.hpp"
#include "scheduler/RetrieveFromTapeCopyRequest.hpp"
#include "scheduler/RetrieveRequestDump.hpp"
#include "scheduler/RetrieveMount.hpp"
#include "scheduler/RetrieveToDirRequest.hpp"
#include "scheduler/RetrieveToFileRequest.hpp"
......@@ -85,18 +85,18 @@ void cta::DummyScheduler::deleteArchiveRequest(
//------------------------------------------------------------------------------
// getRetrieveRequests
//------------------------------------------------------------------------------
std::map<cta::Tape, std::list<cta::RetrieveFromTapeCopyRequest> > cta::
std::map<cta::Tape, std::list<cta::RetrieveRequestDump> > cta::
DummyScheduler::getRetrieveRequests(const SecurityIdentity &requester) const {
return std::map<cta::Tape, std::list<cta::RetrieveFromTapeCopyRequest> >();
return std::map<cta::Tape, std::list<cta::RetrieveRequestDump> >();
}
//------------------------------------------------------------------------------
// getRetrieveRequests
//------------------------------------------------------------------------------
std::list<cta::RetrieveFromTapeCopyRequest> cta::DummyScheduler::getRetrieveRequests(
std::list<cta::RetrieveRequestDump> cta::DummyScheduler::getRetrieveRequests(
const SecurityIdentity &requester,
const std::string &vid) const {
return std::list<cta::RetrieveFromTapeCopyRequest>();
return std::list<cta::RetrieveRequestDump>();
}
//------------------------------------------------------------------------------
......
......@@ -88,7 +88,7 @@ public:
* @return all of the queued retrieve requests. The returned requsts are
* grouped by tape and then sorted by creation time, oldest first.
*/
std::map<Tape, std::list<RetrieveFromTapeCopyRequest> > getRetrieveRequests(
std::map<Tape, std::list<RetrieveRequestDump> > getRetrieveRequests(
const SecurityIdentity &requester) const;
/**
......@@ -100,7 +100,7 @@ public:
* @return The queued retrieve requests for the specified tape. The
* returned requests are sorted by creation time, oldest first.
*/
std::list<RetrieveFromTapeCopyRequest> getRetrieveRequests(
std::list<RetrieveRequestDump> getRetrieveRequests(
const SecurityIdentity &requester,
const std::string &vid) const;
......
......@@ -987,7 +987,7 @@ void OStoreDB::queue(const cta::RetrieveToFileRequest& rqst) {
// Add all the tape copies to the request
try {
for (auto tc=rqst.getTapeCopies().begin(); tc!=rqst.getTapeCopies().end(); tc++) {
rtfr.addJob(tc->copyNb, tc->vid, vidToAddress.at(tc->vid));
rtfr.addJob(*tc, vidToAddress.at(tc->vid));
}
} catch (std::out_of_range &) {
throw NoSuchTape("In OStoreDB::queue(RetrieveToFile): tape not found");
......@@ -1054,12 +1054,46 @@ void OStoreDB::queue(const RetrieveToDirRequest& rqst) {
}
}
std::list<RetrieveFromTapeCopyRequest> OStoreDB::getRetrieveRequests(const std::string& vid) const {
std::list<RetrieveRequestDump> OStoreDB::getRetrieveRequests(const std::string& vid) const {
throw exception::Exception("Not Implemented");
}
std::map<cta::Tape, std::list<RetrieveFromTapeCopyRequest> > OStoreDB::getRetrieveRequests() const {
throw exception::Exception("Not Implemented");
std::map<cta::Tape, std::list<RetrieveRequestDump> > OStoreDB::getRetrieveRequests() const {
std::map<cta::Tape, std::list<RetrieveRequestDump> > ret;
// Get list of tape pools and then tapes
objectstore::RootEntry re(m_objectStore);
objectstore::ScopedSharedLock rel(re);
re.fetch();
auto tpl=re.dumpTapePools();
rel.release();
for (auto tpp = tpl.begin(); tpp != tpl.end(); tpp++) {
// Get the list of tapes for the tape pool
objectstore::TapePool tp(tpp->address, m_objectStore);
objectstore::ScopedSharedLock tplock(tp);
tp.fetch();
auto tl = tp.dumpTapes();
for (auto tptr = tl.begin(); tptr!= tl.end(); tptr++) {
// Get the list of retrieve requests for the tape.
objectstore::Tape t(tptr->address, m_objectStore);
objectstore::ScopedSharedLock tlock(t);
t.fetch();
auto jobs = t.dumpAndFetchRetrieveRequests();
// If the list is not empty, add to the map.
if (jobs.size()) {
cta::Tape tkey;
// TODO tkey.capacityInBytes;
tkey.creationLog = t.getCreationLog();
// TODO tkey.dataOnTapeInBytes;
tkey.logicalLibraryName = t.getLogicalLibrary();
tkey.nbFiles = t.getLastFseq();
// TODO tkey.status
tkey.tapePoolName = tp.getName();
tkey.vid = t.getVid();
ret[tkey] = std::move(jobs);
}
}
}
return ret;
}
void OStoreDB::deleteRetrieveRequest(const SecurityIdentity& requester,
......
......@@ -250,9 +250,9 @@ public:
virtual void queue(const RetrieveToDirRequest& rqst);
virtual std::list<RetrieveFromTapeCopyRequest> getRetrieveRequests(const std::string& vid) const;
virtual std::list<RetrieveRequestDump> getRetrieveRequests(const std::string& vid) const;
virtual std::map<Tape, std::list<RetrieveFromTapeCopyRequest> > getRetrieveRequests() const;
virtual std::map<Tape, std::list<RetrieveRequestDump> > getRetrieveRequests() const;
virtual void deleteRetrieveRequest(const SecurityIdentity& requester,
const std::string& remoteFile);
......
......@@ -25,7 +25,7 @@
#include "common/archiveNS/Tape.hpp"
#include "scheduler/LogicalLibrary.hpp"
#include "scheduler/ArchiveToTapeCopyRequest.hpp"
#include "scheduler/RetrieveFromTapeCopyRequest.hpp"
#include "scheduler/RetrieveRequestDump.hpp"
#include "objectstore/RootEntry.hpp"
#include "objectstore/Agent.hpp"
#include "objectstore/BackendVFS.hpp"
......@@ -183,11 +183,11 @@ public:
return m_OStoreDB.getLogicalLibraries();
}
virtual std::map<Tape, std::list<RetrieveFromTapeCopyRequest> > getRetrieveRequests() const {
virtual std::map<Tape, std::list<RetrieveRequestDump> > getRetrieveRequests() const {
return m_OStoreDB.getRetrieveRequests();
}
virtual std::list<RetrieveFromTapeCopyRequest> getRetrieveRequests(const std::string& vid) const {
virtual std::list<RetrieveRequestDump> getRetrieveRequests(const std::string& vid) const {
return m_OStoreDB.getRetrieveRequests(vid);
}
......
/*
* The CERN Tape Archive (CTA) project
* The CERN Tape Retrieve (CTA) project
* Copyright (C) 2015 CERN
*
* This program is free software: you can redistribute it and/or modify
......@@ -18,70 +18,12 @@
#pragma once
#include "scheduler/RetrieveRequest.hpp"
#include "common/archiveNS/TapeFileLocation.hpp"
#include <list>
#include <string>
namespace cta {
/**
* Class representing a user request to retrieve a single tape copy of an
* archived file to a single remote file.
*/
struct RetrieveFromTapeCopyRequest: public RetrieveRequest {
/**
* Constructor.
*/
RetrieveFromTapeCopyRequest();
/**
* Destructor.
*/
~RetrieveFromTapeCopyRequest() throw();
/**
* Constructor.
*
* @param archiveFile The full path of the source archive file.
* @param copyNb The tape copy number.
* @param tapeCopy The location of the tape copy.
* @param remoteFile The URL of the destination remote file.
* @param priority The priority of the request.
* @param user The identity of the user who made the request.
* @param creationTime Optionally the absolute time at which the user request
* was created. If no value is given then the current time is used.
*/
RetrieveFromTapeCopyRequest(
const std::string &archiveFile,
const uint64_t copyNb,
const TapeFileLocation &tapeCopy,
const std::string &remoteFile,
const uint64_t priority,
const CreationLog &creationLog);
/**
* The full path of the source archive file.
*/
std::string archiveFile;
/**
* The tape copy number.
*/
uint64_t copyNb;
/**
* The location of the copy on tape.
*/
TapeFileLocation tapeCopy;
/**
* The URL of the destination remote file.
*/
std::string remoteFile;
}; // struct RetrieveFromTapeCopyRequest
} // namespace cta
enum class RetrieveJobStatus {
LinkingToTape = 0,
Pending = 1,
Selected = 2,
Complete = 3,
Failed = 99
};
}
......@@ -16,34 +16,27 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "scheduler/RetrieveFromTapeCopyRequest.hpp"
#pragma once
#include "common/archiveNS/TapeFileLocation.hpp"
#include "common/CreationLog.hpp"
#include <list>
#include <string>
//------------------------------------------------------------------------------
// constructor
//------------------------------------------------------------------------------
cta::RetrieveFromTapeCopyRequest::RetrieveFromTapeCopyRequest() {
}
namespace cta {
//------------------------------------------------------------------------------
// destructor
//------------------------------------------------------------------------------
cta::RetrieveFromTapeCopyRequest::~RetrieveFromTapeCopyRequest() throw() {
}
/**
* Class representing a user request to retrieve a single archive file to a
* single remote file.
*/
struct RetrieveRequestDump {
uint64_t priority; /**< The priority of the request. */
CreationLog creationLog; /**< The time at which the request was created. */
std::string archiveFile; /**< he full path of the source archive file. */
uint64_t activeCopyNb; /**< The tape copy number currenty considered for retrieve. */
std::list<TapeFileLocation> tapeCopies; /**<The location of the copies on tape. */
std::string remoteFile; /**< The URL of the destination remote file. */
}; // struct RetrieveFromTapeCopyRequest
//------------------------------------------------------------------------------
// constructor
//------------------------------------------------------------------------------
cta::RetrieveFromTapeCopyRequest::RetrieveFromTapeCopyRequest(
const std::string &archiveFile,
const uint64_t copyNb,
const TapeFileLocation &tapeCopy,
const std::string &remoteFile,
const uint64_t priority,
const CreationLog & creationLog):
RetrieveRequest(priority, creationLog),
archiveFile(archiveFile),
copyNb(copyNb),
tapeCopy(tapeCopy),
remoteFile(remoteFile) {
}
} // namespace cta
......@@ -34,7 +34,7 @@
#include "scheduler/ArchiveToFileRequest.hpp"
#include "scheduler/ArchiveToTapeCopyRequest.hpp"
#include "scheduler/LogicalLibrary.hpp"
#include "scheduler/RetrieveFromTapeCopyRequest.hpp"
#include "scheduler/RetrieveRequestDump.hpp"
#include "scheduler/RetrieveMount.hpp"
#include "scheduler/RetrieveToDirRequest.hpp"
#include "scheduler/RetrieveToFileRequest.hpp"
......@@ -113,7 +113,7 @@ void cta::Scheduler::deleteArchiveRequest(
//------------------------------------------------------------------------------
// getRetrieveRequests
//------------------------------------------------------------------------------
std::map<cta::Tape, std::list<cta::RetrieveFromTapeCopyRequest> > cta::
std::map<cta::Tape, std::list<cta::RetrieveRequestDump> > cta::
Scheduler::getRetrieveRequests(const SecurityIdentity &requester) const {
return m_db.getRetrieveRequests();
}
......@@ -121,7 +121,7 @@ std::map<cta::Tape, std::list<cta::RetrieveFromTapeCopyRequest> > cta::
//------------------------------------------------------------------------------
// getRetrieveRequests
//------------------------------------------------------------------------------
std::list<cta::RetrieveFromTapeCopyRequest> cta::Scheduler::getRetrieveRequests(
std::list<cta::RetrieveRequestDump> cta::Scheduler::getRetrieveRequests(
const SecurityIdentity &requester,
const std::string &vid) const {
return m_db.getRetrieveRequests(vid);
......
......@@ -42,7 +42,7 @@ class LogicalLibrary;
class NameServer;
class RemoteNS;
class RemotePathAndStatus;
class RetrieveFromTapeCopyRequest;
class RetrieveRequestDump;
class RetrieveToDirRequest;
class RetrieveToFileRequest;
class SchedulerDatabase;
......@@ -138,7 +138,7 @@ public:
* @return all of the queued retrieve requests. The returned requsts are
* grouped by tape and then sorted by creation time, oldest first.
*/
virtual std::map<Tape, std::list<RetrieveFromTapeCopyRequest> >
virtual std::map<Tape, std::list<RetrieveRequestDump> >
getRetrieveRequests(const SecurityIdentity &requester) const;
/**
......@@ -150,7 +150,7 @@ public:
* @return The queued retrieve requests for the specified tape. The
* returned requests are sorted by creation time, oldest first.
*/
virtual std::list<RetrieveFromTapeCopyRequest> getRetrieveRequests(