diff --git a/objectstore/ArchiveToFileRequest.cpp b/objectstore/ArchiveToFileRequest.cpp index 750c01c7e1798158716699ef3fefa8137ed803a9..658acc98df11d5665a3c9b060b77c66b538a277d 100644 --- a/objectstore/ArchiveToFileRequest.cpp +++ b/objectstore/ArchiveToFileRequest.cpp @@ -88,7 +88,7 @@ auto cta::objectstore::ArchiveToFileRequest::dumpJobs() -> std::list<JobDump> { ret.push_back(JobDump()); ret.back().copyNb = j->copynb(); ret.back().tapePool = j->tapepool(); - ret.back().tapePool = j->tapepooladdress(); + ret.back().tapePoolAddress = j->tapepooladdress(); } return ret; } diff --git a/objectstore/ObjectOps.cpp b/objectstore/ObjectOps.cpp index 3e450b6f1c1d65c4ba7759c2490976b6083426c4..2d19c66ef52e411993cbd6e9185a5acca04378ac 100644 --- a/objectstore/ObjectOps.cpp +++ b/objectstore/ObjectOps.cpp @@ -32,6 +32,7 @@ namespace cta { namespace objectstore { MAKE_CTA_OBJECTSTORE_OBJECTOPS_TYPEID(DriveRegister); MAKE_CTA_OBJECTSTORE_OBJECTOPS_TYPEID(Tape); MAKE_CTA_OBJECTSTORE_OBJECTOPS_TYPEID(ArchiveToFileRequest); + MAKE_CTA_OBJECTSTORE_OBJECTOPS_TYPEID(RetrieveToFileRequest); #undef MAKE_CTA_OBJECTSTORE_OBJECTOPS_TYPEID }} \ No newline at end of file diff --git a/objectstore/RetrieveToFileRequest.cpp b/objectstore/RetrieveToFileRequest.cpp index 59e4db31b9f9f0fbb2923edf3262ce6a78a3853c..a11ca956e07b69818452f61a23e7be528ce7a451 100644 --- a/objectstore/RetrieveToFileRequest.cpp +++ b/objectstore/RetrieveToFileRequest.cpp @@ -16,4 +16,89 @@ * along with this program. If not, see <http://www.gnu.org/licenses/>. */ -#include "RetrieveToFileRequest.hpp" \ No newline at end of file +#include "RetrieveToFileRequest.hpp" +#include "GenericObject.hpp" +#include "CreationLog.hpp" + +cta::objectstore::RetrieveToFileRequest::RetrieveToFileRequest( + const std::string& address, Backend& os): + ObjectOps<serializers::RetrieveToFileRequest>(os, address) { } + +cta::objectstore::RetrieveToFileRequest::RetrieveToFileRequest(GenericObject& go): + ObjectOps<serializers::RetrieveToFileRequest>(go.objectStore()) { + // Here we transplant the generic object into the new object + go.transplantHeader(*this); + // And interpret the header. + getPayloadFromHeader(); +} + +void cta::objectstore::RetrieveToFileRequest::initialize() { + // Setup underlying object + ObjectOps<serializers::RetrieveToFileRequest>::initialize(); + // This object is good to go (to storage) + m_payloadInterpreted = true; +} + +void cta::objectstore::RetrieveToFileRequest::addJob(uint16_t copyNumber, + const std::string& tape, const std::string& tapeaddress) { + checkPayloadWritable(); + auto *j = m_payload.add_jobs(); + j->set_copynb(copyNumber); + j->set_status(serializers::RetrieveJobStatus::RJS_LinkingToTape); + j->set_tape(tape); + j->set_tapeaddress(tapeaddress); + j->set_totalretries(0); + j->set_retrieswithinmount(0); +} + +void cta::objectstore::RetrieveToFileRequest::setArchiveFile( + const std::string& archiveFile) { + checkPayloadWritable(); + m_payload.set_archivefile(archiveFile); +} + +void cta::objectstore::RetrieveToFileRequest::setRemoteFile( + const std::string& remoteFile) { + checkHeaderReadable(); + m_payload.set_remotefile(remoteFile); +} + +void cta::objectstore::RetrieveToFileRequest::setPriority(uint64_t priority) { + checkPayloadWritable(); + m_payload.set_priority(priority); +} + +void cta::objectstore::RetrieveToFileRequest::setLog( + const objectstore::CreationLog& creationLog) { + checkPayloadReadable(); + creationLog.serialize(*m_payload.mutable_log()); +} + +void cta::objectstore::RetrieveToFileRequest::setRetrieveToDirRequestAddress( + const std::string& dirRequestAddress) { + checkPayloadWritable(); + m_payload.set_retrievetodiraddress(dirRequestAddress); +} + +auto cta::objectstore::RetrieveToFileRequest::dumpJobs() -> std::list<JobDump> { + checkPayloadReadable(); + std::list<JobDump> ret; + auto & jl = m_payload.jobs(); + for (auto j=jl.begin(); j!=jl.end(); j++) { + ret.push_back(JobDump()); + ret.back().copyNb = j->copynb(); + ret.back().tape = j->tape(); + ret.back().tapeAddress = j->tapeaddress(); + } + return ret; +} + +uint64_t cta::objectstore::RetrieveToFileRequest::getSize() { + checkPayloadWritable(); + return m_payload.size(); +} + +void cta::objectstore::RetrieveToFileRequest::setSize(uint64_t size) { + checkPayloadWritable(); + m_payload.set_size(size); +} \ No newline at end of file diff --git a/objectstore/RetrieveToFileRequest.hpp b/objectstore/RetrieveToFileRequest.hpp index a5c2cab3d57bc2bea57b34776993fe7d92c21001..c2d95ce6bfb45c84331d82c536812890bb5a946d 100644 --- a/objectstore/RetrieveToFileRequest.hpp +++ b/objectstore/RetrieveToFileRequest.hpp @@ -37,10 +37,14 @@ public: void addJob(uint16_t copyNumber, const std::string & tape, const std::string & tapeaddress); void setArchiveFile(const std::string & archiveFile); + std::string getArchiveFile(); void setRemoteFile (const std::string & remoteFile); + std::string getRemoteFile(); void setPriority (uint64_t priority); void setLog (const objectstore::CreationLog& creationLog); void setRetrieveToDirRequestAddress(const std::string & dirRequestAddress); + void setSize(uint64_t size); + uint64_t getSize(); class JobDump { public: uint16_t copyNb; diff --git a/objectstore/Tape.cpp b/objectstore/Tape.cpp index dc3b27e9f82bec66084ff12c8efaa91e903726ad..4f88c97c1978135482bc6a385743b604ccb744f3 100644 --- a/objectstore/Tape.cpp +++ b/objectstore/Tape.cpp @@ -91,7 +91,10 @@ std::string cta::objectstore::Tape::dump() { return ret.str(); } - - - - +void cta::objectstore::Tape::addJob(const RetrieveToFileRequest::JobDump& job, + const std::string & retrieveToFileAddress, uint64_t size) { + checkPayloadWritable(); + auto * j = m_payload.add_retrievaljobs(); + j->set_address(retrieveToFileAddress); + j->set_size(size); +} diff --git a/objectstore/Tape.hpp b/objectstore/Tape.hpp index ca188a48c8e76b66c6f1253bd5680922df72263f..c0c2cabf800e4f7c19f7a871fd958d0cf60b9e23 100644 --- a/objectstore/Tape.hpp +++ b/objectstore/Tape.hpp @@ -20,6 +20,7 @@ #include "ObjectOps.hpp" #include "objectstore/cta.pb.h" +#include "RetrieveToFileRequest.hpp" namespace cta { namespace objectstore { @@ -38,6 +39,10 @@ public: void removeIfEmpty(); std::string dump(); + // Retrieval jobs management ================================================== + void addJob(const RetrieveToFileRequest::JobDump & job, + const std::string & retrieveToFileAddress, uint64_t size); + // -- Stored data counting --------------------------------------------------- uint64_t getStoredData(); std::string getVid(); diff --git a/objectstore/cta.proto b/objectstore/cta.proto index 9bd1627b78481aed192c9a237e277ef10d198cb6..1f6f858a53c37f9f2602221a66e312c3898e9ed8 100644 --- a/objectstore/cta.proto +++ b/objectstore/cta.proto @@ -10,6 +10,7 @@ enum ObjectType { DriveRegister_t = 4; Tape_t = 5; ArchiveToFileRequest_t = 6; + RetrieveToFileRequest_t = 7; GenericObject_t = 1000; } @@ -155,7 +156,7 @@ message ArchiveJobPointer { required string address = 3002; } -message RetrievalJobPointer { +message RetrieveJobPointer { required uint64 size = 3101; required string address = 3102; } @@ -262,10 +263,11 @@ message RetrieveJobEntry { message RetrieveToFileRequest { required string remotefile = 4800; required string archivefile = 4801; - repeated RetrieveJobEntry routes = 4802; - required uint64 priority = 4803; - required CreationLog log = 4804; - optional string retrievetodiraddress = 4805; + required uint64 size = 4802; + repeated RetrieveJobEntry jobs = 4803; + required uint64 priority = 4804; + required CreationLog log = 4805; + optional string retrievetodiraddress = 4806; } message RetrieveToDirRequest { diff --git a/scheduler/OStoreDB/OStoreDB.cpp b/scheduler/OStoreDB/OStoreDB.cpp index 455e73cd177fcb86260bbc2581230793cb3fd71d..989ba487e1b4551fd980232f4851c819da1898a3 100644 --- a/scheduler/OStoreDB/OStoreDB.cpp +++ b/scheduler/OStoreDB/OStoreDB.cpp @@ -527,7 +527,18 @@ void OStoreDB::queue(const cta::RetrieveToFileRequest& rqst) { objectstore::Tape tp(j->tapeAddress, m_objectStore); ScopedExclusiveLock tpl(tp); tp.fetch(); - //tp.addJob(j); + tp.addJob(*j, rtfr.getAddressIfSet(), rtfr.getSize()); + tp.commit(); + } + // The request is now fully set. As it's multi-owned, we do not set the owner + rtfr.setOwner(""); + rtfr.commit(); + // And remove reference from the agent + { + objectstore::ScopedExclusiveLock al(*m_agent); + m_agent->fetch(); + m_agent->removeFromOwnership(rtfr.getAddressIfSet()); + m_agent->commit(); } }