Skip to content
GitLab
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
dCache
cta
Commits
4f6101c3
Commit
4f6101c3
authored
Aug 12, 2015
by
Daniele Kruse
Browse files
Started to replace the reporting client (tapegateway) with the new one (mount and job)
parent
e946eb39
Changes
13
Hide whitespace changes
Inline
Side-by-side
tapeserver/castor/tape/tapeserver/daemon/DataTransferSession.cpp
View file @
4f6101c3
...
...
@@ -38,9 +38,9 @@
#include
"castor/tape/tapeserver/drive/DriveInterface.hpp"
#include
"castor/tape/tapeserver/SCSI/Device.hpp"
#include
"log.h"
#include
"scheduler/TapeMount.hpp"
#include
"serrno.h"
#include
"stager_client_commandline.h"
#include
"scheduler/RetrieveMount.hpp"
#include
<google/protobuf/stubs/common.h>
#include
<memory>
...
...
@@ -114,9 +114,9 @@ castor::tape::tapeserver::daemon::Session::EndOfSessionAction
// Depending on the type of session, branch into the right execution
switch
(
m_volInfo
.
mountType
)
{
case
cta
::
MountType
::
RETRIEVE
:
return
executeRead
(
lc
);
return
executeRead
(
lc
,
dynamic_cast
<
cta
::
RetrieveMount
*>
(
tapeMount
.
get
())
);
case
cta
::
MountType
::
ARCHIVE
:
return
executeWrite
(
lc
);
return
executeWrite
(
lc
,
dynamic_cast
<
cta
::
ArchiveMount
*>
(
tapeMount
.
get
())
);
default:
return
MARK_DRIVE_AS_UP
;
}
...
...
@@ -125,20 +125,19 @@ castor::tape::tapeserver::daemon::Session::EndOfSessionAction
//DataTransferSession::executeRead
//------------------------------------------------------------------------------
castor
::
tape
::
tapeserver
::
daemon
::
Session
::
EndOfSessionAction
castor
::
tape
::
tapeserver
::
daemon
::
DataTransferSession
::
executeRead
(
log
::
LogContext
&
lc
)
{
castor
::
tape
::
tapeserver
::
daemon
::
DataTransferSession
::
executeRead
(
log
::
LogContext
&
lc
,
cta
::
RetrieveMount
*
retrieveMount
)
{
// We are ready to start the session. We need to create the whole machinery
// in order to get the task injector ready to check if we actually have a
// file to recall.
// findDrive does not throw exceptions (it catches them to log errors)
// A NULL pointer is returned on failure
std
::
unique_ptr
<
castor
::
tape
::
tapeserver
::
drive
::
DriveInterface
>
drive
(
findDrive
(
m_driveConfig
,
lc
));
if
(
!
drive
.
get
())
return
MARK_DRIVE_AS_DOWN
;
// We can now start instantiating all the components of the data path
{
// Allocate all the elements of the memory management (in proper order
// to refer them to each other)
RecallReportPacker
rrp
(
m_clientProxy
,
RecallReportPacker
rrp
(
retrieveMount
,
m_castorConf
.
bulkRequestRecallMaxFiles
,
lc
);
rrp
.
disableBulk
();
//no bulk needed anymore
...
...
@@ -222,7 +221,7 @@ castor::tape::tapeserver::daemon::Session::EndOfSessionAction
//------------------------------------------------------------------------------
castor
::
tape
::
tapeserver
::
daemon
::
Session
::
EndOfSessionAction
castor
::
tape
::
tapeserver
::
daemon
::
DataTransferSession
::
executeWrite
(
log
::
LogContext
&
lc
)
{
log
::
LogContext
&
lc
,
cta
::
ArchiveMount
*
archiveMount
)
{
// We are ready to start the session. We need to create the whole machinery
// in order to get the task injector ready to check if we actually have a
// file to migrate.
...
...
@@ -238,7 +237,7 @@ castor::tape::tapeserver::daemon::Session::EndOfSessionAction
MigrationMemoryManager
mm
(
m_castorConf
.
nbBufs
,
m_castorConf
.
bufsz
,
lc
);
MigrationReportPacker
mrp
(
m_clientProxy
,
lc
);
MigrationReportPacker
mrp
(
archiveMount
,
lc
);
MigrationWatchDog
mwd
(
15
,
60
*
10
,
m_intialProcess
,
m_driveConfig
.
getUnitName
(),
lc
);
TapeWriteSingleThread
twst
(
*
drive
.
get
(),
m_mc
,
...
...
tapeserver/castor/tape/tapeserver/daemon/DataTransferSession.hpp
View file @
4f6101c3
...
...
@@ -34,7 +34,10 @@
#include
"castor/tape/tapeserver/daemon/Session.hpp"
#include
"castor/tape/tapeserver/daemon/TapeSingleThreadInterface.hpp"
#include
"castor/tape/tapeserver/system/Wrapper.hpp"
#include
"scheduler/ArchiveMount.hpp"
#include
"scheduler/RetrieveMount.hpp"
#include
"scheduler/Scheduler.hpp"
#include
"scheduler/TapeMount.hpp"
namespace
castor
{
namespace
legacymsg
{
...
...
@@ -119,9 +122,9 @@ namespace daemon {
const
DriveConfig
&
driveConfig
,
log
::
LogContext
&
lc
);
/** sub-part of execute for the read sessions */
EndOfSessionAction
executeRead
(
log
::
LogContext
&
lc
);
EndOfSessionAction
executeRead
(
log
::
LogContext
&
lc
,
cta
::
RetrieveMount
*
retrieveMount
);
/** sub-part of execute for a write session */
EndOfSessionAction
executeWrite
(
log
::
LogContext
&
lc
);
EndOfSessionAction
executeWrite
(
log
::
LogContext
&
lc
,
cta
::
ArchiveMount
*
archiveMount
);
/** sub-part of execute for a dump session */
void
executeDump
(
log
::
LogContext
&
lc
);
/** Reference to the MediaChangerFacade, allowing the mounting of the tape
...
...
tapeserver/castor/tape/tapeserver/daemon/DiskWriteTaskTest.cpp
View file @
4f6101c3
...
...
@@ -33,6 +33,8 @@
#include
"castor/log/StringLogger.hpp"
#include
"castor/tape/tapeserver/daemon/MigrationMemoryManager.hpp"
#include
"castor/tape/tapeserver/daemon/MemBlock.hpp"
#include
"scheduler/mockDB/MockSchedulerDatabase.hpp"
#include
"serrno.h"
#include
<gtest/gtest.h>
...
...
@@ -46,8 +48,8 @@ namespace unitTests{
MOCK_METHOD3
(
reportFailedJob
,
void
(
const
FileStruct
&
,
const
std
::
string
&
,
int
));
MOCK_METHOD0
(
reportEndOfSession
,
void
());
MOCK_METHOD2
(
reportEndOfSessionWithErrors
,
void
(
const
std
::
string
,
int
));
MockRecallReportPacker
(
ClientInterface
&
client
,
castor
::
log
::
LogContext
lc
)
:
RecallReportPacker
(
client
,
1
,
lc
){}
MockRecallReportPacker
(
cta
::
RetrieveMount
*
rm
,
castor
::
log
::
LogContext
lc
)
:
RecallReportPacker
(
rm
,
1
,
lc
){}
};
TEST
(
castor_tape_tapeserver_daemon
,
DiskWriteTaskFailledBlock
){
...
...
@@ -57,7 +59,8 @@ namespace unitTests{
castor
::
log
::
StringLogger
log
(
"castor_tape_tapeserver_daemon_DiskWriteTaskFailledBlock"
);
castor
::
log
::
LogContext
lc
(
log
);
MockRecallReportPacker
report
(
client
,
lc
);
std
::
unique_ptr
<
cta
::
MockSchedulerDatabase
>
mdb
(
new
cta
::
MockSchedulerDatabase
);
MockRecallReportPacker
report
(
dynamic_cast
<
cta
::
RetrieveMount
*>
((
mdb
->
getNextMount
(
"ll"
,
"drive"
)).
get
()),
lc
);
EXPECT_CALL
(
report
,
reportFailedJob
(
_
,
_
,
_
));
RecallMemoryManager
mm
(
10
,
100
,
lc
);
DiskFileFactory
fileFactory
(
"RFIO"
,
""
,
0
);
...
...
tapeserver/castor/tape/tapeserver/daemon/DiskWriteThreadPoolTest.cpp
View file @
4f6101c3
...
...
@@ -32,6 +32,7 @@
#include
"castor/log/StringLogger.hpp"
#include
"castor/tape/tapeserver/daemon/MigrationMemoryManager.hpp"
#include
"castor/tape/tapeserver/daemon/MemBlock.hpp"
#include
"scheduler/mockDB/MockSchedulerDatabase.hpp"
#include
<gtest/gtest.h>
namespace
unitTests
{
...
...
@@ -42,8 +43,8 @@ namespace unitTests{
MOCK_METHOD3
(
reportFailedJob
,
void
(
const
FileStruct
&
,
const
std
::
string
&
,
int
));
MOCK_METHOD0
(
reportEndOfSession
,
void
());
MOCK_METHOD2
(
reportEndOfSessionWithErrors
,
void
(
const
std
::
string
,
int
));
MockRecallReportPacker
(
ClientInterface
&
client
,
castor
::
log
::
LogContext
lc
)
:
RecallReportPacker
(
client
,
1
,
lc
){}
MockRecallReportPacker
(
cta
::
RetrieveMount
*
rm
,
castor
::
log
::
LogContext
lc
)
:
RecallReportPacker
(
rm
,
1
,
lc
){}
};
struct
MockTaskInjector
:
public
RecallTaskInjector
{
...
...
@@ -56,7 +57,8 @@ namespace unitTests{
castor
::
log
::
StringLogger
log
(
"castor_tape_tapeserver_daemon_DiskWriteThreadPoolTest"
);
castor
::
log
::
LogContext
lc
(
log
);
MockRecallReportPacker
report
(
client
,
lc
);
std
::
unique_ptr
<
cta
::
MockSchedulerDatabase
>
mdb
(
new
cta
::
MockSchedulerDatabase
);
MockRecallReportPacker
report
(
dynamic_cast
<
cta
::
RetrieveMount
*>
((
mdb
->
getNextMount
(
"ll"
,
"drive"
)).
get
()),
lc
);
EXPECT_CALL
(
report
,
reportCompletedJob
(
_
,
_
,
_
)).
Times
(
5
);
//EXPECT_CALL(tskInjectorl,requestInjection(_,_,_)).Times(2);
EXPECT_CALL
(
report
,
reportEndOfSession
()).
Times
(
1
);
...
...
tapeserver/castor/tape/tapeserver/daemon/MigrationReportPacker.cpp
View file @
4f6101c3
...
...
@@ -47,10 +47,10 @@ namespace daemon {
//------------------------------------------------------------------------------
//Constructor
//------------------------------------------------------------------------------
MigrationReportPacker
::
MigrationReportPacker
(
c
lient
::
ClientInterface
&
tg
,
MigrationReportPacker
::
MigrationReportPacker
(
c
ta
::
ArchiveMount
*
archiveMount
,
castor
::
log
::
LogContext
lc
)
:
ReportPackerInterface
<
detail
::
Migration
>
(
tg
,
lc
),
m_workerThread
(
*
this
),
m_errorHappened
(
false
),
m_continue
(
true
)
{
ReportPackerInterface
<
detail
::
Migration
>
(
lc
),
m_workerThread
(
*
this
),
m_errorHappened
(
false
),
m_continue
(
true
)
,
m_archiveMount
(
archiveMount
)
{
}
//------------------------------------------------------------------------------
//Destructore
...
...
@@ -112,28 +112,29 @@ void MigrationReportPacker::synchronousReportEndWithErrors(const std::string msg
//ReportSuccessful::execute
//------------------------------------------------------------------------------
void
MigrationReportPacker
::
ReportSuccessful
::
execute
(
MigrationReportPacker
&
reportPacker
){
std
::
unique_ptr
<
tapegateway
::
FileMigratedNotificationStruct
>
successMigration
(
new
tapegateway
::
FileMigratedNotificationStruct
);
successMigration
->
setFseq
(
m_migratedFile
.
fseq
());
successMigration
->
setFileTransactionId
(
m_migratedFile
.
fileTransactionId
());
successMigration
->
setId
(
m_migratedFile
.
id
());
successMigration
->
setNshost
(
m_migratedFile
.
nshost
());
successMigration
->
setFileid
(
m_migratedFile
.
fileid
());
successMigration
->
setBlockId0
((
m_blockId
>>
24
)
&
0xFF
);
successMigration
->
setBlockId1
((
m_blockId
>>
16
)
&
0xFF
);
successMigration
->
setBlockId2
((
m_blockId
>>
8
)
&
0xFF
);
successMigration
->
setBlockId3
((
m_blockId
>>
0
)
&
0xFF
);
//WARNING; Ad-hoc name of the ChecksumName !!");
successMigration
->
setChecksumName
(
"adler32"
);
successMigration
->
setChecksum
(
m_checksum
);
successMigration
->
setFileSize
(
m_migratedFile
.
fileSize
());
// successMigration->setBlockId0(m_migratedFile.BlockId0());
// successMigration->setBlockId1();
// successMigration->setBlockId2();
// successMigration->setBlockId3();
reportPacker
.
m_listReports
->
addSuccessfulMigrations
(
successMigration
.
release
());
// std::unique_ptr<tapegateway::FileMigratedNotificationStruct> successMigration(new tapegateway::FileMigratedNotificationStruct);
// successMigration->setFseq(m_migratedFile.fseq());
// successMigration->setFileTransactionId(m_migratedFile.fileTransactionId());
// successMigration->setId(m_migratedFile.id());
// successMigration->setNshost(m_migratedFile.nshost());
// successMigration->setFileid(m_migratedFile.fileid());
// successMigration->setBlockId0((m_blockId >> 24) & 0xFF);
// successMigration->setBlockId1((m_blockId >> 16) & 0xFF);
// successMigration->setBlockId2((m_blockId >> 8) & 0xFF);
// successMigration->setBlockId3((m_blockId >> 0) & 0xFF);
// //WARNING; Ad-hoc name of the ChecksumName !!");
// successMigration->setChecksumName("adler32");
// successMigration->setChecksum(m_checksum);
//
// successMigration->setFileSize(m_migratedFile.fileSize());
//
//// successMigration->setBlockId0(m_migratedFile.BlockId0());
//// successMigration->setBlockId1();
//// successMigration->setBlockId2();
//// successMigration->setBlockId3();
//
// reportPacker.m_listReports->addSuccessfulMigrations(successMigration.release());
reportPacker
.
m_successfulArchiveJobs
.
push
(
std
::
move
(
m_successfulArchiveJob
));
}
//------------------------------------------------------------------------------
//ReportFlush::computeCompressedSize
...
...
@@ -170,31 +171,31 @@ std::vector<tapegateway::FileMigratedNotificationStruct*>::iterator end
//------------------------------------------------------------------------------
//Report::reportFileErrors
//------------------------------------------------------------------------------
void
MigrationReportPacker
::
Report
::
reportFileErrors
(
MigrationReportPacker
&
reportPacker
)
{
// Some errors still have to be transmitted to the client, but not the
// successful writes which were not validated by a flush (they will be
// discarded)
if
(
reportPacker
.
m_listReports
->
failedMigrations
().
size
())
{
tapeserver
::
client
::
ClientInterface
::
RequestReport
chrono
;
// First, cleanup the report of existing successes
for
(
size_t
i
=
0
;
i
<
reportPacker
.
m_listReports
->
successfulMigrations
().
size
();
i
++
)
{
delete
reportPacker
.
m_listReports
->
successfulMigrations
()[
i
];
}
reportPacker
.
m_listReports
->
successfulMigrations
().
resize
(
0
);
// Report those errors to the client
reportPacker
.
logReportWithError
(
reportPacker
.
m_listReports
->
failedMigrations
(),
"Will report failed file to the client before end of session"
);
reportPacker
.
m_client
.
reportMigrationResults
(
*
(
reportPacker
.
m_listReports
),
chrono
);
log
::
ScopedParamContainer
sp
(
reportPacker
.
m_lc
);
sp
.
add
(
"connectDuration"
,
chrono
.
connectDuration
)
.
add
(
"sendRecvDuration"
,
chrono
.
sendRecvDuration
)
.
add
(
"transactionId"
,
chrono
.
transactionId
);
reportPacker
.
m_lc
.
log
(
LOG_INFO
,
"Reported failed file(s) to the client before end of session"
);
// Reset the report lists.
reportPacker
.
m_listReports
.
reset
(
new
tapegateway
::
FileMigrationReportList
);
}
}
//
void MigrationReportPacker::Report::reportFileErrors(MigrationReportPacker& reportPacker)
//
{
//
// Some errors still have to be transmitted to the client, but not the
//
// successful writes which were not validated by a flush (they will be
//
// discarded)
//
if(reportPacker.m_listReports->failedMigrations().size()) {
//
tapeserver::client::ClientInterface::RequestReport chrono;
//
// First, cleanup the report of existing successes
//
for (size_t i=0; i<reportPacker.m_listReports->successfulMigrations().size(); i++) {
//
delete reportPacker.m_listReports->successfulMigrations()[i];
//
}
//
reportPacker.m_listReports->successfulMigrations().resize(0);
//
// Report those errors to the client
//
reportPacker.logReportWithError(reportPacker.m_listReports->failedMigrations(),
//
"Will report failed file to the client before end of session");
//
reportPacker.m_client.reportMigrationResults(*(reportPacker.m_listReports),chrono);
//
log::ScopedParamContainer sp(reportPacker.m_lc);
//
sp.add("connectDuration", chrono.connectDuration)
//
.add("sendRecvDuration", chrono.sendRecvDuration)
//
.add("transactionId", chrono.transactionId);
//
reportPacker.m_lc.log(LOG_INFO, "Reported failed file(s) to the client before end of session");
//
// Reset the report lists.
//
reportPacker.m_listReports.reset(new tapegateway::FileMigrationReportList);
//
}
//
}
//------------------------------------------------------------------------------
//ReportFlush::execute
//------------------------------------------------------------------------------
...
...
@@ -214,7 +215,12 @@ void MigrationReportPacker::ReportFlush::execute(MigrationReportPacker& reportPa
computeCompressedSize
(
reportPacker
.
m_listReports
->
successfulMigrations
().
begin
(),
reportPacker
.
m_listReports
->
successfulMigrations
().
end
());
reportPacker
.
m_client
.
reportMigrationResults
(
*
(
reportPacker
.
m_listReports
),
chrono
);
//reportPacker.m_client.reportMigrationResults(*(reportPacker.m_listReports),chrono);
while
(
reportPacker
.
m_successfulArchiveJobs
.
size
())
{
std
::
unique_ptr
<
cta
::
ArchiveJob
>
successfulArchiveJob
=
std
::
move
(
reportPacker
.
m_successfulArchiveJobs
.
front
());
reportPacker
.
m_successfulArchiveJobs
.
pop
();
successfulArchiveJob
->
complete
();
}
reportPacker
.
logReport
(
reportPacker
.
m_listReports
->
successfulMigrations
(),
"A file was successfully written on the tape"
);
log
::
ScopedParamContainer
container
(
reportPacker
.
m_lc
);
container
.
add
(
"batch size"
,
reportPacker
.
m_listReports
->
successfulMigrations
().
size
())
...
...
@@ -237,7 +243,7 @@ void MigrationReportPacker::ReportFlush::execute(MigrationReportPacker& reportPa
}
else
{
// This is an abnormal situation: we should never flush after an error!
reportPacker
.
m_lc
.
log
(
LOG_ALERT
,
"Received a flush after an error: sending file errors to client"
);
reportFileErrors
(
reportPacker
);
//
reportFileErrors(reportPacker);
}
//reset (ie delete and replace) the current m_listReports.
//Thus all current reports are deleted otherwise they would have been sent again at the next flush
...
...
@@ -250,7 +256,8 @@ void MigrationReportPacker::ReportFlush::execute(MigrationReportPacker& reportPa
void
MigrationReportPacker
::
ReportEndofSession
::
execute
(
MigrationReportPacker
&
reportPacker
){
client
::
ClientInterface
::
RequestReport
chrono
;
if
(
!
reportPacker
.
m_errorHappened
){
reportPacker
.
m_client
.
reportEndOfSession
(
chrono
);
//reportPacker.m_client.reportEndOfSession(chrono);
reportPacker
.
m_archiveMount
->
complete
();
log
::
ScopedParamContainer
sp
(
reportPacker
.
m_lc
);
sp
.
add
(
"connectDuration"
,
chrono
.
connectDuration
)
.
add
(
"sendRecvDuration"
,
chrono
.
sendRecvDuration
)
...
...
@@ -265,9 +272,10 @@ void MigrationReportPacker::ReportEndofSession::execute(MigrationReportPacker& r
}
}
else
{
reportFileErrors
(
reportPacker
);
//
reportFileErrors(reportPacker);
// We have some errors: report end of session as such to the client
reportPacker
.
m_client
.
reportEndOfSessionWithError
(
"Previous file errors"
,
SEINTERNAL
,
chrono
);
//reportPacker.m_client.reportEndOfSessionWithError("Previous file errors",SEINTERNAL,chrono);
reportPacker
.
m_archiveMount
->
failed
(
cta
::
exception
::
Exception
(
"Previous file errors"
));
log
::
ScopedParamContainer
sp
(
reportPacker
.
m_lc
);
sp
.
add
(
"errorMessage"
,
"Previous file errors"
)
.
add
(
"errorCode"
,
SEINTERNAL
)
...
...
@@ -292,8 +300,9 @@ void MigrationReportPacker::ReportEndofSessionWithErrors::execute(MigrationRepor
client
::
ClientInterface
::
RequestReport
chrono
;
if
(
reportPacker
.
m_errorHappened
)
{
reportFileErrors
(
reportPacker
);
reportPacker
.
m_client
.
reportEndOfSessionWithError
(
m_message
,
m_errorCode
,
chrono
);
// reportFileErrors(reportPacker);
//reportPacker.m_client.reportEndOfSessionWithError(m_message,m_errorCode,chrono);
reportPacker
.
m_archiveMount
->
failed
(
cta
::
exception
::
Exception
(
m_message
));
log
::
ScopedParamContainer
sp
(
reportPacker
.
m_lc
);
sp
.
add
(
"errorMessage"
,
m_message
)
.
add
(
"errorCode"
,
m_errorCode
)
...
...
@@ -308,7 +317,8 @@ void MigrationReportPacker::ReportEndofSessionWithErrors::execute(MigrationRepor
if
(
ENOSPC
!=
m_errorCode
)
{
m_errorCode
=
SEINTERNAL
;
}
reportPacker
.
m_client
.
reportEndOfSessionWithError
(
msg
,
m_errorCode
,
chrono
);
//reportPacker.m_client.reportEndOfSessionWithError(msg,m_errorCode,chrono);
reportPacker
.
m_archiveMount
->
failed
(
cta
::
exception
::
Exception
(
msg
));
reportPacker
.
m_lc
.
log
(
LOG_INFO
,
msg
);
}
if
(
reportPacker
.
m_watchdog
)
{
...
...
@@ -326,17 +336,18 @@ void MigrationReportPacker::ReportEndofSessionWithErrors::execute(MigrationRepor
//------------------------------------------------------------------------------
void
MigrationReportPacker
::
ReportError
::
execute
(
MigrationReportPacker
&
reportPacker
){
std
::
unique_ptr
<
tapegateway
::
FileErrorReportStruct
>
failedMigration
(
new
tapegateway
::
FileErrorReportStruct
);
//failedMigration->setFileMigrationReportList(reportPacker.m_listReports.get());
failedMigration
->
setErrorCode
(
m_error_code
);
failedMigration
->
setErrorMessage
(
m_error_msg
);
failedMigration
->
setFseq
(
m_migratedFile
.
fseq
());
failedMigration
->
setFileTransactionId
(
m_migratedFile
.
fileTransactionId
());
failedMigration
->
setFileid
(
m_migratedFile
.
fileid
());
failedMigration
->
setNshost
(
m_migratedFile
.
nshost
());
failedMigration
->
setPositionCommandCode
(
m_migratedFile
.
positionCommandCode
());
reportPacker
.
m_listReports
->
addFailedMigrations
(
failedMigration
.
release
());
// std::unique_ptr<tapegateway::FileErrorReportStruct> failedMigration(new tapegateway::FileErrorReportStruct);
// //failedMigration->setFileMigrationReportList(reportPacker.m_listReports.get());
// failedMigration->setErrorCode(m_error_code);
// failedMigration->setErrorMessage(m_error_msg);
// failedMigration->setFseq(m_migratedFile.fseq());
// failedMigration->setFileTransactionId(m_migratedFile.fileTransactionId());
// failedMigration->setFileid(m_migratedFile.fileid());
// failedMigration->setNshost(m_migratedFile.nshost());
// failedMigration->setPositionCommandCode(m_migratedFile.positionCommandCode());
//
// reportPacker.m_listReports->addFailedMigrations(failedMigration.release());
m_failedArchiveJob
->
failed
();
reportPacker
.
m_errorHappened
=
true
;
}
...
...
@@ -362,7 +373,8 @@ void MigrationReportPacker::WorkerThread::run(){
catch
(
const
failedMigrationRecallResult
&
e
){
//here we catch a failed report MigrationResult. We try to close and it that fails too
//we end up in the catch below
m_parent
.
m_client
.
reportEndOfSessionWithError
(
e
.
getMessageValue
(),
SEINTERNAL
,
chrono
);
//m_parent.m_client.reportEndOfSessionWithError(e.getMessageValue(),SEINTERNAL,chrono);
m_parent
.
m_archiveMount
->
failed
(
e
);
m_parent
.
logRequestReport
(
chrono
,
"Successfully closed client's session after the failed report MigrationResult"
);
if
(
m_parent
.
m_watchdog
)
{
m_parent
.
m_watchdog
->
addToErrorCount
(
"Error_clientCommunication"
);
...
...
tapeserver/castor/tape/tapeserver/daemon/MigrationReportPacker.hpp
View file @
4f6101c3
...
...
@@ -28,6 +28,7 @@
#include
"castor/tape/tapegateway/FileToMigrateStruct.hpp"
#include
"castor/tape/tapeserver/daemon/ReportPackerInterface.hpp"
#include
"castor/tape/tapeserver/drive/DriveInterface.hpp"
#include
"scheduler/ArchiveMount.hpp"
#include
<list>
#include
<memory>
...
...
@@ -42,7 +43,7 @@ public:
* @param tg The client who is asking for a migration of his files
* and to whom we have to report to the status of the operations.
*/
MigrationReportPacker
(
c
lient
::
ClientInterface
&
tg
,
log
::
LogContext
lc
);
MigrationReportPacker
(
c
ta
::
ArchiveMount
*
archiveMount
,
log
::
LogContext
lc
);
~
MigrationReportPacker
();
...
...
@@ -108,12 +109,17 @@ private:
* recorded, we will transmit it to the client before signaling the
* end of the session.
*/
virtual
void
reportFileErrors
(
MigrationReportPacker
&
reportPacker
);
//
virtual void reportFileErrors(MigrationReportPacker& reportPacker);
};
class
ReportSuccessful
:
public
Report
{
const
FileStruct
m_migratedFile
;
const
unsigned
long
m_checksum
;
const
uint32_t
m_blockId
;
/**
* The successful archive job to be pushed in the report packer queue and reported later
*/
std
::
unique_ptr
<
cta
::
ArchiveJob
>
m_successfulArchiveJob
;
public:
ReportSuccessful
(
const
FileStruct
&
file
,
unsigned
long
checksum
,
u_int32_t
blockId
)
:
...
...
@@ -151,6 +157,11 @@ private:
const
FileStruct
m_migratedFile
;
const
std
::
string
m_error_msg
;
const
int
m_error_code
;
/**
* The failed archive job to be reported immediately
*/
std
::
unique_ptr
<
cta
::
ArchiveJob
>
m_failedArchiveJob
;
public:
ReportError
(
const
FileStruct
&
file
,
std
::
string
msg
,
int
error_code
)
:
m_migratedFile
(
file
),
m_error_msg
(
msg
),
m_error_code
(
error_code
){}
...
...
@@ -196,6 +207,16 @@ private:
* when a end of session (with error) is called
*/
bool
m_continue
;
/**
* The mount object used to send reports
*/
cta
::
ArchiveMount
*
m_archiveMount
;
/**
* The successful archive jobs to be reported when flushing
*/
std
::
queue
<
std
::
unique_ptr
<
cta
::
ArchiveJob
>
>
m_successfulArchiveJobs
;
};
}}}}
tapeserver/castor/tape/tapeserver/daemon/MigrationReportPackerTest.cpp
View file @
4f6101c3
...
...
@@ -26,6 +26,7 @@
#include
"castor/tape/tapeserver/drive/DriveInterface.hpp"
#include
"castor/tape/tapeserver/client/FakeClient.hpp"
#include
"serrno.h"
#include
"scheduler/mockDB/MockSchedulerDatabase.hpp"
#include
<gtest/gtest.h>
...
...
@@ -43,7 +44,8 @@ TEST(castor_tape_tapeserver_daemon, MigrationReportPackerNominal) {
castor
::
log
::
StringLogger
log
(
"castor_tape_tapeserver_daemon_MigrationReportPackerNominal"
);
castor
::
log
::
LogContext
lc
(
log
);
tapeserver
::
daemon
::
MigrationReportPacker
mrp
(
client
,
lc
);
std
::
unique_ptr
<
cta
::
MockSchedulerDatabase
>
mdb
(
new
cta
::
MockSchedulerDatabase
);
tapeserver
::
daemon
::
MigrationReportPacker
mrp
(
dynamic_cast
<
cta
::
ArchiveMount
*>
((
mdb
->
getNextMount
(
"ll"
,
"drive"
)).
get
()),
lc
);
mrp
.
startThreads
();
tapegateway
::
FileToMigrateStruct
migratedFile
;
...
...
@@ -67,7 +69,9 @@ TEST(castor_tape_tapeserver_daemon, MigrationReportPackerFaillure) {
castor
::
log
::
StringLogger
log
(
"castor_tape_tapeserver_daemon_MigrationReportPackerFaillure"
);
castor
::
log
::
LogContext
lc
(
log
);
tapeserver
::
daemon
::
MigrationReportPacker
mrp
(
client
,
lc
);
std
::
unique_ptr
<
cta
::
MockSchedulerDatabase
>
mdb
(
new
cta
::
MockSchedulerDatabase
);
tapeserver
::
daemon
::
MigrationReportPacker
mrp
(
dynamic_cast
<
cta
::
ArchiveMount
*>
((
mdb
->
getNextMount
(
"ll"
,
"drive"
)).
get
()),
lc
);
mrp
.
startThreads
();
tapegateway
::
FileToMigrateStruct
migratedFile
;
...
...
@@ -92,7 +96,9 @@ TEST(castor_tape_tapeserver_daemon, MigrationReportPackerFaillureGoodEnd) {
castor
::
log
::
StringLogger
log
(
"castor_tape_tapeserver_daemon_MigrationReportPackerFaillureGoodEnd"
);
castor
::
log
::
LogContext
lc
(
log
);
tapeserver
::
daemon
::
MigrationReportPacker
mrp
(
client
,
lc
);
std
::
unique_ptr
<
cta
::
MockSchedulerDatabase
>
mdb
(
new
cta
::
MockSchedulerDatabase
);
tapeserver
::
daemon
::
MigrationReportPacker
mrp
(
dynamic_cast
<
cta
::
ArchiveMount
*>
((
mdb
->
getNextMount
(
"ll"
,
"drive"
)).
get
()),
lc
);
mrp
.
startThreads
();
tapegateway
::
FileToMigrateStruct
migratedFile
;
...
...
@@ -119,7 +125,9 @@ TEST(castor_tape_tapeserver_daemon, MigrationReportPackerGoodBadEnd) {
castor
::
log
::
StringLogger
log
(
"castor_tape_tapeserver_daemon_MigrationReportPackerGoodBadEnd"
);
castor
::
log
::
LogContext
lc
(
log
);
tapeserver
::
daemon
::
MigrationReportPacker
mrp
(
client
,
lc
);
std
::
unique_ptr
<
cta
::
MockSchedulerDatabase
>
mdb
(
new
cta
::
MockSchedulerDatabase
);
tapeserver
::
daemon
::
MigrationReportPacker
mrp
(
dynamic_cast
<
cta
::
ArchiveMount
*>
((
mdb
->
getNextMount
(
"ll"
,
"drive"
)).
get
()),
lc
);
mrp
.
startThreads
();
tapegateway
::
FileToMigrateStruct
migratedFile
;
...
...
@@ -161,7 +169,9 @@ TEST(castor_tape_tapeserver_daemon, MigrationReportPackerOneByteFile) {
castor
::
log
::
StringLogger
log
(
"castor_tape_tapeserver_daemon_MigrationReportPackerGoodBadEnd"
);
castor
::
log
::
LogContext
lc
(
log
);
tapeserver
::
daemon
::
MigrationReportPacker
mrp
(
client
,
lc
);
std
::
unique_ptr
<
cta
::
MockSchedulerDatabase
>
mdb
(
new
cta
::
MockSchedulerDatabase
);
tapeserver
::
daemon
::
MigrationReportPacker
mrp
(
dynamic_cast
<
cta
::
ArchiveMount
*>
((
mdb
->
getNextMount
(
"ll"
,
"drive"
)).
get
()),
lc
);
mrp
.
startThreads
();
tapegateway
::
FileToMigrateStruct
migratedFileSmall
;
...
...
tapeserver/castor/tape/tapeserver/daemon/RecallReportPacker.cpp
View file @
4f6101c3
...
...
@@ -47,10 +47,10 @@ namespace daemon {
//------------------------------------------------------------------------------
//Constructor
//------------------------------------------------------------------------------
RecallReportPacker
::
RecallReportPacker
(
c
lient
::
ClientInterface
&
tg
,
RecallReportPacker
::
RecallReportPacker
(
c
ta
::
RetrieveMount
*
retrieveMount
,
unsigned
int
reportFilePeriod
,
log
::
LogContext
lc
)
:
ReportPackerInterface
<
detail
::
Recall
>
(
tg
,
lc
),
m_workerThread
(
*
this
),
m_reportFilePeriod
(
reportFilePeriod
),
m_errorHappened
(
false
){
ReportPackerInterface
<
detail
::
Recall
>
(
lc
),
m_workerThread
(
*
this
),
m_reportFilePeriod
(
reportFilePeriod
),
m_errorHappened
(
false
)
,
m_retrieveMount
(
retrieveMount
)
{
}
//------------------------------------------------------------------------------
...
...
@@ -97,21 +97,22 @@ void RecallReportPacker::reportEndOfSessionWithErrors(const std::string msg,int
//ReportSuccessful::execute
//------------------------------------------------------------------------------
void
RecallReportPacker
::
ReportSuccessful
::
execute
(
RecallReportPacker
&
parent
){
std
::
unique_ptr
<
FileSuccessStruct
>
successRecall
(
new
FileSuccessStruct
);
successRecall
->
setFseq
(
m_recalledFile
.
fseq
());
successRecall
->
setFileTransactionId
(
m_recalledFile
.
fileTransactionId
());
successRecall
->
setId
(
m_recalledFile
.
id
());
successRecall
->
setNshost
(
m_recalledFile
.
nshost
());
successRecall
->
setFileid
(
m_recalledFile
.
fileid
());
successRecall
->
setPath
(
m_recalledFile
.
path
());
successRecall
->
setFileSize
(
m_size
);
//WARNING : ad hoc name of checksum algorithm
successRecall
->
setChecksumName
(
"adler32"
);
successRecall
->
setChecksum
(
m_checksum
);
parent
.
m_listReports
->
addSuccessfulRecalls
(
successRecall
.
release
());
// std::unique_ptr<FileSuccessStruct> successRecall(new FileSuccessStruct);
//
// successRecall->setFseq(m_recalledFile.fseq());
// successRecall->setFileTransactionId(m_recalledFile.fileTransactionId());
// successRecall->setId(m_recalledFile.id());
// successRecall->setNshost(m_recalledFile.nshost());
// successRecall->setFileid(m_recalledFile.fileid());
// successRecall->setPath(m_recalledFile.path());
// successRecall->setFileSize(m_size);
//
// //WARNING : ad hoc name of checksum algorithm
// successRecall->setChecksumName("adler32");
// successRecall->setChecksum(m_checksum);
//
// parent.m_listReports->addSuccessfulRecalls(successRecall.release());
parent
.
m_successfulRetrieveJobs
.
push
(
std
::
move
(
m_successfulRetrieveJob
));
}
//------------------------------------------------------------------------------
//flush
...
...
@@ -126,7 +127,12 @@ void RecallReportPacker::flush(){
client
::
ClientInterface
::
RequestReport
chrono
;
try
{
m_client
.
reportRecallResults
(
*
m_listReports
,
chrono
);
// m_client.reportRecallResults(*m_listReports,chrono);
while
(
m_successfulRetrieveJobs
.
size
())
{
std
::
unique_ptr
<
cta
::
RetrieveJob
>
successfulRetrieveJob
=
std
::
move
(
m_successfulRetrieveJobs
.
front
());
m_successfulRetrieveJobs
.
pop
();
successfulRetrieveJob
->
complete
(
0
,
0
);
//TODO: put size and checksum
}
{
log
::
ScopedParamContainer
params
(
m_lc
);
params
.
add
(
"successCount"
,
m_listReports
->
successfulRecalls
().
size
())
...
...
@@ -153,7 +159,8 @@ void RecallReportPacker::flush(){
void
RecallReportPacker
::
ReportEndofSession
::
execute
(
RecallReportPacker
&
parent
){
client
::
ClientInterface
::
RequestReport
chrono
;
if
(
!
parent
.
errorHappened
()){
parent
.
m_client
.
reportEndOfSession
(
chrono
);
// parent.m_client.reportEndOfSession(chrono);
parent
.
m_retrieveMount
->
complete
();
parent
.
logRequestReport
(
chrono
,
"Nominal RecallReportPacker::EndofSession has been reported"
,
LOG_INFO
);
if
(
parent
.
m_watchdog
)
{
parent
.
m_watchdog
->
addParameter
(
log
::
Param
(
"status"
,
"success"
));
...
...
@@ -166,7 +173,8 @@ void RecallReportPacker::ReportEndofSession::execute(RecallReportPacker& parent)
else
{
const
std
::
string
&
msg
=
"RecallReportPacker::EndofSession has been reported but an error happened somewhere in the process"
;
parent
.
m_lc
.
log
(
LOG_ERR
,
msg
);
parent
.
m_client
.
reportEndOfSessionWithError
(
msg
,
SEINTERNAL
,
chrono
);
// parent.m_client.reportEndOfSessionWithError(msg,SEINTERNAL,chrono);
parent
.
m_retrieveMount
->
failed
(
cta
::
exception
::
Exception
(
msg
));