diff --git a/objectstore/Tape.cpp b/objectstore/Tape.cpp index 5c5ab67c5e74e7d9eeb23d50e7e65040e44524e6..c356a158240878fbd3f55f57349e9b7e5b857985 100644 --- a/objectstore/Tape.cpp +++ b/objectstore/Tape.cpp @@ -70,6 +70,10 @@ void cta::objectstore::Tape::removeIfEmpty() { } remove(); } +void cta::objectstore::Tape::setStoredData(uint64_t bytes) { + checkPayloadWritable(); + m_payload.set_bytesstored(bytes); +} void cta::objectstore::Tape::addStoredData(uint64_t bytes) { checkPayloadWritable(); @@ -81,6 +85,16 @@ uint64_t cta::objectstore::Tape::getStoredData() { return m_payload.bytesstored(); } +void cta::objectstore::Tape::setLastFseq(uint64_t lastFseq) { + checkPayloadWritable(); + m_payload.set_lastfseq(lastFseq); +} + +uint64_t cta::objectstore::Tape::getLastFseq() { + checkPayloadReadable(); + return m_payload.lastfseq(); +} + std::string cta::objectstore::Tape::getVid() { checkPayloadReadable(); return m_payload.vid(); @@ -218,3 +232,11 @@ void cta::objectstore::Tape::releaseBusy() { } + + + + + + + + diff --git a/objectstore/Tape.hpp b/objectstore/Tape.hpp index 90db0db555d6c82ee81218c899c3e6c7e44f17cb..f68bf4caff5df5bea56601e1c561ecf890ba01ad 100644 --- a/objectstore/Tape.hpp +++ b/objectstore/Tape.hpp @@ -78,6 +78,8 @@ public: std::string getVid(); void setStoredData(uint64_t bytes); void addStoredData(uint64_t bytes); + void setLastFseq(uint64_t lastFseq); + uint64_t getLastFseq(); }; }} \ No newline at end of file diff --git a/scheduler/ArchiveJob.cpp b/scheduler/ArchiveJob.cpp index 8bbb5844650f3a8afdc83c4096bac6b5bc2f78a2..09afd1c5fe054289089fd1bd36847a701864502e 100644 --- a/scheduler/ArchiveJob.cpp +++ b/scheduler/ArchiveJob.cpp @@ -17,6 +17,7 @@ */ #include "scheduler/ArchiveJob.hpp" +#include <limits> //------------------------------------------------------------------------------ // destructor @@ -27,20 +28,36 @@ cta::ArchiveJob::~ArchiveJob() throw() { //------------------------------------------------------------------------------ // constructor //------------------------------------------------------------------------------ -cta::ArchiveJob::ArchiveJob(/*ArchiveMount &mount,*/ +cta::ArchiveJob::ArchiveJob(ArchiveMount &mount, + NameServer & ns, const ArchiveFile &archiveFile, const RemotePathAndStatus &remotePathAndStatus, - const TapeFileLocation &tapeFileLocation): - /*mount(mount),*/ + const NameServerTapeFile &nsTapeFile): + m_mount(mount), m_ns(ns), archiveFile(archiveFile), remotePathAndStatus(remotePathAndStatus), - tapeFileLocation(tapeFileLocation) {} + nameServerTapeFile(nsTapeFile) {} //------------------------------------------------------------------------------ // complete //------------------------------------------------------------------------------ void cta::ArchiveJob::complete() { - throw NotImplemented(""); + // First check that the block Id for the file has been set. + if (nameServerTapeFile.tapeFileLocation.blockId == + std::numeric_limits<decltype(nameServerTapeFile.tapeFileLocation.blockId)>::max()) + throw BlockIdNotSet("In cta::ArchiveJob::complete(): Block ID not set"); + // Also check the checksum has been set + if (nameServerTapeFile.checksum.getType() == Checksum::CHECKSUMTYPE_NONE) + throw ChecksumNotSet("In cta::ArchiveJob::complete(): checksum not set"); + // We are good to go to record the data in the persistent storage. + // First make the file safe on tape. + m_dbJob->bumpUpTapeFileCount(nameServerTapeFile.tapeFileLocation.vid, + nameServerTapeFile.tapeFileLocation.fSeq); + // Now record the data in the archiveNS + m_ns.addTapeFile(SecurityIdentity(UserIdentity(std::numeric_limits<uint32_t>::max(), + std::numeric_limits<uint32_t>::max()), ""), archiveFile.path, nameServerTapeFile); + // We can now record the success for the job in the database + m_dbJob->succeed(); } //------------------------------------------------------------------------------ diff --git a/scheduler/ArchiveJob.hpp b/scheduler/ArchiveJob.hpp index 95471802007f8de58c846c49debc911e5ae8e1ff..57d4797ca7ec7b5c20cfb7181078da765ffbce47 100644 --- a/scheduler/ArchiveJob.hpp +++ b/scheduler/ArchiveJob.hpp @@ -22,6 +22,8 @@ #include "common/archiveNS/ArchiveFile.hpp" #include "common/exception/Exception.hpp" #include "common/remoteFS/RemotePathAndStatus.hpp" +#include "scheduler/SchedulerDatabase.hpp" +#include "nameserver/NameServer.hpp" #include <stdint.h> #include <string> @@ -47,7 +49,7 @@ protected: /** * Empty constructor. TODO: to be removed in the future when we put in the reference to the owning mount; */ - ArchiveJob() {} + //ArchiveJob(): m_mount(*((ArchiveMount*)NULL)), m_ns(*((NameServer*)NULL)){} /** * Constructor. @@ -57,18 +59,22 @@ protected: * @param remotePathAndStatus location and properties of the remote file * @param tapeFileLocation the location within the tape */ - ArchiveJob(/*ArchiveMount &mount,*/ + ArchiveJob( + ArchiveMount &mount, + NameServer & ns, const ArchiveFile &archiveFile, const RemotePathAndStatus &remotePathAndStatus, - const TapeFileLocation &tapeFileLocation); + const NameServerTapeFile &nameServerTapeFile); public: /** * Destructor. */ - virtual ~ArchiveJob() throw() = 0; - + virtual ~ArchiveJob() throw(); + + CTA_GENERATE_EXCEPTION_CLASS(BlockIdNotSet); + CTA_GENERATE_EXCEPTION_CLASS(ChecksumNotSet); /** * Indicates that the job was successful and updates the backend store * @@ -92,15 +98,22 @@ public: * of the tape). */ virtual void retry(); - -public: - CTA_GENERATE_EXCEPTION_CLASS(NotImplemented); +private: + std::unique_ptr<cta::SchedulerDatabase::ArchiveJob> m_dbJob; /** * The mount that generated this job */ - //ArchiveMount &mount; + ArchiveMount &m_mount; + + /** + * Reference to the name server + */ + NameServer &m_ns; +public: + + CTA_GENERATE_EXCEPTION_CLASS(NotImplemented); /** * The NS archive file information @@ -111,11 +124,11 @@ public: * The remote file information */ RemotePathAndStatus remotePathAndStatus; - + /** - * The location of the tape file + * The file archive result for the NS */ - TapeFileLocation tapeFileLocation; + NameServerTapeFile nameServerTapeFile; }; // class ArchiveJob diff --git a/scheduler/ArchiveMount.cpp b/scheduler/ArchiveMount.cpp index 17f6b77ce1205c94f64fe4c1dae1aebf7258c082..d9c2ba614d942bba844a0a78f1120b3ede7bb4c7 100644 --- a/scheduler/ArchiveMount.cpp +++ b/scheduler/ArchiveMount.cpp @@ -21,14 +21,14 @@ //------------------------------------------------------------------------------ // constructor //------------------------------------------------------------------------------ -cta::ArchiveMount::ArchiveMount() { +cta::ArchiveMount::ArchiveMount(NameServer & ns): m_ns(ns) { } //------------------------------------------------------------------------------ // constructor //------------------------------------------------------------------------------ -cta::ArchiveMount::ArchiveMount( - std::unique_ptr<SchedulerDatabase::ArchiveMount> dbMount) { +cta::ArchiveMount::ArchiveMount(NameServer & ns, + std::unique_ptr<SchedulerDatabase::ArchiveMount> dbMount): m_ns(ns) { m_dbMount.reset( dynamic_cast<SchedulerDatabase::ArchiveMount*>(dbMount.release())); if(!m_dbMount.get()) { @@ -83,7 +83,15 @@ std::string cta::ArchiveMount::getMountTransactionId() const throw(){ // getNextJob //------------------------------------------------------------------------------ std::unique_ptr<cta::ArchiveJob> cta::ArchiveMount::getNextJob() { - throw NotImplemented(std::string(__FUNCTION__) + ": Not implemented"); + // try and get a new job from the DB side + std::unique_ptr<cta::SchedulerDatabase::ArchiveJob> dbJob(m_dbMount->getNextJob().release()); + if (!dbJob.get()) + return std::unique_ptr<cta::ArchiveJob>(NULL); + // We have something to migrate: prepare the response + std::unique_ptr<cta::ArchiveJob> ret(new ArchiveJob(*this, m_ns, + dbJob->archiveFile, dbJob->remoteFile, dbJob->nameServerTapeFile)); + ret->m_dbJob.reset(dbJob.release()); + return ret; } //------------------------------------------------------------------------------ diff --git a/scheduler/ArchiveMount.hpp b/scheduler/ArchiveMount.hpp index 3dc3a12a8b601ec59b2b5b48729e82ad936098d2..42301a1019507e06ac41bea772ace02aa6882bf0 100644 --- a/scheduler/ArchiveMount.hpp +++ b/scheduler/ArchiveMount.hpp @@ -20,13 +20,13 @@ #include "common/exception/Exception.hpp" #include "scheduler/ArchiveJob.hpp" -#include "scheduler/ArchiveMount.hpp" #include "scheduler/SchedulerDatabase.hpp" #include "scheduler/TapeMount.hpp" #include <memory> namespace cta { + class NameServer; /** * The class driving a retrieve mount. @@ -40,14 +40,14 @@ namespace cta { /** * Constructor. */ - ArchiveMount(); + ArchiveMount(NameServer & ns); /** * Constructor. * * @param dbMount The database representation of this mount. */ - ArchiveMount(std::unique_ptr<cta::SchedulerDatabase::ArchiveMount> dbMount); + ArchiveMount(NameServer & ns, std::unique_ptr<cta::SchedulerDatabase::ArchiveMount> dbMount); public: @@ -125,6 +125,11 @@ namespace cta { * The database representation of this mount. */ std::unique_ptr<cta::SchedulerDatabase::ArchiveMount> m_dbMount; + + /** + * A reference to the name server. + */ + NameServer & m_ns; }; // class ArchiveMount diff --git a/scheduler/OStoreDB/OStoreDB.cpp b/scheduler/OStoreDB/OStoreDB.cpp index d4c1df4210d11acb8fe8805cafd3c5e60609ef4f..506081f3ec9582ad8bd503abf3d2be7f202d00fb 100644 --- a/scheduler/OStoreDB/OStoreDB.cpp +++ b/scheduler/OStoreDB/OStoreDB.cpp @@ -1090,6 +1090,7 @@ std::unique_ptr<SchedulerDatabase::ArchiveMount> am.mountInfo.vid = vid; am.mountInfo.drive = driveName; am.mountInfo.tapePool = tapePool; + am.m_nextFseq = std::numeric_limits<decltype(am.m_nextFseq)>::max(); objectstore::RootEntry re(m_objectStore); objectstore::ScopedSharedLock rel(re); re.fetch(); @@ -1142,8 +1143,11 @@ std::unique_ptr<SchedulerDatabase::ArchiveMount> m_agent.addToOwnership(t.getAddressIfSet()); m_agent.commit(); } + am.m_nextFseq = t.getLastFseq() + 1; + am.mountInfo.vid = t.getVid(); t.setBusy(driveName, objectstore::Tape::MountType::Archive, hostName, startTime, m_agent.getAddressIfSet()); + t.commit(); } // Fill up the mount info privateRet->mountInfo.drive = driveName; @@ -1258,6 +1262,11 @@ auto OStoreDB::ArchiveMount::getNextJob() -> std::unique_ptr<SchedulerDatabase:: tp.commit(); privateRet->archiveFile.path = privateRet->m_atfr.getArchiveFile(); privateRet->remoteFile = privateRet->m_atfr.getRemoteFile(); + privateRet->nameServerTapeFile.tapeFileLocation.fSeq = m_nextFseq++; + privateRet->nameServerTapeFile.tapeFileLocation.copyNb = privateRet->m_copyNb; + privateRet->nameServerTapeFile.tapeFileLocation.vid = mountInfo.vid; + privateRet->nameServerTapeFile.tapeFileLocation.blockId = + std::numeric_limits<decltype(privateRet->nameServerTapeFile.tapeFileLocation.blockId)>::max(); privateRet->m_jobOwned = true; privateRet->m_mountId = mountInfo.mountId; privateRet->m_tapePool = mountInfo.tapePool; @@ -1312,8 +1321,12 @@ void OStoreDB::ArchiveJob::fail() { } } throw NoSuchTapePool("In OStoreDB::ArchiveJob::fail(): could not find the tape pool"); + } + +void OStoreDB::ArchiveJob::bumpUpTapeFileCount(const std::string& vid, uint64_t newFileCount) { } + void OStoreDB::ArchiveJob::succeed() { // Lock the request and set the job as successful. objectstore::ScopedExclusiveLock atfrl(m_atfr); diff --git a/scheduler/OStoreDB/OStoreDB.hpp b/scheduler/OStoreDB/OStoreDB.hpp index 9e384863a787c5a0735c822427ef122b66ff71ca..eb1945973592f83cae786ad3563cd1362cd26866 100644 --- a/scheduler/OStoreDB/OStoreDB.hpp +++ b/scheduler/OStoreDB/OStoreDB.hpp @@ -75,6 +75,7 @@ public: ArchiveMount(objectstore::Backend &, objectstore::Agent &); objectstore::Backend & m_objectStore; objectstore::Agent & m_agent; + uint64_t m_nextFseq; public: virtual const MountInfo & getMountInfo(); virtual std::unique_ptr<ArchiveJob> getNextJob(); @@ -88,6 +89,7 @@ public: CTA_GENERATE_EXCEPTION_CLASS(NoSuchJob); virtual void succeed(); virtual void fail(); + virtual void bumpUpTapeFileCount(const std::string& vid, uint64_t newFileCount); virtual ~ArchiveJob(); private: ArchiveJob(const std::string &, objectstore::Backend &, objectstore::Agent &); diff --git a/scheduler/Scheduler.cpp b/scheduler/Scheduler.cpp index e4263d783365c14d248e47260bb3ea33ad9116b5..d12fef18406dd4239e8804dd3c72e6161e2e55b0 100644 --- a/scheduler/Scheduler.cpp +++ b/scheduler/Scheduler.cpp @@ -848,7 +848,7 @@ std::unique_ptr<cta::TapeMount> cta::Scheduler::getNextMount( t->status.availableToWrite()) { // We have our tape. Try to create the session. Prepare a return value // for it. - std::unique_ptr<ArchiveMount> internalRet(new ArchiveMount); + std::unique_ptr<ArchiveMount> internalRet(new ArchiveMount(m_ns)); // Get the db side of the session try { internalRet->m_dbMount.reset(mountInfo->createArchiveMount(t->vid, diff --git a/scheduler/SchedulerDatabase.hpp b/scheduler/SchedulerDatabase.hpp index 7ac3e67a97f0414a641639d948703bb3b3eae624..30d0ac6dfc843cc9f2e19bf32d6f97bcc9d42f9c 100644 --- a/scheduler/SchedulerDatabase.hpp +++ b/scheduler/SchedulerDatabase.hpp @@ -26,12 +26,14 @@ #include <vector> #include <stdexcept> #include "common/archiveNS/ArchiveFile.hpp" +#include "common/archiveNS/TapeFileLocation.hpp" #include "common/admin/AdminUser.hpp" #include "common/admin/AdminHost.hpp" #include "common/archiveRoutes/ArchiveRoute.hpp" #include "common/remoteFS/RemotePathAndStatus.hpp" #include "scheduler/MountType.hpp" #include "common/MountControl.hpp" +#include "nameserver/NameServerTapeFile.hpp" namespace cta { @@ -176,8 +178,10 @@ public: public: cta::RemotePathAndStatus remoteFile; cta::ArchiveFile archiveFile; + cta::NameServerTapeFile nameServerTapeFile; virtual void succeed() = 0; virtual void fail() = 0; + virtual void bumpUpTapeFileCount(const std::string & vid, uint64_t newFileCount) = 0; virtual ~ArchiveJob() {} }; diff --git a/scheduler/SchedulerTest.cpp b/scheduler/SchedulerTest.cpp index bf79bdd2bff22fc6e2d6d81129779542852d7b1c..5b8547bb3f90a1731c28dce9d8b35b25368666c4 100644 --- a/scheduler/SchedulerTest.cpp +++ b/scheduler/SchedulerTest.cpp @@ -2418,7 +2418,11 @@ TEST_P(SchedulerTest, archive_and_retrieve_new_file) { std::unique_ptr<cta::ArchiveJob> archiveJob; ASSERT_NO_THROW(archiveJob.reset(archiveMount->getNextJob().release())); ASSERT_NE((cta::ArchiveJob*)NULL, archiveJob.get()); - archiveJob->complete(); + archiveJob->nameServerTapeFile.tapeFileLocation.blockId = 1; + archiveJob->nameServerTapeFile.copyNb = archiveJob->nameServerTapeFile.tapeFileLocation.copyNb; + cta::Checksum checksum(cta::Checksum::CHECKSUMTYPE_ADLER32, cta::ByteArray(0x12345687)); + archiveJob->nameServerTapeFile.checksum = checksum; + ASSERT_NO_THROW(archiveJob->complete()); ASSERT_NO_THROW(archiveJob.reset(archiveMount->getNextJob().release())); ASSERT_EQ((cta::ArchiveJob*)NULL, archiveJob.get()); ASSERT_NO_THROW(archiveMount->complete()); diff --git a/tapeserver/castor/tape/tapeserver/daemon/DataTransferSessionTest.cpp b/tapeserver/castor/tape/tapeserver/daemon/DataTransferSessionTest.cpp index 37e6283017639631b172b251f3a74467b456436b..cbc3a97145b40d2de94cb8cdc9cfee422cf44018 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/DataTransferSessionTest.cpp +++ b/tapeserver/castor/tape/tapeserver/daemon/DataTransferSessionTest.cpp @@ -143,7 +143,9 @@ protected: class MockArchiveJob: public cta::ArchiveJob { public: - MockArchiveJob() { + MockArchiveJob(): cta::ArchiveJob(*((cta::ArchiveMount *)NULL), + *((cta::NameServer *)NULL), cta::ArchiveFile(), + cta::RemotePathAndStatus(), cta::NameServerTapeFile()) { } ~MockArchiveJob() throw() { @@ -229,7 +231,7 @@ TEST_F(castor_tape_tapeserver_daemon_DataTransferSessionTest, DataTransferSessio std::unique_ptr<cta::RetrieveJob> ftr(new MockRetrieveJob()); std::unique_ptr<cta::ArchiveJob> ftm(new MockArchiveJob()); ftr->tapeFileLocation.fSeq = fseq; - ftm->tapeFileLocation.fSeq = fseq; + ftm->nameServerTapeFile.tapeFileLocation.fSeq = fseq; ftr->archiveFile.fileId = 1000 + fseq; ftm->archiveFile.fileId = 1000 + fseq; castor::tape::tapeFile::WriteFile wf(&ws, *ftm, archiveFileSize); @@ -374,7 +376,7 @@ TEST_F(castor_tape_tapeserver_daemon_DataTransferSessionTest, DataTransferSessio std::unique_ptr<cta::RetrieveJob> ftr(new MockRetrieveJob()); std::unique_ptr<cta::ArchiveJob> ftm_temp(new MockArchiveJob()); ftr->tapeFileLocation.fSeq = fseq; - ftm_temp->tapeFileLocation.fSeq = fseq; + ftm_temp->nameServerTapeFile.tapeFileLocation.fSeq = fseq; ftr->archiveFile.fileId = 1000 + fseq; ftm_temp->archiveFile.fileId = 1000 + fseq; castor::tape::tapeFile::WriteFile wf(&ws, *ftm_temp, archiveFileSize); @@ -570,7 +572,7 @@ TEST_F(castor_tape_tapeserver_daemon_DataTransferSessionTest, DataTransferSessio std::unique_ptr<cta::RetrieveJob> ftr(new MockRetrieveJob()); std::unique_ptr<cta::ArchiveJob> ftm_temp(new MockArchiveJob()); ftr->tapeFileLocation.fSeq = fseq; - ftm_temp->tapeFileLocation.fSeq = fseq; + ftm_temp->nameServerTapeFile.tapeFileLocation.fSeq = fseq; ftr->archiveFile.fileId = 1000 + fseq; ftm_temp->archiveFile.fileId = 1000 + fseq; castor::tape::tapeFile::WriteFile wf(&ws, *ftm_temp, archiveFileSize); diff --git a/tapeserver/castor/tape/tapeserver/daemon/DiskReadTaskTest.cpp b/tapeserver/castor/tape/tapeserver/daemon/DiskReadTaskTest.cpp index 2f04d1372529baeb7905e1ce31523838d2f6b0c4..531e0a242307301c551d0e21401c5bae56e002a6 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/DiskReadTaskTest.cpp +++ b/tapeserver/castor/tape/tapeserver/daemon/DiskReadTaskTest.cpp @@ -38,13 +38,16 @@ namespace unitTests{ class TestingArchiveMount: public cta::ArchiveMount { public: - TestingArchiveMount(std::unique_ptr<cta::SchedulerDatabase::ArchiveMount> dbrm): ArchiveMount(std::move(dbrm)) { + TestingArchiveMount(std::unique_ptr<cta::SchedulerDatabase::ArchiveMount> dbrm): + ArchiveMount(*((cta::NameServer *)NULL),std::move(dbrm)) { } }; class TestingArchiveJob: public cta::ArchiveJob { public: - TestingArchiveJob() { + TestingArchiveJob(): cta::ArchiveJob(*((cta::ArchiveMount *)NULL), + *((cta::NameServer *)NULL), cta::ArchiveFile(), + cta::RemotePathAndStatus(), cta::NameServerTapeFile()) { } }; diff --git a/tapeserver/castor/tape/tapeserver/daemon/MigrationReportPacker.cpp b/tapeserver/castor/tape/tapeserver/daemon/MigrationReportPacker.cpp index 06aced3794611080c1c3afe59075e25a2a1835e6..5f513bb42cbc5ee6b69e6e78767fb6175e003abf 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/MigrationReportPacker.cpp +++ b/tapeserver/castor/tape/tapeserver/daemon/MigrationReportPacker.cpp @@ -60,9 +60,8 @@ MigrationReportPacker::~MigrationReportPacker(){ //reportCompletedJob //------------------------------------------------------------------------------ void MigrationReportPacker::reportCompletedJob( -std::unique_ptr<cta::ArchiveJob> successfulArchiveJob,u_int32_t checksum, - u_int32_t blockId) { - std::unique_ptr<Report> rep(new ReportSuccessful(std::move(successfulArchiveJob),checksum,blockId)); +std::unique_ptr<cta::ArchiveJob> successfulArchiveJob) { + std::unique_ptr<Report> rep(new ReportSuccessful(std::move(successfulArchiveJob))); castor::server::MutexLocker ml(&m_producterProtection); m_fifo.push(rep.release()); } diff --git a/tapeserver/castor/tape/tapeserver/daemon/MigrationReportPacker.hpp b/tapeserver/castor/tape/tapeserver/daemon/MigrationReportPacker.hpp index b88d24dce436a9cbe732644aeb8ae7289fdd8237..c191a151dc97d85eef2d1d62a6f81613063f5869 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/MigrationReportPacker.hpp +++ b/tapeserver/castor/tape/tapeserver/daemon/MigrationReportPacker.hpp @@ -55,8 +55,7 @@ public: * of the file. This is 0 (instead of 1) for the first file on the tape (aka * fseq = 1). */ - void reportCompletedJob(std::unique_ptr<cta::ArchiveJob> successfulArchiveJob, - u_int32_t checksum, u_int32_t blockId); + void reportCompletedJob(std::unique_ptr<cta::ArchiveJob> successfulArchiveJob); /** * Create into the MigrationReportPacker a report for the failled migration @@ -102,16 +101,13 @@ private: virtual void execute(MigrationReportPacker& packer)=0; }; class ReportSuccessful : public Report { - const unsigned long m_checksum; - const uint32_t m_blockId; - /** * The successful archive job to be pushed in the report packer queue and reported later */ std::unique_ptr<cta::ArchiveJob> m_successfulArchiveJob; public: - ReportSuccessful(std::unique_ptr<cta::ArchiveJob> successfulArchiveJob, unsigned long checksum, u_int32_t blockId): - m_checksum(checksum),m_blockId(blockId), m_successfulArchiveJob(std::move(successfulArchiveJob)) {} + ReportSuccessful(std::unique_ptr<cta::ArchiveJob> successfulArchiveJob): + m_successfulArchiveJob(std::move(successfulArchiveJob)) {} virtual void execute(MigrationReportPacker& reportPacker); }; class ReportFlush : public Report { diff --git a/tapeserver/castor/tape/tapeserver/daemon/MigrationReportPackerTest.cpp b/tapeserver/castor/tape/tapeserver/daemon/MigrationReportPackerTest.cpp index 5eea729e5d324e642190422a0272590a898bc877..5005c9f6491fc29a6c89a183437fbb40faf79c6e 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/MigrationReportPackerTest.cpp +++ b/tapeserver/castor/tape/tapeserver/daemon/MigrationReportPackerTest.cpp @@ -46,7 +46,9 @@ namespace unitTests { class MockArchiveJob: public cta::ArchiveJob { public: - MockArchiveJob() { + MockArchiveJob(): cta::ArchiveJob(*((cta::ArchiveMount *)NULL), + *((cta::NameServer *)NULL), cta::ArchiveFile(), + cta::RemotePathAndStatus(), cta::NameServerTapeFile()) { } ~MockArchiveJob() throw() { @@ -59,7 +61,7 @@ namespace unitTests { class MockArchiveMount: public cta::ArchiveMount { public: - MockArchiveMount() { + MockArchiveMount(): cta::ArchiveMount(*((cta::NameServer *)NULL)) { const unsigned int nbArchiveJobs = 2; createArchiveJobs(nbArchiveJobs); } @@ -118,8 +120,8 @@ namespace unitTests { tapeserver::daemon::MigrationReportPacker mrp(&tam,lc); mrp.startThreads(); - mrp.reportCompletedJob(std::move(job1),0,0); - mrp.reportCompletedJob(std::move(job2),0,0); + mrp.reportCompletedJob(std::move(job1)); + mrp.reportCompletedJob(std::move(job2)); const tapeserver::drive::compressionStats statsCompress; mrp.reportFlush(statsCompress); @@ -157,8 +159,8 @@ namespace unitTests { tapeserver::daemon::MigrationReportPacker mrp(&tam,lc); mrp.startThreads(); - mrp.reportCompletedJob(std::move(job1),0,0); - mrp.reportCompletedJob(std::move(job2),0,0); + mrp.reportCompletedJob(std::move(job1)); + mrp.reportCompletedJob(std::move(job2)); const std::string error_msg = "ERROR_TEST_MSG"; const castor::exception::Exception ex(error_msg); @@ -206,9 +208,9 @@ namespace unitTests { tapeserver::daemon::MigrationReportPacker mrp(&tam,lc); mrp.startThreads(); - mrp.reportCompletedJob(std::move(migratedBigFile),0,0); - mrp.reportCompletedJob(std::move(migratedFileSmall),0,0); - mrp.reportCompletedJob(std::move(migratedNullFile),0,0); + mrp.reportCompletedJob(std::move(migratedBigFile)); + mrp.reportCompletedJob(std::move(migratedFileSmall)); + mrp.reportCompletedJob(std::move(migratedNullFile)); tapeserver::drive::compressionStats stats; stats.toTape=(100000+1)/3; mrp.reportFlush(stats); diff --git a/tapeserver/castor/tape/tapeserver/daemon/MigrationTaskInjector.cpp b/tapeserver/castor/tape/tapeserver/daemon/MigrationTaskInjector.cpp index 88c55d5b364b5fb0fb900306f6910007425cb56b..ecce73648e23fd2487806c4804d52d62cc0c303d 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/MigrationTaskInjector.cpp +++ b/tapeserver/castor/tape/tapeserver/daemon/MigrationTaskInjector.cpp @@ -58,7 +58,7 @@ namespace daemon { LogContext::ScopedParam sp[]={ LogContext::ScopedParam(m_lc, Param("NSHOSTNAME", (*it)->archiveFile.nsHostName)), LogContext::ScopedParam(m_lc, Param("NSFILEID", (*it)->archiveFile.fileId)), - LogContext::ScopedParam(m_lc, Param("fSeq", (*it)->tapeFileLocation.fSeq)), + LogContext::ScopedParam(m_lc, Param("fSeq", (*it)->nameServerTapeFile.tapeFileLocation.fSeq)), LogContext::ScopedParam(m_lc, Param("path", (*it)->archiveFile.path)) }; tape::utils::suppresUnusedVariable(sp); @@ -129,7 +129,7 @@ namespace daemon { m_lc.log(LOG_ERR, "No files to migrate: empty mount"); return false; } else { - m_firstFseqToWrite = jobs.front()->tapeFileLocation.fSeq; + m_firstFseqToWrite = jobs.front()->nameServerTapeFile.tapeFileLocation.fSeq; injectBulkMigrations(jobs); return true; } diff --git a/tapeserver/castor/tape/tapeserver/daemon/TapeWriteTask.cpp b/tapeserver/castor/tape/tapeserver/daemon/TapeWriteTask.cpp index e7dc415c3562402f91fe80b9eb4958b358fc8c30..f411b91c12431092c08ab59ce5f732578bafb80b 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/TapeWriteTask.cpp +++ b/tapeserver/castor/tape/tapeserver/daemon/TapeWriteTask.cpp @@ -73,7 +73,7 @@ namespace daemon { .add("NSFILEID",m_archiveJob->archiveFile.fileId) .add("lastKnownFilename",m_archiveJob->archiveFile.path) .add("fileSize",m_archiveJob->archiveFile.size) - .add("fSeq",m_archiveJob->tapeFileLocation.fSeq) + .add("fSeq",m_archiveJob->nameServerTapeFile.tapeFileLocation.fSeq) .add("path",m_archiveJob->remotePathAndStatus.path.getRaw()); // We will clock the stats for the file itself, and eventually add those @@ -90,11 +90,11 @@ namespace daemon { // We will not record errors for an empty string. This will allow us to // prevent counting where error happened upstream. std::string currentErrorToCount = "Error_tapeFSeqOutOfSequenceForWrite"; - session.validateNextFSeq(m_archiveJob->tapeFileLocation.fSeq); + session.validateNextFSeq(m_archiveJob->nameServerTapeFile.tapeFileLocation.fSeq); try { //try to open the session currentErrorToCount = "Error_tapeWriteHeader"; - watchdog.notifyBeginNewJob(m_archiveJob->archiveFile.path, m_archiveJob->archiveFile.fileId, m_archiveJob->tapeFileLocation.fSeq); + watchdog.notifyBeginNewJob(m_archiveJob->archiveFile.path, m_archiveJob->archiveFile.fileId, m_archiveJob->nameServerTapeFile.tapeFileLocation.fSeq); std::unique_ptr<castor::tape::tapeFile::WriteFile> output(openWriteFile(session,lc)); m_taskStats.readWriteTime += timer.secs(castor::utils::Timer::resetCounter); m_taskStats.headerVolume += TapeSessionStats::headerVolumePerFile; @@ -129,8 +129,12 @@ namespace daemon { m_taskStats.headerVolume += TapeSessionStats::trailerVolumePerFile; m_taskStats.filesCount ++; // Record the fSeq in the tape session - session.reportWrittenFSeq(m_archiveJob->tapeFileLocation.fSeq); - reportPacker.reportCompletedJob(std::move(m_archiveJob),ckSum,output->getBlockId()); + session.reportWrittenFSeq(m_archiveJob->nameServerTapeFile.tapeFileLocation.fSeq); + m_archiveJob->nameServerTapeFile.checksum = + cta::Checksum(cta::Checksum::CHECKSUMTYPE_ADLER32, cta::ByteArray(ckSum)); + m_archiveJob->nameServerTapeFile.compressedSize = m_taskStats.dataVolume; + m_archiveJob->nameServerTapeFile.tapeFileLocation.blockId = output->getBlockId(); + reportPacker.reportCompletedJob(std::move(m_archiveJob)); m_taskStats.waitReportingTime += timer.secs(castor::utils::Timer::resetCounter); m_taskStats.totalTime = localTime.secs(); // Log the successful transfer @@ -316,7 +320,7 @@ namespace daemon { .add("fileSize",m_archiveJob->archiveFile.size) .add("NSHOST",m_archiveJob->archiveFile.nsHostName) .add("NSFILEID",m_archiveJob->archiveFile.fileId) - .add("fSeq",m_archiveJob->tapeFileLocation.fSeq) + .add("fSeq",m_archiveJob->nameServerTapeFile.tapeFileLocation.fSeq) .add("lastKnownFilename",m_archiveJob->archiveFile.path) .add("lastModificationTime",m_archiveJob->archiveFile.lastModificationTime); diff --git a/tapeserver/castor/tape/tapeserver/file/File.cpp b/tapeserver/castor/tape/tapeserver/file/File.cpp index 57789eff2e43a00051f7daf0f226bd1a9472ec96..b2dabdcee08f446328823276a86d6a144a09f271 100644 --- a/tapeserver/castor/tape/tapeserver/file/File.cpp +++ b/tapeserver/castor/tape/tapeserver/file/File.cpp @@ -437,11 +437,11 @@ namespace castor { m_open(false), m_nonzeroFileWritten(false), m_numberOfBlocks(0) { // Check the sanity of the parameters. fSeq should be >= 1 - if (0 == m_fileToMigrate.archiveFile.fileId || m_fileToMigrate.tapeFileLocation.fSeq<1) { + if (0 == m_fileToMigrate.archiveFile.fileId || m_fileToMigrate.nameServerTapeFile.tapeFileLocation.fSeq<1) { std::stringstream err; err << "Unexpected fileId in WriteFile::WriteFile (expected != 0, got: " << m_fileToMigrate.archiveFile.fileId << ") or fSeq (expected >=1, got: " - << m_fileToMigrate.tapeFileLocation.fSeq << ")"; + << m_fileToMigrate.nameServerTapeFile.tapeFileLocation.fSeq << ")"; throw castor::exception::InvalidArgument(err.str()); } if(m_session->isCorrupted()) { @@ -456,12 +456,12 @@ namespace castor { std::string fileId; s >> fileId; std::transform(fileId.begin(), fileId.end(), fileId.begin(), ::toupper); - hdr1.fill(fileId, m_session->m_vid, m_fileToMigrate.tapeFileLocation.fSeq); + hdr1.fill(fileId, m_session->m_vid, m_fileToMigrate.nameServerTapeFile.tapeFileLocation.fSeq); hdr2.fill(m_currentBlockSize, m_session->m_compressionEnabled); - uhl1.fill(m_fileToMigrate.tapeFileLocation.fSeq, m_currentBlockSize, m_session->getSiteName(), + uhl1.fill(m_fileToMigrate.nameServerTapeFile.tapeFileLocation.fSeq, m_currentBlockSize, m_session->getSiteName(), m_session->getHostName(), m_session->m_drive.getDeviceInfo()); /* Before writing anything, we record the blockId of the file */ - if (1 == m_fileToMigrate.tapeFileLocation.fSeq) { + if (1 == m_fileToMigrate.nameServerTapeFile.tapeFileLocation.fSeq) { m_blockId = 0; } else { m_blockId = getPosition(); @@ -512,9 +512,9 @@ namespace castor { std::string fileId; s >> fileId; std::transform(fileId.begin(), fileId.end(), fileId.begin(), ::toupper); - eof1.fill(fileId, m_session->m_vid, m_fileToMigrate.tapeFileLocation.fSeq, m_numberOfBlocks); + eof1.fill(fileId, m_session->m_vid, m_fileToMigrate.nameServerTapeFile.tapeFileLocation.fSeq, m_numberOfBlocks); eof2.fill(m_currentBlockSize, m_session->m_compressionEnabled); - utl1.fill(m_fileToMigrate.tapeFileLocation.fSeq, m_currentBlockSize, m_session->getSiteName(), + utl1.fill(m_fileToMigrate.nameServerTapeFile.tapeFileLocation.fSeq, m_currentBlockSize, m_session->getSiteName(), m_session->getHostName(), m_session->m_drive.getDeviceInfo()); m_session->m_drive.writeBlock(&eof1, sizeof(eof1)); m_session->m_drive.writeBlock(&eof2, sizeof(eof2)); diff --git a/tapeserver/castor/tape/tapeserver/file/FileTest.cpp b/tapeserver/castor/tape/tapeserver/file/FileTest.cpp index c166f2644a58cf38f76e955d9aac8384121fe27f..3b1ff0c6f762947295991d0aafbad3fe9690b4b7 100644 --- a/tapeserver/castor/tape/tapeserver/file/FileTest.cpp +++ b/tapeserver/castor/tape/tapeserver/file/FileTest.cpp @@ -31,6 +31,7 @@ #include "castor/exception/Errnum.hpp" #include "scheduler/ArchiveJob.hpp" #include "scheduler/RetrieveJob.hpp" +#include "nameserver/NameServer.hpp" #include <gtest/gtest.h> #include <memory> @@ -45,7 +46,9 @@ namespace UnitTests { class TestingArchiveJob: public cta::ArchiveJob { public: - TestingArchiveJob() { + TestingArchiveJob(): cta::ArchiveJob(*((cta::ArchiveMount *)NULL), + *((cta::NameServer *)NULL), cta::ArchiveFile(), + cta::RemotePathAndStatus(), cta::NameServerTapeFile()) { } }; @@ -59,7 +62,7 @@ namespace UnitTests { fileToRecall.archiveFile.fileId = 1; fileToMigrate.archiveFile.size = 500; fileToMigrate.archiveFile.fileId = 1; - fileToMigrate.tapeFileLocation.fSeq = 1; + fileToMigrate.nameServerTapeFile.tapeFileLocation.fSeq = 1; volInfo.vid= label; //Label castor::tape::tapeFile::LabelSession *ls;