Commit fc3a5ea0 authored by Daniele Kruse's avatar Daniele Kruse
Browse files

WIP: Started working on object store retrievals

parent fb40700b
......@@ -20,6 +20,7 @@ add_library (CTAObjectStore SHARED
TapePool.cpp
Tape.cpp
ArchiveToFileRequest.cpp
RetrieveToFileRequest.cpp
DriveRegister.cpp
BackendVFS.cpp
BackendRados.cpp
......
/*
* The CERN Tape Archive (CTA) project
* Copyright (C) 2015 CERN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "RetrieveToFileRequest.hpp"
\ No newline at end of file
/*
* The CERN Tape Archive (CTA) project
* Copyright (C) 2015 CERN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include "ObjectOps.hpp"
#include "objectstore/cta.pb.h"
#include <list>
namespace cta { namespace objectstore {
class Backend;
class Agent;
class GenericObject;
class CreationLog;
class RetrieveToFileRequest: public ObjectOps<serializers::RetrieveToFileRequest> {
public:
RetrieveToFileRequest(const std::string & address, Backend & os);
RetrieveToFileRequest(GenericObject & go);
void initialize();
void addJob(uint16_t copyNumber, const std::string & tape,
const std::string & tapeaddress);
void setArchiveFile(const std::string & archiveFile);
void setRemoteFile (const std::string & remoteFile);
void setPriority (uint64_t priority);
void setLog (const objectstore::CreationLog& creationLog);
void setRetrieveToDirRequestAddress(const std::string & dirRequestAddress);
class JobDump {
public:
uint16_t copyNb;
std::string tape;
std::string tapeAddress;
};
std::list<JobDump> dumpJobs();
};
}}
......@@ -128,7 +128,7 @@ public:
};
std::list<LibraryDump> dumpLibraries();
// TapePoolManipulations =====================================================
// TapePool Manipulations =====================================================
CTA_GENERATE_EXCEPTION_CLASS(TapePoolNotEmpty);
CTA_GENERATE_EXCEPTION_CLASS(WrongTapePool);
/** This function implicitly creates the tape pool structure and updates
......
......@@ -195,7 +195,7 @@ message Tape {
repeated RetrievalJobPointer retrievaljobs = 4305;
}
// ------------- Jobs ----------------------------------------------------------
// ------------- Archive Jobs --------------------------------------------------
// The status of the individual archive jobs. The jobs are materialised
// by table entries in the ArchiveToFileRequest.
......@@ -227,13 +227,52 @@ message ArchiveToFileRequest {
optional string archivetodiraddress = 4505;
}
message ArchivalToDirRequest {
message ArchiveToDirRequest {
required string archivedir = 4600;
repeated string filerequestsaddresses = 4601;
required uint64 priority = 4602;
required CreationLog log = 4603;
}
// ------------- Retrieve Jobs -------------------------------------------------
// The status of the individual retrieve jobs. The jobs are materialised
// by table entries in the RetrieveToFileRequest.
// This life cycle represented by the following enum
enum RetrieveJobStatus {
RJS_LinkingToTape = 0;
RJS_Pending = 1;
RJS_Selected = 2;
RJS_Complete = 3;
RJS_Failed = 99;
}
message RetrieveJobEntry {
required uint32 copynb = 4700;
required string tape = 4701;
required string tapeaddress = 4702;
required RetrieveJobStatus status = 4703;
required uint32 totalretries = 4704;
required uint32 retrieswithinmount = 4705;
}
message RetrieveToFileRequest {
required string remotefile = 4800;
required string archivefile = 4801;
repeated RetrieveJobEntry routes = 4802;
required uint64 priority = 4803;
required CreationLog log = 4804;
optional string retrievetodiraddress = 4805;
}
message RetrieveToDirRequest {
required string retrievedir = 4900;
repeated string filerequestsaddresses = 4901;
required uint64 priority = 4902;
required CreationLog log = 4903;
}
// ------------- Drives handling ----------------------------------------------
message DriveRegister {
......
......@@ -22,6 +22,7 @@
#include "objectstore/TapePool.hpp"
#include "objectstore/Tape.hpp"
#include "objectstore/ArchiveToFileRequest.hpp"
#include "objectstore/RetrieveToFileRequest.hpp"
#include "common/exception/Exception.hpp"
#include "scheduler/AdminHost.hpp"
#include "scheduler/AdminUser.hpp"
......@@ -29,10 +30,16 @@
#include "scheduler/ArchiveRequest.hpp"
#include "scheduler/ArchiveToFileRequest.hpp"
#include "scheduler/LogicalLibrary.hpp"
#include "scheduler/RetrieveToFileRequest.hpp"
#include "scheduler/StorageClass.hpp"
#include "scheduler/TapePool.hpp"
#include "scheduler/Tape.hpp"
#include "ArchiveToDirRequest.hpp"
#include "RetrieveToFileRequest.hpp"
#include "TapeCopyLocation.hpp"
#include <algorithm>
#include <stdlib.h> /* srand, rand */
#include <time.h> /* time */
namespace cta {
......@@ -430,7 +437,10 @@ void OStoreDB::queue(const cta::ArchiveToFileRequest& rqst) {
}
void OStoreDB::queue(const ArchiveToDirRequest& rqst) {
throw exception::Exception("Not Implemented");
auto & archiveToFileRequests = rqst.getArchiveToFileRequests();
for(auto req=archiveToFileRequests.begin(); req!=archiveToFileRequests.end(); req++) {
queue(*req);
}
}
void OStoreDB::deleteArchiveRequest(const SecurityIdentity& requester,
......@@ -453,8 +463,54 @@ std::list<ArchiveToTapeCopyRequest>
throw exception::Exception("Not Implemented");
}
void OStoreDB::queue(const RetrieveToFileRequest& rqst) {
void OStoreDB::queue(const cta::RetrieveToFileRequest& rqst) {
throw exception::Exception("Not Implemented");
// In order to post the job, construct it first.
objectstore::RetrieveToFileRequest rtfr(m_agent->nextId("RetrieveToFileRequest"), m_objectStore);
rtfr.initialize();
rtfr.setArchiveFile(rqst.getArchiveFile());
rtfr.setRemoteFile(rqst.getRemoteFile());
rtfr.setPriority(rqst.getPriority());
rtfr.setLog(rqst.getCreationLog());
// We will need to identity tapes is order to construct the request
RootEntry re(m_objectStore);
ScopedSharedLock rel(re);
re.fetch();
auto & tc = rqst.getTapeCopies();
auto numberOfCopies = tc.size();
srand (time(NULL));
auto chosenCopyNumber = rand() % numberOfCopies;
auto it = tc.begin();
std::advance(it, chosenCopyNumber);
auto tapePools = re.dumpTapePools();
std::string tapeAddress = "";
for(auto pool=tapePools.begin(); pool!=tapePools.end(); pool++) {
objectstore::TapePool tp(pool->address, m_objectStore);
auto tapes = tp.dumpTapes();
for(auto tape=tapes.begin(); tape!=tapes.end(); tape++) {
if(tape->vid==it->getVid()) {
tapeAddress = tape->address;
break;
}
}
}
rtfr.addJob(chosenCopyNumber, it->getVid(), tapeAddress);
auto jl = rtfr.dumpJobs();
if (!jl.size()) {
throw RetrieveRequestHasNoCopies("In OStoreDB::queue: the retrieve to file request has no copies");
}
// We successfully prepared the object. Time to create it and plug it to
// the tree.
rtfr.setOwner(m_agent->getAddressIfSet());
rtfr.insert();
ScopedExclusiveLock rtfrl(rtfr);
// We can now plug the request onto its tape
for (auto j=jl.begin(); j!=jl.end(); j++) {
objectstore::Tape tp(j->tapeAddress, m_objectStore);
ScopedExclusiveLock tpl(tp);
tp.fetch();
//tp.addJob(j);
}
}
void OStoreDB::queue(const RetrieveToDirRequest& rqst) {
......
......@@ -139,6 +139,7 @@ public:
virtual std::list<ArchiveToTapeCopyRequest> getArchiveRequests(const std::string& tapePoolName) const;
/* === Retrieve requests handling ======================================== */
CTA_GENERATE_EXCEPTION_CLASS(RetrieveRequestHasNoCopies);
virtual void queue(const RetrieveToFileRequest& rqst_);
virtual void queue(const RetrieveToDirRequest& rqst);
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment