Commit e36e5cf4 authored by David COME's avatar David COME
Browse files

Merge remote branch 'origin/tapeserver' into tapeserver

Conflicts:
	castor/tape/tapeserver/daemon/DiskWriteThreadPool.hpp
parents 16511704 0427db07
......@@ -33,33 +33,60 @@ using namespace castor::exception;
SErrnum::SErrnum(std::string what):Exception("") {
m_serrnum = serrno;
m_errnum = errno;
SErrnumConstructorBottomHalf(what);
}
SErrnum::SErrnum(int err, std::string what):Exception("") {
m_serrnum = err;
m_errnum = errno;
SErrnumConstructorBottomHalf(what);
}
void SErrnum::SErrnumConstructorBottomHalf(const std::string & what) {
char buf[100];
if(sstrerror_r(m_serrnum, buf, sizeof(buf))) {
// sstrerror_r() failed
const int new_errno = errno;
std::stringstream w;
w << "SErrno=" << m_serrnum << ". In addition, failed to read the corresponding error string (sstrerror_r gave errno="
<< new_errno << ")";
m_sstrerror = w.str();
// Castor-style error reporting is not for the faint of the heart.
// If we get serrno = 0, we should rely on the errnum instead
if(m_serrnum) {
// Normal case, serrnum is not zero.
if(sstrerror_r(m_serrnum, buf, sizeof(buf))) {
// sstrerror_r() failed
const int new_errno = errno;
std::stringstream w;
w << "SErrno=" << m_serrnum << ". In addition, failed to read the corresponding error string (sstrerror_r gave errno="
<< new_errno << ")";
m_sstrerror = w.str();
} else {
// sstrerror_r() succeeded
m_sstrerror = buf;
}
std::stringstream w2;
if (what.size())
w2 << what << " ";
w2 << "SErrno=" << m_serrnum << ": " << m_sstrerror;
getMessage().str(w2.str());
} else {
// sstrerror_r() succeeded
m_sstrerror = buf;
// degraded case, serrnum is indeed 0, we fall back on the normal error reporting.
std::stringstream w;
w << "Serrno=0. Failling back to errno=" << m_errnum;
char buf[100];
if(sstrerror_r(m_errnum, buf, sizeof(buf))) {
// sstrerror_r() failed
const int new_errno = errno;
w << ". In addition, failed to read the corresponding error string (sstrerror_r gave errno="
<< new_errno << ")";
} else {
// sstrerror_r() succeeded
w << ". "<< buf;
}
std::stringstream w2;
if (what.size())
w2 << what << " ";
w2 << w.str();
getMessage().str(w2.str());
}
std::stringstream w2;
if (what.size())
w2 << what << " ";
w2 << "SErrno=" << m_serrnum << ": " << m_sstrerror;
getMessage().str(w2.str());
}
void SErrnum::throwOnReturnedErrno (int err, std::string context) {
......
......@@ -45,6 +45,7 @@ namespace exception {
protected:
void SErrnumConstructorBottomHalf(const std::string & what);
int m_serrnum;
int m_errnum;
std::string m_sstrerror;
};
}
......
#include "castor/tape/tapeserver/daemon/DiskWriteThreadPool.hpp"
#include "log.h"
#include <memory>
#include <sstream>
namespace castor {
......@@ -11,28 +12,32 @@ namespace daemon {
ReportPackerInterface<detail::Recall>& report,castor::log::LogContext lc):
m_maxFilesReq(maxFilesReq), m_maxBytesReq(maxBlocksReq),
m_reporter(report),m_lc(lc)
{
{
m_lc.pushOrReplace(castor::log::Param("threadCount", nbThread));
m_lc.log(LOG_INFO, "Creating threads in DiskWriteThreadPool::DiskWriteThreadPool");
for(int i=0; i<nbThread; i++) {
DiskWriteWorkerThread * thr = new DiskWriteWorkerThread(*this);
m_threads.push_back(thr);
}
}
DiskWriteThreadPool::~DiskWriteThreadPool() {
DiskWriteThreadPool::~DiskWriteThreadPool() {
while (m_threads.size()) {
delete m_threads.back();
m_threads.pop_back();
}
}
void DiskWriteThreadPool::startThreads() {
m_lc.log(LOG_INFO, "Starting threads in DiskWriteThreadPool::DiskWriteThreadPool");
for (std::vector<DiskWriteWorkerThread *>::iterator i=m_threads.begin();
i != m_threads.end(); i++) {
(*i)->startThreads();
(*i)->start();
}
}
void DiskWriteThreadPool::waitThreads() {
for (std::vector<DiskWriteWorkerThread *>::iterator i=m_threads.begin();
i != m_threads.end(); i++) {
(*i)->waitThreads();
(*i)->wait();
}
}
void DiskWriteThreadPool::push(DiskWriteTaskInterface *t) {
......@@ -95,36 +100,34 @@ namespace daemon {
return ret;
}
void DiskWriteThreadPool::DiskWriteWorkerThread::run() {
m_lc.log(LOG_INFO, "Starting disk write thread");
std::auto_ptr<DiskWriteTaskInterface> task;
while(1) {
task.reset(_this. m_tasks.pop());
task.reset(m_parentThreadPool. m_tasks.pop());
if (NULL!=task.get()) {
if(false==task->execute(_this.m_reporter,lc)) {
++failledWritting;
if(false==task->execute(m_parentThreadPool.m_reporter,m_lc)) {
++m_parentThreadPool.m_failedWriteCount;
}
} //end of task!=NULL
else {
log::LogContext::ScopedParam param(lc, log::Param("threadID", threadID));
lc.log(LOG_INFO,"Disk write thread finishing");
log::LogContext::ScopedParam param(m_lc, log::Param("threadID", m_threadID));
m_lc.log(LOG_INFO,"Disk write thread finishing");
break;
}
} //enf of while(1)
if(0 == --m_nbActiveThread){
if(0 == --m_parentThreadPool.m_nbActiveThread){
//Im the last Thread alive, report end of session
if(failledWritting==0){
_this.m_reporter.reportEndOfSession();
if(m_parentThreadPool.m_failedWriteCount==0){
m_parentThreadPool.m_reporter.reportEndOfSession();
//TODO
// _this.m_jobInjector->end();
}
else{
_this.m_reporter.reportEndOfSessionWithErrors("A thread failed to write a file",SEINTERNAL);
m_parentThreadPool.m_reporter.reportEndOfSessionWithErrors("A thread failed to write a file",SEINTERNAL);
}
}
m_lc.log(LOG_INFO, "Disk write thread exiting");
}
tape::threading::AtomicCounter<int> DiskWriteThreadPool::DiskWriteWorkerThread::m_nbActiveThread(0);
tape::threading::AtomicCounter<int> DiskWriteThreadPool::DiskWriteWorkerThread::failledWritting(0);
}}}}
......@@ -64,28 +64,26 @@ private:
*/
DiskWriteTaskInterface * popAndRequestMoreJobs() ;
tape::threading::AtomicCounter<int> m_nbActiveThread;
tape::threading::AtomicCounter<int> m_failedWriteCount;
class DiskWriteWorkerThread: private castor::tape::threading::Thread {
public:
DiskWriteWorkerThread(DiskWriteThreadPool & manager):
threadID(m_nbActiveThread++),_this(manager),lc(_this.m_lc)
m_threadID(manager.m_nbActiveThread++),m_parentThreadPool(manager),m_lc(m_parentThreadPool.m_lc)
{
log::LogContext::ScopedParam param(lc, log::Param("threadID", threadID));
lc.log(LOG_INFO,"DiskWrite Thread created");
// This thread Id will remain for the rest of the thread's lifetime (and
// also context's lifetime) so ne need for a scope.
m_lc.pushOrReplace(log::Param("threadID", m_threadID));
m_lc.log(LOG_INFO,"DiskWrite Thread created");
}
void startThreads() { start(); }
void waitThreads() { wait(); }
void start() { castor::tape::threading::Thread::start(); }
void wait() { castor::tape::threading::Thread::wait(); }
private:
//counter to generate threadID and to know how many thread are still doing something
static tape::threading::AtomicCounter<int> m_nbActiveThread;
//counter to know how many files failed at writing among the different threads
//and choose the right endOfsession to call (with error or not))
static tape::threading::AtomicCounter<int> failledWritting;
const int threadID;
DiskWriteThreadPool & _this;
castor::log::LogContext lc;
const int m_threadID;
DiskWriteThreadPool & m_parentThreadPool;
castor::log::LogContext m_lc;
virtual void run();
};
......
......@@ -132,6 +132,7 @@ TEST(tapeServer, MountSessionGoodday) {
castorConf.rtcopydNbBufs = 10;
castorConf.tapebridgeBulkRequestRecallMaxBytes = UINT64_C(100)*1000*1000*1000;
castorConf.tapebridgeBulkRequestRecallMaxFiles = 1000;
castorConf.tapeserverdDiskThreads = 1;
MountSession sess(VDQMjob, logger, mockSys, tpConfig, castorConf);
sess.execute();
simRun.wait();
......
......@@ -36,7 +36,7 @@ namespace threading {
* the work to be done and the reporting of the completed work, in parallel
*/
template <class T> struct AtomicCounter{
AtomicCounter(T init): m_val(init) {};
AtomicCounter(T init = 0): m_val(init) {};
T operator ++ () {
threading::MutexLocker ml(&m_mutex);
return ++m_val;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment