diff --git a/ReleaseNotes.md b/ReleaseNotes.md index 58eb9c9cf52a745dcd8e0d4caf92999e0c540ab5..3bfe1cbb56f5a0741a0bf8b884468a31a61ab23b 100644 --- a/ReleaseNotes.md +++ b/ReleaseNotes.md @@ -1,3 +1,17 @@ +# v4.7.2-1 + +## Summary +### Upgrade Instructions +### Features +- cta/CTA#1161 - Tape server refactoring, "Decide where m_reportPacker.setTapeDone() should be called" +### Bug fixes +- cta/CTA#950 - Eliminate race condition preventing the drive to go down on failure +- cta/CTA#1160 - Fix DrainingToDisk stale status in case if there is DiskWriteThreadPool thread left +- cta/operations#708 - Fix "Should run cleaner but VID is missing. Putting the drive down" + +### Building and Packaging +### Catalogue Schema + # v4.7.1-1 ## Summary diff --git a/disk/CryptoPPTest.cpp b/disk/CryptoPPTest.cpp index eba77b8d8564020640495369781dc470f687f036..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 100644 --- a/disk/CryptoPPTest.cpp +++ b/disk/CryptoPPTest.cpp @@ -1,244 +0,0 @@ -/* - * @project The CERN Tape Archive (CTA) - * @copyright Copyright © 2021-2022 CERN - * @license This program is free software, distributed under the terms of the GNU General Public - * Licence version 3 (GPL Version 3), copied verbatim in the file "COPYING". You can - * redistribute it and/or modify it under the terms of the GPL Version 3, or (at your - * option) any later version. - * - * This program is distributed in the hope that it will be useful, but WITHOUT ANY - * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A - * PARTICULAR PURPOSE. See the GNU General Public License for more details. - * - * In applying this licence, CERN does not waive the privileges and immunities - * granted to it by virtue of its status as an Intergovernmental Organization or - * submit itself to any jurisdiction. - */ - -#include <gtest/gtest.h> -#include "common/threading/Thread.hpp" -#include "disk/DiskFileImplementations.hpp" -#include "disk/DiskFile.hpp" -#include "disk/RadosStriperPool.hpp" -#include <cryptopp/base64.h> -#include <cryptopp/osrng.h> - - -namespace unitTests { - class CryptoPPThread: public cta::threading::Thread { - public: - void setKey(const CryptoPP::RSA::PrivateKey & key) { m_key = key; } - private: - virtual void run() { - for (int i=0; i<100; i++) { - std::string payload = "Some payload... And some more..."; - std::string signature = cta::disk::CryptoPPSigner::sign( - payload, m_key); - } - } - CryptoPP::RSA::PrivateKey m_key; - }; - - class PEMKeyString { - public: - PEMKeyString(const std::string & keyString) { - // Import the key - const std::string HEADER = "-----BEGIN RSA PRIVATE KEY-----"; - const std::string FOOTER = "-----END RSA PRIVATE KEY-----"; - - size_t pos1, pos2; - pos1 = keyString.find(HEADER); - if(pos1 == std::string::npos) - throw cta::exception::Exception( - "In DiskFileFactory::xrootCryptoPPPrivateKey, PEM header not found"); - - pos2 = keyString.find(FOOTER, pos1+1); - if(pos2 == std::string::npos) - throw cta::exception::Exception( - "In DiskFileFactory::xrootCryptoPPPrivateKey, PEM footer not found"); - - // Start position and length - pos1 = pos1 + HEADER.length(); - pos2 = pos2 - pos1; - std::string keystr = keyString.substr(pos1, pos2); - - // Base64 decode, place in a ByteQueue - CryptoPP::ByteQueue queue; - CryptoPP::Base64Decoder decoder; - - decoder.Attach(new CryptoPP::Redirector(queue)); - decoder.Put((const byte*)keystr.data(), keystr.length()); - decoder.MessageEnd(); - - // Get the key - m_key.BERDecodePrivateKey(queue, false /*paramsPresent*/, queue.MaxRetrievable()); - - // BERDecodePrivateKey is a void function. Here's the only check - // we have regarding the DER bytes consumed. - if(!queue.IsEmpty()) - throw cta::exception::Exception( - "In DiskFileFactory::xrootCryptoPPPrivateKey, garbage at end of key"); - - CryptoPP::AutoSeededRandomPool prng; - bool valid = m_key.Validate(prng, 3); - if(!valid) - throw cta::exception::Exception( - "In DiskFileFactory::xrootCryptoPPPrivateKey, RSA private key is not valid"); - } - operator CryptoPP::RSA::PrivateKey() { - return m_key; - } - private: - CryptoPP::RSA::PrivateKey m_key; - }; - - // A random key from a bare "openssl genrsa". - std::string somePrivateKey = - "-----BEGIN RSA PRIVATE KEY-----\n" - "MIICXQIBAAKBgQDMGFBHGra0lHWyXA9oiwaMSPqKjv7tNuxAL3oUe+SbXBUJX6Nh\n" - "oMh/uo71jSQpyEozxkTsHkwNCMAPq+fBlsjGFHoNkiAjH68zwzCILM3XDQx5+ztH\n" - "e1att+niTVzLgrBy8R729Vgyvv/ToghLdwrJ+witd1YNPHHoZH5amLsHLwIDAQAB\n" - "AoGBAJ7y0JKP23sHpCIkUFu66n6W14jRlPhprdTPJOSPGJtmO3vxX+zIq13OjUfv\n" - "hBqGQkPQRh0d+1yrU+jgmL3MEM/OYJRm0iKAom3x28a9Rn2c+6tr7synBJMT3b8t\n" - "c5ShEGU2diNR3VrjyRDLQplaWY+1txLp+5jZ86C10M5y22QBAkEA9hYkGHJ0q9B6\n" - "16bHJvEUE8VbdFdZBPCE5HmJBoambFBeQgseuxi9fY3byrVvrRKgn8t7tocz0Ans\n" - "8ND+QTWAUQJBANRRHpBv13JmmBN0SK57rw3bDz27CnDbCMN2/omC1Ykb1g3L+JvU\n" - "VupF4zHo54VXIMXwRVQ8dSmTLxYEVMepr38CQC3Y/iyX1mjUVK6s4dm9fJIaaOmK\n" - "BInJDdlLU14l5Ae2CXmgfL864sLrlRF1MDM8jzR2QrxFAEA4OS68oUIg56ECQQCR\n" - "W0gVkrxpshuDliT8b+kVD1iL5rXrNcn2KE1zT4Np7wjJQU/fP6yRj29QCCgZfeEO\n" - "IsUUOp/r6rxd0nFIkL95AkA3oztzUVis7R4g5gO9UXCJzhlIY7y67coAbBHrqiHa\n" - "QeMkZnkN4sib4C4U2GanmAI05U05AIjDgp9lx3EYSulY\n" - "-----END RSA PRIVATE KEY-----\n"; - - TEST(castor_CryptoPP, multiThreading) { - // Read the key - PEMKeyString privateKey(somePrivateKey); - - // Run the threads - std::vector<CryptoPPThread> m_threads; - m_threads.resize(3); - for (std::vector<CryptoPPThread>::iterator i=m_threads.begin(); - i!=m_threads.end(); i++) { - i->setKey(privateKey); - i->start(); - } - for (std::vector<CryptoPPThread>::iterator i=m_threads.begin(); - i!=m_threads.end(); i++) - i->wait(); - } - - class CryptoPPKeyThread: public cta::threading::Thread { - private: - virtual void run() { - for (int i=0; i<5; i++) { - // Read keys in parallel and in a loop to test MT protection of the - // key reading, not protected here. - PEMKeyString CryptoPPKey(somePrivateKey); - } - } - }; - - TEST(castor_CryptoPP, multiThreadingKeyRead) { - // Run the threads - std::vector<CryptoPPKeyThread> m_threads; - m_threads.resize(3); - for (std::vector<CryptoPPKeyThread>::iterator i=m_threads.begin(); - i!=m_threads.end(); i++) { - i->start(); - } - for (std::vector<CryptoPPKeyThread>::iterator i=m_threads.begin(); - i!=m_threads.end(); i++) - i->wait(); - } - - class castor_CryptoPPDiskFileFactory: public cta::threading::Thread { - public: - void setPath(const std::string & path) { - m_keyPath = path; - } - private: - virtual void run() { - cta::disk::RadosStriperPool striperPool; - cta::disk::DiskFileFactory dff(m_keyPath, 0, - striperPool); - for (int i=0; i<5; i++) { - // Read keys in parallel and in a loop to test MT protection of the - // key reading, not protected here. - dff.xrootPrivateKey(); - } - } - std::string m_keyPath; - }; - - class TempFileForXrootKey { - public: - TempFileForXrootKey(const std::string & content) { - char path[100]; - strncpy(path, "/tmp/castorUnitTestPrivateKeyXXXXXX", 100); - int tmpFileFd = mkstemp(path); - cta::exception::Errnum::throwOnMinusOne(tmpFileFd, "Error creating a temporary file"); - m_path = path; - try { - cta::exception::Errnum::throwOnMinusOne(write(tmpFileFd, content.c_str(), content.size())); - } catch (...) { - close (tmpFileFd); - ::unlink(m_path.c_str()); - throw; - } - close (tmpFileFd); - } - ~TempFileForXrootKey() { - ::unlink(m_path.c_str()); - } - const std::string & path() { - return m_path; - } - private: - std::string m_path; - }; - - TEST(castor_CryptoPP, multiThreadingFileFactoryKeyRead) { - // Create the key file - TempFileForXrootKey keyFile(somePrivateKey); - // Run the threads - std::vector<castor_CryptoPPDiskFileFactory> m_threads; - m_threads.resize(3); - for (std::vector<castor_CryptoPPDiskFileFactory>::iterator i=m_threads.begin(); - i!=m_threads.end(); i++) { - i->setPath(keyFile.path()); - } - for (std::vector<castor_CryptoPPDiskFileFactory>::iterator i=m_threads.begin(); - i!=m_threads.end(); i++) { - i->start(); - } - for (std::vector<castor_CryptoPPDiskFileFactory>::iterator i=m_threads.begin(); - i!=m_threads.end(); i++) { - i->wait(); - } - } - - TEST(castor_CryptoPP, agreesWithOpenSSL) { - // Import the key for CryptoPP - PEMKeyString CryptoPPKey(somePrivateKey); - std::string msg("Any random message will do!"); - // This is the output of: - // echo -n 'Any random message will do!' | openssl dgst -sha1 -sign ~/testRSAPrivate.pem | openssl enc -base64 | tr -d '\n' ; echo - std::string osslSign("bfqLxACTFS7fMKH5ewNUOaglRlIGCEPWGhx4fRPErFGHtuCi2yWlYFsXIfjBxOT+yCyKRpTnZWGJTbcP72eT7os2qCqIOejAM3nTcsChHN5f3UyADvsi1f7C3DqhYVKVFQPaBdb3zm8IBHsFjmu2EzVE5juc1C9L+ztVmoABptw="); - std::string CryptoPPSign(cta::disk::CryptoPPSigner::sign(msg,CryptoPPKey)); - // std::cout << CryptoPPSign << std::endl; - ASSERT_EQ(osslSign,CryptoPPSign); - // A few examples generated with openssl's command line: - // string=`dd if=/dev/urandom bs=40 count=1 2>/dev/null | openssl enc -base64 | tr -d 'n\'` - // echo $string - // openssl dgst -sha1 -sign ~/testRSAPrivate.pem | openssl enc -base64 | tr -d '\n' ; echo - // - // ~/testRSAPrivate.pem contains the same content as the variable somePrivateKey. - - ASSERT_EQ("O8nSHzVPXyNRGRu8vaQ+CrqJjlv28qdsiFCTjlmeMFgc/aEnlJq+2b2q5al7BiHmPAFOd6fUkvn8xlFBm9IUlPFENhPLuMKJGqRSBndE7At0t+/vbS9UVnKiuOjrFepXo8JOvbt7lpqfp3jBwrQE5OZpOT92Nh8GXlpiCksvoxI=", - cta::disk::CryptoPPSigner::sign("rsoCb+KBj9j9Xk6wr5Cgh+TVuI3eDZHTzD9z8pTFjBPEjdyfiTFIeQ==",CryptoPPKey)); - ASSERT_EQ("uf8Cgkwtmh9K1VFLfeIfkZFQMd/pfSGCHNYPByH0nm0COPHNQAkYEI38ez9DS43fsIBVU9Gdrs0x50dVIntzawgzDrjp8YJIeARJF73he8M+6/FUgWJumNMoDE8fdvgiBaCFTv4+di5vtSb/abKVfqY9IbUPSDByxYjDKKI0OF0=", - cta::disk::CryptoPPSigner::sign("MtvFsd09F8UQNpwsULF6eMyVkRDIU+uAvBXyJs/LoNM5HrjoJgZrig==",CryptoPPKey)); - ASSERT_EQ("EzSR5Fd1kfmdrVhCiYgoWQ7E1MSdv8OYng3L7LepCfS9OStlEFTkJcMezt4VRqUZnarlcIZ0yPAvrmOUscjrAOAbqA0rMYKsvHnAwd19RaH54QZhtRCDwMloxpuLmUC1cmyJ/PAdRoMYCoHiMVr7yQw0CnVJ5168MUe5o0v3swY=", - cta::disk::CryptoPPSigner::sign("v3lPb49U+Zz+DNdzoTf2R8AU+AFP+/9/7nLlJV1+HNf3Z+Nzl/HuiQ==",CryptoPPKey)); - } -} diff --git a/scheduler/ArchiveMount.cpp b/scheduler/ArchiveMount.cpp index 70f739be77a773de58028fd8e14a9b1b7e5244b2..c1d43594851de1f93d52ceb1c35892b420a3b8d8 100644 --- a/scheduler/ArchiveMount.cpp +++ b/scheduler/ArchiveMount.cpp @@ -310,20 +310,10 @@ void cta::ArchiveMount::reportJobsBatchTransferred(std::queue<std::unique_ptr<ct // complete //------------------------------------------------------------------------------ void cta::ArchiveMount::complete() { - // Just set the session as complete in the DB. - m_dbMount->complete(time(nullptr)); - // and record we are done with the mount + // Just record we are done with the mount m_sessionRunning = false; } -//------------------------------------------------------------------------------ -// abort -//------------------------------------------------------------------------------ -void cta::ArchiveMount::abort(const std::string& reason) { - complete(); - setDriveStatus(cta::common::dataStructures::DriveStatus::Down, reason); -} - //------------------------------------------------------------------------------ // destructor //------------------------------------------------------------------------------ @@ -333,7 +323,7 @@ cta::ArchiveMount::~ArchiveMount() noexcept = default; // setDriveStatus() //------------------------------------------------------------------------------ void cta::ArchiveMount::setDriveStatus(cta::common::dataStructures::DriveStatus status, const std::optional<std::string>& reason) { - m_dbMount->setDriveStatus(status, time(nullptr), reason); + m_dbMount->setDriveStatus(status, getMountType(), time(nullptr), reason); } //------------------------------------------------------------------------------ diff --git a/scheduler/ArchiveMount.hpp b/scheduler/ArchiveMount.hpp index ca589d49b16742c59b4a0497935bd5c2a7a7df95..24700ea476ad9adc8e3a83088c81ccd46e3f981c 100644 --- a/scheduler/ArchiveMount.hpp +++ b/scheduler/ArchiveMount.hpp @@ -99,12 +99,7 @@ namespace cta { * Indicates that the mount was completed. * This function is overridden in MockArchiveMount for unit tests. */ - virtual void complete(); - - /** - * Indicates that the mount was cancelled. - */ - void abort(const std::string& reason) override; + void complete() override; /** * Report a drive status change diff --git a/scheduler/LabelMount.cpp b/scheduler/LabelMount.cpp index 54bfc25756a32ae102c40771cfb39197a370b762..cf3f0dfbbcf6aa8e868e7b6754181280dab7f66b 100644 --- a/scheduler/LabelMount.cpp +++ b/scheduler/LabelMount.cpp @@ -24,7 +24,7 @@ LabelMount::LabelMount(catalogue::Catalogue& catalogue): m_catalogue(catalogue) // TODO } -void LabelMount::abort(const std::string& reason) { +void LabelMount::complete() { throw 0; // TODO } diff --git a/scheduler/LabelMount.hpp b/scheduler/LabelMount.hpp index eb9f655f650ca5a017af16fa064b5a852a29d09b..4920a5faf088be3dcf01aa2f1d35651727e72ad7 100644 --- a/scheduler/LabelMount.hpp +++ b/scheduler/LabelMount.hpp @@ -82,9 +82,9 @@ namespace cta { /** - * Indicates that the mount was cancelled. + * Indicates that the mount was completed. */ - void abort(const std::string& reason) override; + void complete() override; /** * Report a drive status change diff --git a/scheduler/OStoreDB/OStoreDB.cpp b/scheduler/OStoreDB/OStoreDB.cpp index 89c10f8fa0f7b77eed8728ea9e05c4831a8a670c..061ad038db011d99af16e3ecd7db1c896f7bfcd6 100644 --- a/scheduler/OStoreDB/OStoreDB.cpp +++ b/scheduler/OStoreDB/OStoreDB.cpp @@ -3399,29 +3399,6 @@ std::list<std::unique_ptr<SchedulerDatabase::ArchiveJob> > OStoreDB::ArchiveMoun } } -//------------------------------------------------------------------------------ -// OStoreDB::ArchiveMount::complete() -//------------------------------------------------------------------------------ -void OStoreDB::ArchiveMount::complete(time_t completionTime) { - // When the session is complete, we can reset the status of the drive. - // Tape will be implicitly released - // Reset the drive state. - common::dataStructures::DriveInfo driveInfo; - driveInfo.driveName=mountInfo.drive; - driveInfo.logicalLibrary=mountInfo.logicalLibrary; - driveInfo.host=mountInfo.host; - ReportDriveStatusInputs inputs; - inputs.mountType = common::dataStructures::MountType::NoMount; - inputs.mountSessionId = mountInfo.mountId; - inputs.reportTime = completionTime; - inputs.status = common::dataStructures::DriveStatus::Up; - inputs.vid = mountInfo.vid; - inputs.tapepool = mountInfo.tapePool; - inputs.vo = mountInfo.vo; - log::LogContext lc(m_oStoreDB.m_logger); - m_oStoreDB.m_tapeDrivesState->updateDriveStatus(driveInfo, inputs, lc); -} - //------------------------------------------------------------------------------ // OStoreDB::ArchiveJob::ArchiveJob() //------------------------------------------------------------------------------ @@ -3595,33 +3572,13 @@ bool OStoreDB::RetrieveMount::reserveDiskSpace(const cta::DiskSpaceReservationRe return true; } -//------------------------------------------------------------------------------ -// OStoreDB::RetrieveMount::complete() -//------------------------------------------------------------------------------ -void OStoreDB::RetrieveMount::complete(time_t completionTime) { - // When the session is complete, we can reset the status of the tape and the - // drive - // Reset the drive state. - common::dataStructures::DriveInfo driveInfo; - driveInfo.driveName=mountInfo.drive; - driveInfo.logicalLibrary=mountInfo.logicalLibrary; - driveInfo.host=mountInfo.host; - ReportDriveStatusInputs inputs; - inputs.mountType = common::dataStructures::MountType::NoMount; - inputs.mountSessionId = mountInfo.mountId; - inputs.reportTime = completionTime; - inputs.status = common::dataStructures::DriveStatus::Up; - inputs.vid = mountInfo.vid; - inputs.tapepool = mountInfo.tapePool; - inputs.vo = mountInfo.vo; - log::LogContext lc(m_oStoreDB.m_logger); - m_oStoreDB.m_tapeDrivesState->updateDriveStatus(driveInfo, inputs, lc); -} - //------------------------------------------------------------------------------ // OStoreDB::RetrieveMount::setDriveStatus() //------------------------------------------------------------------------------ -void OStoreDB::RetrieveMount::setDriveStatus(cta::common::dataStructures::DriveStatus status, time_t completionTime, const std::optional<std::string> & reason) { +void OStoreDB::RetrieveMount::setDriveStatus(cta::common::dataStructures::DriveStatus status, + cta::common::dataStructures::MountType mountType, + time_t completionTime, + const std::optional<std::string>& reason) { // We just report the drive status as instructed by the tape thread. // Reset the drive state. common::dataStructures::DriveInfo driveInfo; @@ -3629,7 +3586,7 @@ void OStoreDB::RetrieveMount::setDriveStatus(cta::common::dataStructures::DriveS driveInfo.logicalLibrary=mountInfo.logicalLibrary; driveInfo.host=mountInfo.host; ReportDriveStatusInputs inputs; - inputs.mountType = common::dataStructures::MountType::Retrieve; + inputs.mountType = mountType; inputs.mountSessionId = mountInfo.mountId; inputs.reportTime = completionTime; inputs.status = status; @@ -3837,8 +3794,10 @@ void OStoreDB::RetrieveMount::flushAsyncSuccessReports(std::list<cta::SchedulerD //------------------------------------------------------------------------------ // OStoreDB::ArchiveMount::setDriveStatus() //------------------------------------------------------------------------------ -void OStoreDB::ArchiveMount::setDriveStatus(cta::common::dataStructures::DriveStatus status, time_t completionTime, - const std::optional<std::string> & reason) { +void OStoreDB::ArchiveMount::setDriveStatus(cta::common::dataStructures::DriveStatus status, + cta::common::dataStructures::MountType mountType, + time_t completionTime, + const std::optional<std::string>& reason) { // We just report the drive status as instructed by the tape thread. // Reset the drive state. common::dataStructures::DriveInfo driveInfo; @@ -3846,8 +3805,7 @@ void OStoreDB::ArchiveMount::setDriveStatus(cta::common::dataStructures::DriveSt driveInfo.logicalLibrary = mountInfo.logicalLibrary; driveInfo.host = mountInfo.host; ReportDriveStatusInputs inputs; - inputs.mountType = m_queueType == common::dataStructures::JobQueueType::JobsToTransferForUser - ? common::dataStructures::MountType::ArchiveForUser : common::dataStructures::MountType::ArchiveForRepack; + inputs.mountType = mountType; inputs.mountSessionId = mountInfo.mountId; inputs.reportTime = completionTime; inputs.status = status; diff --git a/scheduler/OStoreDB/OStoreDB.hpp b/scheduler/OStoreDB/OStoreDB.hpp index 96688c41f86fd26edd44f405f99be6f71218fe03..9ece54e18492d4a037e64ecf9ef2908639f9e031 100644 --- a/scheduler/OStoreDB/OStoreDB.hpp +++ b/scheduler/OStoreDB/OStoreDB.hpp @@ -251,9 +251,10 @@ class OStoreDB: public SchedulerDatabase { const MountInfo & getMountInfo() override; std::list<std::unique_ptr<SchedulerDatabase::ArchiveJob>> getNextJobBatch(uint64_t filesRequested, uint64_t bytesRequested, log::LogContext &logContext) override; - void complete(time_t completionTime) override; - void setDriveStatus(cta::common::dataStructures::DriveStatus status, time_t completionTime, - const std::optional<std::string> & reason = std::nullopt) override; + + void + setDriveStatus(cta::common::dataStructures::DriveStatus status, cta::common::dataStructures::MountType mountType, + time_t completionTime, const std::optional<std::string>& reason = std::nullopt) override; void setTapeSessionStats(const castor::tape::tapeserver::daemon::TapeSessionStats &stats) override; public: void setJobBatchTransferred( @@ -321,10 +322,9 @@ class OStoreDB: public SchedulerDatabase { std::set<DiskSystemToSkip> m_diskSystemsToSkip; public: - /// Public but non overriding function used by retrieve jobs (on failure to transfer): - void complete(time_t completionTime) override; - void setDriveStatus(cta::common::dataStructures::DriveStatus status, time_t completionTime, - const std::optional<std::string> & reason = std::nullopt) override; + void + setDriveStatus(cta::common::dataStructures::DriveStatus status, cta::common::dataStructures::MountType mountType, + time_t completionTime, const std::optional<std::string>& reason = std::nullopt) override; void setTapeSessionStats(const castor::tape::tapeserver::daemon::TapeSessionStats &stats) override; void addDiskSystemToSkip(const SchedulerDatabase::RetrieveMount::DiskSystemToSkip &diskSystemToSkip) override; void flushAsyncSuccessReports(std::list<cta::SchedulerDatabase::RetrieveJob*>& jobsBatch, log::LogContext& lc) override; diff --git a/scheduler/RetrieveMount.cpp b/scheduler/RetrieveMount.cpp index ceb8b31ebef02d422ae7b64b8aa0d229c4bd8bf8..aa62be23087d8cb5c0b2dc713701735810d7d187 100644 --- a/scheduler/RetrieveMount.cpp +++ b/scheduler/RetrieveMount.cpp @@ -304,14 +304,8 @@ cta::disk::DiskReporter* cta::RetrieveMount::createDiskReporter(std::string& URL void cta::RetrieveMount::tapeComplete() { m_tapeRunning = false; if (!m_diskRunning) { - // Just set the session as complete in the DB. - m_dbMount->complete(time(NULL)); - // and record we are done with the mount + // Just record we are done with the mount m_sessionRunning = false; - } else { - // This is a special case: we have to report the tape server is draining - // its memory to disk - setDriveStatus(cta::common::dataStructures::DriveStatus::DrainingToDisk); } } @@ -321,17 +315,15 @@ void cta::RetrieveMount::tapeComplete() { void cta::RetrieveMount::diskComplete() { m_diskRunning = false; if (!m_tapeRunning) { - // Just set the session as complete in the DB. - m_dbMount->complete(time(NULL)); - // and record we are done with the mount + // Just record we are done with the mount m_sessionRunning = false; } } //------------------------------------------------------------------------------ -// abort() +// complete() //------------------------------------------------------------------------------ -void cta::RetrieveMount::abort(const std::string&) { +void cta::RetrieveMount::complete() { diskComplete(); tapeComplete(); } @@ -340,7 +332,7 @@ void cta::RetrieveMount::abort(const std::string&) { // setDriveStatus() //------------------------------------------------------------------------------ void cta::RetrieveMount::setDriveStatus(cta::common::dataStructures::DriveStatus status, const std::optional<std::string> & reason) { - m_dbMount->setDriveStatus(status, time(NULL), reason); + m_dbMount->setDriveStatus(status, getMountType(), time(nullptr), reason); } //------------------------------------------------------------------------------ diff --git a/scheduler/RetrieveMount.hpp b/scheduler/RetrieveMount.hpp index a75fd29d8181e76ccbc1c0c909b47f89fca50001..0f5acd8d5600eff9e843b825622b6844dfe70d60 100644 --- a/scheduler/RetrieveMount.hpp +++ b/scheduler/RetrieveMount.hpp @@ -168,7 +168,7 @@ namespace cta { * Indicates that the we should cancel the mount (equivalent to diskComplete * + tapeComeplete). */ - virtual void abort(const std::string& reason); + virtual void complete(); /** * Tests whether all threads are complete diff --git a/scheduler/Scheduler.cpp b/scheduler/Scheduler.cpp index bbac0f37aecfec4c59e07e4d1a8a6997ffc9be9a..cebafd6df9a0da3c5605170d0d96ab4bdbb5595c 100644 --- a/scheduler/Scheduler.cpp +++ b/scheduler/Scheduler.cpp @@ -207,7 +207,7 @@ std::string Scheduler::queueRetrieve( // in situations where files with multiple tape copies are being recalled. { - const time_t secondsSinceEpoch = time(NULL); + const time_t secondsSinceEpoch = time(nullptr); const uint64_t daysSinceEpoch = secondsSinceEpoch / (60*60*24); const uint64_t indexToChose = daysSinceEpoch % queueCriteria.archiveFile.tapeFiles.size(); @@ -900,10 +900,10 @@ void Scheduler::reportDriveConfig(const cta::tape::daemon::TpconfigLine& tpConfi // reportDriveStatus //------------------------------------------------------------------------------ void Scheduler::reportDriveStatus(const common::dataStructures::DriveInfo& driveInfo, - common::dataStructures::MountType type, common::dataStructures::DriveStatus status, log::LogContext & lc) { - // TODO: mount type should be transmitted too. + common::dataStructures::MountType type, common::dataStructures::DriveStatus status, + log::LogContext& lc) { utils::Timer t; - m_tapeDrivesState->reportDriveStatus(driveInfo, type, status, time(NULL), lc); + m_tapeDrivesState->reportDriveStatus(driveInfo, type, status, time(nullptr), lc); auto schedulerDbTime = t.secs(); if (schedulerDbTime > 1) { log::ScopedParamContainer spc(lc); @@ -1136,7 +1136,7 @@ void Scheduler::sortAndGetTapesForMountInfo(std::unique_ptr<SchedulerDatabase::T mountPassesACriteria = true; if (m->filesQueued / (1 + effectiveExistingMountsForThisTapepool) >= minFilesToWarrantAMount) mountPassesACriteria = true; - if (!effectiveExistingMountsForThisTapepool && ((time(NULL) - m->oldestJobStartTime) > m->minRequestAge)) + if (!effectiveExistingMountsForThisTapepool && ((time(nullptr) - m->oldestJobStartTime) > m->minRequestAge)) mountPassesACriteria = true; if (m->sleepingMount) { sleepingMount = true; @@ -1161,8 +1161,8 @@ void Scheduler::sortAndGetTapesForMountInfo(std::unique_ptr<SchedulerDatabase::T .add("minBytesToWarrantMount", minBytesToWarrantAMount) .add("filesQueued", m->filesQueued) .add("minFilesToWarrantMount", minFilesToWarrantAMount) - .add("oldestJobAge", time(NULL) - m->oldestJobStartTime) - .add("youngestJobAge", time(NULL) - m->youngestJobStartTime) + .add("oldestJobAge", time(nullptr) - m->oldestJobStartTime) + .add("youngestJobAge", time(nullptr) - m->youngestJobStartTime) .add("minArchiveRequestAge", m->minRequestAge) .add("voReadMaxDrives",voOfThisPotentialMount.readMaxDrives) .add("voWriteMaxDrives",voOfThisPotentialMount.writeMaxDrives) @@ -1194,8 +1194,8 @@ void Scheduler::sortAndGetTapesForMountInfo(std::unique_ptr<SchedulerDatabase::T .add("minBytesToWarrantMount", m_minBytesToWarrantAMount) .add("filesQueued", m->filesQueued) .add("minFilesToWarrantMount", m_minFilesToWarrantAMount) - .add("oldestJobAge", time(NULL) - m->oldestJobStartTime) - .add("youngestJobAge", time(NULL) - m->youngestJobStartTime) + .add("oldestJobAge", time(nullptr) - m->oldestJobStartTime) + .add("youngestJobAge", time(nullptr) - m->youngestJobStartTime) .add("minArchiveRequestAge", m->minRequestAge) .add("maxDrives", maxDrives) .add("voReadMaxDrives",voOfThisPotentialMount.readMaxDrives) @@ -1373,8 +1373,8 @@ bool Scheduler::getNextMountDryRun(const std::string& logicalLibraryName, const .add("minBytesToWarrantMount", m_minBytesToWarrantAMount) .add("filesQueued", m->filesQueued) .add("minFilesToWarrantMount", m_minFilesToWarrantAMount) - .add("oldestJobAge", time(NULL) - m->oldestJobStartTime) - .add("youngestJobAge", time(NULL) - m->youngestJobStartTime) + .add("oldestJobAge", time(nullptr) - m->oldestJobStartTime) + .add("youngestJobAge", time(nullptr) - m->youngestJobStartTime) .add("minArchiveRequestAge", m->minRequestAge) .add("getMountInfoTime", getMountInfoTime) .add("getTapeInfoTime", getTapeInfoTime) @@ -1416,8 +1416,8 @@ bool Scheduler::getNextMountDryRun(const std::string& logicalLibraryName, const .add("minBytesToWarrantMount", m_minBytesToWarrantAMount) .add("filesQueued", m->filesQueued) .add("minFilesToWarrantMount", m_minFilesToWarrantAMount) - .add("oldestJobAge", time(NULL) - m->oldestJobStartTime) - .add("youngestJobAge", time(NULL) - m->youngestJobStartTime) + .add("oldestJobAge", time(nullptr) - m->oldestJobStartTime) + .add("youngestJobAge", time(nullptr) - m->youngestJobStartTime) .add("minArchiveRequestAge", m->minRequestAge) .add("getMountInfoTime", getMountInfoTime) .add("getTapeInfoTime", getTapeInfoTime) @@ -1566,8 +1566,8 @@ auto logicalLibrary = getLogicalLibrary(logicalLibraryName,getLogicalLibrariesTi .add("minBytesToWarrantMount", m_minBytesToWarrantAMount) .add("filesQueued", m->filesQueued) .add("minFilesToWarrantMount", m_minFilesToWarrantAMount) - .add("oldestJobAge", time(NULL) - m->oldestJobStartTime) - .add("youngestJobAge", time(NULL) - m->youngestJobStartTime) + .add("oldestJobAge", time(nullptr) - m->oldestJobStartTime) + .add("youngestJobAge", time(nullptr) - m->youngestJobStartTime) .add("minArchiveRequestAge", m->minRequestAge) .add("getMountInfoTime", getMountInfoTime) .add("queueTrimingTime", queueTrimingTime) @@ -1645,8 +1645,8 @@ auto logicalLibrary = getLogicalLibrary(logicalLibraryName,getLogicalLibrariesTi .add("minBytesToWarrantMount", m_minBytesToWarrantAMount) .add("filesQueued", m->filesQueued) .add("minFilesToWarrantMount", m_minFilesToWarrantAMount) - .add("oldestJobAge", time(NULL) - m->oldestJobStartTime) - .add("youngestJobAge", time(NULL) - m->youngestJobStartTime) + .add("oldestJobAge", time(nullptr) - m->oldestJobStartTime) + .add("youngestJobAge", time(nullptr) - m->youngestJobStartTime) .add("minArchiveRequestAge", m->minRequestAge) .add("getMountInfoTime", getMountInfoTime) .add("queueTrimingTime", queueTrimingTime) diff --git a/scheduler/SchedulerDatabase.hpp b/scheduler/SchedulerDatabase.hpp index 627ad90f37e1a1c3e164a299f0a534d8a8a9cfe5..178c90a05351dfd2f96010c7c1b5abee6178c9df 100644 --- a/scheduler/SchedulerDatabase.hpp +++ b/scheduler/SchedulerDatabase.hpp @@ -197,9 +197,10 @@ class SchedulerDatabase { virtual const MountInfo & getMountInfo() = 0; virtual std::list<std::unique_ptr<ArchiveJob>> getNextJobBatch(uint64_t filesRequested, uint64_t bytesRequested, log::LogContext& logContext) = 0; - virtual void complete(time_t completionTime) = 0; - virtual void setDriveStatus(common::dataStructures::DriveStatus status, time_t completionTime, - const std::optional<std::string> & reason = std::nullopt) = 0; + + virtual void setDriveStatus(common::dataStructures::DriveStatus status, common::dataStructures::MountType mountType, + time_t completionTime, const std::optional<std::string>& reason = std::nullopt) = 0; + virtual void setTapeSessionStats(const castor::tape::tapeserver::daemon::TapeSessionStats &stats) = 0; virtual void setJobBatchTransferred( std::list<std::unique_ptr<cta::SchedulerDatabase::ArchiveJob>> & jobsBatch, log::LogContext & lc) = 0; @@ -438,10 +439,10 @@ class SchedulerDatabase { const std::string &fetchEosFreeSpaceScript, log::LogContext& logContext) = 0; virtual void requeueJobBatch(std::list<std::unique_ptr<SchedulerDatabase::RetrieveJob>>& jobBatch, log::LogContext& logContext) = 0; - - virtual void complete(time_t completionTime) = 0; - virtual void setDriveStatus(common::dataStructures::DriveStatus status, time_t completionTime, - const std::optional<std::string> & reason = std::nullopt) = 0; + + virtual void setDriveStatus(common::dataStructures::DriveStatus status, common::dataStructures::MountType mountType, + time_t completionTime, const std::optional<std::string> & reason = std::nullopt) = 0; + virtual void setTapeSessionStats(const castor::tape::tapeserver::daemon::TapeSessionStats &stats) = 0; virtual void flushAsyncSuccessReports(std::list<cta::SchedulerDatabase::RetrieveJob *> & jobsBatch, log::LogContext & lc) = 0; diff --git a/scheduler/SchedulerDatabaseTest.cpp b/scheduler/SchedulerDatabaseTest.cpp index 2a9e5b5c4d45e521f5c0008948f0f4c5d0f03088..57496c8a60d6f2c10f1e4c334ea8553b4a7fae15 100644 --- a/scheduler/SchedulerDatabaseTest.cpp +++ b/scheduler/SchedulerDatabaseTest.cpp @@ -231,7 +231,6 @@ TEST_P(SchedulerDatabaseTest, createManyArchiveJobs) { } #endif ASSERT_EQ(filesToDo, count); - am->complete(time(nullptr)); am.reset(nullptr); moutInfo.reset(nullptr); #ifdef LOOPING_TEST @@ -307,7 +306,6 @@ TEST_P(SchedulerDatabaseTest, createManyArchiveJobs) { done = true; } ASSERT_EQ(filesToDo2, count); - am->complete(time(nullptr)); am.reset(nullptr); moutInfo.reset(nullptr); } @@ -525,7 +523,6 @@ TEST_P(SchedulerDatabaseTest, popAndRequeueArchiveRequests) { ASSERT_EQ(aj.request.creationLog.time, creationTime); } - am->complete(time(nullptr)); am.reset(nullptr); moutInfo.reset(nullptr); } @@ -617,7 +614,6 @@ TEST_P(SchedulerDatabaseTest, popAndRequeueRetrieveRequests) { } } - rm->complete(time(nullptr)); rm.reset(nullptr); mountInfo.reset(nullptr); } @@ -707,7 +703,6 @@ TEST_P(SchedulerDatabaseTest, popRetrieveRequestsWithDisksytem) { rm->flushAsyncSuccessReports(jobBatch, lc); rjb.clear(); ASSERT_EQ(0, rm->getNextJobBatch(20,20*1000, lc).size()); - rm->complete(time(nullptr)); rm.reset(nullptr); moutInfo.reset(nullptr); } @@ -800,7 +795,6 @@ TEST_P(SchedulerDatabaseTest, popRetrieveRequestsWithBackpressure) { //reserving disk space will fail (not enough disk space, backpressure is triggered) ASSERT_FALSE(rm->reserveDiskSpace(reservationRequest, "", lc)); } - rm->complete(time(nullptr)); rm.reset(nullptr); moutInfo.reset(nullptr); auto mi = db.getMountInfoNoLock(cta::SchedulerDatabase::PurposeGetMountInfo::GET_NEXT_MOUNT,lc); @@ -892,7 +886,6 @@ TEST_P(SchedulerDatabaseTest, popRetrieveRequestsWithDiskSystemNotFetcheable) { //reserving disk space will fail because the disk instance is not reachable, causing backpressure ASSERT_FALSE(rm->reserveDiskSpace(reservationRequest, "", lc)); } - rm->complete(time(nullptr)); rm.reset(nullptr); mountInfo.reset(nullptr); auto mi = db.getMountInfoNoLock(cta::SchedulerDatabase::PurposeGetMountInfo::GET_NEXT_MOUNT,lc); diff --git a/scheduler/SchedulerTest.cpp b/scheduler/SchedulerTest.cpp index 55f85ada7cc2353624e759ce8a333725bba07cae..6de3a27c3d8d4777371f1747e3884cf2d47a1be2 100644 --- a/scheduler/SchedulerTest.cpp +++ b/scheduler/SchedulerTest.cpp @@ -86,21 +86,21 @@ public: class FailedToGetCatalogue: public std::exception { public: - const char *what() const throw() { + const char *what() const noexcept { return "Failed to get catalogue"; } }; class FailedToGetScheduler: public std::exception { public: - const char *what() const throw() { + const char *what() const noexcept { return "Failed to get scheduler"; } }; class FailedToGetSchedulerDB: public std::exception { public: - const char *what() const throw() { + const char *what() const noexcept { return "Failed to get object store db."; } }; @@ -515,8 +515,7 @@ TEST_P(SchedulerTest, archive_report_and_retrieve_new_file) { } const std::string driveName = "tape_drive"; - - catalogue.tapeLabelled(s_vid, "tape_drive"); + catalogue.tapeLabelled(s_vid, driveName); { // Emulate a tape server by asking for a mount and then a file (and succeed the transfer) @@ -525,12 +524,12 @@ TEST_P(SchedulerTest, archive_report_and_retrieve_new_file) { cta::common::dataStructures::DriveInfo driveInfo = { driveName, "myHost", s_libraryName }; scheduler.reportDriveStatus(driveInfo, cta::common::dataStructures::MountType::NoMount, cta::common::dataStructures::DriveStatus::Down, lc); scheduler.reportDriveStatus(driveInfo, cta::common::dataStructures::MountType::NoMount, cta::common::dataStructures::DriveStatus::Up, lc); - mount.reset(scheduler.getNextMount(s_libraryName, "drive0", lc).release()); + mount.reset(scheduler.getNextMount(s_libraryName, driveName, lc).release()); //Test that no mount is available when a logical library is disabled ASSERT_EQ(nullptr, mount.get()); catalogue.setLogicalLibraryDisabled(s_adminOnAdminHost,s_libraryName,false); //continue our test - mount.reset(scheduler.getNextMount(s_libraryName, "drive0", lc).release()); + mount.reset(scheduler.getNextMount(s_libraryName, driveName, lc).release()); ASSERT_NE(nullptr, mount.get()); ASSERT_EQ(cta::common::dataStructures::MountType::ArchiveForUser, mount.get()->getMountType()); auto & osdb=getSchedulerDB(); @@ -561,7 +560,7 @@ TEST_P(SchedulerTest, archive_report_and_retrieve_new_file) { } { - // Emulate the the reporter process reporting successful transfer to tape to the disk system + // Emulate the reporter process reporting successful transfer to tape to the disk system auto jobsToReport = scheduler.getNextArchiveJobsToReportBatch(10, lc); ASSERT_NE(0, jobsToReport.size()); disk::DiskReporterFactory factory; @@ -625,7 +624,7 @@ TEST_P(SchedulerTest, archive_report_and_retrieve_new_file) { { // Emulate a tape server by asking for a mount and then a file (and succeed the transfer) std::unique_ptr<cta::TapeMount> mount; - mount.reset(scheduler.getNextMount(s_libraryName, "drive0", lc).release()); + mount.reset(scheduler.getNextMount(s_libraryName, driveName, lc).release()); ASSERT_NE(nullptr, mount.get()); ASSERT_EQ(cta::common::dataStructures::MountType::Retrieve, mount.get()->getMountType()); std::unique_ptr<cta::RetrieveMount> retrieveMount; @@ -1068,12 +1067,12 @@ TEST_P(SchedulerTest, archive_report_and_retrieve_new_dual_copy_file) { scheduler.reportDriveStatus(driveInfo, cta::common::dataStructures::MountType::NoMount, cta::common::dataStructures::DriveStatus::Down, lc); scheduler.reportDriveStatus(driveInfo, cta::common::dataStructures::MountType::NoMount, cta::common::dataStructures::DriveStatus::Up, lc); - mount.reset(scheduler.getNextMount(s_libraryName, "drive0", lc).release()); + mount.reset(scheduler.getNextMount(s_libraryName, driveName, lc).release()); //Test that no mount is available when a logical library is disabled ASSERT_EQ(nullptr, mount.get()); catalogue.setLogicalLibraryDisabled(s_adminOnAdminHost,s_libraryName,false); //continue our test - mount.reset(scheduler.getNextMount(s_libraryName, "drive0", lc).release()); + mount.reset(scheduler.getNextMount(s_libraryName, driveName, lc).release()); ASSERT_NE(nullptr, mount.get()); ASSERT_EQ(cta::common::dataStructures::MountType::ArchiveForUser, mount.get()->getMountType()); auto & osdb=getSchedulerDB(); @@ -1139,12 +1138,12 @@ TEST_P(SchedulerTest, archive_report_and_retrieve_new_dual_copy_file) { cta::common::dataStructures::DriveInfo driveInfo = { driveName, "myHost", s_libraryName }; scheduler.reportDriveStatus(driveInfo, cta::common::dataStructures::MountType::NoMount, cta::common::dataStructures::DriveStatus::Down, lc); scheduler.reportDriveStatus(driveInfo, cta::common::dataStructures::MountType::NoMount, cta::common::dataStructures::DriveStatus::Up, lc); - mount.reset(scheduler.getNextMount(s_libraryName, "drive0", lc).release()); + mount.reset(scheduler.getNextMount(s_libraryName, driveName, lc).release()); //Test that no mount is available when a logical library is disabled ASSERT_EQ(nullptr, mount.get()); catalogue.setLogicalLibraryDisabled(s_adminOnAdminHost,s_libraryName,false); //continue our test - mount.reset(scheduler.getNextMount(s_libraryName, "drive0", lc).release()); + mount.reset(scheduler.getNextMount(s_libraryName, driveName, lc).release()); ASSERT_NE(nullptr, mount.get()); ASSERT_EQ(cta::common::dataStructures::MountType::ArchiveForUser, mount.get()->getMountType()); auto & osdb=getSchedulerDB(); @@ -1175,7 +1174,7 @@ TEST_P(SchedulerTest, archive_report_and_retrieve_new_dual_copy_file) { } { - // Emulate the the reporter process reporting successful transfer to tape to the disk system + // Emulate the reporter process reporting successful transfer to tape to the disk system auto jobsToReport = scheduler.getNextArchiveJobsToReportBatch(10, lc); ASSERT_NE(0, jobsToReport.size()); disk::DiskReporterFactory factory; @@ -1273,7 +1272,7 @@ TEST_P(SchedulerTest, archive_report_and_retrieve_new_dual_copy_file) { { // Emulate a tape server by asking for a mount and then a file (and succeed the transfer) std::unique_ptr<cta::TapeMount> mount; - mount.reset(scheduler.getNextMount(s_libraryName, "drive0", lc).release()); + mount.reset(scheduler.getNextMount(s_libraryName, driveName, lc).release()); ASSERT_NE(nullptr, mount.get()); ASSERT_EQ(cta::common::dataStructures::MountType::Retrieve, mount.get()->getMountType()); std::unique_ptr<cta::RetrieveMount> retrieveMount; @@ -1367,7 +1366,7 @@ TEST_P(SchedulerTest, archive_and_retrieve_failure) { const std::string driveName = "tape_drive"; - catalogue.tapeLabelled(s_vid, "tape_drive"); + catalogue.tapeLabelled(s_vid, driveName); { // Emulate a tape server by asking for a mount and then a file (and succeed the transfer) @@ -1376,7 +1375,7 @@ TEST_P(SchedulerTest, archive_and_retrieve_failure) { cta::common::dataStructures::DriveInfo driveInfo = { driveName, "myHost", s_libraryName }; scheduler.reportDriveStatus(driveInfo, cta::common::dataStructures::MountType::NoMount, cta::common::dataStructures::DriveStatus::Down, lc); scheduler.reportDriveStatus(driveInfo, cta::common::dataStructures::MountType::NoMount, cta::common::dataStructures::DriveStatus::Up, lc); - mount.reset(scheduler.getNextMount(s_libraryName, "drive0", lc).release()); + mount.reset(scheduler.getNextMount(s_libraryName, driveName, lc).release()); ASSERT_NE(nullptr, mount.get()); ASSERT_EQ(cta::common::dataStructures::MountType::ArchiveForUser, mount.get()->getMountType()); auto & osdb=getSchedulerDB(); @@ -1475,7 +1474,7 @@ TEST_P(SchedulerTest, archive_and_retrieve_failure) { { // Emulate a tape server by asking for a mount and then a file std::unique_ptr<cta::TapeMount> mount; - mount.reset(scheduler.getNextMount(s_libraryName, "drive0", lc).release()); + mount.reset(scheduler.getNextMount(s_libraryName, driveName, lc).release()); ASSERT_NE(nullptr, mount.get()); ASSERT_EQ(cta::common::dataStructures::MountType::Retrieve, mount.get()->getMountType()); std::unique_ptr<cta::RetrieveMount> retrieveMount; @@ -1525,7 +1524,7 @@ TEST_P(SchedulerTest, archive_and_retrieve_failure) { // The failed queue should be empty auto retrieveJobFailedList = scheduler.getNextRetrieveJobsFailedBatch(10,lc); ASSERT_EQ(0, retrieveJobFailedList.size()); - // Emulate the the reporter process + // Emulate the reporter process auto jobsToReport = scheduler.getNextRetrieveJobsToReportBatch(10, lc); ASSERT_EQ(1, jobsToReport.size()); disk::DiskReporterFactory factory; @@ -1616,7 +1615,7 @@ TEST_P(SchedulerTest, archive_and_retrieve_report_failure) { const std::string driveName = "tape_drive"; - catalogue.tapeLabelled(s_vid, "tape_drive"); + catalogue.tapeLabelled(s_vid, driveName); { // Emulate a tape server by asking for a mount and then a file (and succeed the transfer) @@ -1625,7 +1624,7 @@ TEST_P(SchedulerTest, archive_and_retrieve_report_failure) { cta::common::dataStructures::DriveInfo driveInfo = { driveName, "myHost", s_libraryName }; scheduler.reportDriveStatus(driveInfo, cta::common::dataStructures::MountType::NoMount, cta::common::dataStructures::DriveStatus::Down, lc); scheduler.reportDriveStatus(driveInfo, cta::common::dataStructures::MountType::NoMount, cta::common::dataStructures::DriveStatus::Up, lc); - mount.reset(scheduler.getNextMount(s_libraryName, "drive0", lc).release()); + mount.reset(scheduler.getNextMount(s_libraryName, driveName, lc).release()); ASSERT_NE(nullptr, mount.get()); ASSERT_EQ(cta::common::dataStructures::MountType::ArchiveForUser, mount.get()->getMountType()); auto & osdb=getSchedulerDB(); @@ -1656,7 +1655,7 @@ TEST_P(SchedulerTest, archive_and_retrieve_report_failure) { } { - // Emulate the the reporter process reporting successful transfer to tape to the disk system + // Emulate the reporter process reporting successful transfer to tape to the disk system auto jobsToReport = scheduler.getNextArchiveJobsToReportBatch(10, lc); ASSERT_NE(0, jobsToReport.size()); disk::DiskReporterFactory factory; @@ -1724,7 +1723,7 @@ TEST_P(SchedulerTest, archive_and_retrieve_report_failure) { { // Emulate a tape server by asking for a mount and then a file std::unique_ptr<cta::TapeMount> mount; - mount.reset(scheduler.getNextMount(s_libraryName, "drive0", lc).release()); + mount.reset(scheduler.getNextMount(s_libraryName, driveName, lc).release()); ASSERT_NE(nullptr, mount.get()); ASSERT_EQ(cta::common::dataStructures::MountType::Retrieve, mount.get()->getMountType()); std::unique_ptr<cta::RetrieveMount> retrieveMount; @@ -1841,7 +1840,7 @@ TEST_P(SchedulerTest, retry_archive_until_max_reached) { scheduler.waitSchedulerDbSubthreadsComplete(); // Create the environment for the migration to happen (library + tape) - const std::string libraryComment = "Library comment"; + const std::string libraryComment = "Library comment"; const bool libraryIsDisabled = false; catalogue.createLogicalLibrary(s_adminOnAdminHost, s_libraryName, libraryIsDisabled, libraryComment); @@ -1857,12 +1856,13 @@ TEST_P(SchedulerTest, retry_archive_until_max_reached) { catalogue.createTape(s_adminOnAdminHost, tape); } - catalogue.tapeLabelled(s_vid, "tape_drive"); + const std::string driveName = "tape_drive"; + catalogue.tapeLabelled(s_vid, driveName); { // Emulate a tape server by asking for a mount and then a file std::unique_ptr<cta::TapeMount> mount; - mount.reset(scheduler.getNextMount(s_libraryName, "drive0", lc).release()); + mount.reset(scheduler.getNextMount(s_libraryName, driveName, lc).release()); ASSERT_NE(nullptr, mount.get()); ASSERT_EQ(cta::common::dataStructures::MountType::ArchiveForUser, mount.get()->getMountType()); std::unique_ptr<cta::ArchiveMount> archiveMount; @@ -2266,13 +2266,15 @@ TEST_P(SchedulerTest, expandRepackRequest) { } scheduler.waitSchedulerDbSubthreadsComplete(); + const std::string driveName = "tape_drive"; + //Now, we need to simulate a retrieve for each file { // Emulate a tape server by asking for nbTapesForTest mount and then all files for(uint64_t i = 1; i<= nbTapesForTest ;++i) { std::unique_ptr<cta::TapeMount> mount; - mount.reset(scheduler.getNextMount(s_libraryName, "drive0", lc).release()); + mount.reset(scheduler.getNextMount(s_libraryName, driveName, lc).release()); ASSERT_NE(nullptr, mount.get()); ASSERT_EQ(cta::common::dataStructures::MountType::Retrieve, mount.get()->getMountType()); std::unique_ptr<cta::RetrieveMount> retrieveMount; @@ -2551,9 +2553,10 @@ TEST_P(SchedulerTest, expandRepackRequestRetrieveFailed) { scheduler.waitSchedulerDbSubthreadsComplete(); } + const std::string driveName = "tape_drive"; { std::unique_ptr<cta::TapeMount> mount; - mount.reset(scheduler.getNextMount(s_libraryName, "drive0", lc).release()); + mount.reset(scheduler.getNextMount(s_libraryName, driveName, lc).release()); ASSERT_NE(nullptr, mount.get()); ASSERT_EQ(cta::common::dataStructures::MountType::Retrieve, mount.get()->getMountType()); std::unique_ptr<cta::RetrieveMount> retrieveMount; @@ -2600,7 +2603,7 @@ TEST_P(SchedulerTest, expandRepackRequestRetrieveFailed) { { for(int i = 0; i < 5; ++i){ std::unique_ptr<cta::TapeMount> mount; - mount.reset(scheduler.getNextMount(s_libraryName, "drive0", lc).release()); + mount.reset(scheduler.getNextMount(s_libraryName, driveName, lc).release()); ASSERT_NE(nullptr, mount.get()); ASSERT_EQ(cta::common::dataStructures::MountType::Retrieve, mount.get()->getMountType()); std::unique_ptr<cta::RetrieveMount> retrieveMount; @@ -2796,9 +2799,10 @@ TEST_P(SchedulerTest, expandRepackRequestArchiveSuccess) { scheduler.expandRepackRequest(repackRequestToExpand,tl,t,lc); scheduler.waitSchedulerDbSubthreadsComplete(); } + const std::string driveName = "tape_drive"; { std::unique_ptr<cta::TapeMount> mount; - mount.reset(scheduler.getNextMount(s_libraryName, "drive0", lc).release()); + mount.reset(scheduler.getNextMount(s_libraryName, driveName, lc).release()); ASSERT_NE(nullptr, mount.get()); ASSERT_EQ(cta::common::dataStructures::MountType::Retrieve, mount.get()->getMountType()); std::unique_ptr<cta::RetrieveMount> retrieveMount; @@ -2848,7 +2852,7 @@ TEST_P(SchedulerTest, expandRepackRequestArchiveSuccess) { { scheduler.waitSchedulerDbSubthreadsComplete(); std::unique_ptr<cta::TapeMount> mount; - mount.reset(scheduler.getNextMount(s_libraryName, "drive0", lc).release()); + mount.reset(scheduler.getNextMount(s_libraryName, driveName, lc).release()); ASSERT_NE(nullptr, mount.get()); ASSERT_EQ(cta::common::dataStructures::MountType::ArchiveForRepack, mount.get()->getMountType()); @@ -3047,9 +3051,10 @@ TEST_P(SchedulerTest, expandRepackRequestArchiveFailed) { scheduler.expandRepackRequest(repackRequestToExpand,tl,t,lc); scheduler.waitSchedulerDbSubthreadsComplete(); } + const std::string driveName = "tape_drive"; { std::unique_ptr<cta::TapeMount> mount; - mount.reset(scheduler.getNextMount(s_libraryName, "drive0", lc).release()); + mount.reset(scheduler.getNextMount(s_libraryName, driveName, lc).release()); ASSERT_NE(nullptr, mount.get()); ASSERT_EQ(cta::common::dataStructures::MountType::Retrieve, mount.get()->getMountType()); std::unique_ptr<cta::RetrieveMount> retrieveMount; @@ -3099,7 +3104,7 @@ TEST_P(SchedulerTest, expandRepackRequestArchiveFailed) { { scheduler.waitSchedulerDbSubthreadsComplete(); std::unique_ptr<cta::TapeMount> mount; - mount.reset(scheduler.getNextMount(s_libraryName, "drive0", lc).release()); + mount.reset(scheduler.getNextMount(s_libraryName, driveName, lc).release()); ASSERT_NE(nullptr, mount.get()); ASSERT_EQ(cta::common::dataStructures::MountType::ArchiveForRepack, mount.get()->getMountType()); @@ -3171,7 +3176,7 @@ TEST_P(SchedulerTest, expandRepackRequestArchiveFailed) { } } std::unique_ptr<cta::TapeMount> mount; - mount.reset(scheduler.getNextMount(s_libraryName, "drive0", lc).release()); + mount.reset(scheduler.getNextMount(s_libraryName, driveName, lc).release()); ASSERT_NE(nullptr, mount.get()); ASSERT_EQ(cta::common::dataStructures::MountType::ArchiveForRepack, mount.get()->getMountType()); std::unique_ptr<cta::ArchiveMount> archiveMount; @@ -3526,19 +3531,20 @@ TEST_P(SchedulerTest, noMountIsTriggeredWhenTapeIsDisabled) { std::string disabledReason = "Disabled reason"; catalogue.setTapeDisabled(admin,vid,disabledReason); + const std::string driveName = "tape_drive"; //No mount should be returned by getNextMount - ASSERT_EQ(nullptr,scheduler.getNextMount(s_libraryName, "drive0", lc)); + ASSERT_EQ(nullptr,scheduler.getNextMount(s_libraryName, driveName, lc)); //enable the tape catalogue.modifyTapeState(admin,vid,common::dataStructures::Tape::ACTIVE,std::nullopt); //A mount should be returned by getNextMount - ASSERT_NE(nullptr,scheduler.getNextMount(s_libraryName,"drive0",lc)); + ASSERT_NE(nullptr,scheduler.getNextMount(s_libraryName,driveName,lc)); //disable the tape catalogue.setTapeDisabled(admin,vid,disabledReason); - ASSERT_EQ(nullptr,scheduler.getNextMount(s_libraryName,"drive0",lc)); + ASSERT_EQ(nullptr,scheduler.getNextMount(s_libraryName,driveName,lc)); //create repack mount policy const std::string mountPolicyName = s_repackMountPolicyName; @@ -3589,7 +3595,7 @@ TEST_P(SchedulerTest, noMountIsTriggeredWhenTapeIsDisabled) { * and the mount policy name begins with repack * We will then get the Repack AND USER jobs from the getNextJobBatch */ - auto nextMount = scheduler.getNextMount(s_libraryName,"drive0",lc); + auto nextMount = scheduler.getNextMount(s_libraryName,driveName,lc); ASSERT_NE(nullptr,nextMount); std::unique_ptr<cta::RetrieveMount> retrieveMount; retrieveMount.reset(dynamic_cast<cta::RetrieveMount*>(nextMount.release())); @@ -3703,7 +3709,7 @@ TEST_P(SchedulerTest, emptyMountIsTriggeredWhenCancelledRetrieveRequest) { catalogue.setTapeDisabled(admin,vid,disabledReason); //No mount should be returned by getNextMount - ASSERT_EQ(nullptr,scheduler.getNextMount(s_libraryName, "drive0", lc)); + ASSERT_EQ(nullptr,scheduler.getNextMount(s_libraryName, driveName, lc)); //abort the retrieve request { @@ -3722,7 +3728,7 @@ TEST_P(SchedulerTest, emptyMountIsTriggeredWhenCancelledRetrieveRequest) { } //A mount should be returned by getNextMount - auto retrieveMount = scheduler.getNextMount(s_libraryName,"drive0",lc); + auto retrieveMount = scheduler.getNextMount(s_libraryName,driveName,lc); ASSERT_NE(nullptr,retrieveMount); } @@ -3812,7 +3818,7 @@ TEST_P(SchedulerTest, DISABLED_archiveReportMultipleAndQueueRetrievesWithActivit std::string vid = s_vid + std::to_string(i); tape.vid = vid; catalogue.createTape(s_adminOnAdminHost, tape); - catalogue.tapeLabelled(vid, "tape_drive"); + catalogue.tapeLabelled(vid, driveName); } @@ -3824,8 +3830,8 @@ TEST_P(SchedulerTest, DISABLED_archiveReportMultipleAndQueueRetrievesWithActivit scheduler.reportDriveStatus(driveInfo, cta::common::dataStructures::MountType::NoMount, cta::common::dataStructures::DriveStatus::Down, lc); scheduler.reportDriveStatus(driveInfo, cta::common::dataStructures::MountType::NoMount, cta::common::dataStructures::DriveStatus::Up, lc); for (auto i = 0; i < NUMBER_OF_FILES; i++) { - i=i; - mount.reset(scheduler.getNextMount(s_libraryName, "drive0", lc).release()); + (void) i; // ignore unused variable + mount.reset(scheduler.getNextMount(s_libraryName, driveName, lc).release()); ASSERT_NE(nullptr, mount.get()); ASSERT_EQ(cta::common::dataStructures::MountType::ArchiveForUser, mount.get()->getMountType()); auto & osdb=getSchedulerDB(); @@ -3857,7 +3863,7 @@ TEST_P(SchedulerTest, DISABLED_archiveReportMultipleAndQueueRetrievesWithActivit } { - // Emulate the the reporter process reporting successful transfer to tape to the disk system + // Emulate the reporter process reporting successful transfer to tape to the disk system // The jobs get reported by tape, so we need to report 10*1 file (one per tape). for (auto i = 0; i < NUMBER_OF_FILES; i++) { auto jobsToReport = scheduler.getNextArchiveJobsToReportBatch(10, lc); @@ -4114,10 +4120,10 @@ TEST_P(SchedulerTest, expandRepackRequestAddCopiesOnly) { ASSERT_EQ(10*archiveFileSize,rr.getTotalStatsFile().totalBytesToRetrieve); } } - + const std::string driveName = "tape_drive"; { std::unique_ptr<cta::TapeMount> mount; - mount.reset(scheduler.getNextMount(s_libraryName, "drive0", lc).release()); + mount.reset(scheduler.getNextMount(s_libraryName, driveName, lc).release()); ASSERT_NE(nullptr, mount.get()); ASSERT_EQ(cta::common::dataStructures::MountType::Retrieve, mount.get()->getMountType()); std::unique_ptr<cta::RetrieveMount> retrieveMount; @@ -4170,7 +4176,7 @@ TEST_P(SchedulerTest, expandRepackRequestAddCopiesOnly) { { //The first mount given by the scheduler should be the vidDestination1 that belongs to the tapepool1 std::unique_ptr<cta::TapeMount> mount; - mount.reset(scheduler.getNextMount(s_libraryName, "drive0", lc).release()); + mount.reset(scheduler.getNextMount(s_libraryName, driveName, lc).release()); ASSERT_NE(nullptr, mount.get()); ASSERT_EQ(cta::common::dataStructures::MountType::ArchiveForRepack, mount.get()->getMountType()); @@ -4188,7 +4194,7 @@ TEST_P(SchedulerTest, expandRepackRequestAddCopiesOnly) { { //Second mount should be the vidDestination2 that belongs to the tapepool2 std::unique_ptr<cta::TapeMount> mount; - mount.reset(scheduler.getNextMount(s_libraryName, "drive0", lc).release()); + mount.reset(scheduler.getNextMount(s_libraryName, driveName, lc).release()); ASSERT_NE(nullptr, mount.get()); ASSERT_EQ(cta::common::dataStructures::MountType::ArchiveForRepack, mount.get()->getMountType()); @@ -4550,10 +4556,10 @@ TEST_P(SchedulerTest, expandRepackRequestMoveAndAddCopies){ ASSERT_EQ(10*archiveFileSize,rr.getTotalStatsFile().totalBytesToRetrieve); } } - + const std::string driveName = "tape_drive"; { std::unique_ptr<cta::TapeMount> mount; - mount.reset(scheduler.getNextMount(s_libraryName, "drive0", lc).release()); + mount.reset(scheduler.getNextMount(s_libraryName, driveName, lc).release()); ASSERT_NE(nullptr, mount.get()); ASSERT_EQ(cta::common::dataStructures::MountType::Retrieve, mount.get()->getMountType()); std::unique_ptr<cta::RetrieveMount> retrieveMount; @@ -4606,7 +4612,7 @@ TEST_P(SchedulerTest, expandRepackRequestMoveAndAddCopies){ { //The first mount given by the scheduler should be the vidMove that belongs to the TapePool tapepool std::unique_ptr<cta::TapeMount> mount; - mount.reset(scheduler.getNextMount(s_libraryName, "drive0", lc).release()); + mount.reset(scheduler.getNextMount(s_libraryName, driveName, lc).release()); ASSERT_NE(nullptr, mount.get()); ASSERT_EQ(cta::common::dataStructures::MountType::ArchiveForRepack, mount.get()->getMountType()); @@ -4624,7 +4630,7 @@ TEST_P(SchedulerTest, expandRepackRequestMoveAndAddCopies){ { //Second mount should be the vidDestination1 that belongs to the tapepool std::unique_ptr<cta::TapeMount> mount; - mount.reset(scheduler.getNextMount(s_libraryName, "drive0", lc).release()); + mount.reset(scheduler.getNextMount(s_libraryName, driveName, lc).release()); ASSERT_NE(nullptr, mount.get()); ASSERT_EQ(cta::common::dataStructures::MountType::ArchiveForRepack, mount.get()->getMountType()); @@ -4642,7 +4648,7 @@ TEST_P(SchedulerTest, expandRepackRequestMoveAndAddCopies){ { //Third mount should be the vidDestination2 that belongs to the same tapepool as the repacked tape std::unique_ptr<cta::TapeMount> mount; - mount.reset(scheduler.getNextMount(s_libraryName, "drive0", lc).release()); + mount.reset(scheduler.getNextMount(s_libraryName, driveName, lc).release()); ASSERT_NE(nullptr, mount.get()); ASSERT_EQ(cta::common::dataStructures::MountType::ArchiveForRepack, mount.get()->getMountType()); @@ -4838,9 +4844,11 @@ TEST_P(SchedulerTest, cancelRepackRequest) { scheduler.expandRepackRequest(repackRequestToExpand,tl,t,lc); scheduler.waitSchedulerDbSubthreadsComplete(); + + const std::string driveName = "tape_drive"; { std::unique_ptr<cta::TapeMount> mount; - mount.reset(scheduler.getNextMount(s_libraryName, "drive0", lc).release()); + mount.reset(scheduler.getNextMount(s_libraryName, driveName, lc).release()); std::unique_ptr<cta::RetrieveMount> retrieveMount; retrieveMount.reset(dynamic_cast<cta::RetrieveMount*>(mount.release())); std::unique_ptr<cta::RetrieveJob> retrieveJob; @@ -4966,9 +4974,11 @@ TEST_P(SchedulerTest, getNextMountEmptyArchiveForRepackIfNbFilesQueuedIsLessThan sorter.flushAll(lc); + const std::string driveName = "tape_drive"; + //As the scheduler minFilesToWarrantAMount is 5 and there is 5 ArchiveForRepack jobs queued - //the call to getNextMount should return an nullptr (10 files mini to have an ArchiveForRepack mount) - ASSERT_EQ(nullptr,scheduler.getNextMount(s_libraryName,"drive0",lc)); + //the call to getNextMount should return a nullptr (10 files mini to have an ArchiveForRepack mount) + ASSERT_EQ(nullptr,scheduler.getNextMount(s_libraryName,driveName,lc)); for(uint64_t i = s_minFilesToWarrantAMount; i < 2 * s_minFilesToWarrantAMount; ++i) { std::shared_ptr<cta::objectstore::ArchiveRequest> ar(new cta::objectstore::ArchiveRequest(agentReference.nextId("RepackSubRequest"),backend)); @@ -5002,7 +5012,7 @@ TEST_P(SchedulerTest, getNextMountEmptyArchiveForRepackIfNbFilesQueuedIsLessThan //As there is now 10 files in the queue, the getNextMount method should return an ArchiveMount //with 10 files in it - std::unique_ptr<cta::TapeMount> tapeMount = scheduler.getNextMount(s_libraryName,"drive0",lc); + std::unique_ptr<cta::TapeMount> tapeMount = scheduler.getNextMount(s_libraryName,driveName,lc); ASSERT_NE(nullptr,tapeMount); cta::ArchiveMount * archiveMount = dynamic_cast<cta::ArchiveMount *>(tapeMount.get()); archiveMount->getNextJobBatch(2 * s_minFilesToWarrantAMount,2 * s_minBytesToWarrantAMount, lc); @@ -5239,9 +5249,11 @@ TEST_P(SchedulerTest, repackRetrieveRequestsFailToFetchDiskSystem){ scheduler.expandRepackRequest(repackRequestToExpand,tl,t,lc); scheduler.waitSchedulerDbSubthreadsComplete(); + const std::string driveName = "tape_drive"; + { std::unique_ptr<cta::TapeMount> mount; - mount.reset(scheduler.getNextMount(s_libraryName, "drive0", lc).release()); + mount.reset(scheduler.getNextMount(s_libraryName, driveName, lc).release()); ASSERT_NE(nullptr, mount.get()); ASSERT_EQ(cta::common::dataStructures::MountType::Retrieve, mount.get()->getMountType()); std::unique_ptr<cta::RetrieveMount> retrieveMount; @@ -5253,11 +5265,11 @@ TEST_P(SchedulerTest, repackRetrieveRequestsFailToFetchDiskSystem){ reservationRequest.addRequest(job->diskSystemName().value(), job->archiveFile.fileSize); } ASSERT_EQ(10,jobBatch.size()); - auto diskSpaceReservedBefore = catalogue.getTapeDrive("drive0").value().reservedBytes; + auto diskSpaceReservedBefore = catalogue.getTapeDrive(driveName).value().reservedBytes; //Trying to reserve disk space should result in 10 jobs should fail ASSERT_FALSE(retrieveMount->reserveDiskSpace(reservationRequest, lc)); //No extra disk space was reserved - auto diskSpaceReservedAfter = catalogue.getTapeDrive("drive0").value().reservedBytes; + auto diskSpaceReservedAfter = catalogue.getTapeDrive(driveName).value().reservedBytes; ASSERT_EQ(diskSpaceReservedAfter, diskSpaceReservedBefore); } /* @@ -5778,7 +5790,7 @@ TEST_P(SchedulerTest, archiveMaxDrivesVoInFlightChangeScheduleMount){ const std::string driveName = "tape_drive"; - catalogue.tapeLabelled(s_vid, "tape_drive"); + catalogue.tapeLabelled(s_vid, driveName); log::DummyLogger dl("", ""); @@ -5954,7 +5966,7 @@ TEST_P(SchedulerTest, retrieveArchiveAllTypesMaxDrivesVoInFlightChangeScheduleMo const bool logicalLibraryIsDisabled = false; catalogue.createLogicalLibrary(s_adminOnAdminHost, s_libraryName, logicalLibraryIsDisabled, "Create logical library"); - //This tape will contains files for triggering a Retrieve + //This tape will contain files for triggering a Retrieve auto tape1 = getDefaultTape(); catalogue.createTape(s_adminOnAdminHost, tape1); @@ -5983,7 +5995,7 @@ TEST_P(SchedulerTest, retrieveArchiveAllTypesMaxDrivesVoInFlightChangeScheduleMo catalogue.modifyStorageClassNbCopies(s_adminOnAdminHost,storageClass.name,storageClass.nbCopies); - //Create the a new archive routes for the second copy + //Create the new archive routes for the second copy catalogue.createArchiveRoute(s_adminOnAdminHost,storageClass.name,2,newTapepool,"ArchiveRoute2"); const std::string tapeDrive = "tape_drive"; diff --git a/scheduler/TapeMount.hpp b/scheduler/TapeMount.hpp index 7464089ef2e33b12cd5373606071a925b8e52c7c..ede9f93329e321d861874a5326d0021c85615f4c 100644 --- a/scheduler/TapeMount.hpp +++ b/scheduler/TapeMount.hpp @@ -86,9 +86,9 @@ namespace cta { uint64_t getCapacityInBytes() const; /** - * Indicates that the mount was aborted. + * Indicates that the mount was completed. */ - virtual void abort(const std::string& reason) = 0; + virtual void complete() = 0; /** * Report a drive status change diff --git a/scheduler/TapeMountDummy.hpp b/scheduler/TapeMountDummy.hpp index 108294366e44798fea51489b3b94fabe26e966ab..bbf7eda2146f5343d6df7b9eaa1685f764f98968 100644 --- a/scheduler/TapeMountDummy.hpp +++ b/scheduler/TapeMountDummy.hpp @@ -27,7 +27,7 @@ namespace cta { * Null returning functions do nothing. */ class TapeMountDummy: public TapeMount { - void abort(const std::string&) override {}; + void complete() override {}; std::string getMountTransactionId() const override { throw exception::Exception("In DummyTapeMount::getMountTransactionId() : not implemented"); } diff --git a/scheduler/testingMocks/MockRetrieveMount.hpp b/scheduler/testingMocks/MockRetrieveMount.hpp index 499789d1ae4b2b6270ac0d9a33c2c945ee56bb9f..4a1dba9968645687b2e465881fa3740a3d48f281 100644 --- a/scheduler/testingMocks/MockRetrieveMount.hpp +++ b/scheduler/testingMocks/MockRetrieveMount.hpp @@ -60,7 +60,7 @@ namespace cta { return "VDSTK11"; } - void abort(const std::string&) override { completes ++; } + void complete() override { completes ++; } void diskComplete() override { completes ++;} diff --git a/tapeserver/castor/tape/tapeserver/daemon/DataTransferSession.cpp b/tapeserver/castor/tape/tapeserver/daemon/DataTransferSession.cpp index 5c9e9b878bc50281cd69659e3ae7c26fafa42249..79e5290f8fc0bb27ea51ee8b3555a1bb5417aebf 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/DataTransferSession.cpp +++ b/tapeserver/castor/tape/tapeserver/daemon/DataTransferSession.cpp @@ -42,12 +42,14 @@ //------------------------------------------------------------------------------ //Constructor //------------------------------------------------------------------------------ -castor::tape::tapeserver::daemon::DataTransferSession::DataTransferSession(const std::string& hostname, cta::log::Logger& log, +castor::tape::tapeserver::daemon::DataTransferSession::DataTransferSession(const std::string& hostname, + cta::log::Logger& log, System::virtualWrapper& sysWrapper, const cta::tape::daemon::TpconfigLine& driveConfig, cta::mediachanger::MediaChangerFacade& mc, cta::tape::daemon::TapedProxy& initialProcess, - cta::server::ProcessCap& capUtils, const DataTransferConfig& castorConf, + cta::server::ProcessCap& capUtils, + const DataTransferConfig& castorConf, cta::Scheduler& scheduler) : m_log(log), m_sysWrapper(sysWrapper), @@ -116,7 +118,8 @@ castor::tape::tapeserver::daemon::DataTransferSession::execute() { downUpTransition = true; // We wait a bit before polling the scheduler again. // TODO: parametrize the duration? - m_scheduler.reportDriveStatus(m_driveInfo, cta::common::dataStructures::MountType::NoMount, cta::common::dataStructures::DriveStatus::Down, + m_scheduler.reportDriveStatus(m_driveInfo, cta::common::dataStructures::MountType::NoMount, + cta::common::dataStructures::DriveStatus::Down, lc); sleep(5); } @@ -126,27 +129,31 @@ castor::tape::tapeserver::daemon::DataTransferSession::execute() { } catch (cta::Scheduler::NoSuchDrive& e) { // The object store does not even know about this drive. We will report our state // (default status is down). - m_scheduler.reportDriveStatus(m_driveInfo, cta::common::dataStructures::MountType::NoMount, cta::common::dataStructures::DriveStatus::Down, lc); + m_scheduler.reportDriveStatus(m_driveInfo, cta::common::dataStructures::MountType::NoMount, + cta::common::dataStructures::DriveStatus::Down, lc); } } // If we get here after seeing a down desired state, we are transitioning from // down to up. In such a case, we will run an empty if (downUpTransition) { downUpTransition = false; - m_scheduler.reportDriveStatus(m_driveInfo, cta::common::dataStructures::MountType::NoMount, cta::common::dataStructures::DriveStatus::Probing, + m_scheduler.reportDriveStatus(m_driveInfo, cta::common::dataStructures::MountType::NoMount, + cta::common::dataStructures::DriveStatus::Probing, lc); castor::tape::tapeserver::daemon::EmptyDriveProbe emptyDriveProbe(m_log, m_driveConfig, m_sysWrapper); lc.log(cta::log::INFO, "Transition from down to up detected. Will check if a tape is in the drive."); if (!emptyDriveProbe.driveIsEmpty()) { - m_scheduler.reportDriveStatus(m_driveInfo, cta::common::dataStructures::MountType::NoMount, cta::common::dataStructures::DriveStatus::Down, lc); + m_scheduler.reportDriveStatus(m_driveInfo, cta::common::dataStructures::MountType::NoMount, + cta::common::dataStructures::DriveStatus::Down, lc); cta::common::dataStructures::SecurityIdentity securityIdentity; cta::common::dataStructures::DesiredDriveState driveState; driveState.up = false; driveState.forceDown = false; std::string errorMsg = "A tape was detected in the drive. Putting the drive down."; std::optional<std::string> probeErrorMsg = emptyDriveProbe.getProbeErrorMsg(); - if (probeErrorMsg) + if (probeErrorMsg) { errorMsg = probeErrorMsg.value(); + } int logLevel = cta::log::ERR; driveState.setReasonFromLogMsg(logLevel, errorMsg); m_scheduler.setDesiredDriveState(securityIdentity, m_driveConfig.unitName, driveState, lc); @@ -160,12 +167,15 @@ castor::tape::tapeserver::daemon::DataTransferSession::execute() { // 2b) Get initial mount information std::unique_ptr<cta::TapeMount> tapeMount; // As getting next mount could be long, we report the drive as up immediately. - m_scheduler.reportDriveStatus(m_driveInfo, cta::common::dataStructures::MountType::NoMount, cta::common::dataStructures::DriveStatus::Up, lc); - tapeServerReporter.reportState(cta::tape::session::SessionState::Scheduling, cta::tape::session::SessionType::Undetermined); + m_scheduler.reportDriveStatus(m_driveInfo, cta::common::dataStructures::MountType::NoMount, + cta::common::dataStructures::DriveStatus::Up, lc); + tapeServerReporter.reportState(cta::tape::session::SessionState::Scheduling, + cta::tape::session::SessionType::Undetermined); try { - if (m_scheduler.getNextMountDryRun(m_driveConfig.logicalLibrary, m_driveConfig.unitName, lc)) + if (m_scheduler.getNextMountDryRun(m_driveConfig.logicalLibrary, m_driveConfig.unitName, lc)) { tapeMount = m_scheduler.getNextMount(m_driveConfig.logicalLibrary, m_driveConfig.unitName, lc); + } } catch (cta::exception::Exception& e) { cta::log::ScopedParamContainer localParams(lc); std::string exceptionMsg = e.getMessageValue(); @@ -173,7 +183,8 @@ castor::tape::tapeserver::daemon::DataTransferSession::execute() { localParams.add("errorMessage", exceptionMsg); lc.log(logLevel, "Error while scheduling new mount. Putting the drive down. Stack trace follows."); lc.logBacktrace(logLevel, e.backtrace()); - m_scheduler.reportDriveStatus(m_driveInfo, cta::common::dataStructures::MountType::NoMount, cta::common::dataStructures::DriveStatus::Down, lc); + m_scheduler.reportDriveStatus(m_driveInfo, cta::common::dataStructures::MountType::NoMount, + cta::common::dataStructures::DriveStatus::Down, lc); cta::common::dataStructures::SecurityIdentity cliId; cta::common::dataStructures::DesiredDriveState driveState; driveState.up = false; @@ -185,7 +196,8 @@ castor::tape::tapeserver::daemon::DataTransferSession::execute() { // No mount to be done found, that was fast... if (!tapeMount) { lc.log(cta::log::DEBUG, "No new mount found. (sleeping 10 seconds)"); - m_scheduler.reportDriveStatus(m_driveInfo, cta::common::dataStructures::MountType::NoMount, cta::common::dataStructures::DriveStatus::Up, lc); + m_scheduler.reportDriveStatus(m_driveInfo, cta::common::dataStructures::MountType::NoMount, + cta::common::dataStructures::DriveStatus::Up, lc); sleep(10); goto schedule; // return MARK_DRIVE_AS_UP; @@ -234,7 +246,8 @@ castor::tape::tapeserver::daemon::DataTransferSession::executeRead(cta::log::Log // findDrive does not throw exceptions (it catches them to log errors) // A nullptr is returned on failure retrieveMount->setFetchEosFreeSpaceScript(m_castorConf.fetchEosFreeSpaceScript); - std::unique_ptr<castor::tape::tapeserver::drive::DriveInterface> drive(findDrive(m_driveConfig, logContext, retrieveMount)); + std::unique_ptr<castor::tape::tapeserver::drive::DriveInterface> drive( + findDrive(m_driveConfig, logContext, retrieveMount)); if (!drive) { reporter.bailout(); @@ -250,15 +263,12 @@ castor::tape::tapeserver::daemon::DataTransferSession::executeRead(cta::log::Log RecallMemoryManager memoryManager(m_castorConf.nbBufs, m_castorConf.bufsz, logContext); - //we retrieved the detail from the client in execute, so at this point - //we can already report ! - reporter.reportState(cta::tape::session::SessionState::Mounting, - cta::tape::session::SessionType::Retrieve); - TapeReadSingleThread readSingleThread(*drive, m_mediaChanger, reporter, m_volInfo, - m_castorConf.bulkRequestRecallMaxFiles, m_capUtils, watchDog, logContext, reportPacker, + m_castorConf.bulkRequestRecallMaxFiles, m_capUtils, watchDog, logContext, + reportPacker, m_castorConf.useLbp, m_castorConf.useRAO, m_castorConf.useEncryption, - m_castorConf.externalEncryptionKeyScript, *retrieveMount, m_castorConf.tapeLoadTimeout); + m_castorConf.externalEncryptionKeyScript, *retrieveMount, + m_castorConf.tapeLoadTimeout); DiskWriteThreadPool threadPool(m_castorConf.nbDiskThreads, reportPacker, @@ -283,7 +293,8 @@ castor::tape::tapeserver::daemon::DataTransferSession::executeRead(cta::log::Log // The RecallTaskInjector and the TapeReadSingleThread share the promise if (m_castorConf.useRAO) { - castor::tape::tapeserver::rao::RAOParams raoDataConfig(m_castorConf.useRAO, m_castorConf.raoLtoAlgorithm, m_castorConf.raoLtoAlgorithmOptions, + castor::tape::tapeserver::rao::RAOParams raoDataConfig(m_castorConf.useRAO, m_castorConf.raoLtoAlgorithm, + m_castorConf.raoLtoAlgorithmOptions, m_volInfo.vid); taskInjector.initRAO(raoDataConfig, &m_scheduler.getCatalogue()); } @@ -304,18 +315,23 @@ castor::tape::tapeserver::daemon::DataTransferSession::executeRead(cta::log::Log reportPacker.startThreads(); taskInjector.startThreads(); reporter.startThreads(); - // This thread is now going to be idle until the system unwinds at the end - // of the session - // All client notifications are done by the report packer, including the - // end of session + // This thread is now going to be idle until the system unwinds at the end of the session + // All client notifications are done by the report packer, including the end of session taskInjector.waitThreads(); threadPool.waitThreads(); - reportPacker.setDiskDone(); readSingleThread.waitThreads(); - reportPacker.setTapeDone(); reportPacker.waitThread(); reporter.waitThreads(); watchDog.stopAndWaitThread(); + + // If the disk thread finished the last, it leaves the drive in DrainingToDisk state + // Return the drive back to UP state + if (m_scheduler.getDriveState(m_driveInfo.driveName, &logContext)->driveStatus == + cta::common::dataStructures::DriveStatus::DrainingToDisk) { + m_scheduler.reportDriveStatus(m_driveInfo, cta::common::dataStructures::MountType::Retrieve, + cta::common::dataStructures::DriveStatus::Up, logContext); + } + return readSingleThread.getHardwareStatus(); } else { @@ -342,14 +358,17 @@ castor::tape::tapeserver::daemon::DataTransferSession::executeRead(cta::log::Log cta::log::LogContext::ScopedParam sp1(logContext, errorMessageParam); try { - retrieveMount->abort("Aborted: empty recall mount"); + retrieveMount->complete(); watchDog.updateStats(TapeSessionStats()); watchDog.reportStats(); std::list<cta::log::Param> paramList{errorMessageParam, mountIdParam, mountTypeParam, statusParam}; m_initialProcess.addLogParams(m_driveConfig.unitName, paramList); cta::log::LogContext::ScopedParam sp08(logContext, cta::log::Param("MountTransactionId", mountId)); - cta::log::LogContext::ScopedParam sp11(logContext, cta::log::Param("errorMessage", "Aborted: empty recall mount")); + cta::log::LogContext::ScopedParam sp11(logContext, + cta::log::Param("errorMessage", "Aborted: empty recall mount")); logContext.log(priority, "Notified client of end session with error"); + m_scheduler.reportDriveStatus(m_driveInfo, cta::common::dataStructures::MountType::NoMount, + cta::common::dataStructures::DriveStatus::Up, logContext); } catch (cta::exception::Exception& ex) { cta::log::LogContext::ScopedParam sp12(logContext, cta::log::Param("notificationError", ex.getMessageValue())); logContext.log(cta::log::ERR, "Failed to notified client of end session with error"); @@ -371,8 +390,12 @@ castor::tape::tapeserver::daemon::DataTransferSession::executeWrite(cta::log::Lo // in order to get the task injector ready to check if we actually have a // file to migrate. // 1) Get hold of the drive error logs are done inside the findDrive function - std::unique_ptr<castor::tape::tapeserver::drive::DriveInterface> drive(findDrive(m_driveConfig, logContext, archiveMount)); - if (!drive) return MARK_DRIVE_AS_DOWN; + std::unique_ptr<castor::tape::tapeserver::drive::DriveInterface> drive( + findDrive(m_driveConfig, logContext, archiveMount)); + if (!drive) { + reporter.bailout(); + return MARK_DRIVE_AS_DOWN; + } // Once we got hold of the drive, we can run the session { //dereferencing configLine is safe, because if configLine were not valid, @@ -417,29 +440,10 @@ castor::tape::tapeserver::daemon::DataTransferSession::executeWrite(cta::log::Lo if (taskInjector.synchronousInjection(noFilesToMigrate)) { const uint64_t firstFseqFromClient = taskInjector.firstFseqToWrite(); - //the last fseq written on the tape is the first file's fseq minus one + // The last fseq written on the tape is the first file's fseq minus one writeSingleThread.setlastFseq(firstFseqFromClient - 1); - //we retrieved the detail from the client in execute, so at this point - //we can report we will mount the tape. - // TODO: create a "StartingSession" state as the mounting will happen in - // the to-be-created tape thread. - try { - reporter.reportState(cta::tape::session::SessionState::Mounting, - cta::tape::session::SessionType::Archive); - } catch (cta::exception::Exception& e) { - cta::log::LogContext::ScopedParam sp1(logContext, cta::log::Param("errorMessage", e.getMessage().str())); - logContext.log(cta::log::INFO, "Aborting the session after problem with mount details. Notifying the client."); - reportPacker.synchronousReportEndWithErrors(e.getMessageValue(), 666, logContext); - return MARK_DRIVE_AS_UP; - } catch (...) { - logContext.log(cta::log::INFO, "Aborting the session after problem with mount details (unknown exception). Notifying the client."); - reportPacker.synchronousReportEndWithErrors("Unknown exception while checking session parameters with VMGR", 666, logContext); - return MARK_DRIVE_AS_UP; - } - - // We have something to do: start the session by starting all the - // threads. + // We have something to do: start the session by starting all the threads. memoryManager.startThreads(); threadPool.startThreads(); watchDog.startThread(); @@ -450,18 +454,17 @@ castor::tape::tapeserver::daemon::DataTransferSession::executeWrite(cta::log::Lo reporter.startThreads(); // Synchronise with end of threads taskInjector.waitThreads(); - reportPacker.waitThread(); writeSingleThread.waitThreads(); threadPool.waitThreads(); memoryManager.waitThreads(); + reportPacker.waitThread(); reporter.waitThreads(); watchDog.stopAndWaitThread(); return writeSingleThread.getHardwareStatus(); } else { - // Just log this was an empty mount and that's it. The memory management - // will be deallocated automatically. + // Just log this was an empty mount and that's it. The memory management will be deallocated automatically. int priority = cta::log::ERR; std::string status = "failure"; if (noFilesToMigrate) { @@ -490,6 +493,8 @@ castor::tape::tapeserver::daemon::DataTransferSession::executeWrite(cta::log::Lo logContext.log(cta::log::ERR, "Failed to notified client of end session with error"); } // Empty mount, hardware safe + m_scheduler.reportDriveStatus(m_driveInfo, cta::common::dataStructures::MountType::NoMount, + cta::common::dataStructures::DriveStatus::Up, logContext); return MARK_DRIVE_AS_UP; } } @@ -499,7 +504,8 @@ castor::tape::tapeserver::daemon::DataTransferSession::executeWrite(cta::log::Lo //DataTransferSession::executeLabel //------------------------------------------------------------------------------ castor::tape::tapeserver::daemon::Session::EndOfSessionAction -castor::tape::tapeserver::daemon::DataTransferSession::executeLabel(cta::log::LogContext& logContext, cta::LabelMount *labelMount) { +castor::tape::tapeserver::daemon::DataTransferSession::executeLabel(cta::log::LogContext& logContext, + cta::LabelMount *labelMount) { throw cta::exception::Exception("In DataTransferSession::executeLabel(): not implemented"); // TODO } @@ -522,7 +528,8 @@ castor::tape::tapeserver::daemon::DataTransferSession::executeLabel(cta::log::Lo * @return the drive if found, nullptr otherwise */ castor::tape::tapeserver::drive::DriveInterface * -castor::tape::tapeserver::daemon::DataTransferSession::findDrive(const cta::tape::daemon::TpconfigLine& driveConfig, cta::log::LogContext& logContext, +castor::tape::tapeserver::daemon::DataTransferSession::findDrive(const cta::tape::daemon::TpconfigLine& driveConfig, + cta::log::LogContext& logContext, cta::TapeMount *mount) { // Find the drive in the system's SCSI devices castor::tape::SCSI::DeviceVector dv(m_sysWrapper); @@ -537,10 +544,13 @@ castor::tape::tapeserver::daemon::DataTransferSession::findDrive(const cta::tape std::stringstream errMsg; const std::string headerErrMsg = "Drive not found on this path"; errMsg << headerErrMsg << logContext; - mount->abort(headerErrMsg + e.getMessageValue()); - cta::log::LogContext::ScopedParam sp10(logContext, cta::log::Param("tapebridgeTransId", mount->getMountTransactionId())); + mount->complete(); + cta::log::LogContext::ScopedParam sp10(logContext, + cta::log::Param("tapebridgeTransId", mount->getMountTransactionId())); cta::log::LogContext::ScopedParam sp13(logContext, cta::log::Param("errorMessage", errMsg.str())); logContext.log(cta::log::ERR, "Notified client of end session with error"); + m_scheduler.reportDriveStatus(m_driveInfo, cta::common::dataStructures::MountType::NoMount, + cta::common::dataStructures::DriveStatus::Down, logContext); return nullptr; } catch (cta::exception::Exception& e) { // We could not find this drive in the system's SCSI devices @@ -551,10 +561,13 @@ castor::tape::tapeserver::daemon::DataTransferSession::findDrive(const cta::tape std::stringstream errMsg; const std::string headerErrMsg = "Error looking to path to tape drive: "; errMsg << headerErrMsg << logContext; - mount->abort(headerErrMsg + e.getMessageValue()); - cta::log::LogContext::ScopedParam sp11(logContext, cta::log::Param("tapebridgeTransId", mount->getMountTransactionId())); + mount->complete(); + cta::log::LogContext::ScopedParam sp11(logContext, + cta::log::Param("tapebridgeTransId", mount->getMountTransactionId())); cta::log::LogContext::ScopedParam sp14(logContext, cta::log::Param("errorMessage", errMsg.str())); logContext.log(cta::log::ERR, "Notified client of end session with error"); + m_scheduler.reportDriveStatus(m_driveInfo, cta::common::dataStructures::MountType::NoMount, + cta::common::dataStructures::DriveStatus::Down, logContext); return nullptr; } catch (...) { // We could not find this drive in the system's SCSI devices @@ -564,16 +577,19 @@ castor::tape::tapeserver::daemon::DataTransferSession::findDrive(const cta::tape std::stringstream errMsg; const std::string headerErrMsg = "Unexpected exception while looking for drive"; errMsg << headerErrMsg << logContext; - mount->abort(headerErrMsg); - cta::log::LogContext::ScopedParam sp10(logContext, cta::log::Param("tapebridgeTransId", mount->getMountTransactionId())); + mount->complete(); + cta::log::LogContext::ScopedParam sp10(logContext, + cta::log::Param("tapebridgeTransId", mount->getMountTransactionId())); cta::log::LogContext::ScopedParam sp13(logContext, cta::log::Param("errorMessage", errMsg.str())); logContext.log(cta::log::ERR, "Notified client of end session with error"); + m_scheduler.reportDriveStatus(m_driveInfo, cta::common::dataStructures::MountType::NoMount, + cta::common::dataStructures::DriveStatus::Down, logContext); return nullptr; } try { std::unique_ptr<castor::tape::tapeserver::drive::DriveInterface> drive; drive.reset(castor::tape::tapeserver::drive::createDrive(driveInfo, m_sysWrapper)); - if (drive) drive->config = driveConfig; + if (drive) { drive->config = driveConfig; } return drive.release(); } catch (cta::exception::Exception& e) { // We could not find this drive in the system's SCSI devices @@ -584,10 +600,13 @@ castor::tape::tapeserver::daemon::DataTransferSession::findDrive(const cta::tape std::stringstream errMsg; const std::string headerErrMsg = "Error opening tape drive"; errMsg << headerErrMsg << logContext; - mount->abort(headerErrMsg); - cta::log::LogContext::ScopedParam sp11(logContext, cta::log::Param("tapebridgeTransId", mount->getMountTransactionId())); + mount->complete(); + cta::log::LogContext::ScopedParam sp11(logContext, + cta::log::Param("tapebridgeTransId", mount->getMountTransactionId())); cta::log::LogContext::ScopedParam sp14(logContext, cta::log::Param("errorMessage", errMsg.str())); logContext.log(cta::log::ERR, "Notified client of end session with error"); + m_scheduler.reportDriveStatus(m_driveInfo, cta::common::dataStructures::MountType::NoMount, + cta::common::dataStructures::DriveStatus::Down, logContext); return nullptr; } catch (...) { // We could not find this drive in the system's SCSI devices @@ -597,10 +616,13 @@ castor::tape::tapeserver::daemon::DataTransferSession::findDrive(const cta::tape std::stringstream errMsg; const std::string headerErrMsg = "Unexpected exception while opening drive"; errMsg << headerErrMsg << logContext; - mount->abort(headerErrMsg); - cta::log::LogContext::ScopedParam sp10(logContext, cta::log::Param("tapebridgeTransId", mount->getMountTransactionId())); + mount->complete(); + cta::log::LogContext::ScopedParam sp10(logContext, + cta::log::Param("tapebridgeTransId", mount->getMountTransactionId())); cta::log::LogContext::ScopedParam sp13(logContext, cta::log::Param("errorMessage", errMsg.str())); logContext.log(cta::log::ERR, "Notified client of end session with error"); + m_scheduler.reportDriveStatus(m_driveInfo, cta::common::dataStructures::MountType::NoMount, + cta::common::dataStructures::DriveStatus::Down, logContext); return nullptr; } } diff --git a/tapeserver/castor/tape/tapeserver/daemon/DiskWriteTaskTest.cpp b/tapeserver/castor/tape/tapeserver/daemon/DiskWriteTaskTest.cpp index e28aefd2947db5958c4def6127d476a2170a1b08..8b901b647a9aff2e4aa8800755cd8d84e31bf0ed 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/DiskWriteTaskTest.cpp +++ b/tapeserver/castor/tape/tapeserver/daemon/DiskWriteTaskTest.cpp @@ -39,8 +39,12 @@ namespace unitTests{ std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob> > getNextJobBatch(uint64_t filesRequested, uint64_t bytesRequested, cta::log::LogContext& logContext) override { throw std::runtime_error("Not implemented");} - void complete(time_t completionTime) override { throw std::runtime_error("Not implemented"); } - void setDriveStatus(cta::common::dataStructures::DriveStatus status, time_t completionTime, const std::optional<std::string> & reason) override { throw std::runtime_error("Not implemented"); } + void + setDriveStatus(cta::common::dataStructures::DriveStatus status, cta::common::dataStructures::MountType mountType, + time_t completionTime, const std::optional<std::string>& reason) override { + throw std::runtime_error("Not implemented"); + } + void setTapeSessionStats(const castor::tape::tapeserver::daemon::TapeSessionStats &stats) override { throw std::runtime_error("Not implemented"); } void flushAsyncSuccessReports(std::list<cta::SchedulerDatabase::RetrieveJob*>& jobsBatch, cta::log::LogContext& lc) override { throw std::runtime_error("Not implemented"); } void addDiskSystemToSkip(const cta::SchedulerDatabase::RetrieveMount::DiskSystemToSkip &diskSystemToSkip) override { throw std::runtime_error("Not implemented"); } @@ -84,9 +88,20 @@ namespace unitTests{ cta::threading::MutexLocker ml(m_mutex); endSessionsWithError++; } - void setDiskDone() override {} - - void setTapeDone() override {} + void setDiskDone() override { + cta::threading::MutexLocker mutexLocker(m_mutex); + m_diskThreadComplete = true; + } + + void setTapeDone() override { + cta::threading::MutexLocker mutexLocker(m_mutex); + m_tapeThreadComplete = true; + } + + bool allThreadsDone() override { + cta::threading::MutexLocker mutexLocker(m_mutex); + return m_tapeThreadComplete && m_diskThreadComplete; + } MockRecallReportPacker(cta::RetrieveMount *rm,cta::log::LogContext lc): RecallReportPacker(rm,lc), completeJobs(0), failedJobs(0), @@ -96,6 +111,8 @@ namespace unitTests{ int failedJobs; int endSessions; int endSessionsWithError; + bool m_tapeThreadComplete; + bool m_diskThreadComplete; }; TEST(castor_tape_tapeserver_daemon, DiskWriteTaskFailedBlock){ diff --git a/tapeserver/castor/tape/tapeserver/daemon/DiskWriteThreadPool.cpp b/tapeserver/castor/tape/tapeserver/daemon/DiskWriteThreadPool.cpp index 1494e5d18ac85144ab7090c91055b3b6e0ecb3f9..c2a9e22f0cd22dd9fc7739b6b903fc636e652202 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/DiskWriteThreadPool.cpp +++ b/tapeserver/castor/tape/tapeserver/daemon/DiskWriteThreadPool.cpp @@ -66,7 +66,7 @@ DiskWriteThreadPool::~DiskWriteThreadPool() { // DiskWriteThreadPool::startThreads //------------------------------------------------------------------------------ void DiskWriteThreadPool::startThreads() { - for (const auto & m_thread : m_threads) { + for (const auto& m_thread: m_threads) { m_thread->start(); } m_lc.log(cta::log::INFO, "Starting threads in DiskWriteThreadPool::DiskWriteThreadPool"); @@ -76,7 +76,7 @@ void DiskWriteThreadPool::startThreads() { // DiskWriteThreadPool::waitThreads //------------------------------------------------------------------------------ void DiskWriteThreadPool::waitThreads() { - for (const auto & m_thread : m_threads) { + for (const auto& m_thread: m_threads) { m_thread->wait(); } m_lc.log(cta::log::INFO, "All DiskWriteThreadPool threads are now complete"); @@ -134,7 +134,8 @@ void DiskWriteThreadPool::logWithStat(int level, const std::string& message) { .add("poolAverageDiskPerformanceMBps", m_pooldStat.transferTime ? 1.0 * m_pooldStat.dataVolume / 1000 / 1000 / m_pooldStat.transferTime : 0.0) .add("poolOpenRWCloseToTransferTimeRatio", - m_pooldStat.transferTime ? (m_pooldStat.openingTime + m_pooldStat.readWriteTime + m_pooldStat.closingTime) / m_pooldStat.transferTime + m_pooldStat.transferTime ? + (m_pooldStat.openingTime + m_pooldStat.readWriteTime + m_pooldStat.closingTime) / m_pooldStat.transferTime : 0.0); m_lc.log(level, message); } @@ -174,17 +175,22 @@ void DiskWriteThreadPool::DiskWriteWorkerThread::run() { logWithStat(cta::log::INFO, "Finishing DiskWriteWorkerThread"); m_parentThreadPool.addThreadStats(m_threadStat); if (0 == --m_parentThreadPool.m_nbActiveThread) { + // Notify all disk threads are finished + m_parentThreadPool.m_reporter.setDiskDone(); // In the last Thread alive, report end of session - if (m_parentThreadPool.m_failedWriteCount == 0) { - m_parentThreadPool.m_reporter.reportEndOfSession(m_lc); - m_parentThreadPool.logWithStat(cta::log::INFO, "As last exiting DiskWriteWorkerThread, reported a successful end of session"); - } - else { - m_parentThreadPool.m_reporter.reportEndOfSessionWithErrors("End of recall session with error(s)", 666, m_lc); - cta::log::ScopedParamContainer params(m_lc); - params.add("errorCount", m_parentThreadPool.m_failedWriteCount); - m_parentThreadPool.logWithStat(cta::log::INFO, "As last exiting DiskWriteWorkerThread, reported an end of session with errors"); + // Otherwise end of session will be reported in the tape thread + if (m_parentThreadPool.m_reporter.allThreadsDone()) { + if (m_parentThreadPool.m_failedWriteCount == 0) { + m_parentThreadPool.m_reporter.reportEndOfSession(m_lc); + m_parentThreadPool.logWithStat(cta::log::INFO, + "As last exiting DiskWriteWorkerThread, reported a successful end of session"); + } + else { + m_parentThreadPool.m_reporter.reportEndOfSessionWithErrors("End of recall session with error(s)", 666, m_lc); + m_parentThreadPool.logWithStat(cta::log::INFO, + "As last exiting DiskWriteWorkerThread, reported an end of session with errors"); + } } const double deliveryTime = m_parentThreadPool.m_totalTime.secs(); m_parentThreadPool.m_watchdog.updateStatsDeliveryTime(deliveryTime); @@ -213,8 +219,9 @@ logWithStat(int level, const std::string& msg) { .add("threadAverageDiskPerformanceMBps", m_threadStat.transferTime ? 1.0 * m_threadStat.dataVolume / 1000 / 1000 / m_threadStat.transferTime : 0.0) .add("threadOpenRWCloseToTransferTimeRatio", - m_threadStat.transferTime ? (m_threadStat.openingTime + m_threadStat.readWriteTime + m_threadStat.closingTime) / - m_threadStat.transferTime : 0.0); + m_threadStat.transferTime ? + (m_threadStat.openingTime + m_threadStat.readWriteTime + m_threadStat.closingTime) / + m_threadStat.transferTime : 0.0); m_lc.log(level, msg); } } diff --git a/tapeserver/castor/tape/tapeserver/daemon/DiskWriteThreadPoolTest.cpp b/tapeserver/castor/tape/tapeserver/daemon/DiskWriteThreadPoolTest.cpp index d49c5bb0fbbb093d3f49b8af41073da55b0f09e5..65d5f71520b28869a066992704f070ce41d65a64 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/DiskWriteThreadPoolTest.cpp +++ b/tapeserver/castor/tape/tapeserver/daemon/DiskWriteThreadPoolTest.cpp @@ -33,8 +33,11 @@ namespace unitTests{ const MountInfo & getMountInfo() override { throw std::runtime_error("Not implemented"); } std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob> > getNextJobBatch(uint64_t filesRequested, uint64_t bytesRequested, cta::log::LogContext& logContext) override { throw std::runtime_error("Not implemented");} - void complete(time_t completionTime) override { throw std::runtime_error("Not implemented"); } - void setDriveStatus(cta::common::dataStructures::DriveStatus status, time_t completionTime, const std::optional<std::string> & reason) override { throw std::runtime_error("Not implemented"); } + void setDriveStatus(cta::common::dataStructures::DriveStatus status, cta::common::dataStructures::MountType mountType, + time_t completionTime, const std::optional<std::string>& reason) override { + throw std::runtime_error("Not implemented"); + } + void setTapeSessionStats(const castor::tape::tapeserver::daemon::TapeSessionStats &stats) override { throw std::runtime_error("Not implemented"); } void flushAsyncSuccessReports(std::list<cta::SchedulerDatabase::RetrieveJob*>& jobsBatch, cta::log::LogContext& lc) override { throw std::runtime_error("Not implemented"); } void addDiskSystemToSkip(const cta::SchedulerDatabase::RetrieveMount::DiskSystemToSkip &diskSystemToSkip) override { throw std::runtime_error("Not implemented"); } @@ -82,6 +85,7 @@ namespace unitTests{ MockRecallReportPacker(cta::RetrieveMount *rm, cta::log::LogContext lc): RecallReportPacker(rm,lc), completeJobs(0), failedJobs(0), endSessions(0), endSessionsWithError(0) {} + cta::threading::Mutex m_mutex; int completeJobs; int failedJobs; @@ -109,9 +113,10 @@ namespace unitTests{ castor::messages::TapeserverProxyDummy tspd; cta::TapeMountDummy tmd; RecallWatchDog rwd(1,1,tspd,tmd,"", lc); - + DiskWriteThreadPool dwtp(2,report,rwd,lc,"/dev/null", 0); dwtp.startThreads(); + report.setTapeDone(); for(int i=0;i<5;++i){ std::unique_ptr<TestingRetrieveJob> fileToRecall(new TestingRetrieveJob()); @@ -130,7 +135,7 @@ namespace unitTests{ t->pushDataBlock(nullptr); dwtp.push(t); } - + dwtp.finish(); dwtp.waitThreads(); ASSERT_EQ(5, report.completeJobs); diff --git a/tapeserver/castor/tape/tapeserver/daemon/RecallReportPacker.cpp b/tapeserver/castor/tape/tapeserver/daemon/RecallReportPacker.cpp index 0423e61f1e5371ccb35440c40bce662f23d02909..142c22eab641653fe9333e468a6e81f288478f09 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/RecallReportPacker.cpp +++ b/tapeserver/castor/tape/tapeserver/daemon/RecallReportPacker.cpp @@ -115,18 +115,6 @@ void RecallReportPacker::reportEndOfSessionWithErrors(const std::string& msg, in m_fifo.push(new ReportEndofSessionWithErrors(msg, error_code)); } - -//------------------------------------------------------------------------------ -//reportTestGoingToEnd -//------------------------------------------------------------------------------ -void RecallReportPacker::reportTestGoingToEnd(cta::log::LogContext& lc) { - cta::log::ScopedParamContainer params(lc); - params.add("type", "ReportTestGoingToEnd"); - lc.log(cta::log::DEBUG, "In RecallReportPacker::reportTestGoingToEnd(), pushing a report."); - cta::threading::MutexLocker ml(m_producterProtection); - m_fifo.push(new ReportTestGoingToEnd()); -} - //------------------------------------------------------------------------------ //ReportSuccessful::execute //------------------------------------------------------------------------------ @@ -147,7 +135,6 @@ void RecallReportPacker::ReportSuccessful::execute(RecallReportPacker& parent) { //ReportEndofSession::execute //------------------------------------------------------------------------------ void RecallReportPacker::ReportEndofSession::execute(RecallReportPacker& reportPacker) { - reportPacker.setDiskDone(); if (!reportPacker.errorHappened()) { reportPacker.m_lc.log(cta::log::INFO, "Nominal RecallReportPacker::EndofSession has been reported"); if (reportPacker.m_watchdog) { @@ -199,7 +186,6 @@ bool RecallReportPacker::ReportDriveStatus::goingToEnd() { //ReportEndofSessionWithErrors::execute //------------------------------------------------------------------------------ void RecallReportPacker::ReportEndofSessionWithErrors::execute(RecallReportPacker& parent) { - parent.setDiskDone(); if (parent.m_errorHappened) { LogContext::ScopedParam sp(parent.m_lc, Param("errorCode", m_error_code)); parent.m_lc.log(cta::log::ERR, m_message); @@ -436,7 +422,7 @@ bool RecallReportPacker::isDiskDone() { } //------------------------------------------------------------------------------ -//reportDiskDone() +//setDiskDone() //------------------------------------------------------------------------------ void RecallReportPacker::setDiskDone() { cta::threading::MutexLocker mutexLocker(m_mutex); @@ -444,9 +430,10 @@ void RecallReportPacker::setDiskDone() { } //------------------------------------------------------------------------------ -//reportDiskDone() +//allThreadsDone() //------------------------------------------------------------------------------ bool RecallReportPacker::allThreadsDone() { + cta::threading::MutexLocker mutexLocker(m_mutex); return m_tapeThreadComplete && m_diskThreadComplete; } diff --git a/tapeserver/castor/tape/tapeserver/daemon/RecallReportPacker.hpp b/tapeserver/castor/tape/tapeserver/daemon/RecallReportPacker.hpp index 99d5f228d24843de72f2a1f5a873cb8ea20157e1..084e6aacc1b8308085d67a9f1ada4af5d7c7c405 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/RecallReportPacker.hpp +++ b/tapeserver/castor/tape/tapeserver/daemon/RecallReportPacker.hpp @@ -41,7 +41,7 @@ public: */ RecallReportPacker(cta::RetrieveMount *retrieveMount, cta::log::LogContext& lc); - virtual ~RecallReportPacker(); + ~RecallReportPacker() override; /** * Create into the RecallReportPacker a report for the successful migration @@ -66,12 +66,6 @@ public: */ virtual void reportEndOfSession(cta::log::LogContext& lc); - /** - * Function for testing purposes. It is used to tell the report packer that this is the last report - * @param lc log context provided by the calling thread. - */ - virtual void reportTestGoingToEnd(cta::log::LogContext& lc); - /** * Create into the RecallReportPacker a report for an erroneous end of session * @param msg The error message @@ -131,32 +125,20 @@ private: //inner classes use to store content while receiving a report class Report { public: - virtual ~Report() {} + virtual ~Report() = default; virtual void execute(RecallReportPacker& packer) = 0; virtual bool goingToEnd() { return false; } }; - class ReportTestGoingToEnd : public Report { - public: - ReportTestGoingToEnd() {} - - void execute(RecallReportPacker& reportPacker) override { - reportPacker.m_retrieveMount->diskComplete(); - reportPacker.m_retrieveMount->tapeComplete(); - } - - bool goingToEnd() override { return true; } - }; - class ReportSuccessful : public Report { /** * The successful retrieve job to be reported immediately */ std::unique_ptr<cta::RetrieveJob> m_successfulRetrieveJob; public: - ReportSuccessful(std::unique_ptr<cta::RetrieveJob> successfulRetrieveJob) : + explicit ReportSuccessful(std::unique_ptr<cta::RetrieveJob> successfulRetrieveJob) : m_successfulRetrieveJob(std::move(successfulRetrieveJob)) {} void execute(RecallReportPacker& reportPacker) override; @@ -190,7 +172,7 @@ private: class ReportEndofSession : public Report { public: - ReportEndofSession() {} + ReportEndofSession() = default; void execute(RecallReportPacker& reportPacker) override; @@ -213,7 +195,7 @@ private: class WorkerThread : public cta::threading::Thread { RecallReportPacker& m_parent; public: - WorkerThread(RecallReportPacker& parent); + explicit WorkerThread(RecallReportPacker& parent); void run() override; } m_workerThread; @@ -257,9 +239,6 @@ private: /* * Proceed finish procedure for async execute for all reports. - * - * @param reportedSuccessfuly The successful reports to check - * @return The number of reports proceeded */ void fullCheckAndFinishAsyncExecute(); diff --git a/tapeserver/castor/tape/tapeserver/daemon/RecallReportPackerTest.cpp b/tapeserver/castor/tape/tapeserver/daemon/RecallReportPackerTest.cpp index fb8ca66c59072ee97e0f91cccbe6748ef2a66ca8..20492b9c518b9ae6b88d5349a640d16668da99e9 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/RecallReportPackerTest.cpp +++ b/tapeserver/castor/tape/tapeserver/daemon/RecallReportPackerTest.cpp @@ -86,11 +86,12 @@ TEST_F(castor_tape_tapeserver_daemon_RecallReportPackerTest, RecallReportPackerN rrp.reportCompletedJob(std::move(job1), lc); rrp.reportCompletedJob(std::move(job2), lc); - + rrp.reportDriveStatus(cta::common::dataStructures::DriveStatus::Unmounting, std::nullopt, lc); - + + rrp.setTapeDone(); + rrp.setDiskDone(); rrp.reportEndOfSession(lc); -// rrp.reportTestGoingToEnd(); rrp.waitThread(); std::string temp = log.getLog(); @@ -142,8 +143,9 @@ TEST_F(castor_tape_tapeserver_daemon_RecallReportPackerTest, RecallReportPackerB rrp.reportDriveStatus(cta::common::dataStructures::DriveStatus::Unmounting, std::nullopt, lc); + rrp.setTapeDone(); + rrp.setDiskDone(); rrp.reportEndOfSession(lc); -// rrp.reportTestGoingToEnd(); rrp.waitThread(); const std::string temp = log.getLog(); diff --git a/tapeserver/castor/tape/tapeserver/daemon/RecallTaskInjectorTest.cpp b/tapeserver/castor/tape/tapeserver/daemon/RecallTaskInjectorTest.cpp index 0c6ddb0dbc0e203b13d5cb30ecbee0261c1f8a51..a701b70f121cb34694e6c3a07525964ac8a2b421 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/RecallTaskInjectorTest.cpp +++ b/tapeserver/castor/tape/tapeserver/daemon/RecallTaskInjectorTest.cpp @@ -47,9 +47,9 @@ namespace unitTests void TearDown() { } - + }; // class castor_tape_tapeserver_daemonTest - + struct MockRecallReportPacker : public RecallReportPacker { void reportCompletedJob(std::unique_ptr<cta::RetrieveJob> successfulRetrieveJob, cta::log::LogContext& lc) override { cta::threading::MutexLocker ml(m_mutex); @@ -77,12 +77,12 @@ namespace unitTests int endSessions; int endSessionsWithError; }; - + class FakeDiskWriteThreadPool: public DiskWriteThreadPool { public: using DiskWriteThreadPool::m_tasks; - FakeDiskWriteThreadPool(RecallReportPacker &rrp, RecallWatchDog &rwd, + FakeDiskWriteThreadPool(RecallReportPacker &rrp, RecallWatchDog &rwd, cta::log::LogContext & lc): DiskWriteThreadPool(1,rrp, rwd,lc,"/dev/null", 0){} @@ -97,8 +97,8 @@ namespace unitTests FakeSingleTapeReadThread(tapeserver::drive::DriveInterface& drive, cta::mediachanger::MediaChangerFacade & mc, tapeserver::daemon::TapeServerReporter & tsr, - const tapeserver::daemon::VolumeInfo& volInfo, - cta::server::ProcessCap& cap, + const tapeserver::daemon::VolumeInfo& volInfo, + cta::server::ProcessCap& cap, const uint32_t tapeLoadTimeout, cta::log::LogContext & lc): TapeSingleThreadInterface<TapeReadTask>(drive, mc, tsr, volInfo,cap, lc, false, "", tapeLoadTimeout){} @@ -109,12 +109,12 @@ namespace unitTests delete m_tasks.pop(); } } - - virtual void run () + + virtual void run () { m_tasks.push(nullptr); } - + virtual void push(TapeReadTask* t){ m_tasks.push(t); } @@ -123,21 +123,25 @@ namespace unitTests protected: virtual void logSCSIMetrics() {}; }; - + class TestingDatabaseRetrieveMount: public cta::SchedulerDatabase::RetrieveMount { const MountInfo & getMountInfo() override { throw std::runtime_error("Not implemented"); } std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob> > getNextJobBatch(uint64_t filesRequested, uint64_t bytesRequested, cta::log::LogContext& logContext) override { throw std::runtime_error("Not implemented");} - void complete(time_t completionTime) override { throw std::runtime_error("Not implemented"); } - void setDriveStatus(cta::common::dataStructures::DriveStatus status, time_t completionTime,const std::optional<std::string> & reason) override { throw std::runtime_error("Not implemented"); } + + void + setDriveStatus(cta::common::dataStructures::DriveStatus status, cta::common::dataStructures::MountType mountType, + time_t completionTime, const std::optional<std::string>& reason) override { + throw std::runtime_error("Not implemented"); + } void setTapeSessionStats(const castor::tape::tapeserver::daemon::TapeSessionStats &stats) override { throw std::runtime_error("Not implemented"); } void flushAsyncSuccessReports(std::list<cta::SchedulerDatabase::RetrieveJob*>& jobsBatch, cta::log::LogContext& lc) override { throw std::runtime_error("Not implemented"); } void addDiskSystemToSkip(const cta::SchedulerDatabase::RetrieveMount::DiskSystemToSkip &diskSystemToSkip) override { throw std::runtime_error("Not implemented"); } void requeueJobBatch(std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob>>& jobBatch, cta::log::LogContext& logContext) override { throw std::runtime_error("Not implemented"); } void putQueueToSleep(const std::string &diskSystemName, const uint64_t sleepTime, cta::log::LogContext &logContext) override { throw std::runtime_error("Not implemented"); } - bool reserveDiskSpace(const cta::DiskSpaceReservationRequest &request, const std::string &fetchEosFreeSpaceScript, cta::log::LogContext& logContext) override { throw std::runtime_error("Not implemented"); } + bool reserveDiskSpace(const cta::DiskSpaceReservationRequest &request, const std::string &fetchEosFreeSpaceScript, cta::log::LogContext& logContext) override { throw std::runtime_error("Not implemented"); } }; - + TEST_F(castor_tape_tapeserver_daemonTest, RecallTaskInjectorNominal) { const int nbJobs=15; const int maxNbJobsInjectedAtOnce = 6; @@ -145,12 +149,12 @@ namespace unitTests cta::log::LogContext lc(log); RecallMemoryManager mm(50U, 50U, lc); castor::tape::tapeserver::drive::FakeDrive drive; - + auto catalogue = cta::catalogue::DummyCatalogue(); cta::MockRetrieveMount trm(catalogue); trm.createRetrieveJobs(nbJobs); //EXPECT_CALL(trm, internalGetNextJob()).Times(nbJobs+1); - + castor::messages::TapeserverProxyDummy tspd; cta::TapeMountDummy tmd; RecallWatchDog rwd(1,1,tspd,tmd,"",lc); @@ -205,7 +209,7 @@ namespace unitTests delete tapeReadTask; } } - + TEST_F(castor_tape_tapeserver_daemonTest, RecallTaskInjectorNoFiles) { cta::log::StringLogger log("dummy","castor_tape_tapeserver_daemon_RecallTaskInjectorTest",cta::log::DEBUG); cta::log::LogContext lc(log); @@ -215,7 +219,7 @@ namespace unitTests cta::MockRetrieveMount trm(catalogue); trm.createRetrieveJobs(0); //EXPECT_CALL(trm, internalGetNextJob()).Times(1); //no work: single call to getnextjob - + castor::messages::TapeserverProxyDummy tspd; cta::TapeMountDummy tmd; RecallWatchDog rwd(1,1,tspd,tmd,"",lc); @@ -224,12 +228,12 @@ namespace unitTests FakeDiskWriteThreadPool diskWrite(mrrp,rwd,lc); cta::log::DummyLogger dummyLog("dummy","dummy"); cta::mediachanger::MediaChangerFacade mc(dummyLog); - castor::messages::TapeserverProxyDummy initialProcess; + castor::messages::TapeserverProxyDummy initialProcess; castor::tape::tapeserver::daemon::VolumeInfo volume; volume.vid="V12345"; volume.mountType=cta::common::dataStructures::MountType::Retrieve; cta::server::ProcessCapDummy cap; - castor::tape::tapeserver::daemon::TapeServerReporter tsr(initialProcess, cta::tape::daemon::TpconfigLine(), "0.0.0.0", volume, lc); + castor::tape::tapeserver::daemon::TapeServerReporter tsr(initialProcess, cta::tape::daemon::TpconfigLine(), "0.0.0.0", volume, lc); FakeSingleTapeReadThread tapeRead(drive, mc, tsr, volume, cap, 60, lc); tapeserver::daemon::RecallTaskInjector rti(mm, tapeRead, diskWrite, trm, 6, blockSize, lc); diff --git a/tapeserver/castor/tape/tapeserver/daemon/TapeReadSingleThread.cpp b/tapeserver/castor/tape/tapeserver/daemon/TapeReadSingleThread.cpp index e2663015fbb3ec2c31d739edfe6e8c5864b5cb3e..1269b3de2ffc1fff840a343c5e21d54371aa1199 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/TapeReadSingleThread.cpp +++ b/tapeserver/castor/tape/tapeserver/daemon/TapeReadSingleThread.cpp @@ -40,7 +40,8 @@ castor::tape::tapeserver::daemon::TapeReadSingleThread::TapeReadSingleThread( const cta::RetrieveMount& retrieveMount, const uint32_t tapeLoadTimeout) : TapeSingleThreadInterface<TapeReadTask>(drive, mediaChanger, reporter, volInfo, - capUtils, logContext, useEncryption, externalEncryptionKeyScript, tapeLoadTimeout), + capUtils, logContext, useEncryption, externalEncryptionKeyScript, + tapeLoadTimeout), m_maxFilesRequest(maxFilesRequest), m_watchdog(watchdog), m_reportPacker(reportPacker), @@ -52,7 +53,8 @@ castor::tape::tapeserver::daemon::TapeReadSingleThread::TapeReadSingleThread( //TapeCleaning::~TapeCleaning() //------------------------------------------------------------------------------ castor::tape::tapeserver::daemon::TapeReadSingleThread::TapeCleaning::~TapeCleaning() { - m_this.m_reportPacker.reportDriveStatus(cta::common::dataStructures::DriveStatus::CleaningUp, std::nullopt, m_this.m_logContext); + m_this.m_reportPacker.reportDriveStatus(cta::common::dataStructures::DriveStatus::CleaningUp, std::nullopt, + m_this.m_logContext); // Tell everyone to wrap up the session // We now acknowledge to the task injector that read reached the end. There @@ -64,8 +66,9 @@ castor::tape::tapeserver::daemon::TapeReadSingleThread::TapeCleaning::~TapeClean // Disable encryption (or at least try) try { - if (m_this.m_encryptionControl.disable(m_this.m_drive)) + if (m_this.m_encryptionControl.disable(m_this.m_drive)) { m_this.m_logContext.log(cta::log::INFO, "Turned encryption off before unmounting"); + } } catch (cta::exception::Exception& ex) { cta::log::ScopedParamContainer scoped(m_this.m_logContext); scoped.add("exceptionError", ex.getMessageValue()); @@ -100,34 +103,59 @@ castor::tape::tapeserver::daemon::TapeReadSingleThread::TapeCleaning::~TapeClean } catch (cta::exception::TimeOut&) {} if (!m_this.m_drive.hasTapeInPlace()) { m_this.m_logContext.log(cta::log::INFO, "TapeReadSingleThread: No tape to unload"); - goto done; + m_this.m_reportPacker.reportDriveStatus(cta::common::dataStructures::DriveStatus::Up, std::nullopt, + m_this.m_logContext); + m_this.m_reporter.reportState(cta::tape::session::SessionState::ShuttingDown, + cta::tape::session::SessionType::Retrieve); + + //then we terminate the global status m_reporter + m_this.m_reporter.finish(); + return; } - m_this.m_reportPacker.reportDriveStatus(cta::common::dataStructures::DriveStatus::Unloading, std::nullopt, m_this.m_logContext); + m_this.m_reportPacker.reportDriveStatus(cta::common::dataStructures::DriveStatus::Unloading, std::nullopt, + m_this.m_logContext); m_this.m_drive.unloadTape(); m_this.m_logContext.log(cta::log::INFO, "TapeReadSingleThread: Tape unloaded"); m_this.m_stats.unloadTime += m_timer.secs(cta::utils::Timer::resetCounter); // And return the tape to the library currentErrorToCount = "Error_tapeDismount"; - m_this.m_reporter.reportState(cta::tape::session::SessionState::Unmounting, cta::tape::session::SessionType::Retrieve); - m_this.m_reportPacker.reportDriveStatus(cta::common::dataStructures::DriveStatus::Unmounting, std::nullopt, m_this.m_logContext); + m_this.m_reportPacker.reportDriveStatus(cta::common::dataStructures::DriveStatus::Unmounting, std::nullopt, + m_this.m_logContext); + m_this.m_reporter.reportState(cta::tape::session::SessionState::Unmounting, + cta::tape::session::SessionType::Retrieve); m_this.m_mediaChanger.dismountTape(m_this.m_volInfo.vid, m_this.m_drive.config.librarySlot()); m_this.m_drive.disableLogicalBlockProtection(); - m_this.m_reportPacker.reportDriveStatus(cta::common::dataStructures::DriveStatus::Up, std::nullopt, m_this.m_logContext); - m_this.m_stats.unmountTime += m_timer.secs(cta::utils::Timer::resetCounter); m_this.m_logContext.log(cta::log::INFO, "TapeReadSingleThread : tape unmounted"); - // Report SessionState::ShuttingDown if all tasks finished or - // SessionState::DrainingToDisk if there is any DiskWriteWorkerThread still active - m_this.m_reporter.reportTapeUnmountedForRetrieve(); + m_this.m_stats.unmountTime += m_timer.secs(cta::utils::Timer::resetCounter); + + // Report drive UP if disk threads are done + // Report drive DrainingToDisk if there are disk write threads still active + if (m_this.m_reportPacker.allThreadsDone()) { + m_this.m_reportPacker.reportDriveStatus(cta::common::dataStructures::DriveStatus::Up, std::nullopt, + m_this.m_logContext); + m_this.m_reporter.reportState(cta::tape::session::SessionState::ShuttingDown, + cta::tape::session::SessionType::Retrieve); + } + else { + m_this.m_reportPacker.reportDriveStatus(cta::common::dataStructures::DriveStatus::DrainingToDisk, std::nullopt, + m_this.m_logContext); + m_this.m_reporter.reportState(cta::tape::session::SessionState::DrainingToDisk, + cta::tape::session::SessionType::Retrieve); + } + m_this.m_stats.waitReportingTime += m_timer.secs(cta::utils::Timer::resetCounter); } catch (const cta::exception::Exception& ex) { // Notify something failed during the cleaning m_this.m_hardwareStatus = Session::MARK_DRIVE_AS_DOWN; const int logLevel = cta::log::ERR; const std::string errorMsg = "Exception in TapeReadSingleThread-TapeCleaning when unmounting/unloading the tape. Putting the drive down."; - std::optional<std::string> reason = cta::common::dataStructures::DesiredDriveState::generateReasonFromLogMsg(logLevel, errorMsg); - m_this.m_reportPacker.reportDriveStatus(cta::common::dataStructures::DriveStatus::Down, reason, m_this.m_logContext); + std::optional<std::string> reason = cta::common::dataStructures::DesiredDriveState::generateReasonFromLogMsg( + logLevel, errorMsg); + m_this.m_reportPacker.reportDriveStatus(cta::common::dataStructures::DriveStatus::Down, reason, + m_this.m_logContext); + m_this.m_reporter.reportState(cta::tape::session::SessionState::Fatal, cta::tape::session::SessionType::Retrieve); cta::log::ScopedParamContainer scoped(m_this.m_logContext); scoped.add("exceptionMessage", ex.getMessageValue()); m_this.m_logContext.log(logLevel, errorMsg); @@ -141,11 +169,14 @@ castor::tape::tapeserver::daemon::TapeReadSingleThread::TapeCleaning::~TapeClean } catch (...) {} } catch (...) { // Notify something failed during the cleaning - const int logLevel = cta::log::ERR; - const std::string errorMsg = "Non-Castor exception in TapeReadSingleThread-TapeCleaning when unmounting the tape. Putting the drive down."; - std::optional<std::string> reason = cta::common::dataStructures::DesiredDriveState::generateReasonFromLogMsg(logLevel, errorMsg); m_this.m_hardwareStatus = Session::MARK_DRIVE_AS_DOWN; - m_this.m_reportPacker.reportDriveStatus(cta::common::dataStructures::DriveStatus::Down, reason, m_this.m_logContext); + const int logLevel = cta::log::ERR; + const std::string errorMsg = "Non-CTA exception in TapeReadSingleThread-TapeCleaning when unmounting the tape. Putting the drive down."; + std::optional<std::string> reason = cta::common::dataStructures::DesiredDriveState::generateReasonFromLogMsg( + logLevel, errorMsg); + m_this.m_reportPacker.reportDriveStatus(cta::common::dataStructures::DriveStatus::Down, reason, + m_this.m_logContext); + m_this.m_reporter.reportState(cta::tape::session::SessionState::Fatal, cta::tape::session::SessionType::Retrieve); m_this.m_logContext.log(logLevel, errorMsg); try { if (!currentErrorToCount.empty()) { @@ -153,7 +184,7 @@ castor::tape::tapeserver::daemon::TapeReadSingleThread::TapeCleaning::~TapeClean } } catch (...) {} } - done: + //then we terminate the global status m_reporter m_this.m_reporter.finish(); } @@ -228,19 +259,23 @@ void castor::tape::tapeserver::daemon::TapeReadSingleThread::run() { // of crash m_watchdog.updateThreadTimer(totalTimer); - //pair of brackets to create an artificial scope for the tapeCleaner + // Pair of brackets to create an artificial scope for the tapeCleaner { - //log and notify + // Log and notify m_logContext.log(cta::log::INFO, "Starting tape read thread"); // The tape will be loaded // it has to be unloaded, unmounted at all cost -> RAII // will also take care of the TapeServerReporter and of RecallTaskInjector TapeCleaning tapeCleaner(*this, timer); + // Before anything, the tape should be mounted m_reportPacker.reportDriveStatus(cta::common::dataStructures::DriveStatus::Mounting, std::nullopt, m_logContext); + m_reporter.reportState(cta::tape::session::SessionState::Mounting, cta::tape::session::SessionType::Retrieve); + std::ostringstream ossLabelFormat; ossLabelFormat << std::showbase << std::internal << std::setfill('0') << std::hex << std::setw(4) << static_cast<unsigned int>(m_volInfo.labelFormat); + cta::log::ScopedParamContainer params(m_logContext); params.add("vo", m_retrieveMount.getVo()); params.add("mediaType", m_retrieveMount.getMediaType()); @@ -251,8 +286,9 @@ void castor::tape::tapeserver::daemon::TapeReadSingleThread::run() { params.add("vendor", m_retrieveMount.getVendor()); params.add("capacityInBytes", m_retrieveMount.getCapacityInBytes()); m_logContext.log(cta::log::INFO, "Tape session started for read"); - mountTapeReadOnly(); + currentErrorToCount = "Error_tapeLoad"; + mountTapeReadOnly(); cta::utils::Timer tapeLoadTimer; waitForDrive(); double tapeLoadTime = tapeLoadTimer.secs(); @@ -328,8 +364,6 @@ void castor::tape::tapeserver::daemon::TapeReadSingleThread::run() { } } - m_reporter.reportState(cta::tape::session::SessionState::Running, - cta::tape::session::SessionType::Retrieve); m_stats.waitReportingTime += timer.secs(cta::utils::Timer::resetCounter); // Then we will loop on the tasks as they get from // the task injector @@ -341,7 +375,9 @@ void castor::tape::tapeserver::daemon::TapeReadSingleThread::run() { // From now on, the tasks will identify problems when executed. currentErrorToCount = ""; std::unique_ptr<TapeReadTask> task; - m_reportPacker.reportDriveStatus(cta::common::dataStructures::DriveStatus::Transferring, std::nullopt, m_logContext); + m_reportPacker.reportDriveStatus(cta::common::dataStructures::DriveStatus::Transferring, std::nullopt, + m_logContext); + m_reporter.reportState(cta::tape::session::SessionState::Running, cta::tape::session::SessionType::Retrieve); while (true) { //get a task task.reset(popAndRequestMoreJobs()); @@ -358,7 +394,8 @@ void castor::tape::tapeserver::daemon::TapeReadSingleThread::run() { m_watchdog.updateStatsWithoutDeliveryTime(m_stats); // The session could have been corrupted (failed positioning) if (readSession->isCorrupted()) { - throw cta::exception::Exception("Session corrupted: exiting task execution loop in TapeReadSingleThread. Cleanup will follow."); + throw cta::exception::Exception( + "Session corrupted: exiting task execution loop in TapeReadSingleThread. Cleanup will follow."); } } } @@ -368,13 +405,27 @@ void castor::tape::tapeserver::daemon::TapeReadSingleThread::run() { cta::log::ScopedParamContainer params(m_logContext); params.add("status", "success"); m_stats.totalTime = totalTimer.secs(); - m_reportPacker.setTapeDone(); - m_reportPacker.setTapeComplete(); logWithStat(cta::log::INFO, "Tape thread complete", params); // Report one last time the stats, after unloading/unmounting. m_watchdog.updateStatsWithoutDeliveryTime(m_stats); - // end of session + log are reported by the last active disk thread - // in DiskWriteThreadPool::DiskWriteWorkerThread::run() + + // End of session and log are reported by the last active disk thread + // in DiskWriteThreadPool::DiskWriteWorkerThread::run() if it finishes after this TapeReadSingleThread + // Otherwise end of session is reported here + m_reportPacker.setTapeDone(); + m_reportPacker.setTapeComplete(); + + if (m_reportPacker.allThreadsDone()) { + // If disk threads finished before (for example, due to write error), report end of session + if (!m_watchdog.errorHappened()) { + m_reportPacker.reportEndOfSession(m_logContext); + m_logContext.log(cta::log::INFO, + "Both DiskWriteWorkerThread and TapeReadSingleThread existed, reported a successful end of session"); + } + else { + m_reportPacker.reportEndOfSessionWithErrors("End of recall session with error(s)", 666, m_logContext); + } + } } catch (const cta::exception::Exception& e) { // We can still update the session stats one last time (unmount timings // should have been updated by the RAII cleaner/unmounter). @@ -388,8 +439,7 @@ void castor::tape::tapeserver::daemon::TapeReadSingleThread::run() { params.add("status", "error") .add("ErrorMessage", e.getMessageValue()); m_stats.totalTime = totalTimer.secs(); - logWithStat(cta::log::INFO, "Tape thread complete for reading", - params); + logWithStat(cta::log::ERR, "Tape thread complete for reading", params); // Also transmit the error step to the watchdog if (!currentErrorToCount.empty()) { m_watchdog.addToErrorCount(currentErrorToCount); @@ -403,6 +453,22 @@ void castor::tape::tapeserver::daemon::TapeReadSingleThread::run() { task->reportCancellationToDiskTask(); delete task; } + + // Notify tape thread is finished to gracefully end the session + m_reportPacker.setTapeDone(); + m_reportPacker.setTapeComplete(); + + if (m_reportPacker.allThreadsDone()) { + // If disk threads finished before (for example, due to write error), report end of session + if (!m_watchdog.errorHappened()) { + m_reportPacker.reportEndOfSession(m_logContext); + m_logContext.log(cta::log::INFO, + "Both DiskWriteWorkerThread and TapeReadSingleThread existed, reported a successful end of session"); + } + else { + m_reportPacker.reportEndOfSessionWithErrors("End of recall session with error(s)", 666, m_logContext); + } + } } } diff --git a/tapeserver/castor/tape/tapeserver/daemon/TapeServerReporter.cpp b/tapeserver/castor/tape/tapeserver/daemon/TapeServerReporter.cpp index fa60e2e9c6ccba0b21498f61bb44e6a0ad7bdcf5..27dcc605742201b969b6f0c62f8eee16e7906229 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/TapeServerReporter.cpp +++ b/tapeserver/castor/tape/tapeserver/daemon/TapeServerReporter.cpp @@ -90,20 +90,6 @@ void TapeServerReporter::reportState(cta::tape::session::SessionState state, m_fifo.push(new ReportStateChange(state, type)); } -//------------------------------------------------------------------------------ -//reportTapeUnmountedForRetrieve -//------------------------------------------------------------------------------ -void TapeServerReporter::reportTapeUnmountedForRetrieve() { - m_fifo.push(new ReportTapeUnmountedForRetrieve()); -} - -//------------------------------------------------------------------------------ -//reportDiskCompleteForRetrieve -//------------------------------------------------------------------------------ -void TapeServerReporter::reportDiskCompleteForRetrieve() { - m_fifo.push(new ReportDiskCompleteForRetrieve()); -} - //------------------------------------------------------------------------------ //run //------------------------------------------------------------------------------ @@ -146,32 +132,6 @@ void TapeServerReporter::ReportStateChange::execute(TapeServerReporter& parent) parent.m_tapeserverProxy.reportState(m_state, m_type, parent.m_volume.vid); } -//------------------------------------------------------------------------------ -// ReportTapeUnmountedForRetrieve::execute()) -//------------------------------------------------------------------------------ -void TapeServerReporter::ReportTapeUnmountedForRetrieve::execute(TapeServerReporter& parent) { - parent.m_tapeUnmountedForRecall = true; - if (parent.m_diskCompleteForRecall) { - parent.m_tapeserverProxy.reportState(cta::tape::session::SessionState::ShuttingDown, - cta::tape::session::SessionType::Retrieve, parent.m_volume.vid); - } - else { - parent.m_tapeserverProxy.reportState(cta::tape::session::SessionState::DrainingToDisk, - cta::tape::session::SessionType::Retrieve, parent.m_volume.vid); - } -} - -//------------------------------------------------------------------------------ -// ReportDiskCompleteForRetrieve::execute()) -//------------------------------------------------------------------------------ -void TapeServerReporter::ReportDiskCompleteForRetrieve::execute(TapeServerReporter& parent) { - parent.m_diskCompleteForRecall = true; - if (parent.m_tapeUnmountedForRecall) { - parent.m_tapeserverProxy.reportState(cta::tape::session::SessionState::ShuttingDown, - cta::tape::session::SessionType::Retrieve, parent.m_volume.vid); - } -} - } } } diff --git a/tapeserver/castor/tape/tapeserver/daemon/TapeServerReporter.hpp b/tapeserver/castor/tape/tapeserver/daemon/TapeServerReporter.hpp index 95be2f17b9836200bb7eeb1a4434f494f7203c1f..053e739bfc300336a94f8f0b728ae63c3c1a2a5f 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/TapeServerReporter.hpp +++ b/tapeserver/castor/tape/tapeserver/daemon/TapeServerReporter.hpp @@ -68,18 +68,6 @@ public: void reportState(cta::tape::session::SessionState state, cta::tape::session::SessionType type); - /** - * Special function managing the special case of retrieves, where disk and - * tape thread can finish in different orders (tape part) - */ - void reportTapeUnmountedForRetrieve(); - - /** - * Special function managing the special case of retrieves, where disk and - * tape thread can finish in different orders (disk part) - */ - void reportDiskCompleteForRetrieve(); - //------------------------------------------------------------------------------ //start and wait for thread to finish void startThreads(); @@ -119,16 +107,6 @@ private: cta::tape::session::SessionType m_type; }; - class ReportTapeUnmountedForRetrieve : public Report { - public: - void execute(TapeServerReporter&) override; - }; - - class ReportDiskCompleteForRetrieve : public Report { - public: - void execute(TapeServerReporter&) override; - }; - /** * Inherited from Thread, it will do the job : pop a request, execute it * and delete it diff --git a/tapeserver/castor/tape/tapeserver/daemon/TapeWriteSingleThread.cpp b/tapeserver/castor/tape/tapeserver/daemon/TapeWriteSingleThread.cpp index 94cebf8578907397c03da76c0601744972c6cdf3..9580736e7ef9391ff3e59fd11f37f951883ff68b 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/TapeWriteSingleThread.cpp +++ b/tapeserver/castor/tape/tapeserver/daemon/TapeWriteSingleThread.cpp @@ -100,7 +100,14 @@ castor::tape::tapeserver::daemon::TapeWriteSingleThread::TapeCleaning::~TapeClea } catch (cta::exception::TimeOut&) {} if (!m_this.m_drive.hasTapeInPlace()) { m_this.m_logContext.log(cta::log::INFO, "TapeWriteSingleThread: No tape to unload"); - goto done; + m_this.m_reportPacker.reportDriveStatus(cta::common::dataStructures::DriveStatus::Up, std::nullopt, + m_this.m_logContext); + m_this.m_reporter.reportState(cta::tape::session::SessionState::ShuttingDown, + cta::tape::session::SessionType::Retrieve); + + //then we terminate the global status m_reporter + m_this.m_reporter.finish(); + return; } m_this.m_reportPacker.reportDriveStatus(cta::common::dataStructures::DriveStatus::Unloading, std::nullopt, m_this.m_logContext); @@ -110,13 +117,15 @@ castor::tape::tapeserver::daemon::TapeWriteSingleThread::TapeCleaning::~TapeClea // And return the tape to the library currentErrorToCount = "Error_tapeDismount"; - m_this.m_reporter.reportState(cta::tape::session::SessionState::Unmounting, cta::tape::session::SessionType::Archive); m_this.m_reportPacker.reportDriveStatus(cta::common::dataStructures::DriveStatus::Unmounting, std::nullopt, m_this.m_logContext); + m_this.m_reporter.reportState(cta::tape::session::SessionState::Unmounting, cta::tape::session::SessionType::Archive); m_this.m_mediaChanger.dismountTape(m_this.m_volInfo.vid, m_this.m_drive.config.librarySlot()); m_this.m_drive.disableLogicalBlockProtection(); - m_this.m_reportPacker.reportDriveStatus(cta::common::dataStructures::DriveStatus::Up, std::nullopt, m_this.m_logContext); m_this.m_stats.unmountTime += m_timer.secs(cta::utils::Timer::resetCounter); m_this.m_logContext.log(cta::log::INFO, "TapeWriteSingleThread : tape unmounted"); + + // We know we are the last thread, just report the drive Up + m_this.m_reportPacker.reportDriveStatus(cta::common::dataStructures::DriveStatus::Up, std::nullopt, m_this.m_logContext); m_this.m_reporter.reportState(cta::tape::session::SessionState::ShuttingDown, cta::tape::session::SessionType::Archive); m_this.m_stats.waitReportingTime += m_timer.secs(cta::utils::Timer::resetCounter); @@ -126,8 +135,11 @@ castor::tape::tapeserver::daemon::TapeWriteSingleThread::TapeCleaning::~TapeClea m_this.m_hardwareStatus = Session::MARK_DRIVE_AS_DOWN; const int logLevel = cta::log::ERR; const std::string errorMsg = "Exception in TapeWriteSingleThread-TapeCleaning when unmounting/unloading the tape. Putting the drive down."; - std::optional<std::string> reason = cta::common::dataStructures::DesiredDriveState::generateReasonFromLogMsg(logLevel, errorMsg); - m_this.m_reportPacker.reportDriveStatus(cta::common::dataStructures::DriveStatus::Down, reason, m_this.m_logContext); + std::optional<std::string> reason = cta::common::dataStructures::DesiredDriveState::generateReasonFromLogMsg( + logLevel, errorMsg); + m_this.m_reportPacker.reportDriveStatus(cta::common::dataStructures::DriveStatus::Down, reason, + m_this.m_logContext); + m_this.m_reporter.reportState(cta::tape::session::SessionState::Fatal, cta::tape::session::SessionType::Archive); cta::log::ScopedParamContainer scoped(m_this.m_logContext); scoped.add("exceptionMessage", ex.getMessageValue()); m_this.m_logContext.log(logLevel, errorMsg); @@ -141,11 +153,14 @@ castor::tape::tapeserver::daemon::TapeWriteSingleThread::TapeCleaning::~TapeClea } catch (...) {} } catch (...) { // Notify something failed during the cleaning - const int logLevel = cta::log::ERR; - const std::string errorMsg = "Non-Castor exception in TapeWriteSingleThread-TapeCleaning when unmounting the tape. Putting the drive down."; - std::optional<std::string> reason = cta::common::dataStructures::DesiredDriveState::generateReasonFromLogMsg(logLevel, errorMsg); m_this.m_hardwareStatus = Session::MARK_DRIVE_AS_DOWN; - m_this.m_reportPacker.reportDriveStatus(cta::common::dataStructures::DriveStatus::Down, reason, m_this.m_logContext); + const int logLevel = cta::log::ERR; + const std::string errorMsg = "Non-CTA exception in TapeWriteSingleThread-TapeCleaning when unmounting the tape. Putting the drive down."; + std::optional<std::string> reason = cta::common::dataStructures::DesiredDriveState::generateReasonFromLogMsg( + logLevel, errorMsg); + m_this.m_reportPacker.reportDriveStatus(cta::common::dataStructures::DriveStatus::Down, reason, + m_this.m_logContext); + m_this.m_reporter.reportState(cta::tape::session::SessionState::Fatal, cta::tape::session::SessionType::Archive); m_this.m_logContext.log(logLevel, errorMsg); try { if (!currentErrorToCount.empty()) { @@ -153,7 +168,7 @@ castor::tape::tapeserver::daemon::TapeWriteSingleThread::TapeCleaning::~TapeClea } } catch (...) {} } - done: + //then we terminate the global status m_reporter m_this.m_reporter.finish(); } @@ -212,10 +227,11 @@ tapeFlush(const std::string& message, uint64_t bytes, uint64_t files, //------------------------------------------------------------------------ // logAndCheckTapeAlertsForWrite //------------------------------------------------------------------------------ -bool castor::tape::tapeserver::daemon::TapeWriteSingleThread:: -logAndCheckTapeAlertsForWrite() { +bool castor::tape::tapeserver::daemon::TapeWriteSingleThread::logAndCheckTapeAlertsForWrite() { std::vector<uint16_t> tapeAlertCodes = m_drive.getTapeAlertCodes(); - if (tapeAlertCodes.empty()) return false; + if (tapeAlertCodes.empty()) { + return false; + } size_t alertNumber = 0; // Log tape alerts in the logs. std::vector<std::string> tapeAlerts = m_drive.getTapeAlerts(tapeAlertCodes); @@ -279,9 +295,9 @@ void castor::tape::tapeserver::daemon::TapeWriteSingleThread::run() { // of crash m_watchdog.updateThreadTimer(totalTimer); - //pair of brackets to create an artificial scope for the tape cleaning + // Pair of brackets to create an artificial scope for the tape cleaning { - //log and notify + // Log and notify m_logContext.log(cta::log::INFO, "Starting tape write thread"); // The tape will be loaded @@ -290,6 +306,7 @@ void castor::tape::tapeserver::daemon::TapeWriteSingleThread::run() { TapeCleaning cleaner(*this, timer); // Before anything, the tape should be mounted m_reportPacker.reportDriveStatus(cta::common::dataStructures::DriveStatus::Mounting, std::nullopt, m_logContext); + m_reporter.reportState(cta::tape::session::SessionState::Mounting, cta::tape::session::SessionType::Archive); cta::log::ScopedParamContainer params(m_logContext); params.add("vo", m_archiveMount.getVo()); params.add("mediaType", m_archiveMount.getMediaType()); @@ -381,8 +398,6 @@ void castor::tape::tapeserver::daemon::TapeWriteSingleThread::run() { } } - m_reporter.reportState(cta::tape::session::SessionState::Running, - cta::tape::session::SessionType::Archive); m_stats.waitReportingTime += timer.secs(cta::utils::Timer::resetCounter); uint64_t bytes = 0; uint64_t files = 0; @@ -390,6 +405,7 @@ void castor::tape::tapeserver::daemon::TapeWriteSingleThread::run() { currentErrorToCount = ""; std::unique_ptr<TapeWriteTask> task; m_reportPacker.reportDriveStatus(cta::common::dataStructures::DriveStatus::Transferring, std::nullopt, m_logContext); + m_reporter.reportState(cta::tape::session::SessionState::Running, cta::tape::session::SessionType::Archive); while (true) { //get a task task.reset(m_tasks.pop()); @@ -483,7 +499,6 @@ void castor::tape::tapeserver::daemon::TapeWriteSingleThread::run() { std::string errorMessage(e.getMessageValue()); int errorCode(666); // Override if we got en ENOSPC error (end of tape) - // This is try { const auto& errnum = dynamic_cast<const cta::exception::Errnum&> (e); @@ -497,8 +512,7 @@ void castor::tape::tapeserver::daemon::TapeWriteSingleThread::run() { params.add("status", "error") .add("ErrorMessage", errorMessage); m_stats.totalTime = totalTimer.secs(); - logWithStats(cta::log::INFO, "Tape thread complete for writing", - params); + logWithStats(cta::log::ERR, "Tape thread complete for writing", params); m_reportPacker.reportEndOfSessionWithErrors(errorMessage, errorCode, m_logContext); } } diff --git a/tapeserver/castor/tape/tapeserver/daemon/TaskWatchDog.hpp b/tapeserver/castor/tape/tapeserver/daemon/TaskWatchDog.hpp index ae324f2f4c1ae2f244eff73f10d7f0f527e8d6dc..21d4d7897ec8f16cbaaae9bf160764af598bb041 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/TaskWatchDog.hpp +++ b/tapeserver/castor/tape/tapeserver/daemon/TaskWatchDog.hpp @@ -483,7 +483,7 @@ private: .add("fileId", m_fileId) .add("fileId",m_fileId) .add("fSeq",m_fSeq); - m_lc.log(cta::log::WARNING, "No tape block movement for too long"); + m_lc.log(cta::log::ERR, "No tape block movement for too long"); } public: @@ -535,7 +535,7 @@ private: params.add("TimeSinceLastBlockMove", m_blockMovementTimer.secs()) .add("fileId",m_fileId) .add("fSeq",m_fSeq); - m_lc.log(cta::log::WARNING, "No tape block movement for too long"); + m_lc.log(cta::log::ERR, "No tape block movement for too long"); } public: diff --git a/tapeserver/daemon/DriveHandler.cpp b/tapeserver/daemon/DriveHandler.cpp index 522f5cc975c75307b5df78b04d144934daa96e5a..e591dd5fc2d2e1292e878954069e87360e08e379 100644 --- a/tapeserver/daemon/DriveHandler.cpp +++ b/tapeserver/daemon/DriveHandler.cpp @@ -310,21 +310,29 @@ SubprocessHandler::ProcessingStatus DriveHandler::processEvent() { switch (static_cast<SessionState>(message.sessionstate())) { case SessionState::Checking: - return processChecking(message); + m_processingStatus = processChecking(message); + break; case SessionState::Scheduling: - return processScheduling(message); + m_processingStatus = processScheduling(message); + break; case SessionState::Mounting: - return processMounting(message); + m_processingStatus = processMounting(message); + break; case SessionState::Running: - return processRunning(message); + m_processingStatus = processRunning(message); + break; case SessionState::Unmounting: - return processUnmounting(message); + m_processingStatus = processUnmounting(message); + break; case SessionState::DrainingToDisk: - return processDrainingToDisk(message); + m_processingStatus = processDrainingToDisk(message); + break; case SessionState::ShuttingDown: - return processShuttingDown(message); + m_processingStatus = processShuttingDown(message); + break; case SessionState::Fatal: - return processFatal(message); + m_processingStatus = processFatal(message); + break; default: { exception::Exception ex; ex.getMessage() << "In DriveHandler::processEvent(): unexpected session state:" @@ -483,6 +491,11 @@ SubprocessHandler::ProcessingStatus DriveHandler::processRunning(serializers::Wa "WARNING: In DriveHandler::processMounting(): unexpected previous state/type."); } + // On state change reset the data movement counter + if (m_sessionState != static_cast<SessionState>(message.sessionstate())) { + m_lastDataMovementTime=std::chrono::steady_clock::now(); + } + m_sessionVid = message.vid(); return m_processingStatus; }