Skip to content
GitLab
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
dCache
cta
Commits
9b63c8fb
Commit
9b63c8fb
authored
Jul 15, 2014
by
David COME
Browse files
Fixed some missing comments or their style
parent
9cd7d8c7
Changes
4
Hide whitespace changes
Inline
Side-by-side
castor/tape/tapeserver/daemon/MigrationReportPacker.hpp
View file @
9b63c8fb
...
...
@@ -81,7 +81,11 @@ public:
*/
void
reportEndOfSessionWithErrors
(
const
std
::
string
msg
,
int
error_code
);
/**
* Create into the MigrationReportPacker a report signaling we have stuck on
* that particlar file without moving for to long
* @param file
*/
void
reportStuckOn
(
FileStruct
&
file
);
void
startThreads
()
{
m_workerThread
.
start
();
}
...
...
castor/tape/tapeserver/daemon/ReportPackerInterface.hpp
View file @
9b63c8fb
...
...
@@ -141,7 +141,12 @@ template <class PlaceHolder> class ReportPackerInterface{
*/
std
::
auto_ptr
<
FileReportList
>
m_listReports
;
public:
virtual
void
reportStuckOn
(
FileStruct
&
file
)
=
0
;
/**
* Put a message to into the queue to notify we have been stuck on
* the given file
*/
virtual
void
reportStuckOn
(
FileStruct
&
file
)
=
0
;
};
}}}}
...
...
castor/tape/tapeserver/daemon/TaskWatchDog.hpp
View file @
9b63c8fb
...
...
@@ -36,22 +36,73 @@ namespace castor {
namespace
tape
{
namespace
tapeserver
{
namespace
daemon
{
/**
* Templated class for watching tape read or write operation
*/
template
<
class
placeHolder
>
class
TaskWatchDog
:
private
castor
::
tape
::
threading
::
Thread
{
typedef
typename
ReportPackerInterface
<
placeHolder
>::
FileStruct
FileStruct
;
/*
* Number of blocks we moved since the last update. Has to be atomic because it is
* updated from the outside
*/
castor
::
tape
::
threading
::
AtomicCounter
<
uint64_t
>
m_nbOfMemblocksMoved
;
/*
* Utility member to send heartbeat notifications at a given period.
*/
timeval
m_previousReportTime
;
/*
* When was the last time we have been notified by the tape working thread
*/
castor
::
tape
::
threading
::
AtomicVariable
<
timeval
>
m_previousNotifiedTime
;
const
double
m_periodToReport
;
//in second
const
double
m_stuckPeriod
;
//in second
/*
* How often to we send heartbeat notifications (in second)
* Currently hard coded in the destructor
*/
const
double
m_periodToReport
;
/*
* How long to we have to wait before saying we are stuck (in second)
* Currently hard coded in the destructor
*/
const
double
m_stuckPeriod
;
/*
* Atomic flag to stop the thread's loop
*/
castor
::
tape
::
threading
::
AtomicFlag
m_stopFlag
;
/*
* The proxy that will receive or heartbeat notifications
*/
messages
::
TapeserverProxy
&
m_initialProcess
;
/*
* An interace on the report packer for sending a message to the client if we
* were to be stuck
*/
ReportPackerInterface
<
placeHolder
>&
m_reportPacker
;
/*
* The file we are operating
*/
FileStruct
m_file
;
/*
* Is the system at _this very moment_ reading or writing on tape
*/
bool
m_fileBeingMoved
;
/*
* Logging system
*/
log
::
LogContext
m_lc
;
/**
* Thread;s loop
*/
void
run
(){
timeval
currentTime
;
while
(
!
m_stopFlag
)
{
...
...
@@ -59,6 +110,7 @@ template <class placeHolder> class TaskWatchDog : private castor::tape::threadin
timeval
diffTimeStuck
=
castor
::
utils
::
timevalAbsDiff
(
currentTime
,
m_previousNotifiedTime
);
double
diffTimeStuckd
=
castor
::
utils
::
timevalToDouble
(
diffTimeStuck
);
if
(
diffTimeStuckd
>
m_stuckPeriod
&&
m_fileBeingMoved
){
m_reportPacker
.
reportStuckOn
(
m_file
);
break
;
...
...
@@ -76,12 +128,23 @@ template <class placeHolder> class TaskWatchDog : private castor::tape::threadin
}
}
}
/*
*
*/
void
updateStuckTime
(){
timeval
tmpTime
;
castor
::
utils
::
getTimeOfDay
(
&
tmpTime
);
m_previousNotifiedTime
=
tmpTime
;
}
public:
/**
* Constructor
* @param initialProcess The proxy we use for sending heartbeat
* @param reportPacker
* @param lc To log the events
*/
TaskWatchDog
(
messages
::
TapeserverProxy
&
initialProcess
,
ReportPackerInterface
<
placeHolder
>&
reportPacker
,
log
::
LogContext
lc
)
:
m_nbOfMemblocksMoved
(
0
),
m_periodToReport
(
2
),
m_stuckPeriod
(
60
*
10
),
...
...
@@ -91,23 +154,45 @@ template <class placeHolder> class TaskWatchDog : private castor::tape::threadin
castor
::
utils
::
getTimeOfDay
(
&
m_previousReportTime
);
updateStuckTime
();
}
/**
* notify the wtachdog a mem block has been moved
*/
void
notify
(){
updateStuckTime
();
m_nbOfMemblocksMoved
++
;
}
/**
* Start the thread
*/
void
startThread
(){
start
();
}
/**
* Ask tp stop the watchdog thread and join it
*/
void
stopAndWaitThread
(){
m_stopFlag
.
set
();
wait
();
}
/**
* Notify the watchdog which file we are operating
* @param file
*/
void
notifyBeginNewJob
(
const
FileStruct
&
file
){
m_file
=
file
;
m_fileBeingMoved
=
true
;
}
/**
* Notify the watchdog we have finished operating on the current file
*/
void
fileFinished
(){
m_fileBeingMoved
=
false
;
m_file
=
FileStruct
();
}
};
...
...
castor/tape/tapeserver/threading/BlockingQueue.hpp
View file @
9b63c8fb
...
...
@@ -53,7 +53,11 @@ public:
BlockingQueue
(){}
~
BlockingQueue
()
{}
/**
* Copy the concent of e and push into the queue
* @param e
*/
void
push
(
const
C
&
e
)
{
{
MutexLocker
ml
(
&
m_mutex
);
...
...
@@ -62,7 +66,9 @@ public:
m_sem
.
release
();
}
///Return the next value of the queue and remove it
/**
* Return the next value of the queue and remove it
*/
C
pop
()
{
m_sem
.
acquire
();
return
popCriticalSection
();
...
...
@@ -81,24 +87,34 @@ public:
return
ret
;
}
///return the number of elements currently in the queue
/**
* return the number of elements currently in the queue
*/
size_t
size
()
const
{
MutexLocker
ml
(
&
m_mutex
);
return
m_queue
.
size
();
}
private:
/// holds data of the queue
/**
* holds data of the queue
*/
std
::
queue
<
C
>
m_queue
;
///Used for blocking a consumer thread as long as the queue is empty
/**
* Used for blocking a consumer thread as long as the queue is empty
*/
Semaphore
m_sem
;
///used for locking-operation thus providing thread-safety
/**
* used for locking-operation thus providing thread-safety
*/
mutable
Mutex
m_mutex
;
///Thread and exception safe pop. Optionally atomically extracts the size
// of the queue after pop
/**
* Thread and exception safe pop. Optionally atomically extracts the size
* of the queue after pop
*/
C
popCriticalSection
(
size_t
*
sz
=
NULL
)
{
MutexLocker
ml
(
&
m_mutex
);
C
ret
=
m_queue
.
front
();
...
...
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment