Skip to content
GitLab
Menu
Projects
Groups
Snippets
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
dCache
cta
Commits
2517e326
Commit
2517e326
authored
Jan 26, 2021
by
Michael Davis
Browse files
Updates SSI protobuf to v0.93
Adds fields for tape verification to Workflow message
parent
94155bea
Pipeline
#2811
passed with stages
in 16 minutes and 54 seconds
Changes
21
Pipelines
1
Hide whitespace changes
Inline
Side-by-side
cmdline/CMakeLists.txt
View file @
2517e326
...
...
@@ -43,6 +43,14 @@ add_executable(cta-send-event CtaSendEvent.cpp)
target_link_libraries
(
cta-send-event ctacommon XrdSsiPbEosCta XrdSsiLib XrdUtils
)
set_property
(
TARGET cta-send-event APPEND PROPERTY INSTALL_RPATH
${
PROTOBUF3_RPATH
}
)
#
# verify-file <archiveFileId> <vid>
# recalls a file from tape without writing a disk copy
#
add_executable
(
cta-verify-file CtaVerifyFile.cpp
)
target_link_libraries
(
cta-verify-file ctacommon XrdSsiPbEosCta XrdSsiLib XrdUtils
)
set_property
(
TARGET cta-verify-file APPEND PROPERTY INSTALL_RPATH
${
PROTOBUF3_RPATH
}
)
#
# cta-wfe-test archive|retrieve|delete <options> allows testing of the SSI WorkFlow Engine hooks
# without invoking EOS.
...
...
cmdline/CtaSendEvent.cpp
View file @
2517e326
...
...
@@ -25,6 +25,7 @@
#include <common/checksum/ChecksumBlobSerDeser.hpp>
#include "CtaFrontendApi.hpp"
#include "version.h"
const
std
::
string
config_file
=
"/etc/cta/cta-cli.conf"
;
...
...
@@ -205,6 +206,10 @@ int exceptionThrowingMain(int argc, const char *const *const argv)
cta
::
xrd
::
Request
request
;
cta
::
eos
::
Notification
&
notification
=
*
(
request
.
mutable_notification
());
// Set client version
request
.
set_client_cta_version
(
CTA_VERSION
);
request
.
set_client_xrootd_ssi_protobuf_interface_version
(
XROOTD_SSI_PROTOBUF_INTERFACE_VERSION
);
// Set configuration options
XrdSsiPb
::
Config
config
(
config_file
,
"cta"
);
config
.
set
(
"resource"
,
"/ctafrontend"
);
...
...
cmdline/CtaVerifyFile.cpp
0 → 100644
View file @
2517e326
/*!
* @project The CERN Tape Archive (CTA)
* @brief Command-line tool to inject a special verify-only PREPARE request
* @copyright Copyright 2021 CERN
* @license This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <stdexcept>
#include <iostream>
#include <map>
#include "CtaFrontendApi.hpp"
#include "version.h"
const
std
::
string
config_file
=
"/etc/cta/cta-cli.conf"
;
// Define XRootD SSI Alert message callback
namespace
XrdSsiPb
{
/*
* Alert callback.
*
* Defines how Alert messages should be logged by EOS (or directed to the User)
*/
template
<
>
void
RequestCallback
<
cta
::
xrd
::
Alert
>::
operator
()(
const
cta
::
xrd
::
Alert
&
alert
)
{
std
::
cout
<<
"AlertCallback():"
<<
std
::
endl
;
XrdSsiPb
::
Log
::
DumpProtobuf
(
XrdSsiPb
::
Log
::
PROTOBUF
,
&
alert
);
}
}
// namespace XrdSsiPb
// Attribute map type
typedef
std
::
map
<
std
::
string
,
std
::
string
>
AttrMap
;
// Usage exception
const
std
::
runtime_error
Usage
(
"Usage: cta-verify-file archiveFileID [vid]"
);
/*
* Fill a Notification message from the command-line parameters and stdin
*
* @param[out] notification The protobuf to fill
* @param[in] config The XrdSsiPb object containing the configuration parameters
* @param[in] archiveFileId The ID of the file to retrieve
* @param[in] vid If non-empty, restrict PREPARE requests to this tape only
*/
void
fillNotification
(
cta
::
eos
::
Notification
&
notification
,
const
std
::
string
&
archiveFileId
,
const
std
::
string
&
vid
)
{
XrdSsiPb
::
Config
config
(
config_file
,
"eos"
);
// WF
notification
.
mutable_wf
()
->
set_event
(
cta
::
eos
::
Workflow
::
PREPARE
);
for
(
auto
&
conf_option
:
std
::
vector
<
std
::
string
>
({
"instance"
,
"requester.user"
,
"requester.group"
}))
{
if
(
!
config
.
getOptionValueStr
(
conf_option
).
first
)
{
throw
std
::
runtime_error
(
conf_option
+
" must be specified in "
+
config_file
);
}
}
notification
.
mutable_wf
()
->
mutable_instance
()
->
set_name
(
config
.
getOptionValueStr
(
"instance"
).
second
);
notification
.
mutable_wf
()
->
set_requester_instance
(
"cta-verify-file"
);
notification
.
mutable_wf
()
->
set_verify_only
(
true
);
notification
.
mutable_wf
()
->
set_vid
(
vid
);
// CLI
notification
.
mutable_cli
()
->
mutable_user
()
->
set_username
(
config
.
getOptionValueStr
(
"requester.user"
).
second
);
notification
.
mutable_cli
()
->
mutable_user
()
->
set_groupname
(
config
.
getOptionValueStr
(
"requester.group"
).
second
);
// Transport
notification
.
mutable_transport
()
->
set_dst_url
(
"file://dummy"
);
// File
notification
.
mutable_file
()
->
set_lpath
(
"dummy"
);
// eXtended attributes
AttrMap
xattrs
;
xattrs
[
"sys.archive.file_id"
]
=
archiveFileId
;
for
(
auto
&
xattr
:
xattrs
)
{
google
::
protobuf
::
MapPair
<
std
::
string
,
std
::
string
>
mp
(
xattr
.
first
,
xattr
.
second
);
notification
.
mutable_file
()
->
mutable_xattr
()
->
insert
(
mp
);
}
}
/*
* Sends a Notification to the CTA XRootD SSI server
*/
int
exceptionThrowingMain
(
int
argc
,
const
char
*
const
*
const
argv
)
{
std
::
string
vid
;
if
(
argc
==
3
)
{
vid
=
argv
[
2
];
}
else
if
(
argc
!=
2
)
{
throw
Usage
;
}
// Verify that the Google Protocol Buffer header and linked library versions are compatible
GOOGLE_PROTOBUF_VERIFY_VERSION
;
cta
::
xrd
::
Request
request
;
cta
::
eos
::
Notification
&
notification
=
*
(
request
.
mutable_notification
());
// Set client version
request
.
set_client_cta_version
(
CTA_VERSION
);
request
.
set_client_xrootd_ssi_protobuf_interface_version
(
XROOTD_SSI_PROTOBUF_INTERFACE_VERSION
);
// Set configuration options
XrdSsiPb
::
Config
config
(
config_file
,
"cta"
);
config
.
set
(
"resource"
,
"/ctafrontend"
);
// Allow environment variables to override config file
config
.
getEnv
(
"request_timeout"
,
"XRD_REQUESTTIMEOUT"
);
// If XRDDEBUG=1, switch on all logging
if
(
getenv
(
"XRDDEBUG"
))
{
config
.
set
(
"log"
,
"all"
);
}
// If fine-grained control over log level is required, use XrdSsiPbLogLevel
config
.
getEnv
(
"log"
,
"XrdSsiPbLogLevel"
);
// Parse the command line arguments: fill the Notification fields
fillNotification
(
notification
,
argv
[
1
],
vid
);
// Obtain a Service Provider
XrdSsiPbServiceType
cta_service
(
config
);
// Send the Request to the Service and get a Response
cta
::
xrd
::
Response
response
;
cta_service
.
Send
(
request
,
response
);
// Handle responses
switch
(
response
.
type
())
{
using
namespace
cta
::
xrd
;
case
Response
::
RSP_SUCCESS
:
std
::
cout
<<
response
.
xattr
().
at
(
"sys.cta.objectstore.id"
)
<<
std
::
endl
;
break
;
case
Response
::
RSP_ERR_PROTOBUF
:
throw
XrdSsiPb
::
PbException
(
response
.
message_txt
());
case
Response
::
RSP_ERR_CTA
:
throw
std
::
runtime_error
(
response
.
message_txt
());
case
Response
::
RSP_ERR_USER
:
throw
std
::
runtime_error
(
response
.
message_txt
());
default:
throw
XrdSsiPb
::
PbException
(
"Invalid response type."
);
}
// Delete all global objects allocated by libprotobuf
google
::
protobuf
::
ShutdownProtobufLibrary
();
return
0
;
}
/*
* Start here
*/
int
main
(
int
argc
,
const
char
**
argv
)
{
try
{
return
exceptionThrowingMain
(
argc
,
argv
);
}
catch
(
XrdSsiPb
::
PbException
&
ex
)
{
std
::
cerr
<<
"Error in Google Protocol Buffers: "
<<
ex
.
what
()
<<
std
::
endl
;
}
catch
(
XrdSsiPb
::
XrdSsiException
&
ex
)
{
std
::
cerr
<<
"Error from XRootD SSI Framework: "
<<
ex
.
what
()
<<
std
::
endl
;
}
catch
(
std
::
exception
&
ex
)
{
std
::
cerr
<<
ex
.
what
()
<<
std
::
endl
;
}
catch
(...)
{
std
::
cerr
<<
"Caught an unknown exception"
<<
std
::
endl
;
}
return
0
;
}
common/dataStructures/ArchiveFile.cpp
View file @
2517e326
...
...
@@ -89,7 +89,12 @@ ArchiveFile::TapeFilesList::const_iterator ArchiveFile::TapeFilesList::find(uint
return
std
::
find_if
(
cbegin
(),
cend
(),
[
=
](
const
TapeFile
&
tf
){
return
tf
.
isActiveCopyNb
(
copyNb
);});
}
//------------------------------------------------------------------------------
// ArchiveFile::TapeFilesList::removeAllVidsExcept()
//------------------------------------------------------------------------------
void
ArchiveFile
::
TapeFilesList
::
removeAllVidsExcept
(
const
std
::
string
&
vid
)
{
remove_if
([
=
](
TapeFile
&
tf
){
return
tf
.
vid
!=
vid
;
});
}
...
...
common/dataStructures/ArchiveFile.hpp
View file @
2517e326
...
...
@@ -68,6 +68,7 @@ struct ArchiveFile {
const
TapeFile
&
at
(
uint8_t
copyNb
)
const
;
TapeFilesList
::
iterator
find
(
uint8_t
copyNb
);
TapeFilesList
::
const_iterator
find
(
uint8_t
copyNb
)
const
;
void
removeAllVidsExcept
(
const
std
::
string
&
vid
);
};
TapeFilesList
tapeFiles
;
time_t
creationTime
;
...
...
common/dataStructures/RetrieveRequest.cpp
View file @
2517e326
...
...
@@ -28,7 +28,7 @@ namespace dataStructures {
//------------------------------------------------------------------------------
// constructor
//------------------------------------------------------------------------------
RetrieveRequest
::
RetrieveRequest
()
:
archiveFileID
(
0
)
{}
RetrieveRequest
::
RetrieveRequest
()
:
archiveFileID
(
0
)
,
isVerifyOnly
(
false
)
{}
//------------------------------------------------------------------------------
// operator==
...
...
@@ -38,7 +38,9 @@ bool RetrieveRequest::operator==(const RetrieveRequest &rhs) const {
&&
archiveFileID
==
rhs
.
archiveFileID
&&
dstURL
==
rhs
.
dstURL
&&
diskFileInfo
==
rhs
.
diskFileInfo
&&
creationLog
==
rhs
.
creationLog
;
&&
creationLog
==
rhs
.
creationLog
&&
isVerifyOnly
==
rhs
.
isVerifyOnly
&&
vid
==
rhs
.
vid
;
}
//------------------------------------------------------------------------------
...
...
@@ -56,7 +58,10 @@ std::ostream &operator<<(std::ostream &os, const RetrieveRequest &obj) {
<<
" archiveFileID="
<<
obj
.
archiveFileID
<<
" dstURL="
<<
obj
.
dstURL
<<
" diskFileInfo="
<<
obj
.
diskFileInfo
<<
" creationLog="
<<
obj
.
creationLog
<<
")"
;
<<
" creationLog="
<<
obj
.
creationLog
<<
" isVerifyOnly="
<<
obj
.
isVerifyOnly
;
if
(
obj
.
vid
)
os
<<
" vid="
<<
*
(
obj
.
vid
);
os
<<
")"
;
return
os
;
}
...
...
common/dataStructures/RetrieveRequest.hpp
View file @
2517e326
...
...
@@ -58,6 +58,9 @@ struct RetrieveRequest {
std
::
string
errorReportURL
;
DiskFileInfo
diskFileInfo
;
EntryLog
creationLog
;
bool
isVerifyOnly
;
// request to retrieve file from tape but do not write a disk copy
optional
<
std
::
string
>
vid
;
// limit retrieve requests to the specified vid (in the case of dual-copy files)
LifecycleTimings
lifecycleTimings
;
optional
<
std
::
string
>
activity
;
...
...
common/exception/ChecksumValueMismatch.hpp
View file @
2517e326
...
...
@@ -36,7 +36,7 @@ public:
* @param embedBacktrace whether to embed a backtrace of where the
* exception was throw in the message
*/
ChecksumValueMismatch
(
const
std
::
string
&
context
=
""
,
const
bool
embedBacktrace
=
tru
e
)
:
ChecksumValueMismatch
(
const
std
::
string
&
context
=
""
,
const
bool
embedBacktrace
=
fals
e
)
:
Exception
(
context
,
embedBacktrace
)
{}
/**
...
...
objectstore/RetrieveRequest.cpp
View file @
2517e326
...
...
@@ -61,6 +61,7 @@ void RetrieveRequest::initialize() {
m_payload
.
set_failurereportlog
(
""
);
m_payload
.
set_failurereporturl
(
""
);
m_payload
.
set_isrepack
(
false
);
m_payload
.
set_isverifyonly
(
false
);
// This object is good to go (to storage)
m_payloadInterpreted
=
true
;
}
...
...
@@ -373,9 +374,13 @@ auto RetrieveRequest::addTransferFailure(uint32_t copyNumber, uint64_t mountId,
j
.
set_totalretries
(
j
.
totalretries
()
+
1
);
*
j
.
mutable_failurelogs
()
->
Add
()
=
failureReason
;
}
if
(
m_payload
.
isverifyonly
())
{
// Don't retry verification jobs, they should fail immediately
return
determineNextStep
(
copyNumber
,
JobEvent
::
TransferFailed
,
lc
);
}
if
(
j
.
totalretries
()
<
j
.
maxtotalretries
())
{
EnqueueingNextStep
ret
;
ret
.
nextStatus
=
serializers
::
RetrieveJobStatus
::
RJS_ToTransfer
;
ret
.
nextStatus
=
serializers
::
RetrieveJobStatus
::
RJS_ToTransfer
;
if
(
j
.
retrieswithinmount
()
<
j
.
maxretrieswithinmount
())
// Job can try again within this mount
ret
.
nextStep
=
EnqueueingNextStep
::
NextStep
::
EnqueueForTransferForUser
;
...
...
@@ -474,6 +479,7 @@ void RetrieveRequest::setSchedulerRequest(const cta::common::dataStructures::Ret
sr
->
set_archivefileid
(
retrieveRequest
.
archiveFileID
);
sr
->
set_dsturl
(
retrieveRequest
.
dstURL
);
sr
->
set_retrieveerrorreporturl
(
retrieveRequest
.
errorReportURL
);
sr
->
set_isverifyonly
(
retrieveRequest
.
isVerifyOnly
);
DiskFileInfoSerDeser
dfisd
(
retrieveRequest
.
diskFileInfo
);
dfisd
.
serialize
(
*
sr
->
mutable_diskfileinfo
());
objectstore
::
EntryLogSerDeser
el
(
retrieveRequest
.
creationLog
);
...
...
@@ -494,6 +500,7 @@ cta::common::dataStructures::RetrieveRequest RetrieveRequest::getSchedulerReques
ret
.
creationLog
=
el
;
ret
.
dstURL
=
m_payload
.
schedulerrequest
().
dsturl
();
ret
.
errorReportURL
=
m_payload
.
schedulerrequest
().
retrieveerrorreporturl
();
ret
.
isVerifyOnly
=
m_payload
.
schedulerrequest
().
isverifyonly
();
objectstore
::
DiskFileInfoSerDeser
dfisd
;
dfisd
.
deserialize
(
m_payload
.
schedulerrequest
().
diskfileinfo
());
ret
.
diskFileInfo
=
dfisd
;
...
...
@@ -950,6 +957,7 @@ auto RetrieveRequest::asyncUpdateJobOwner(uint32_t copyNumber, const std::string
retRef
.
m_retrieveRequest
.
diskFileInfo
=
dfi
;
retRef
.
m_retrieveRequest
.
dstURL
=
payload
.
schedulerrequest
().
dsturl
();
retRef
.
m_retrieveRequest
.
errorReportURL
=
payload
.
schedulerrequest
().
retrieveerrorreporturl
();
retRef
.
m_retrieveRequest
.
isVerifyOnly
=
payload
.
schedulerrequest
().
isverifyonly
();
retRef
.
m_retrieveRequest
.
requester
.
name
=
payload
.
schedulerrequest
().
requester
().
name
();
retRef
.
m_retrieveRequest
.
requester
.
group
=
payload
.
schedulerrequest
().
requester
().
group
();
objectstore
::
ArchiveFileSerDeser
af
;
...
...
objectstore/RetrieveRequest.hpp
View file @
2517e326
...
...
@@ -268,6 +268,7 @@ public:
void
setReportedTime
(
const
uint64_t
reportedTime
);
void
setActiveCopyNumber
(
uint32_t
activeCopyNb
);
uint32_t
getActiveCopyNumber
();
void
setIsVerifyOnly
(
bool
isVerifyOnly
)
{
m_payload
.
set_isverifyonly
(
isVerifyOnly
);
}
// ===========================================================================
std
::
list
<
JobDump
>
dumpJobs
();
std
::
string
dump
();
...
...
objectstore/cta.proto
View file @
2517e326
...
...
@@ -397,6 +397,7 @@ message SchedulerRetrieveRequest {
required
DiskFileInfo
diskfileinfo
=
9103
;
required
EntryLog
entrylog
=
9106
;
required
string
retrieveerrorreporturl
=
9110
;
required
bool
isverifyonly
=
9111
;
}
message
RetrieveJob
{
...
...
@@ -456,6 +457,7 @@ message RetrieveRequest {
optional
RetrieveRequestRepackInfo
repack_info
=
9158
;
optional
LifecycleTimings
lifecycle_timings
=
9159
;
optional
string
disk_system_name
=
9161
;
required
bool
isverifyonly
=
9162
;
}
message
ValueCountPair
{
...
...
scheduler/OStoreDB/OStoreDB.cpp
View file @
2517e326
...
...
@@ -1284,6 +1284,7 @@ SchedulerDatabase::RetrieveRequestInfo OStoreDB::queueRetrieve(cta::common::data
rReq
->
setRetrieveFileQueueCriteria
(
criteria
);
rReq
->
setActivityIfNeeded
(
rqst
,
criteria
);
rReq
->
setCreationTime
(
rqst
.
creationLog
.
time
);
rReq
->
setIsVerifyOnly
(
rqst
.
isVerifyOnly
);
if
(
diskSystemName
)
rReq
->
setDiskSystemName
(
diskSystemName
.
value
());
// Find the job corresponding to the vid (and check we indeed have one).
auto
jobs
=
rReq
->
getJobs
();
...
...
scheduler/Scheduler.cpp
View file @
2517e326
...
...
@@ -196,7 +196,18 @@ std::string Scheduler::queueRetrieve(
queueCriteria
.
archiveFile
.
diskFileInfo
=
request
.
diskFileInfo
;
auto
diskSystemList
=
m_catalogue
.
getAllDiskSystems
();
auto
catalogueTime
=
t
.
secs
(
cta
::
utils
::
Timer
::
resetCounter
);
// Determine disk system for this request, if any.
// By default, the scheduler makes its decision based on all available vids. But if a vid is specified in the protobuf,
// ignore all the others.
if
(
request
.
vid
)
{
queueCriteria
.
archiveFile
.
tapeFiles
.
removeAllVidsExcept
(
*
request
.
vid
);
if
(
queueCriteria
.
archiveFile
.
tapeFiles
.
empty
())
{
exception
::
UserError
ex
;
ex
.
getMessage
()
<<
"VID "
<<
*
request
.
vid
<<
" does not contain a tape copy of file with archive file ID "
<<
request
.
archiveFileID
;
throw
ex
;
}
}
// Determine disk system for this request, if any
optional
<
std
::
string
>
diskSystemName
;
try
{
diskSystemName
=
diskSystemList
.
getDSName
(
request
.
dstURL
);
...
...
@@ -232,6 +243,7 @@ std::string Scheduler::queueRetrieve(
spc
.
add
(
tc
.
str
(),
tf
);
}
spc
.
add
(
"selectedVid"
,
requestInfo
.
selectedVid
)
.
add
(
"verifyOnly"
,
request
.
isVerifyOnly
)
.
add
(
"catalogueTime"
,
catalogueTime
)
.
add
(
"schedulerDbTime"
,
schedulerDbTime
)
.
add
(
"policyName"
,
queueCriteria
.
mountPolicy
.
name
)
...
...
tapeserver/castor/tape/tapeserver/daemon/DataTransferSessionTest.cpp
View file @
2517e326
...
...
@@ -511,6 +511,7 @@ TEST_P(DataTransferSessionTest, DataTransferSessionGooddayRecall) {
rReq
.
requester
.
group
=
"someGroup"
;
rReq
.
dstURL
=
remoteFilePaths
.
back
();
rReq
.
diskFileInfo
.
path
=
"path/to/file"
;
rReq
.
isVerifyOnly
=
false
;
std
::
list
<
std
::
string
>
archiveFilePaths
;
scheduler
.
queueRetrieve
(
diskInstance
,
rReq
,
logContext
);
}
...
...
@@ -918,6 +919,7 @@ TEST_P(DataTransferSessionTest, DataTransferSessionRAORecall) {
rReq
.
requester
.
name
=
s_userName
;
rReq
.
requester
.
group
=
"someGroup"
;
rReq
.
dstURL
=
remoteFilePaths
.
back
();
rReq
.
isVerifyOnly
=
false
;
std
::
list
<
std
::
string
>
archiveFilePaths
;
scheduler
.
queueRetrieve
(
diskInstance
,
rReq
,
logContext
);
...
...
@@ -1119,6 +1121,7 @@ TEST_P(DataTransferSessionTest, DataTransferSessionRAORecallLinearAlgorithm) {
rReq
.
requester
.
name
=
s_userName
;
rReq
.
requester
.
group
=
"someGroup"
;
rReq
.
dstURL
=
remoteFilePaths
.
back
();
rReq
.
isVerifyOnly
=
false
;
std
::
list
<
std
::
string
>
archiveFilePaths
;
scheduler
.
queueRetrieve
(
diskInstance
,
rReq
,
logContext
);
}
...
...
@@ -1316,6 +1319,7 @@ TEST_P(DataTransferSessionTest, DataTransferSessionRAORecallRAOAlgoDoesNotExistS
rReq
.
requester
.
name
=
s_userName
;
rReq
.
requester
.
group
=
"someGroup"
;
rReq
.
dstURL
=
remoteFilePaths
.
back
();
rReq
.
isVerifyOnly
=
false
;
std
::
list
<
std
::
string
>
archiveFilePaths
;
scheduler
.
queueRetrieve
(
diskInstance
,
rReq
,
logContext
);
}
...
...
@@ -1515,6 +1519,7 @@ TEST_P(DataTransferSessionTest, DataTransferSessionRAORecallSLTFRAOAlgorithm) {
rReq
.
requester
.
name
=
s_userName
;
rReq
.
requester
.
group
=
"someGroup"
;
rReq
.
dstURL
=
remoteFilePaths
.
back
();
rReq
.
isVerifyOnly
=
false
;
std
::
list
<
std
::
string
>
archiveFilePaths
;
scheduler
.
queueRetrieve
(
diskInstance
,
rReq
,
logContext
);
}
...
...
tapeserver/castor/tape/tapeserver/daemon/DiskWriteTask.cpp
View file @
2517e326
...
...
@@ -60,6 +60,7 @@ bool DiskWriteTask::execute(RecallReportPacker& reporter,cta::log::LogContext&
// We will not record errors for an empty string. This will allow us to
// prevent counting where error happened upstream.
std
::
string
currentErrorToCount
=
""
;
bool
isVerifyOnly
(
false
);
try
{
currentErrorToCount
=
""
;
// Placeholder for the disk file. We will open it only
...
...
@@ -72,7 +73,11 @@ bool DiskWriteTask::execute(RecallReportPacker& reporter,cta::log::LogContext&
if
(
MemBlock
*
const
mb
=
m_fifo
.
pop
())
{
m_stats
.
waitDataTime
+=
localTime
.
secs
(
cta
::
utils
::
Timer
::
resetCounter
);
AutoReleaseBlock
<
RecallMemoryManager
>
releaser
(
mb
,
m_memManager
);
if
(
mb
->
isCanceled
())
{
if
(
mb
->
isVerifyOnly
())
{
// For verifyOnly, there is no disk file to write. Ignore the memory block and continue.
isVerifyOnly
=
true
;
continue
;
}
else
if
(
mb
->
isCanceled
())
{
// If the tape side got canceled, we report nothing and count
// it as a success.
lc
.
log
(
cta
::
log
::
DEBUG
,
"File transfer canceled"
);
...
...
@@ -109,8 +114,11 @@ bool DiskWriteTask::execute(RecallReportPacker& reporter,cta::log::LogContext&
currentErrorToCount
=
""
;
blockId
++
;
}
//end if block non NULL
else
{
//end if block non NULL
}
else
if
(
isVerifyOnly
)
{
// No file to close, we are done
break
;
}
else
{
//close has to be explicit, because it may throw.
//A close is done in WriteFile's destructor, but it may lead to some
//silent data loss
...
...
@@ -136,10 +144,8 @@ bool DiskWriteTask::execute(RecallReportPacker& reporter,cta::log::LogContext&
m_stats
.
waitReportingTime
+=
localTime
.
secs
(
cta
::
utils
::
Timer
::
resetCounter
);
m_stats
.
transferTime
=
transferTime
.
secs
();
m_stats
.
totalTime
=
totalTime
.
secs
();
logWithStat
(
cta
::
log
::
INFO
,
"File successfully transfered to disk"
,
lc
);
watchdog
.
deleteParameter
(
"stillOpenFileForThread"
+
std
::
to_string
((
long
long
)
threadID
));
logWithStat
(
cta
::
log
::
INFO
,
isVerifyOnly
?
"File successfully verified"
:
"File successfully transfered to disk"
,
lc
);
watchdog
.
deleteParameter
(
"stillOpenFileForThread"
+
std
::
to_string
((
long
long
)
threadID
));
//everything went well, return true
return
true
;
}
//end of try
...
...
@@ -164,7 +170,7 @@ bool DiskWriteTask::execute(RecallReportPacker& reporter,cta::log::LogContext&
m_stats
.
waitReportingTime
+=
localTime
.
secs
(
cta
::
utils
::
Timer
::
resetCounter
);
cta
::
log
::
ScopedParamContainer
params
(
lc
);
params
.
add
(
"errorMessage"
,
e
.
getMessageValue
());
logWithStat
(
cta
::
log
::
ERR
,
"File writing to disk failed
.
"
,
lc
);
logWithStat
(
cta
::
log
::
ERR
,
isVerifyOnly
?
"File verification failed"
:
"File writing to disk failed"
,
lc
);
lc
.
logBacktrace
(
cta
::
log
::
ERR
,
e
.
backtrace
());
reporter
.
reportFailedJob
(
std
::
move
(
m_retrieveJob
),
e
);
...
...
tapeserver/castor/tape/tapeserver/daemon/MemBlock.hpp
View file @
2517e326
...
...
@@ -46,14 +46,21 @@ class MemBlock {
struct
Cancelled_t
{};
static
Cancelled_t
Cancelled
;
struct
VerifyOnly_t
{};
static
VerifyOnly_t
VerifyOnly
;
/** Flag indicating to the receiver that the file read failed */
bool
m_failed
;
/** Flag indicating that the transfer was cancelled, usually due to a
previous failure. */
bool
m_cancelled
;
/** Flag indicating that the transfer is verify only, no disk file
should be written. */
bool
m_verifyonly
;
/**
* in case of error, the error message
*/
...
...
@@ -65,16 +72,19 @@ class MemBlock {
int
m_errorCode
;
AlterationContext
(
const
std
::
string
&
msg
,
int
errorCode
,
Failed_t
)
:
m_failed
(
true
),
m_cancelled
(
false
),
m_errorMsg
(
msg
),
m_errorCode
(
errorCode
){}
m_failed
(
true
),
m_cancelled
(
false
),
m_
verifyonly
(
false
),
m_
errorMsg
(
msg
),
m_errorCode
(
errorCode
){}
AlterationContext
(
Cancelled_t
)
:
m_failed
(
false
),
m_cancelled
(
true
),
m_errorMsg
(
""
),
m_errorCode
(
0
){}
m_failed
(
false
),
m_cancelled
(
true
),
m_verifyonly
(
false
),
m_errorMsg
(
""
),
m_errorCode
(
0
){}
AlterationContext
(
VerifyOnly_t
)
:
m_failed
(
false
),
m_cancelled
(
false
),
m_verifyonly
(
true
),
m_errorMsg
(
""
),
m_errorCode
(
0
){}
};
std
::
unique_ptr
<
AlterationContext
>
m_context
;
public:
/**
* C
O
nstrucor
* C
o
nstruc
t
or
* @param id the block ID for its whole life
* @param capacity the capacity (in byte) of the embed payload
*/
...
...
@@ -127,6 +137,14 @@ public:
return
m_context
.
get
()
&&
m_context
->
m_cancelled
;
}
/**
* Return true if the block has been marked as verify only
* @return
*/
bool
isVerifyOnly
()
const
{
return
m_context
.
get
()
&&
m_context
->
m_verifyonly
;
}
/**
* Mark this block as failed ie
* m_failed is true, m_fileBlock and m_tapeFileBlock are set at -1
...
...
@@ -149,6 +167,13 @@ public:
m_fileBlock
=
-
1
;
m_tapeFileBlock
=
-
1
;
}
/**
* Mark the block as verify only: no disk file will be written but the
* file should otherwise be processed normally
*/
void
markAsVerifyOnly
(){
m_context
.
reset
(
new
AlterationContext
(
AlterationContext
::
VerifyOnly
));
}
/**
* Reset all the members.
* Numerical ones are set at -1.and m_failed to false.
...
...
tapeserver/castor/tape/tapeserver/daemon/TapeReadTask.hpp
View file @
2517e326
...
...
@@ -44,7 +44,7 @@ namespace daemon {
class
TapeReadTask
{
public:
/**
* C
O
nstructor
* C
o
nstructor
* @param ftr The file being recalled. We acquire the ownership on the pointer
* @param destination the task that will consume the memory blocks
* @param mm The memory manager to get free block
...
...
@@ -65,15 +65,17 @@ public:
TapeSessionStats
&
stats
,
cta
::
utils
::
Timer
&
timer
)
{
using
cta
::
log
::
Param
;
bool
isRepack
=
m_retrieveJob
->
m_dbJob
->
isRepack
;
const
bool
isRepack
=
m_retrieveJob
->
m_dbJob
->
isRepack
;
const
bool
isVerifyOnly
=
m_retrieveJob
->
retrieveRequest
.
isVerifyOnly
;
// Set the common context for all the coming logs (file info)
cta
::
log
::
ScopedParamContainer
params
(
lc
);
params
.
add
(
"fileId"
,
m_retrieveJob
->
archiveFile
.
archiveFileID
)
.
add
(
"BlockId"
,
m_retrieveJob
->
selectedTapeFile
().
blockId
)
.
add
(
"fSeq"
,
m_retrieveJob
->
selectedTapeFile
().
fSeq
)
.
add
(
"dstURL"
,
m_retrieveJob
->
retrieveRequest
.
dstURL
)
.
add
(
"isRepack"
,
isRepack
);
.
add
(
"isRepack"
,
isRepack
)
.
add
(
"isVerifyOnly"
,
isVerifyOnly
);