diff --git a/catalogue/TapeFileWritten.hpp b/catalogue/TapeFileWritten.hpp index d5da0e3c6a50ce84e70915cc44bee0d5fdaea8af..2c6e0ca061bc216cbf66555d3b0b6b6c0ae294ac 100644 --- a/catalogue/TapeFileWritten.hpp +++ b/catalogue/TapeFileWritten.hpp @@ -54,7 +54,7 @@ struct TapeFileWritten { * The instance name of the source disk system. */ std::string diskInstance; - + /** * The identifier of the source disk file which is unique within it's host * disk system. Two files from different disk systems may have the same @@ -62,7 +62,7 @@ struct TapeFileWritten { * globally unique, in other words unique within the CTA catalogue. */ std::string diskFileId; - + /** * The path of the source disk file within its host disk system. */ diff --git a/common/dataStructures/ArchiveFileQueueCriteria.hpp b/common/dataStructures/ArchiveFileQueueCriteria.hpp index c662b5742f8886c898900cd214b5b54bb404e802..f0657451c1cff8577bf58bfa672e8cb7646bab47 100644 --- a/common/dataStructures/ArchiveFileQueueCriteria.hpp +++ b/common/dataStructures/ArchiveFileQueueCriteria.hpp @@ -52,6 +52,7 @@ struct ArchiveFileQueueCriteria { const TapeCopyToPoolMap ©ToPoolMap, const MountPolicy &mountPolicy); + // TODO: rename to archiveFileId? /** * The unique archive-file identifier. */ diff --git a/objectstore/ArchiveQueue.cpp b/objectstore/ArchiveQueue.cpp index 69af9851051c3a26eae20e1baea4a69ce50e1017..2a56c4cf479ee5f25524186054b66c2132d5617f 100644 --- a/objectstore/ArchiveQueue.cpp +++ b/objectstore/ArchiveQueue.cpp @@ -43,7 +43,7 @@ std::string ArchiveQueue::dump() { ret << "ArchiveQueue" << std::endl; struct json_object * jo = json_object_new_object(); - json_object_object_add(jo, "name", json_object_new_string(m_payload.tapepoolname().c_str())); + json_object_object_add(jo, "name", json_object_new_string(m_payload.tapepool().c_str())); json_object_object_add(jo, "ArchiveJobsTotalSize", json_object_new_int64(m_payload.archivejobstotalsize())); json_object_object_add(jo, "oldestJobCreationTime", json_object_new_int64(m_payload.oldestjobcreationtime())); { @@ -91,7 +91,7 @@ void ArchiveQueue::initialize(const std::string& name) { // Setup underlying object ObjectOps<serializers::ArchiveQueue, serializers::ArchiveQueue_t>::initialize(); // Setup the object so it's valid - m_payload.set_tapepoolname(name); + m_payload.set_tapepool(name); // set the archive jobs counter to zero m_payload.set_archivejobstotalsize(0); m_payload.set_oldestjobcreationtime(0); @@ -142,14 +142,14 @@ void ArchiveQueue::garbageCollect(const std::string &presumedOwner) { remove(); } -void ArchiveQueue::setName(const std::string& name) { +void ArchiveQueue::setTapePool(const std::string& name) { checkPayloadWritable(); - m_payload.set_tapepoolname(name); + m_payload.set_tapepool(name); } -std::string ArchiveQueue::getName() { +std::string ArchiveQueue::getTapePool() { checkPayloadReadable(); - return m_payload.tapepoolname(); + return m_payload.tapepool(); } void ArchiveQueue::addJob(const ArchiveRequest::JobDump& job, @@ -218,6 +218,7 @@ bool ArchiveQueue::addJobIfNecessary( } void ArchiveQueue::removeJob(const std::string& archiveToFileAddress) { + // TODO: remove the summary of the job from the queue's counts. checkPayloadWritable(); auto * jl=m_payload.mutable_pendingarchivejobs(); bool found = false; @@ -246,9 +247,10 @@ auto ArchiveQueue::dumpJobs() -> std::list<ArchiveQueue::JobDump> { auto & jl=m_payload.pendingarchivejobs(); for (auto j=jl.begin(); j!=jl.end(); j++) { ret.push_back(JobDump()); - ret.back().address = j->address(); - ret.back().size = j->size(); - ret.back().copyNb = j->copynb(); + JobDump & jd = ret.back(); + jd.address = j->address(); + jd.size = j->size(); + jd.copyNb = j->copynb(); } return ret; } diff --git a/objectstore/ArchiveQueue.hpp b/objectstore/ArchiveQueue.hpp index 6b78795d2ad678bde5b7ad22513e14fab7dab20c..26da4bcf460697986f3b127f7eaaeb58294dc0f7 100644 --- a/objectstore/ArchiveQueue.hpp +++ b/objectstore/ArchiveQueue.hpp @@ -34,6 +34,8 @@ namespace cta { namespace objectstore { class GenericObject; class ArchiveQueue: public ObjectOps<serializers::ArchiveQueue, serializers::ArchiveQueue_t> { + // TODO: rename tapepoolname field to tapepool (including in probuf) + public: // Constructor ArchiveQueue(const std::string & address, Backend & os); @@ -44,9 +46,9 @@ public: // In memory initialiser void initialize(const std::string & name); - // Set/get name - void setName(const std::string & name); - std::string getName(); + // Set/get tape pool + void setTapePool(const std::string & name); + std::string getTapePool(); // Archive jobs management =================================================== void addJob(const ArchiveRequest::JobDump & job, diff --git a/objectstore/ArchiveRequest.cpp b/objectstore/ArchiveRequest.cpp index de3ddc8368abaf6d23fe9749f654eb9051aa7c85..bca1bc667eafa1a1b0a7a7c550e7c0382f7e822b 100644 --- a/objectstore/ArchiveRequest.cpp +++ b/objectstore/ArchiveRequest.cpp @@ -131,52 +131,43 @@ void cta::objectstore::ArchiveRequest::setAllJobsPendingNSdeletion() { } } -//------------------------------------------------------------------------------ -// setArchiveFileID -//------------------------------------------------------------------------------ -void cta::objectstore::ArchiveRequest::setArchiveFileID(const uint64_t archiveFileID) { - checkPayloadWritable(); - m_payload.set_archivefileid(archiveFileID); -} - -//------------------------------------------------------------------------------ -// getArchiveFileID -//------------------------------------------------------------------------------ -uint64_t cta::objectstore::ArchiveRequest::getArchiveFileID() { - checkPayloadReadable(); - return m_payload.archivefileid(); -} - -//------------------------------------------------------------------------------ -// setChecksumType -//------------------------------------------------------------------------------ -void cta::objectstore::ArchiveRequest::setChecksumType(const std::string &checksumType) { - checkPayloadWritable(); - m_payload.set_checksumtype(checksumType); -} -//------------------------------------------------------------------------------ -// getChecksumType -//------------------------------------------------------------------------------ -std::string cta::objectstore::ArchiveRequest::getChecksumType() { - checkPayloadReadable(); - return m_payload.checksumtype(); -} -//------------------------------------------------------------------------------ -// setChecksumValue -//------------------------------------------------------------------------------ -void cta::objectstore::ArchiveRequest::setChecksumValue(const std::string &checksumValue) { +void ArchiveRequest::setArchiveFile(const cta::common::dataStructures::ArchiveFile& archiveFile) { checkPayloadWritable(); - m_payload.set_checksumvalue(checksumValue); -} - -//------------------------------------------------------------------------------ -// getChecksumValue -//------------------------------------------------------------------------------ -std::string cta::objectstore::ArchiveRequest::getChecksumValue() { + // TODO: factor out the archivefile structure from the flat ArchiveRequest. + m_payload.set_archivefileid(archiveFile.archiveFileID); + m_payload.set_checksumtype(archiveFile.checksumType); + m_payload.set_checksumvalue(archiveFile.checksumValue); + m_payload.set_creationtime(archiveFile.creationTime); + m_payload.set_diskfileid(archiveFile.diskFileId); + m_payload.mutable_diskfileinfo()->set_group(archiveFile.diskFileInfo.group); + m_payload.mutable_diskfileinfo()->set_owner(archiveFile.diskFileInfo.owner); + m_payload.mutable_diskfileinfo()->set_path(archiveFile.diskFileInfo.path); + m_payload.mutable_diskfileinfo()->set_recoveryblob(archiveFile.diskFileInfo.recoveryBlob); + m_payload.set_diskinstance(archiveFile.diskInstance); + m_payload.set_filesize(archiveFile.fileSize); + m_payload.set_reconcilationtime(archiveFile.reconciliationTime); + m_payload.set_storageclass(archiveFile.storageClass); +} + +cta::common::dataStructures::ArchiveFile ArchiveRequest::getArchiveFile() { checkPayloadReadable(); - return m_payload.checksumvalue(); + cta::common::dataStructures::ArchiveFile ret; + ret.archiveFileID = m_payload.archivefileid(); + ret.checksumType = m_payload.checksumtype(); + ret.checksumValue = m_payload.checksumvalue(); + ret.creationTime = m_payload.creationtime(); + ret.diskFileId = m_payload.diskfileid(); + ret.diskFileInfo.group = m_payload.diskfileinfo().group(); + ret.diskFileInfo.owner = m_payload.diskfileinfo().owner(); + ret.diskFileInfo.path = m_payload.diskfileinfo().path(); + ret.diskFileInfo.recoveryBlob = m_payload.diskfileinfo().recoveryblob(); + ret.diskInstance = m_payload.diskinstance(); + ret.fileSize = m_payload.filesize(); + ret.reconciliationTime = m_payload.reconcilationtime(); + ret.storageClass = m_payload.storageclass(); + return ret; } //------------------------------------------------------------------------------ @@ -230,86 +221,13 @@ cta::common::dataStructures::MountPolicy cta::objectstore::ArchiveRequest::getMo checkPayloadReadable(); cta::common::dataStructures::MountPolicy mountPolicy; auto payloadMountPolicy = m_payload.mountpolicy(); + mountPolicy.name=payloadMountPolicy.name(); mountPolicy.maxDrivesAllowed=payloadMountPolicy.maxdrives(); mountPolicy.archiveMinRequestAge=payloadMountPolicy.minrequestage(); mountPolicy.archivePriority=payloadMountPolicy.priority(); return mountPolicy; } -//------------------------------------------------------------------------------ -// setDiskFileInfo -//------------------------------------------------------------------------------ -void cta::objectstore::ArchiveRequest::setDiskFileInfo(const cta::common::dataStructures::DiskFileInfo &diskFileInfo) { - checkPayloadWritable(); - auto payloadDiskFileInfo = m_payload.mutable_diskfileinfo(); - payloadDiskFileInfo->set_recoveryblob(diskFileInfo.recoveryBlob); - payloadDiskFileInfo->set_group(diskFileInfo.group); - payloadDiskFileInfo->set_owner(diskFileInfo.owner); - payloadDiskFileInfo->set_path(diskFileInfo.path); -} - -//------------------------------------------------------------------------------ -// getDiskFileInfo -//------------------------------------------------------------------------------ -cta::common::dataStructures::DiskFileInfo cta::objectstore::ArchiveRequest::getDiskFileInfo() { - checkPayloadReadable(); - cta::common::dataStructures::DiskFileInfo diskFileInfo; - auto payloadDiskFileInfo = m_payload.diskfileinfo(); - diskFileInfo.recoveryBlob=payloadDiskFileInfo.recoveryblob(); - diskFileInfo.group=payloadDiskFileInfo.group(); - diskFileInfo.owner=payloadDiskFileInfo.owner(); - diskFileInfo.path=payloadDiskFileInfo.path(); - return diskFileInfo; -} - -//------------------------------------------------------------------------------ -// setDiskFileID -//------------------------------------------------------------------------------ -void cta::objectstore::ArchiveRequest::setDiskFileID(const std::string &diskFileID) { - checkPayloadWritable(); - m_payload.set_diskfileid(diskFileID); -} - -//------------------------------------------------------------------------------ -// getDiskFileID -//------------------------------------------------------------------------------ -std::string cta::objectstore::ArchiveRequest::getDiskFileID() { - checkPayloadReadable(); - return m_payload.diskfileid(); -} - -//------------------------------------------------------------------------------ -// setInstance -//------------------------------------------------------------------------------ -void cta::objectstore::ArchiveRequest::setInstance(const std::string &instance) { - checkPayloadWritable(); - m_payload.set_instance(instance); -} - -//------------------------------------------------------------------------------ -// getInstance -//------------------------------------------------------------------------------ -std::string cta::objectstore::ArchiveRequest::getInstance() { - checkPayloadReadable(); - return m_payload.instance(); -} - -//------------------------------------------------------------------------------ -// setFileSize -//------------------------------------------------------------------------------ -void cta::objectstore::ArchiveRequest::setFileSize(const uint64_t fileSize) { - checkPayloadWritable(); - m_payload.set_filesize(fileSize); -} - -//------------------------------------------------------------------------------ -// getFileSize -//------------------------------------------------------------------------------ -uint64_t cta::objectstore::ArchiveRequest::getFileSize() { - checkPayloadReadable(); - return m_payload.filesize(); -} - //------------------------------------------------------------------------------ // setRequester //------------------------------------------------------------------------------ @@ -348,22 +266,6 @@ std::string ArchiveRequest::getSrcURL() { return m_payload.srcurl(); } -//------------------------------------------------------------------------------ -// setStorageClass -//------------------------------------------------------------------------------ -void ArchiveRequest::setStorageClass(const std::string &storageClass) { - checkPayloadWritable(); - m_payload.set_storageclass(storageClass); -} - -//------------------------------------------------------------------------------ -// getStorageClass -//------------------------------------------------------------------------------ -std::string ArchiveRequest::getStorageClass() { - checkPayloadReadable(); - return m_payload.storageclass(); -} - //------------------------------------------------------------------------------ // setCreationLog //------------------------------------------------------------------------------ @@ -506,6 +408,16 @@ void ArchiveRequest::setJobOwner( throw NoSuchJob("In ArchiveRequest::setJobOwner: no such job"); } +std::string ArchiveRequest::getJobOwner(uint16_t copyNumber) { + checkPayloadReadable(); + auto jl = m_payload.jobs(); + auto j=std::find_if(jl.begin(), jl.end(), [&](decltype(*jl.begin())& j2){ return j2.copynb() == copyNumber; }); + if (jl.end() == j) + throw NoSuchJob("In ArchiveRequest::getJobOwner: no such job"); + return j->owner(); +} + + bool ArchiveRequest::finishIfNecessary() { checkPayloadWritable(); // This function is typically called after changing the status of one job @@ -534,7 +446,7 @@ std::string ArchiveRequest::dump() { json_object_object_add(jo, "diskpoolname", json_object_new_string(m_payload.diskpoolname().c_str())); json_object_object_add(jo, "diskpoolthroughput", json_object_new_int64(m_payload.diskpoolthroughput())); json_object_object_add(jo, "diskfileid", json_object_new_string(m_payload.diskfileid().c_str())); - json_object_object_add(jo, "instance", json_object_new_string(m_payload.instance().c_str())); + json_object_object_add(jo, "instance", json_object_new_string(m_payload.diskinstance().c_str())); json_object_object_add(jo, "filesize", json_object_new_int64(m_payload.filesize())); json_object_object_add(jo, "srcurl", json_object_new_string(m_payload.srcurl().c_str())); json_object_object_add(jo, "storageclass", json_object_new_string(m_payload.storageclass().c_str())); diff --git a/objectstore/ArchiveRequest.hpp b/objectstore/ArchiveRequest.hpp index c9cda8b50d1f3002931ea2dedf12e6f2b4c006c0..c78b9e36bf9c6420cc06f309977852e773fd9bd8 100644 --- a/objectstore/ArchiveRequest.hpp +++ b/objectstore/ArchiveRequest.hpp @@ -23,6 +23,7 @@ #include "common/dataStructures/EntryLog.hpp" #include "common/dataStructures/MountPolicy.hpp" #include "common/dataStructures/UserIdentity.hpp" +#include "common/dataStructures/ArchiveFile.hpp" #include "ObjectOps.hpp" #include "objectstore/cta.pb.h" #include <list> @@ -60,18 +61,15 @@ public: CTA_GENERATE_EXCEPTION_CLASS(NoSuchJob); // Set a job ownership void setJobOwner(uint16_t copyNumber, const std::string & owner); + // Get a job owner + std::string getJobOwner(uint16_t copyNumber); // Request management ======================================================== void setSuccessful(); void setFailed(); // =========================================================================== - void setArchiveFileID(const uint64_t archiveFileID); - uint64_t getArchiveFileID(); - - void setChecksumType(const std::string &checksumType); - std::string getChecksumType(); - - void setChecksumValue(const std::string &checksumValue); - std::string getChecksumValue(); + // TODO: ArchiveFile comes with extraneous information. + void setArchiveFile(const cta::common::dataStructures::ArchiveFile& archiveFile); + cta::common::dataStructures::ArchiveFile getArchiveFile(); void setDiskpoolName(const std::string &diskpoolName); std::string getDiskpoolName(); @@ -79,27 +77,13 @@ public: void setDiskpoolThroughput(const uint64_t diskpoolThroughput); uint64_t getDiskpoolThroughput(); - void setDiskFileInfo(const cta::common::dataStructures::DiskFileInfo &diskFileInfo); - cta::common::dataStructures::DiskFileInfo getDiskFileInfo(); - - void setDiskFileID(const std::string &diskFileID); - std::string getDiskFileID(); - - void setInstance(const std::string &instance); - std::string getInstance(); - - void setFileSize(const uint64_t fileSize); - uint64_t getFileSize(); - void setRequester(const cta::common::dataStructures::UserIdentity &requester); cta::common::dataStructures::UserIdentity getRequester(); void setSrcURL(const std::string &srcURL); std::string getSrcURL(); - void setStorageClass(const std::string &storageClass); - std::string getStorageClass(); - + // TODO: rename void setCreationLog(const cta::common::dataStructures::EntryLog &creationLog); cta::common::dataStructures::EntryLog getCreationLog(); diff --git a/objectstore/GarbageCollectorTest.cpp b/objectstore/GarbageCollectorTest.cpp index c5cfc5631ee3769d40c8e4d9e90b088ffbe4c9ef..8136bc1a96c1758cfbb0d84fc94e09beaf4c05ac 100644 --- a/objectstore/GarbageCollectorTest.cpp +++ b/objectstore/GarbageCollectorTest.cpp @@ -307,23 +307,27 @@ TEST(ObjectStore, GarbageCollectorArchiveRequest) { // collector. cta::objectstore::ArchiveRequest ar(atfrAddr, be); ar.initialize(); - ar.setArchiveFileID(123456789L); - ar.setDiskFileID("eos://diskFile"); + cta::common::dataStructures::ArchiveFile aFile; + aFile.archiveFileID = 123456789L; + aFile.diskFileId = "eos://diskFile"; + aFile.checksumType = ""; + aFile.checksumValue = ""; + aFile.creationTime = 0; + aFile.reconciliationTime = 0; + aFile.diskFileInfo = cta::common::dataStructures::DiskFileInfo(); + aFile.diskInstance = "eoseos"; + aFile.fileSize = 667; + aFile.storageClass = "sc"; + ar.setArchiveFile(aFile); ar.addJob(1, "ArchiveQueue0", tpAddr[0], 1, 1); ar.addJob(2, "ArchiveQueue1", tpAddr[1], 1, 1); ar.setOwner(agA.getAddressIfSet()); cta::common::dataStructures::MountPolicy mp; ar.setMountPolicy(mp); - ar.setChecksumType(""); - ar.setChecksumValue(""); ar.setDiskpoolName(""); ar.setDiskpoolThroughput(666); - ar.setDiskFileInfo(cta::common::dataStructures::DiskFileInfo()); - ar.setInstance("eoseos"); - ar.setFileSize(667); ar.setRequester(cta::common::dataStructures::UserIdentity("user0", "group0")); ar.setSrcURL("root://eoseos/myFile"); - ar.setStorageClass("sc"); ar.setCreationLog(cta::common::dataStructures::EntryLog("user0", "host0", time(nullptr))); ar.insert(); cta::objectstore::ScopedExclusiveLock atfrl(ar); @@ -349,7 +353,7 @@ TEST(ObjectStore, GarbageCollectorArchiveRequest) { policy.archiveMinRequestAge = 0; policy.archivePriority = 1; policy.maxDrivesAllowed = 1; - aq.addJob(jd, ar.getAddressIfSet(), ar.getArchiveFileID(), 1000+pass, policy, time(NULL)); + aq.addJob(jd, ar.getAddressIfSet(), ar.getArchiveFile().archiveFileID, 1000+pass, policy, time(NULL)); aq.commit(); } if (pass < 4) { pass++; continue; } @@ -367,7 +371,7 @@ TEST(ObjectStore, GarbageCollectorArchiveRequest) { policy.archiveMinRequestAge = 0; policy.archivePriority = 1; policy.maxDrivesAllowed = 1; - aq.addJob(jd, ar.getAddressIfSet(), ar.getArchiveFileID(), 1000+pass, policy, time(NULL)); + aq.addJob(jd, ar.getAddressIfSet(), ar.getArchiveFile().archiveFileID, 1000+pass, policy, time(NULL)); aq.commit(); } if (pass < 5) { pass++; continue; } diff --git a/objectstore/GenericObject.cpp b/objectstore/GenericObject.cpp index 6b16d746f0290c9de0d0fd0359d8eeb3fe3cb961..c0ac21b6bc08624e11eafec354b99e00fe64e7c5 100644 --- a/objectstore/GenericObject.cpp +++ b/objectstore/GenericObject.cpp @@ -65,7 +65,7 @@ void GenericObject::transplantHeader(ObjectOpsBase& destination) { destination.m_name = m_name; destination.m_nameSet = m_nameSet; destination.m_payloadInterpreted = false; - } +} Backend& GenericObject::objectStore() { return m_objectStore; diff --git a/objectstore/ObjectOps.hpp b/objectstore/ObjectOps.hpp index ad61b9aae3374bdff4ac79f3e3bf3a99907c438c..90ae7d6af47d0ae8a508d5235c57d4d0cbdc69d3 100644 --- a/objectstore/ObjectOps.hpp +++ b/objectstore/ObjectOps.hpp @@ -45,6 +45,7 @@ public: CTA_GENERATE_EXCEPTION_CLASS(NotFetched); CTA_GENERATE_EXCEPTION_CLASS(NotInitialized); CTA_GENERATE_EXCEPTION_CLASS(AddressAlreadySet); + CTA_GENERATE_EXCEPTION_CLASS(InvalidAddress); protected: void checkHeaderWritable() { if (!m_headerInterpreted) @@ -85,6 +86,8 @@ public: void setAddress(const std::string & name) { if (m_nameSet) throw AddressAlreadySet("In ObjectOps::setName: trying to overwrite an already set name"); + if (name.empty()) + throw InvalidAddress("In ObjectOps::setName: empty name"); m_name = name; m_nameSet = true; } @@ -164,6 +167,7 @@ public: CTA_GENERATE_EXCEPTION_CLASS(AlreadyLocked); CTA_GENERATE_EXCEPTION_CLASS(NotLocked); + CTA_GENERATE_EXCEPTION_CLASS(MissingAddress); protected: ScopedLock(): m_objectOps(NULL), m_locked(false) {} @@ -178,6 +182,13 @@ protected: if (!m_locked) throw NotLocked("In ScopedLock::checkLocked: trying to unlock an unlocked lock"); } + void checkObjectAndAddressSet() { + if (!m_objectOps) { + throw MissingAddress("In ScopedLock::checkAddressSet: trying to lock a NULL object"); + } else if (!m_objectOps->m_nameSet || m_objectOps->m_name.empty()) { + throw MissingAddress("In ScopedLock::checkAddressSet: trying to lock an object without address"); + } + } virtual void releaseIfNeeded() { if(!m_locked) return; m_lock.reset(NULL); @@ -195,6 +206,7 @@ public: void lock(ObjectOpsBase & oo) { checkNotLocked(); m_objectOps = & oo; + checkObjectAndAddressSet(); m_lock.reset(m_objectOps->m_objectStore.lockShared(m_objectOps->getAddressIfSet())); m_objectOps->m_locksCount++; m_locked = true; @@ -210,6 +222,7 @@ public: void lock(ObjectOpsBase & oo) { checkNotLocked(); m_objectOps = &oo; + checkObjectAndAddressSet(); m_lock.reset(m_objectOps->m_objectStore.lockExclusive(m_objectOps->getAddressIfSet())); m_objectOps->m_locksCount++; m_objectOps->m_locksForWriteCount++; diff --git a/objectstore/RootEntry.cpp b/objectstore/RootEntry.cpp index d01e825ca696afc6f6f15910034c566228dbe426..2c688688951c537dbe11f1ab1cab247ab5ddc824 100644 --- a/objectstore/RootEntry.cpp +++ b/objectstore/RootEntry.cpp @@ -28,12 +28,14 @@ #include "ProtocolBuffersAlgorithms.hpp" #include <json-c/json.h> +namespace cta { namespace objectstore { + // construtor, when the backend store exists. // Checks the existence and correctness of the root entry -cta::objectstore::RootEntry::RootEntry(Backend & os): +RootEntry::RootEntry(Backend & os): ObjectOps<serializers::RootEntry, serializers::RootEntry_t>(os, "root") {} -cta::objectstore::RootEntry::RootEntry(GenericObject& go): +RootEntry::RootEntry(GenericObject& go): ObjectOps<serializers::RootEntry, serializers::RootEntry_t>(go.objectStore()) { // Here we transplant the generic object into the new object go.transplantHeader(*this); @@ -43,13 +45,13 @@ cta::objectstore::RootEntry::RootEntry(GenericObject& go): // Initialiser. This uses the base object's initialiser and sets the defaults // of payload. -void cta::objectstore::RootEntry::initialize() { +void RootEntry::initialize() { ObjectOps<serializers::RootEntry, serializers::RootEntry_t>::initialize(); // There is nothing to do for the payload. m_payloadInterpreted = true; } -bool cta::objectstore::RootEntry::isEmpty() { +bool RootEntry::isEmpty() { checkPayloadReadable(); if (m_payload.has_driveregisterpointer() && m_payload.driveregisterpointer().address().size()) @@ -69,7 +71,7 @@ bool cta::objectstore::RootEntry::isEmpty() { return true; } -void cta::objectstore::RootEntry::removeIfEmpty() { +void RootEntry::removeIfEmpty() { checkPayloadWritable(); if (!isEmpty()) { throw NotEmpty("In RootEntry::removeIfEmpty(): root entry not empty"); @@ -86,12 +88,12 @@ void cta::objectstore::RootEntry::removeIfEmpty() { // removeOccurences namespace { bool operator==(const std::string &tp, - const cta::objectstore::serializers::ArchiveQueuePointer & tpp) { + const serializers::ArchiveQueuePointer & tpp) { return tpp.name() == tp; } } -std::string cta::objectstore::RootEntry::addOrGetArchiveQueueAndCommit(const std::string& tapePool, Agent& agent) { +std::string RootEntry::addOrGetArchiveQueueAndCommit(const std::string& tapePool, Agent& agent) { checkPayloadWritable(); // Check the tape pool does not already exist try { @@ -128,29 +130,44 @@ std::string cta::objectstore::RootEntry::addOrGetArchiveQueueAndCommit(const std return tapePoolQueueAddress; } -void cta::objectstore::RootEntry::removeArchiveQueueAndCommit(const std::string& tapePool) { +void RootEntry::removeArchiveQueueAndCommit(const std::string& tapePool) { checkPayloadWritable(); // find the address of the tape pool object try { auto tpp = serializers::findElement(m_payload.archivequeuepointers(), tapePool); // Open the tape pool object ArchiveQueue aq (tpp.address(), ObjectOps<serializers::RootEntry, serializers::RootEntry_t>::m_objectStore); - ScopedExclusiveLock tpl(aq); - aq.fetch(); - // Verify this is the tapepool we're looking for. - if (aq.getName() != tapePool) { + ScopedExclusiveLock aql; + try { + aql.lock(aq); + aq.fetch(); + } catch (cta::exception::Exception & ex) { + // The archive queue seems to not be there. Make sure this is the case: + if (aq.exists()) { + // We failed to access the queue, yet it is present. This is an error. + // Let the exception pass through. + throw; + } else { + // The queue object is already gone. We can skip to removing the + // reference from the RootEntry + goto deleteFromRootEntry; + } + } + // Verify this is the archive queue we're looking for. + if (aq.getTapePool() != tapePool) { std::stringstream err; - err << "Unexpected tape pool name found in object pointed to for tape pool: " - << tapePool << " found: " << aq.getName(); - throw WrongTapePoolQueue(err.str()); + err << "Unexpected tape pool name found in archive queue pointed to for tape pool: " + << tapePool << " found: " << aq.getTapePool(); + throw WrongArchiveQueue(err.str()); } - // Check the tape pool is empty + // Check the archive queue is empty if (!aq.isEmpty()) { - throw TapePoolQueueNotEmpty ("In RootEntry::removeTapePoolQueueAndCommit: trying to " + throw ArchivelQueueNotEmpty ("In RootEntry::removeTapePoolQueueAndCommit: trying to " "remove a non-empty tape pool"); } // We can delete the pool aq.remove(); + deleteFromRootEntry: // ... and remove it from our entry serializers::removeOccurences(m_payload.mutable_archivequeuepointers(), tapePool); // We commit for safety and symmetry with the add operation @@ -161,7 +178,7 @@ void cta::objectstore::RootEntry::removeArchiveQueueAndCommit(const std::string& } } -std::string cta::objectstore::RootEntry::getArchiveQueueAddress(const std::string& tapePool) { +std::string RootEntry::getArchiveQueueAddress(const std::string& tapePool) { checkPayloadReadable(); try { auto & tpp = serializers::findElement(m_payload.archivequeuepointers(), tapePool); @@ -171,7 +188,7 @@ std::string cta::objectstore::RootEntry::getArchiveQueueAddress(const std::strin } } -auto cta::objectstore::RootEntry::dumpArchiveQueues() -> std::list<ArchiveQueueDump> { +auto RootEntry::dumpArchiveQueues() -> std::list<ArchiveQueueDump> { checkPayloadReadable(); std::list<ArchiveQueueDump> ret; auto & tpl = m_payload.archivequeuepointers(); @@ -187,7 +204,7 @@ auto cta::objectstore::RootEntry::dumpArchiveQueues() -> std::list<ArchiveQueueD // ================ Drive register manipulation ================================ // ============================================================================= -std::string cta::objectstore::RootEntry::addOrGetDriveRegisterPointerAndCommit( +std::string RootEntry::addOrGetDriveRegisterPointerAndCommit( Agent & agent, const EntryLog & log) { checkPayloadWritable(); // Check if the drive register exists @@ -225,7 +242,7 @@ std::string cta::objectstore::RootEntry::addOrGetDriveRegisterPointerAndCommit( } } -void cta::objectstore::RootEntry::removeDriveRegisterAndCommit() { +void RootEntry::removeDriveRegisterAndCommit() { checkPayloadWritable(); // Get the address of the drive register (nothing to do if there is none) if (!m_payload.has_driveregisterpointer() || @@ -248,7 +265,7 @@ void cta::objectstore::RootEntry::removeDriveRegisterAndCommit() { commit(); } -std::string cta::objectstore::RootEntry::getDriveRegisterAddress() { +std::string RootEntry::getDriveRegisterAddress() { checkPayloadReadable(); if (m_payload.has_driveregisterpointer() && m_payload.driveregisterpointer().address().size()) { @@ -262,7 +279,7 @@ std::string cta::objectstore::RootEntry::getDriveRegisterAddress() { // ================ Agent register manipulation ================================ // ============================================================================= // Get the name of the agent register (or exception if not available) -std::string cta::objectstore::RootEntry::getAgentRegisterAddress() { +std::string RootEntry::getAgentRegisterAddress() { checkPayloadReadable(); // If the registry is defined, return it, job done. if (m_payload.has_agentregisterpointer() && @@ -272,7 +289,7 @@ std::string cta::objectstore::RootEntry::getAgentRegisterAddress() { } // Get the name of a (possibly freshly created) agent register -std::string cta::objectstore::RootEntry::addOrGetAgentRegisterPointerAndCommit(Agent & agent, +std::string RootEntry::addOrGetAgentRegisterPointerAndCommit(Agent & agent, const EntryLog & log) { // Check if the agent register exists try { @@ -318,7 +335,7 @@ std::string cta::objectstore::RootEntry::addOrGetAgentRegisterPointerAndCommit(A } } -void cta::objectstore::RootEntry::removeAgentRegisterAndCommit() { +void RootEntry::removeAgentRegisterAndCommit() { checkPayloadWritable(); // Check that we do have an agent register set. Cleanup a potential intent as // well @@ -353,7 +370,7 @@ void cta::objectstore::RootEntry::removeAgentRegisterAndCommit() { } } -void cta::objectstore::RootEntry::addIntendedAgentRegistry(const std::string& address) { +void RootEntry::addIntendedAgentRegistry(const std::string& address) { checkPayloadWritable(); // We are supposed to have only one intended agent registry at a time. // If we got the lock and there is one entry, this means the previous @@ -361,7 +378,7 @@ void cta::objectstore::RootEntry::addIntendedAgentRegistry(const std::string& ad // When getting here, having a set pointer to the registry is an error. if (m_payload.has_agentregisterpointer() && m_payload.agentregisterpointer().address().size()) { - throw exception::Exception("In cta::objectstore::RootEntry::addIntendedAgentRegistry:" + throw exception::Exception("In RootEntry::addIntendedAgentRegistry:" " pointer to registry already set"); } if (m_payload.agentregisterintent().size()) { @@ -389,7 +406,7 @@ void cta::objectstore::RootEntry::addIntendedAgentRegistry(const std::string& ad // ================ Scheduler global lock manipulation ========================= // ============================================================================= -std::string cta::objectstore::RootEntry::getSchedulerGlobalLock() { +std::string RootEntry::getSchedulerGlobalLock() { checkPayloadReadable(); // If the scheduler lock is defined, return it, job done. if (m_payload.has_schedulerlockpointer() && @@ -399,7 +416,7 @@ std::string cta::objectstore::RootEntry::getSchedulerGlobalLock() { } // Get the name of a (possibly freshly created) scheduler global lock -std::string cta::objectstore::RootEntry::addOrGetSchedulerGlobalLockAndCommit(Agent & agent, +std::string RootEntry::addOrGetSchedulerGlobalLockAndCommit(Agent & agent, const EntryLog & log) { checkPayloadWritable(); // Check if the drive register exists @@ -437,7 +454,7 @@ std::string cta::objectstore::RootEntry::addOrGetSchedulerGlobalLockAndCommit(Ag } } -void cta::objectstore::RootEntry::removeSchedulerGlobalLockAndCommit() { +void RootEntry::removeSchedulerGlobalLockAndCommit() { checkPayloadWritable(); // Get the address of the scheduler lock (nothing to do if there is none) if (!m_payload.has_schedulerlockpointer() || @@ -462,7 +479,7 @@ void cta::objectstore::RootEntry::removeSchedulerGlobalLockAndCommit() { // Dump the root entry -std::string cta::objectstore::RootEntry::dump () { +std::string RootEntry::dump () { checkPayloadReadable(); std::stringstream ret; ret << "RootEntry" << std::endl; @@ -526,3 +543,5 @@ std::string cta::objectstore::RootEntry::dump () { json_object_put(jo); return ret.str(); } + +}} // namespace cta::objectstore diff --git a/objectstore/RootEntry.hpp b/objectstore/RootEntry.hpp index e22bea1a65116ca5f9f54aa8dd70530908b332f8..220a8bce1817e8d93d200ba87cbd7948d9ec0e73 100644 --- a/objectstore/RootEntry.hpp +++ b/objectstore/RootEntry.hpp @@ -51,8 +51,8 @@ public: void removeIfEmpty(); // ArchiveQueue Manipulations ===================================================== - CTA_GENERATE_EXCEPTION_CLASS(TapePoolQueueNotEmpty); - CTA_GENERATE_EXCEPTION_CLASS(WrongTapePoolQueue); + CTA_GENERATE_EXCEPTION_CLASS(ArchivelQueueNotEmpty); + CTA_GENERATE_EXCEPTION_CLASS(WrongArchiveQueue); /** This function implicitly creates the tape pool structure and updates * the pointer to it. It needs to implicitly commit the object to the store. */ std::string addOrGetArchiveQueueAndCommit(const std::string & tapePool, Agent & agent); @@ -60,6 +60,7 @@ public: * Fails if it not empty*/ CTA_GENERATE_EXCEPTION_CLASS(NoSuchTapePoolQueue); void removeArchiveQueueAndCommit(const std::string & tapePool); + void removeArchiveQueueIfAddressMatchesAndCommit(const std::string & tapePool, const std::string & archiveQueueAddress); std::string getArchiveQueueAddress(const std::string & tapePool); class ArchiveQueueDump { public: diff --git a/objectstore/cta.proto b/objectstore/cta.proto index 54c726a99453941728ed9e0ce7f9ea1ac4bb8701..3410b9fab1946aa4de089726509e812d2e09d0b9 100644 --- a/objectstore/cta.proto +++ b/objectstore/cta.proto @@ -340,11 +340,13 @@ message ArchiveRequest { required MountPolicy mountPolicy = 8995; required string checksumtype = 9000; required string checksumvalue = 9010; + required uint64 creationtime = 9015; + required uint64 reconcilationtime = 9017; required string diskpoolName = 9020; required uint64 diskpoolThroughput = 9030; required DiskFileInfo diskFileInfo = 9040; required string diskFileID = 9050; - required string instance = 9055; + required string diskinstance = 9055; required uint64 fileSize = 9060; required User requester = 9070; required string srcURL = 9080; @@ -374,7 +376,7 @@ message ValueCountPair { } message ArchiveQueue { - required string tapepoolname = 10000; + required string tapepool = 10000; repeated ArchiveJobPointer pendingarchivejobs = 10010; repeated ArchiveJobPointer orphanedarchivejobsnscreation = 10020; repeated ArchiveJobPointer orphanedarchivejobsnsdeletion = 10030; diff --git a/scheduler/ArchiveJob.cpp b/scheduler/ArchiveJob.cpp index 7b5f918b324e2127b556736144cd16b43bf53111..dfa77f4bc4b313676e2e43f97b0b44348a0d1660 100644 --- a/scheduler/ArchiveJob.cpp +++ b/scheduler/ArchiveJob.cpp @@ -17,6 +17,7 @@ */ #include "scheduler/ArchiveJob.hpp" +#include "scheduler/ArchiveMount.hpp" #include <limits> //------------------------------------------------------------------------------ @@ -31,11 +32,11 @@ cta::ArchiveJob::~ArchiveJob() throw() { cta::ArchiveJob::ArchiveJob(ArchiveMount &mount, catalogue::Catalogue & catalogue, const common::dataStructures::ArchiveFile &archiveFile, - const RemotePathAndStatus &remotePathAndStatus, + const std::string &srcURL, const common::dataStructures::TapeFile &tapeFile): m_mount(mount), m_catalogue(catalogue), archiveFile(archiveFile), - remotePathAndStatus(remotePathAndStatus), + srcURL(srcURL), tapeFile(tapeFile) {} //------------------------------------------------------------------------------ @@ -47,12 +48,15 @@ void cta::ArchiveJob::complete() { std::numeric_limits<decltype(tapeFile.blockId)>::max()) throw BlockIdNotSet("In cta::ArchiveJob::complete(): Block ID not set"); // Also check the checksum has been set - if (!archiveFile.checksumType.size() || !archiveFile.checksumValue.size()) - throw ChecksumNotSet("In cta::ArchiveJob::complete(): checksum not set"); + if (archiveFile.checksumType.empty() || archiveFile.checksumValue.empty() || + tapeFile.checksumType.empty() || tapeFile.checksumValue.empty()) + throw ChecksumNotSet("In cta::ArchiveJob::complete(): checksums not set"); + // And matches + if (archiveFile.checksumType != tapeFile.checksumType || + archiveFile.checksumValue != tapeFile.checksumValue) + throw ChecksumMismatch("In cta::ArchiveJob::complete(): checksum mismatch"); // We are good to go to record the data in the persistent storage. - // First make the file safe on tape. - m_dbJob->bumpUpTapeFileCount(tapeFile.fSeq); - // Now record the data in the archiveNS. The checksum will be validated if already + // Record the data in the archiveNS. The checksum will be validated if already // present, of inserted if not. catalogue::TapeFileWritten fileReport; fileReport.archiveFileId = archiveFile.archiveFileID; @@ -61,17 +65,18 @@ void cta::ArchiveJob::complete() { fileReport.checksumValue = tapeFile.checksumValue; fileReport.compressedSize = tapeFile.compressedSize; fileReport.copyNb = tapeFile.copyNb; - //TODO fileReport.diskFileGroup - //TODO fileReport.diskFilePath - //TODO fileReport.diskFileRecoveryBlob - //TODO fileReport.diskFileUser - //TODO fileReport.diskInstance + fileReport.diskFileId = archiveFile.diskFileId; + fileReport.diskFileUser = archiveFile.diskFileInfo.owner; + fileReport.diskFileGroup = archiveFile.diskFileInfo.group; + fileReport.diskFilePath = archiveFile.diskFileInfo.path; + fileReport.diskFileRecoveryBlob = archiveFile.diskFileInfo.recoveryBlob; + fileReport.diskInstance = archiveFile.diskInstance; fileReport.fSeq = tapeFile.fSeq; fileReport.size = archiveFile.fileSize; - //TODO fileReport.storageClassName + fileReport.storageClassName = archiveFile.storageClass; + fileReport.tapeDrive = m_mount.getDrive(); fileReport.vid = tapeFile.vid; - //TODO fileReport.tapeDrive - m_catalogue.fileWrittenToTape(fileReport); + m_catalogue.fileWrittenToTape (fileReport); //m_ns.addTapeFile(SecurityIdentity(UserIdentity(std::numeric_limits<uint32_t>::max(), // std::numeric_limits<uint32_t>::max()), ""), archiveFile.fileId, nameServerTapeFile); // We can now record the success for the job in the database diff --git a/scheduler/ArchiveJob.hpp b/scheduler/ArchiveJob.hpp index ce132bd9d00d9ad40211be5b5929df7db1155cf2..988b0522166fef6c00ab998fe83a15328a159102 100644 --- a/scheduler/ArchiveJob.hpp +++ b/scheduler/ArchiveJob.hpp @@ -55,7 +55,7 @@ protected: ArchiveMount &mount, catalogue::Catalogue & catalogue, const common::dataStructures::ArchiveFile &archiveFile, - const RemotePathAndStatus &remotePathAndStatus, + const std::string &srcURL, const common::dataStructures::TapeFile &tapeFile); public: @@ -67,6 +67,7 @@ public: CTA_GENERATE_EXCEPTION_CLASS(BlockIdNotSet); CTA_GENERATE_EXCEPTION_CLASS(ChecksumNotSet); + CTA_GENERATE_EXCEPTION_CLASS(ChecksumMismatch); /** * Indicates that the job was successful and updates the backend store * @@ -115,7 +116,7 @@ public: /** * The remote file information */ - RemotePathAndStatus remotePathAndStatus; + std::string srcURL; /** * The file archive result for the NS diff --git a/scheduler/ArchiveMount.cpp b/scheduler/ArchiveMount.cpp index bdb959a7b029dd401de94fea03ff5fd1815067cd..ca90b7e1b1b932a9baedb8f5e5073f526a40e7ab 100644 --- a/scheduler/ArchiveMount.cpp +++ b/scheduler/ArchiveMount.cpp @@ -52,6 +52,14 @@ std::string cta::ArchiveMount::getVid() const { return m_dbMount->mountInfo.vid; } +//------------------------------------------------------------------------------ +// getDrive +//------------------------------------------------------------------------------ +std::string cta::ArchiveMount::getDrive() const { + return m_dbMount->mountInfo.drive; +} + + //------------------------------------------------------------------------------ // getPoolName //------------------------------------------------------------------------------ @@ -90,7 +98,7 @@ std::unique_ptr<cta::ArchiveJob> cta::ArchiveMount::getNextJob() { return std::unique_ptr<cta::ArchiveJob>(); // We have something to archive: prepare the response std::unique_ptr<cta::ArchiveJob> ret(new ArchiveJob(*this, m_catalogue, - dbJob->archiveFile, dbJob->remoteFile, dbJob->tapeFile)); + dbJob->archiveFile, dbJob->srcURL, dbJob->tapeFile)); ret->m_dbJob.reset(dbJob.release()); return ret; } diff --git a/scheduler/ArchiveMount.hpp b/scheduler/ArchiveMount.hpp index ea1b65c24661442dc208584771a5c9df6b082a40..a1e36c68e276e26d147aa628ff5cbda52a96d64d 100644 --- a/scheduler/ArchiveMount.hpp +++ b/scheduler/ArchiveMount.hpp @@ -66,6 +66,12 @@ namespace cta { * @return The volume identifier of the tape to be mounted. */ virtual std::string getVid() const; + + /** + * Returns the drive namn + * @return The drive name + */ + virtual std::string getDrive() const; /** * Returns the mount transaction id. diff --git a/scheduler/OStoreDB/OStoreDB.cpp b/scheduler/OStoreDB/OStoreDB.cpp index 05c6af0483c9c30103c557d23d9fe3bb6ef818b9..b5dc04394b8b44e2bc111565a0cb06b064d152bb 100644 --- a/scheduler/OStoreDB/OStoreDB.cpp +++ b/scheduler/OStoreDB/OStoreDB.cpp @@ -269,22 +269,28 @@ void OStoreDB::queue(const cta::common::dataStructures::ArchiveRequest &request, const cta::common::dataStructures::ArchiveFileQueueCriteria &criteria) { assertAgentSet(); // Construct the return value immediately - cta::objectstore::ArchiveRequest ar(m_agent->nextId("ArchiveRequest"), m_objectStore); - ar.initialize(); - ar.setArchiveFileID(criteria.fileId); - ar.setChecksumType(request.checksumType); - ar.setChecksumValue(request.checksumValue); - ar.setCreationLog(request.creationLog); - ar.setDiskpoolName(request.diskpoolName); - ar.setDiskpoolThroughput(request.diskpoolThroughput); - ar.setDiskFileInfo(request.diskFileInfo); - ar.setDiskFileID(request.diskFileID); - ar.setFileSize(request.fileSize); - ar.setInstance(request.instance); - ar.setMountPolicy(criteria.mountPolicy); - ar.setRequester(request.requester); - ar.setSrcURL(request.srcURL); - ar.setStorageClass(request.storageClass); + cta::objectstore::ArchiveRequest aReq(m_agent->nextId("ArchiveRequest"), m_objectStore); + aReq.initialize(); + // Summarize all as an archiveFile + cta::common::dataStructures::ArchiveFile aFile; + aFile.archiveFileID = criteria.fileId; + aFile.checksumType = request.checksumType; + aFile.checksumValue = request.checksumValue; + // TODO: fully fledged archive file should not be the representation of the request. + aFile.creationTime = std::numeric_limits<decltype(aFile.creationTime)>::min(); + aFile.reconciliationTime = std::numeric_limits<decltype(aFile.creationTime)>::min(); + aFile.diskFileId = request.diskFileID; + aFile.diskFileInfo = request.diskFileInfo; + aFile.diskInstance = request.instance; + aFile.fileSize = request.fileSize; + aFile.storageClass = request.storageClass; + aReq.setArchiveFile(aFile); + aReq.setMountPolicy(criteria.mountPolicy); + aReq.setDiskpoolName(request.diskpoolName); + aReq.setDiskpoolThroughput(request.diskpoolThroughput); + aReq.setRequester(request.requester); + aReq.setSrcURL(request.srcURL); + aReq.setCreationLog(request.creationLog); // We will need to identity tapepools is order to construct the request RootEntry re(m_objectStore); ScopedExclusiveLock rel(re); @@ -294,7 +300,7 @@ void OStoreDB::queue(const cta::common::dataStructures::ArchiveRequest &request, std::string aqaddr = re.addOrGetArchiveQueueAndCommit(copy.second, *m_agent); const uint32_t hardcodedRetriesWithinMount = 3; const uint32_t hardcodedTotalRetries = 6; - ar.addJob(copy.first, copy.second, aqaddr, hardcodedRetriesWithinMount, hardcodedTotalRetries); + aReq.addJob(copy.first, copy.second, aqaddr, hardcodedRetriesWithinMount, hardcodedTotalRetries); jl.push_back(cta::objectstore::ArchiveRequest::JobDump()); jl.back().copyNb = copy.first; jl.back().tapePool = copy.second; @@ -307,29 +313,30 @@ void OStoreDB::queue(const cta::common::dataStructures::ArchiveRequest &request, { objectstore::ScopedExclusiveLock al(*m_agent); m_agent->fetch(); - m_agent->addToOwnership(ar.getAddressIfSet()); + m_agent->addToOwnership(aReq.getAddressIfSet()); m_agent->commit(); } - ar.setOwner(m_agent->getAddressIfSet()); - ar.insert(); - ScopedExclusiveLock arl(ar); + aReq.setOwner(m_agent->getAddressIfSet()); + aReq.insert(); + ScopedExclusiveLock arl(aReq); // We can now enqueue the requests std::list<std::string> linkedTapePools; try { - for (auto &j: ar.dumpJobs()) { + for (auto &j: aReq.dumpJobs()) { objectstore::ArchiveQueue aq(j.ArchiveQueueAddress, m_objectStore); ScopedExclusiveLock aql(aq); aq.fetch(); if (aq.getOwner() != re.getAddressIfSet()) throw NoSuchArchiveQueue("In OStoreDB::queue: non-existing archive queue found " "(dangling pointer): canceling request creation."); - aq.addJob(j, ar.getAddressIfSet(), ar.getArchiveFileID(), - ar.getFileSize(), ar.getMountPolicy(), - ar.getCreationLog().time); + aq.addJob(j, aReq.getAddressIfSet(), aReq.getArchiveFile().archiveFileID, + aReq.getArchiveFile().fileSize, aReq.getMountPolicy(), + aReq.getCreationLog().time); // Now that we have the tape pool handy, get the retry limits from it and // assign them to the job aq.commit(); linkedTapePools.push_back(j.ArchiveQueueAddress); + aReq.setJobOwner(j.copyNb, aq.getAddressIfSet()); } } catch (NoSuchArchiveQueue &) { // Unlink the request from already connected tape pools @@ -337,22 +344,22 @@ void OStoreDB::queue(const cta::common::dataStructures::ArchiveRequest &request, objectstore::ArchiveQueue aq(*tpa, m_objectStore); ScopedExclusiveLock aql(aq); aq.fetch(); - aq.removeJob(ar.getAddressIfSet()); + aq.removeJob(aReq.getAddressIfSet()); aq.commit(); - ar.remove(); + aReq.remove(); } throw; } // The request is now fully set. As it's multi-owned, we do not set the owner, // just to disown it from the agent. - ar.setOwner(""); - ar.commit(); + aReq.setOwner(""); + aReq.commit(); arl.release(); // And remove reference from the agent { objectstore::ScopedExclusiveLock al(*m_agent); m_agent->fetch(); - m_agent->removeFromOwnership(ar.getAddressIfSet()); + m_agent->removeFromOwnership(aReq.getAddressIfSet()); m_agent->commit(); } } @@ -375,7 +382,7 @@ void OStoreDB::deleteArchiveRequest(const common::dataStructures::SecurityIdenti objectstore::ArchiveRequest ar(ajp->address, m_objectStore); ScopedSharedLock atfrl(ar); ar.fetch(); - if (ar.getArchiveFileID() == fileId) { + if (ar.getArchiveFile().archiveFileID == fileId) { atfrl.release(); objectstore::ScopedExclusiveLock al(*m_agent); m_agent->fetch(); @@ -431,7 +438,7 @@ std::unique_ptr<SchedulerDatabase::ArchiveToFileRequestCancelation> objectstore::ArchiveRequest tar(arp->address, m_objectStore); objectstore::ScopedSharedLock tatfrl(tar); tar.fetch(); - if (tar.getArchiveFileID() == fileId) { + if (tar.getArchiveFile().archiveFileID == fileId) { // Point the agent to the request ScopedExclusiveLock agl(*m_agent); m_agent->fetch(); @@ -566,21 +573,21 @@ std::list<cta::common::dataStructures::ArchiveJob> } if (!copyndFound) continue; ret.push_back(cta::common::dataStructures::ArchiveJob()); - ret.back().archiveFileID = osar.getArchiveFileID(); + ret.back().archiveFileID = osar.getArchiveFile().archiveFileID; ret.back().copyNumber = copynb; ret.back().tapePool = tpp.tapePool; - ret.back().request.checksumType = osar.getChecksumType(); - ret.back().request.checksumValue = osar.getChecksumValue(); + ret.back().request.checksumType = osar.getArchiveFile().checksumType; + ret.back().request.checksumValue = osar.getArchiveFile().checksumValue; ret.back().request.creationLog = osar.getCreationLog(); - ret.back().request.diskFileID = osar.getDiskFileID(); + ret.back().request.diskFileID = osar.getArchiveFile().diskFileId; ret.back().request.diskpoolName = osar.getDiskpoolName(); ret.back().request.diskpoolThroughput = osar.getDiskpoolThroughput(); - ret.back().request.diskFileInfo = osar.getDiskFileInfo(); - ret.back().request.fileSize = osar.getFileSize(); - ret.back().request.instance = osar.getInstance(); + ret.back().request.diskFileInfo = osar.getArchiveFile().diskFileInfo; + ret.back().request.fileSize = osar.getArchiveFile().fileSize; + ret.back().request.instance = osar.getArchiveFile().diskInstance; ret.back().request.requester = osar.getRequester(); ret.back().request.srcURL = osar.getSrcURL(); - ret.back().request.storageClass = osar.getStorageClass(); + ret.back().request.storageClass = osar.getArchiveFile().storageClass; } return ret; } @@ -618,21 +625,21 @@ std::map<std::string, std::list<common::dataStructures::ArchiveJob> > } if (!copyndFound) continue; ret[tpp.tapePool].push_back(cta::common::dataStructures::ArchiveJob()); - ret[tpp.tapePool].back().archiveFileID = osar.getArchiveFileID(); + ret[tpp.tapePool].back().archiveFileID = osar.getArchiveFile().archiveFileID; ret[tpp.tapePool].back().copyNumber = copynb; ret[tpp.tapePool].back().tapePool = tpp.tapePool; - ret[tpp.tapePool].back().request.checksumType = osar.getChecksumType(); - ret[tpp.tapePool].back().request.checksumValue = osar.getChecksumValue(); + ret[tpp.tapePool].back().request.checksumType = osar.getArchiveFile().checksumType; + ret[tpp.tapePool].back().request.checksumValue = osar.getArchiveFile().checksumValue; ret[tpp.tapePool].back().request.creationLog = osar.getCreationLog(); - ret[tpp.tapePool].back().request.diskFileID = osar.getDiskFileID(); + ret[tpp.tapePool].back().request.diskFileID = osar.getArchiveFile().diskFileId; ret[tpp.tapePool].back().request.diskpoolName = osar.getDiskpoolName(); ret[tpp.tapePool].back().request.diskpoolThroughput = osar.getDiskpoolThroughput(); - ret[tpp.tapePool].back().request.diskFileInfo = osar.getDiskFileInfo(); - ret[tpp.tapePool].back().request.fileSize = osar.getFileSize(); - ret[tpp.tapePool].back().request.instance = osar.getInstance(); + ret[tpp.tapePool].back().request.diskFileInfo = osar.getArchiveFile().diskFileInfo; + ret[tpp.tapePool].back().request.fileSize = osar.getArchiveFile().fileSize; + ret[tpp.tapePool].back().request.instance = osar.getArchiveFile().diskInstance; ret[tpp.tapePool].back().request.requester = osar.getRequester(); ret[tpp.tapePool].back().request.srcURL = osar.getSrcURL(); - ret[tpp.tapePool].back().request.storageClass = osar.getStorageClass(); + ret[tpp.tapePool].back().request.storageClass = osar.getArchiveFile().storageClass; } } return ret; @@ -645,13 +652,13 @@ void OStoreDB::queue(const cta::common::dataStructures::RetrieveRequest& rqst, // assertAgentSet(); // // Check at least one potential tape copy is provided. // // In order to post the job, construct it first. -// objectstore::RetrieveToFileRequest rtfr(m_agent->nextId("RetrieveToFileRequest"), m_objectStore); -// rtfr.initialize(); -// rtfr.setArchiveFile(rqst.getArchiveFile()); -// rtfr.setRemoteFile(rqst.getRemoteFile()); -// rtfr.setPriority(rqst.priority); -// rtfr.setEntryLog(rqst.entryLog); -// rtfr.setSize(rqst.getArchiveFile().size); +// objectstore::RetrieveRequest rReq(m_agent->nextId("RetrieveToFileRequest"), m_objectStore); +// rReq.initialize(); +// rReq.setArchiveFile(rqst.()); +// rReq.setRemoteFile(rqst.getRemoteFile()); +// rReq.setPriority(rqst.priority); +// rReq.setEntryLog(rqst.entryLog); +// rReq.setSize(rqst.getArchiveFile().size); // // We will need to identity tapes is order to construct the request. // // First load all the tapes information in a memory map // std::map<std::string, std::string> vidToAddress; @@ -677,7 +684,7 @@ void OStoreDB::queue(const cta::common::dataStructures::RetrieveRequest& rqst, // // Add all the tape copies to the request // try { // for (auto tc=rqst.getTapeCopies().begin(); tc!=rqst.getTapeCopies().end(); tc++) { -// rtfr.addJob(*tc, vidToAddress.at(tc->vid)); +// rReq.addJob(*tc, vidToAddress.at(tc->vid)); // } // } catch (std::out_of_range &) { // throw NoSuchTape("In OStoreDB::queue(RetrieveToFile): tape not found"); @@ -730,17 +737,17 @@ void OStoreDB::queue(const cta::common::dataStructures::RetrieveRequest& rqst, // jd.tapeAddress = vidToAddress.at(selectedVid); // jd.fseq = selectedFseq; // jd.blockid = selectedBlockid; -// tp.addJob(jd, rtfr.getAddressIfSet(), rqst.getArchiveFile().size, rqst.priority, rqst.creationLog.time); +// tp.addJob(jd, rReq.getAddressIfSet(), rqst.getArchiveFile().size, rqst.priority, rqst.creationLog.time); // tp.commit(); // } // // The request is now fully set. It belongs to the tape. -// rtfr.setOwner(vidToAddress.at(selectedVid)); -// rtfr.insert(); +// rReq.setOwner(vidToAddress.at(selectedVid)); +// rReq.insert(); // // And remove reference from the agent // { // objectstore::ScopedExclusiveLock al(*m_agent); // m_agent->fetch(); -// m_agent->removeFromOwnership(rtfr.getAddressIfSet()); +// m_agent->removeFromOwnership(rReq.getAddressIfSet()); // m_agent->commit(); // } } @@ -984,20 +991,111 @@ const SchedulerDatabase::ArchiveMount::MountInfo& OStoreDB::ArchiveMount::getMou auto OStoreDB::ArchiveMount::getNextJob() -> std::unique_ptr<SchedulerDatabase::ArchiveJob> { // Find the next file to archive - // Get the tape pool - throw cta::exception::Exception(std::string("Not implemented: ") + __PRETTY_FUNCTION__); + // Get the archive queue objectstore::RootEntry re(m_objectStore); objectstore::ScopedSharedLock rel(re); re.fetch(); - auto tpl = re.dumpArchiveQueues(); + auto aql = re.dumpArchiveQueues(); rel.release(); - std::string tpAddress; - for (auto tpp = tpl.begin(); tpp != tpl.end(); tpp++) { - if (tpp->tapePool == mountInfo.tapePool) - tpAddress = tpp->address; + std::string aqAddress; + for (auto & aqp : aql) { + if (aqp.tapePool == mountInfo.tapePool) + aqAddress = aqp.address; } - if (!tpAddress.size()) - throw NoSuchArchiveQueue("In OStoreDB::ArchiveMount::getNextJob(): tape pool not found"); + // The archive queue is gone, there is no more job + if (!aqAddress.size()) + return std::unique_ptr<SchedulerDatabase::ArchiveJob>(); + // Try and open the archive queue. It could be gone by now. + try { + objectstore::ArchiveQueue aq(aqAddress, m_objectStore); + objectstore::ScopedExclusiveLock aql(aq); + try { + aq.fetch(); + } catch (cta::exception::Exception & ex) { + // The queue is now absent. We can remove its reference in the root entry. + // A new queue could have been added in the mean time, and be non-empty. + // We will then fail to remove from the RootEntry (non-fatal). + // TODO: We still conclude that the queue is empty on this unlikely event. + // (cont'd): A better approach would be to retart the process of this function + // from scratch. + rel.lock(re); + re.fetch(); + try { + re.removeArchiveQueueAndCommit(mountInfo.tapePool); + } catch (RootEntry::ArchivelQueueNotEmpty & ex) { + // TODO: improve: if we fail here we could retry to fetch a job. + return std::unique_ptr<SchedulerDatabase::ArchiveJob>(); + } + } + // Pop jobs until we find one actually belonging to the queue. + // Any job not really belonging is an uncommitted pop, which we will + // re-do here. + while (aq.dumpJobs().size()) { + // First take a lock on and download the job + // If the request is not around anymore, we will just move the the next + // Prepare the return value + auto job=aq.dumpJobs().front(); + std::unique_ptr<OStoreDB::ArchiveJob> privateRet(new OStoreDB::ArchiveJob( + job.address, m_objectStore, m_agent, *this)); + privateRet->tapeFile.copyNb = job.copyNb; + objectstore::ScopedExclusiveLock arl; + try { + arl.lock(privateRet->m_archiveRequest); + privateRet->m_archiveRequest.fetch(); + // If the archive job does not belong to the queue, it's again a missed pop + if (privateRet->m_archiveRequest.getJobOwner(job.copyNb) != aq.getAddressIfSet()) { + aq.removeJob(privateRet->m_archiveRequest.getAddressIfSet()); + continue; + } + } catch (cta::exception::Exception &) { + // we failed to access the object. It might be missing. + // Just pop this job from the pool and move to the next. + aq.removeJob(privateRet->m_archiveRequest.getAddressIfSet()); + // Commit in case we do not pass by again. + aq.commit(); + continue; + } + // Take ownership of the job + // Add to ownership + objectstore::ScopedExclusiveLock al(m_agent); + m_agent.fetch(); + m_agent.addToOwnership(privateRet->m_archiveRequest.getAddressIfSet()); + m_agent.commit(); + al.release(); + // Make the ownership official (for this job within the request) + privateRet->m_archiveRequest.setJobOwner(job.copyNb, m_agent.getAddressIfSet()); + privateRet->m_archiveRequest.commit(); + // Remove the job from the tape pool queue + aq.removeJob(privateRet->m_archiveRequest.getAddressIfSet()); + // We can commit and release the tape pool lock, we will only fill up + // memory structure from here on. + aq.commit(); + aql.release(); + privateRet->archiveFile = privateRet->m_archiveRequest.getArchiveFile(); + privateRet->srcURL = privateRet->m_archiveRequest.getSrcURL(); + privateRet->tapeFile.fSeq = ++nbFilesCurrentlyOnTape; + privateRet->tapeFile.copyNb = job.copyNb; + privateRet->tapeFile.vid = mountInfo.vid; + privateRet->tapeFile.blockId = + std::numeric_limits<decltype(privateRet->tapeFile.blockId)>::max(); + privateRet->m_jobOwned = true; + privateRet->m_mountId = mountInfo.mountId; + privateRet->m_tapePool = mountInfo.tapePool; + return std::unique_ptr<SchedulerDatabase::ArchiveJob> (privateRet.release()); + } + // If we get here, we exhausted the queue. We can now remove it. + // removeArchiveQueueAndCommit is safe, as it checks whether the queue is empty + // before deleting it. + aq.remove(); + objectstore::RootEntry re(m_objectStore); + objectstore::ScopedExclusiveLock rel (re); + re.removeArchiveQueueAndCommit(mountInfo.tapePool); + return std::unique_ptr<SchedulerDatabase::ArchiveJob>(); + } catch (cta::exception::Exception & ex){ + return std::unique_ptr<SchedulerDatabase::ArchiveJob>(); + } +} + // Open the archive queue // objectstore::TapePool tp(tpAddress, m_objectStore); // objectstore::ScopedExclusiveLock tplock(tp); // tp.fetch(); @@ -1051,40 +1149,23 @@ auto OStoreDB::ArchiveMount::getNextJob() -> std::unique_ptr<SchedulerDatabase:: // return std::unique_ptr<SchedulerDatabase::ArchiveJob> (privateRet.release()); // } // return std::unique_ptr<SchedulerDatabase::ArchiveJob>(); -} void OStoreDB::ArchiveMount::complete(time_t completionTime) { - // When the session is complete, we can reset the status of the tape and the - // drive + // When the session is complete, we can reset the status of the drive. + // Tape will be implicitly released // Reset the drive - throw cta::exception::Exception(std::string("Not implemented: ") + __PRETTY_FUNCTION__); -// objectstore::RootEntry re(m_objectStore); -// objectstore::ScopedSharedLock rel(re); -// re.fetch(); -// objectstore::DriveRegister dr(re.getDriveRegisterAddress(), m_objectStore); -// objectstore::ScopedExclusiveLock drl(dr); -// dr.fetch(); -// // Reset the drive state. -// dr.reportDriveStatus(mountInfo.drive, mountInfo.logicalLibrary, -// cta::common::DriveStatus::Up, completionTime, -// cta::MountType::NONE, 0, -// 0, 0, 0, "", ""); -// dr.commit(); -// // Find the tape and unbusy it. -// objectstore::TapePool tp (re.getTapePoolAddress(mountInfo.tapePool), m_objectStore); -// rel.release(); -// objectstore::ScopedSharedLock tpl(tp); -// tp.fetch(); -// objectstore::Tape t(tp.getTapeAddress(mountInfo.vid), m_objectStore); -// objectstore::ScopedExclusiveLock tl(t); -// tpl.release(); -// t.fetch(); -// t.releaseBusy(); -// t.commit(); -// objectstore::ScopedExclusiveLock agl(m_agent); -// m_agent.fetch(); -// m_agent.removeFromOwnership(t.getAddressIfSet()); -// m_agent.commit(); + objectstore::RootEntry re(m_objectStore); + objectstore::ScopedSharedLock rel(re); + re.fetch(); + objectstore::DriveRegister dr(re.getDriveRegisterAddress(), m_objectStore); + objectstore::ScopedExclusiveLock drl(dr); + dr.fetch(); + // Reset the drive state. + dr.reportDriveStatus(mountInfo.drive, mountInfo.logicalLibrary, + cta::common::DriveStatus::Up, completionTime, + cta::MountType::NONE, 0, + 0, 0, 0, "", ""); + dr.commit(); } OStoreDB::ArchiveJob::ArchiveJob(const std::string& jobAddress, @@ -1315,7 +1396,7 @@ void OStoreDB::ArchiveJob::succeed() { objectstore::ScopedExclusiveLock atfrl(m_archiveRequest); m_archiveRequest.fetch(); std::string atfrAddress = m_archiveRequest.getAddressIfSet(); - if (m_archiveRequest.setJobSuccessful(m_copyNb)) { + if (m_archiveRequest.setJobSuccessful(tapeFile.copyNb)) { m_archiveRequest.remove(); } else { m_archiveRequest.commit(); diff --git a/scheduler/OStoreDB/OStoreDB.hpp b/scheduler/OStoreDB/OStoreDB.hpp index 935fa2778c43746796a1a82c9cdaa414f13cbc5d..3356743052f197c50ec375972a9027eeed56dfad 100644 --- a/scheduler/OStoreDB/OStoreDB.hpp +++ b/scheduler/OStoreDB/OStoreDB.hpp @@ -91,7 +91,7 @@ public: /* === Archive Job Handling =============================================== */ class ArchiveJob: public SchedulerDatabase::ArchiveJob { - friend class ArchiveMount; + friend class OStoreDB::ArchiveMount; public: CTA_GENERATE_EXCEPTION_CLASS(JobNowOwned); CTA_GENERATE_EXCEPTION_CLASS(NoSuchJob); @@ -103,7 +103,6 @@ public: ArchiveJob(const std::string &, objectstore::Backend &, objectstore::Agent &, ArchiveMount &); bool m_jobOwned; - uint16_t m_copyNb; uint64_t m_mountId; std::string m_tapePool; objectstore::Backend & m_objectStore; diff --git a/scheduler/SchedulerDatabase.hpp b/scheduler/SchedulerDatabase.hpp index c17a53c3d0098186015f86fcfdfca9dd673c802b..cbe2705d378355a8bcab29b10d29254dd6ca73fe 100644 --- a/scheduler/SchedulerDatabase.hpp +++ b/scheduler/SchedulerDatabase.hpp @@ -175,7 +175,7 @@ public: */ class ArchiveJob { public: - cta::RemotePathAndStatus remoteFile; + std::string srcURL; cta::common::dataStructures::ArchiveFile archiveFile; cta::common::dataStructures::TapeFile tapeFile; virtual void succeed() = 0; diff --git a/scheduler/SchedulerTest.cpp b/scheduler/SchedulerTest.cpp index 22d90bef8b977c3fd12452e7460216d5da29bd35..edc7d8d45eb43d6b43817bf5a30a45ac2072f567 100644 --- a/scheduler/SchedulerTest.cpp +++ b/scheduler/SchedulerTest.cpp @@ -342,8 +342,8 @@ TEST_P(SchedulerTest, DISABLED_archive_and_retrieve_new_file) { diskFileInfo.owner="cms_user"; diskFileInfo.path="path/to/file"; cta::common::dataStructures::ArchiveRequest request; - request.checksumType="Adler32"; - request.checksumValue="1111"; + request.checksumType="adler32"; + request.checksumValue="1234abcd"; request.creationLog=creationLog; request.diskpoolName="diskpool1"; request.diskpoolThroughput=200*1000*1000; @@ -394,24 +394,23 @@ TEST_P(SchedulerTest, DISABLED_archive_and_retrieve_new_file) { // Emulate a tape server by asking for a mount and then a file (and succeed // the transfer) std::unique_ptr<cta::TapeMount> mount; - ASSERT_NO_THROW(mount.reset(scheduler.getNextMount(s_libraryName, "drive0").release())); + mount.reset(scheduler.getNextMount(s_libraryName, "drive0").release()); ASSERT_NE((cta::TapeMount*)NULL, mount.get()); ASSERT_EQ(cta::MountType::ARCHIVE, mount.get()->getMountType()); std::unique_ptr<cta::ArchiveMount> archiveMount; - ASSERT_NO_THROW(archiveMount.reset(dynamic_cast<cta::ArchiveMount*>(mount.release()))); + archiveMount.reset(dynamic_cast<cta::ArchiveMount*>(mount.release())); ASSERT_NE((cta::ArchiveMount*)NULL, archiveMount.get()); std::unique_ptr<cta::ArchiveJob> archiveJob; - ASSERT_NO_THROW(archiveJob.reset(archiveMount->getNextJob().release())); + archiveJob.reset(archiveMount->getNextJob().release()); ASSERT_NE((cta::ArchiveJob*)NULL, archiveJob.get()); archiveJob->tapeFile.blockId = 1; archiveJob->tapeFile.fSeq = 1; - cta::Checksum checksum(cta::Checksum::CHECKSUMTYPE_ADLER32, 0x12345687); archiveJob->tapeFile.checksumType = "adler32"; archiveJob->tapeFile.checksumValue = "1234abcd"; - ASSERT_NO_THROW(archiveJob->complete()); - ASSERT_NO_THROW(archiveJob.reset(archiveMount->getNextJob().release())); + archiveJob->complete(); + archiveJob.reset(archiveMount->getNextJob().release()); ASSERT_EQ((cta::ArchiveJob*)NULL, archiveJob.get()); - ASSERT_NO_THROW(archiveMount->complete()); + archiveMount->complete(); } { diff --git a/scheduler/testingMocks/MockArchiveJob.hpp b/scheduler/testingMocks/MockArchiveJob.hpp index 6405330b6a021e441dded5ab8304027263f7a02b..0d536a4913ebe3b211cacde6230c6e968986480e 100644 --- a/scheduler/testingMocks/MockArchiveJob.hpp +++ b/scheduler/testingMocks/MockArchiveJob.hpp @@ -29,7 +29,7 @@ namespace cta { int failures; MockArchiveJob(cta::ArchiveMount & am, cta::catalogue::Catalogue &catalogue): cta::ArchiveJob(am, catalogue, cta::common::dataStructures::ArchiveFile(), - cta::RemotePathAndStatus(), cta::common::dataStructures::TapeFile()), + "", cta::common::dataStructures::TapeFile()), completes(0), failures(0) {} ~MockArchiveJob() throw() {} diff --git a/tapeserver/castor/tape/tapeserver/daemon/DiskReadTask.cpp b/tapeserver/castor/tape/tapeserver/daemon/DiskReadTask.cpp index fbbc265200d2136710cc2a26fdb50d59fbd3794e..6e81dbb12d832794026ebdcee11ff5673d20234a 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/DiskReadTask.cpp +++ b/tapeserver/castor/tape/tapeserver/daemon/DiskReadTask.cpp @@ -39,7 +39,7 @@ DiskReadTask::DiskReadTask(DataConsumer & destination, m_nextTask(destination),m_archiveJob(archiveJob), m_numberOfBlock(numberOfBlock),m_errorFlag(errorFlag) { - m_archiveJobCachedInfo.remotePath = m_archiveJob->remotePathAndStatus.path.getRaw(); + m_archiveJobCachedInfo.remotePath = m_archiveJob->srcURL; m_archiveJobCachedInfo.fileId = m_archiveJob->archiveFile.archiveFileID; } @@ -68,9 +68,9 @@ void DiskReadTask::execute(log::LogContext& lc, diskFile::DiskFileFactory & file checkMigrationFailing(); currentErrorToCount = "Error_diskOpenForRead"; std::unique_ptr<tape::diskFile::ReadFile> sourceFile( - fileFactory.createReadFile(m_archiveJob->remotePathAndStatus.path.getRaw())); + fileFactory.createReadFile(m_archiveJob->srcURL)); log::ScopedParamContainer URLcontext(lc); - URLcontext.add("path", m_archiveJob->remotePathAndStatus.path.getRaw()) + URLcontext.add("path", m_archiveJob->srcURL) .add("actualURL", sourceFile->URL()); currentErrorToCount = "Error_diskFileToReadSizeMismatch"; if(migratingFileSize != sourceFile->size()){ diff --git a/tapeserver/castor/tape/tapeserver/daemon/DiskReadTaskTest.cpp b/tapeserver/castor/tape/tapeserver/daemon/DiskReadTaskTest.cpp index d6287d6b4fee83de3fb8f1b1e2dea9dbbd027efc..b3c870c0b9d09eaff3cb817771a74860fb36e630 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/DiskReadTaskTest.cpp +++ b/tapeserver/castor/tape/tapeserver/daemon/DiskReadTaskTest.cpp @@ -41,7 +41,7 @@ namespace unitTests{ public: TestingArchiveJob(): cta::ArchiveJob(*((cta::ArchiveMount *)NULL), *((cta::catalogue::Catalogue *)NULL), cta::common::dataStructures::ArchiveFile(), - cta::RemotePathAndStatus(), cta::common::dataStructures::TapeFile()) { + "", cta::common::dataStructures::TapeFile()) { } }; @@ -123,7 +123,7 @@ namespace unitTests{ TestingArchiveJob file; file.archiveFile.fileSize = fileSize; - file.remotePathAndStatus.path.setPath(url); + file.srcURL = url; const int blockNeeded=fileSize/mm.blockCapacity()+((fileSize%mm.blockCapacity()==0) ? 0 : 1); int value=std::ceil(1024*2000./blockSize); diff --git a/tapeserver/castor/tape/tapeserver/daemon/MigrationTaskInjector.cpp b/tapeserver/castor/tape/tapeserver/daemon/MigrationTaskInjector.cpp index 87b05c552a26727b51f7b73162a760f6ff3ea49b..4b2265070fe99c97599fdeacfa37c327059134d2 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/MigrationTaskInjector.cpp +++ b/tapeserver/castor/tape/tapeserver/daemon/MigrationTaskInjector.cpp @@ -58,7 +58,7 @@ namespace daemon { LogContext::ScopedParam sp[]={ LogContext::ScopedParam(m_lc, Param("NSFILEID", (*it)->archiveFile.archiveFileID)), LogContext::ScopedParam(m_lc, Param("fSeq", (*it)->tapeFile.fSeq)), - LogContext::ScopedParam(m_lc, Param("path", (*it)->remotePathAndStatus.path.getRaw())) + LogContext::ScopedParam(m_lc, Param("path", (*it)->srcURL)) }; tape::utils::suppresUnusedVariable(sp); diff --git a/tapeserver/castor/tape/tapeserver/daemon/TapeWriteTask.cpp b/tapeserver/castor/tape/tapeserver/daemon/TapeWriteTask.cpp index d9a86bff7bd7368dfb6dc19fdd799c2bd3ba086f..0963d94a3f5d84d87806ca548926be0b4a544f0a 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/TapeWriteTask.cpp +++ b/tapeserver/castor/tape/tapeserver/daemon/TapeWriteTask.cpp @@ -48,7 +48,7 @@ namespace daemon { m_archiveJob(archiveJob),m_memManager(mm), m_fifo(blockCount), m_blockCount(blockCount),m_errorFlag(errorFlag), m_archiveFile(m_archiveJob->archiveFile), m_tapeFile(m_archiveJob->tapeFile), - m_remotePathAndStatus(m_archiveJob->remotePathAndStatus) + m_srcURL(m_archiveJob->srcURL) { //register its fifo to the memory manager as a client in order to get mem block mm.addClient(&m_fifo); @@ -73,7 +73,7 @@ namespace daemon { params.add("fileId",m_archiveJob->archiveFile.archiveFileID) .add("fileSize",m_archiveJob->archiveFile.fileSize) .add("fSeq",m_archiveJob->tapeFile.fSeq) - .add("path",m_archiveJob->remotePathAndStatus.path.getRaw()); + .add("diskURL",m_archiveJob->srcURL); // We will clock the stats for the file itself, and eventually add those // stats to the session's. diff --git a/tapeserver/castor/tape/tapeserver/daemon/TapeWriteTask.hpp b/tapeserver/castor/tape/tapeserver/daemon/TapeWriteTask.hpp index 0a10f1eefd493b680149eb30746b94010a0f5e17..b727ddc6ccdf685973d500e9a848ae83bb277369 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/TapeWriteTask.hpp +++ b/tapeserver/castor/tape/tapeserver/daemon/TapeWriteTask.hpp @@ -192,7 +192,7 @@ private: /** * The remote file information */ - cta::RemotePathAndStatus m_remotePathAndStatus; + std::string m_srcURL; }; diff --git a/tapeserver/castor/tape/tapeserver/file/FileTest.cpp b/tapeserver/castor/tape/tapeserver/file/FileTest.cpp index 91efd5d198dcb7df6e911c610d11cef66e340b78..8fd1047c08374d63d28ca2dd6a168c2353be5af8 100644 --- a/tapeserver/castor/tape/tapeserver/file/FileTest.cpp +++ b/tapeserver/castor/tape/tapeserver/file/FileTest.cpp @@ -49,7 +49,7 @@ namespace unitTests { public: TestingArchiveJob(): cta::ArchiveJob(*((cta::ArchiveMount *)NULL), *((cta::catalogue::Catalogue *)NULL), cta::common::dataStructures::ArchiveFile(), - cta::RemotePathAndStatus(), cta::common::dataStructures::TapeFile()) { + "", cta::common::dataStructures::TapeFile()) { } };