Commit 37c687d5 authored by Daniele Kruse's avatar Daniele Kruse
Browse files

Partial commit to make the status update mechanism work (many many changes),...

Partial commit to make the status update mechanism work (many many changes), compiles and passes unit tests now
parent 656aa531
......@@ -117,3 +117,10 @@ void cta::ArchiveMount::abort() {
//------------------------------------------------------------------------------
cta::ArchiveMount::~ArchiveMount() throw() {
}
//------------------------------------------------------------------------------
// setDriveStatus()
//------------------------------------------------------------------------------
void cta::ArchiveMount::setDriveStatus(cta::DriveStatus status) {
m_dbMount->setDriveStatus(status, time(NULL));
}
......@@ -84,6 +84,11 @@ namespace cta {
* Indicates that the mount was cancelled.
*/
virtual void abort();
/**
* Report a drive status change
*/
virtual void setDriveStatus(cta::DriveStatus status);
CTA_GENERATE_EXCEPTION_CLASS(SessionNotRunning);
/**
......
......@@ -1601,7 +1601,24 @@ void OStoreDB::RetrieveMount::setDriveStatus(cta::DriveStatus status, time_t com
dr.reportDriveStatus(mountInfo.drive, mountInfo.logicalLibrary,
status, completionTime,
cta::MountType::RETRIEVE, 0,
0, 0, 0, "", "");
0, 0, 0, getMountInfo().vid, getMountInfo().tapePool);
dr.commit();
}
void OStoreDB::ArchiveMount::setDriveStatus(cta::DriveStatus status, time_t completionTime) {
// We just report the drive status as instructed by the tape thread.
// Get the drive register
objectstore::RootEntry re(m_objectStore);
objectstore::ScopedSharedLock rel(re);
re.fetch();
objectstore::DriveRegister dr(re.getDriveRegisterAddress(), m_objectStore);
objectstore::ScopedExclusiveLock drl(dr);
dr.fetch();
// Reset the drive state.
dr.reportDriveStatus(mountInfo.drive, mountInfo.logicalLibrary,
status, completionTime,
cta::MountType::ARCHIVE, 0,
0, 0, 0, getMountInfo().vid, getMountInfo().tapePool);
dr.commit();
}
......
......@@ -84,6 +84,7 @@ public:
virtual const MountInfo & getMountInfo();
virtual std::unique_ptr<ArchiveJob> getNextJob();
virtual void complete(time_t completionTime);
virtual void setDriveStatus(cta::DriveStatus status, time_t completionTime);
};
/* === Archive Job Handling =============================================== */
......
......@@ -858,6 +858,7 @@ std::unique_ptr<cta::TapeMount> cta::Scheduler::getNextMount(
Utils::getShortHostname(),
time(NULL)).release());
internalRet->m_sessionRunning = true;
internalRet->setDriveStatus(cta::DriveStatus::Starting);
return std::unique_ptr<TapeMount> (internalRet.release());
} catch (cta::exception::Exception & ex) {
continue;
......@@ -882,6 +883,7 @@ std::unique_ptr<cta::TapeMount> cta::Scheduler::getNextMount(
internalRet->m_sessionRunning = true;
internalRet->m_diskRunning = true;
internalRet->m_tapeRunning = true;
internalRet->setDriveStatus(cta::DriveStatus::Starting);
return std::unique_ptr<TapeMount> (internalRet.release());
} catch (cta::exception::Exception & ex) {
std::string debug=ex.getMessageValue();
......
......@@ -171,6 +171,7 @@ public:
virtual const MountInfo & getMountInfo() = 0;
virtual std::unique_ptr<ArchiveJob> getNextJob() = 0;
virtual void complete(time_t completionTime) = 0;
virtual void setDriveStatus(DriveStatus status, time_t completionTime) = 0;
virtual ~ArchiveMount() {}
uint32_t nbFilesCurrentlyOnTape;
};
......
......@@ -25,48 +25,48 @@
namespace cta {
class MockRetrieveMount: public cta::RetrieveMount {
public:
int getJobs;
int completes;
MockRetrieveMount(): getJobs(0), completes(0) {}
public:
int getJobs;
int completes;
MockRetrieveMount(): getJobs(0), completes(0) {}
~MockRetrieveMount() throw() {
}
~MockRetrieveMount() throw() {
}
std::unique_ptr<cta::RetrieveJob> getNextJob() {
getJobs++;
if(m_jobs.empty()) {
return std::unique_ptr<cta::RetrieveJob>();
} else {
std::unique_ptr<cta::RetrieveJob> job = std::move(m_jobs.front());
m_jobs.pop_front();
return job;
}
}
virtual std::string getMountTransactionId() const {
return "1234567890";
std::unique_ptr<cta::RetrieveJob> getNextJob() {
getJobs++;
if(m_jobs.empty()) {
return std::unique_ptr<cta::RetrieveJob>();
} else {
std::unique_ptr<cta::RetrieveJob> job = std::move(m_jobs.front());
m_jobs.pop_front();
return job;
}
virtual void complete() { completes ++; }
virtual void abort() { completes ++; }
void diskComplete() {};
}
virtual std::string getMountTransactionId() const {
return "1234567890";
}
virtual void complete() { completes ++; }
virtual void abort() { completes ++; }
void diskComplete() { completes ++;};
void tapeComplete() {};
bool bothSidesComplete() { return false; }
void tapeComplete() {};
bool bothSidesComplete() { return false; }
private:
private:
std::list<std::unique_ptr<cta::RetrieveJob>> m_jobs;
public:
void createRetrieveJobs(const unsigned int nbJobs) {
for(unsigned int i = 0; i < nbJobs; i++) {
m_jobs.push_back(std::unique_ptr<cta::RetrieveJob>(
new MockRetrieveJob(*this)));
}
std::list<std::unique_ptr<cta::RetrieveJob>> m_jobs;
public:
void createRetrieveJobs(const unsigned int nbJobs) {
for(unsigned int i = 0; i < nbJobs; i++) {
m_jobs.push_back(std::unique_ptr<cta::RetrieveJob>(
new MockRetrieveJob(*this)));
}
}; // class MockRetrieveMount
}
}; // class MockRetrieveMount
}
\ No newline at end of file
......@@ -153,7 +153,7 @@ castor::tape::tapeserver::daemon::Session::EndOfSessionAction
tsr.gotReadMountDetailsFromClient();
TapeReadSingleThread trst(*drive, m_mc, tsr, m_volInfo,
m_castorConf.bulkRequestRecallMaxFiles,m_capUtils,rwd,lc);
m_castorConf.bulkRequestRecallMaxFiles,m_capUtils,rwd,lc,rrp);
DiskWriteThreadPool dwtp(m_castorConf.nbDiskThreads,
rrp,
......@@ -188,8 +188,8 @@ castor::tape::tapeserver::daemon::Session::EndOfSessionAction
// All client notifications are done by the report packer, including the
// end of session
rti.waitThreads();
rrp.waitThread();
dwtp.waitThreads();
rrp.waitThread();
trst.waitThreads();
tsr.waitThreads();
rwd.stopAndWaitThread();
......
......@@ -90,6 +90,7 @@ void DiskWriteThreadPool::waitThreads() {
(*i)->wait();
}
m_lc.log(LOG_INFO, "All DiskWriteThreadPool threads are now complete");
m_reporter.setDiskDone();
}
//------------------------------------------------------------------------------
......
......@@ -28,6 +28,7 @@
#include "serrno.h"
#include <signal.h>
#include <iostream>
namespace{
struct failedReportRecallResult : public castor::exception::Exception{
......@@ -138,7 +139,6 @@ void RecallReportPacker::ReportEndofSession::execute(RecallReportPacker& parent)
//ReportEndofSession::goingToEnd
//------------------------------------------------------------------------------
bool RecallReportPacker::ReportEndofSession::goingToEnd(RecallReportPacker& packer) {
packer.setDiskDone();
return packer.allThreadsDone();
}
......@@ -169,7 +169,6 @@ void RecallReportPacker::ReportEndofSessionWithErrors::execute(RecallReportPacke
//ReportEndofSessionWithErrors::goingToEnd
//------------------------------------------------------------------------------
bool RecallReportPacker::ReportEndofSessionWithErrors::goingToEnd(RecallReportPacker& packer) {
packer.setDiskDone();
return packer.allThreadsDone();
}
......
......@@ -95,6 +95,9 @@ TEST_F(castor_tape_tapeserver_daemonTest, RecallReportPackerNominal) {
rrp.reportCompletedJob(std::move(job1));
rrp.reportCompletedJob(std::move(job2));
rrp.setDiskDone();
rrp.setTapeDone();
rrp.reportEndOfSession();
rrp.waitThread();
......@@ -146,6 +149,9 @@ TEST_F(castor_tape_tapeserver_daemonTest, RecallReportPackerBadBadEnd) {
const castor::exception::Exception ex(error_msg);
job3->failureMessage = ex.getMessageValue();
rrp.reportFailedJob(std::move(job3));
rrp.setDiskDone();
rrp.setTapeDone();
rrp.reportEndOfSession();
rrp.waitThread();
......
......@@ -33,11 +33,13 @@ castor::tape::tapeserver::daemon::TapeReadSingleThread::TapeReadSingleThread(
uint64_t maxFilesRequest,
castor::server::ProcessCap& capUtils,
RecallWatchDog& watchdog,
castor::log::LogContext& lc) :
castor::log::LogContext& lc,
RecallReportPacker &rrp) :
TapeSingleThreadInterface<TapeReadTask>(drive, mc, initialProcess, volInfo,
capUtils, lc),
m_maxFilesRequest(maxFilesRequest),
m_watchdog(watchdog) {}
m_watchdog(watchdog),
m_rrp(rrp){}
//------------------------------------------------------------------------------
//TapeCleaning::~TapeCleaning()
......@@ -113,6 +115,7 @@ castor::tape::tapeserver::daemon::TapeReadSingleThread::TapeCleaning::~TapeClean
done:
//then we terminate the global status reporter
m_this.m_initialProcess.finish();
m_this.m_rrp.setTapeDone();
}
//------------------------------------------------------------------------------
......
......@@ -29,6 +29,7 @@
#include "castor/server/Threading.hpp"
#include "castor/tape/tapeserver/drive/DriveInterface.hpp"
#include "castor/tape/tapeserver/file/File.hpp"
#include "castor/tape/tapeserver/daemon/RecallReportPacker.hpp"
#include "castor/tape/tapeserver/daemon/RecallTaskInjector.hpp"
#include "castor/tape/tapeserver/daemon/TapeServerReporter.hpp"
#include "castor/tape/tapeserver/daemon/TaskWatchDog.hpp"
......@@ -63,7 +64,8 @@ public:
uint64_t maxFilesRequest,
castor::server::ProcessCap &capUtils,
RecallWatchDog& watchdog,
castor::log::LogContext & lc);
castor::log::LogContext & lc,
RecallReportPacker &rrp);
/**
* Set the task injector. Has to be done that way (and not in the constructor)
......@@ -133,6 +135,9 @@ private:
/// Reference to the watchdog, used in run()
RecallWatchDog& m_watchdog;
/// Reference to the RecallReportPacker, used to update tape/drive state during recall
RecallReportPacker & m_rrp;
/// Helper virtual function to access the watchdog from parent class
virtual void countTapeLogError(const std::string & error) {
m_watchdog.addToErrorCount(error);
......
......@@ -85,7 +85,7 @@ TapeServerReporter::TapeServerReporter(
//------------------------------------------------------------------------------
void TapeServerReporter::tapeMountedForWrite(){
m_fifo.push(
new ReportTapeMounterForWrite()
new ReportTapeMountedForWrite()
);
}
//------------------------------------------------------------------------------
......@@ -166,7 +166,7 @@ TapeServerReporter::TapeServerReporter(
//------------------------------------------------------------------------------
// ReportTapeMounterForWrite::execute
//------------------------------------------------------------------------------
void TapeServerReporter::ReportTapeMounterForWrite::
void TapeServerReporter::ReportTapeMountedForWrite::
execute(TapeServerReporter& parent){
parent.m_tapeserverProxy.tapeMountedForMigration(parent.m_volume.vid,
parent.m_unitName);
......
......@@ -126,7 +126,7 @@ private:
public:
virtual void execute(TapeServerReporter&);
};
class ReportTapeMounterForWrite : public Report {
class ReportTapeMountedForWrite : public Report {
public:
virtual void execute(TapeServerReporter&);
};
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment