Skip to content
GitLab
Menu
Projects
Groups
Snippets
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
dCache
cta
Commits
9c2dcde7
Commit
9c2dcde7
authored
Jul 31, 2019
by
Eric Cano
Browse files
Removed remainder of distinction between RetriveToTransferForUser and ForRepack at the queue level.
parent
337a4a92
Changes
14
Hide whitespace changes
Inline
Side-by-side
objectstore/AlgorithmsTest.cpp
View file @
9c2dcde7
...
...
@@ -35,7 +35,7 @@
namespace
unitTests
{
void
fillRetrieveRequests
(
typename
cta
::
objectstore
::
ContainerAlgorithms
<
cta
::
objectstore
::
RetrieveQueue
,
cta
::
objectstore
::
RetrieveQueueToTransfer
ForUser
>::
InsertedElement
::
list
&
requests
,
typename
cta
::
objectstore
::
ContainerAlgorithms
<
cta
::
objectstore
::
RetrieveQueue
,
cta
::
objectstore
::
RetrieveQueueToTransfer
>::
InsertedElement
::
list
&
requests
,
std
::
list
<
std
::
unique_ptr
<
cta
::
objectstore
::
RetrieveRequest
>
>&
requestPtrs
,
//List to avoid memory leak on ArchiveQueueAlgorithms test
cta
::
objectstore
::
BackendVFS
&
be
,
cta
::
objectstore
::
AgentReference
&
agentRef
)
...
...
@@ -77,8 +77,8 @@ void fillRetrieveRequests(
rqc
.
mountPolicy
.
retrieveMinRequestAge
=
1
;
rqc
.
mountPolicy
.
retrievePriority
=
1
;
requestPtrs
.
emplace_back
(
new
cta
::
objectstore
::
RetrieveRequest
(
rrAddr
,
be
));
requests
.
emplace_back
(
ContainerAlgorithms
<
RetrieveQueue
,
RetrieveQueueToTransfer
ForUser
>::
InsertedElement
{
requestPtrs
.
back
().
get
(),
1
,
i
,
667
,
mp
,
serializers
::
RetrieveJobStatus
::
RJS_ToTransfer
ForUser
,
cta
::
nullopt
,
cta
::
nullopt
requests
.
emplace_back
(
ContainerAlgorithms
<
RetrieveQueue
,
RetrieveQueueToTransfer
>::
InsertedElement
{
requestPtrs
.
back
().
get
(),
1
,
i
,
667
,
mp
,
serializers
::
RetrieveJobStatus
::
RJS_ToTransfer
,
cta
::
nullopt
,
cta
::
nullopt
});
auto
&
rr
=
*
requests
.
back
().
retrieveRequest
;
rr
.
initialize
();
...
...
@@ -192,7 +192,7 @@ TEST(ObjectStore, RetrieveQueueAlgorithms) {
agent
.
initialize
();
agent
.
insertAndRegisterSelf
(
lc
);
std
::
list
<
std
::
unique_ptr
<
RetrieveRequest
>
>
requestsPtrs
;
ContainerAlgorithms
<
RetrieveQueue
,
RetrieveQueueToTransfer
ForUser
>::
InsertedElement
::
list
requests
;
ContainerAlgorithms
<
RetrieveQueue
,
RetrieveQueueToTransfer
>::
InsertedElement
::
list
requests
;
fillRetrieveRequests
(
requests
,
requestsPtrs
,
be
,
agentRef
);
//memory leak here
{
...
...
@@ -211,18 +211,18 @@ TEST(ObjectStore, RetrieveQueueAlgorithms) {
rel2
.
release
();
agent2
.
initialize
();
agent2
.
insertAndRegisterSelf
(
lc
);
ContainerAlgorithms
<
RetrieveQueue
,
RetrieveQueueToTransfer
ForUser
>::
InsertedElement
::
list
requests2
;
ContainerAlgorithms
<
RetrieveQueue
,
RetrieveQueueToTransfer
>::
InsertedElement
::
list
requests2
;
std
::
list
<
std
::
unique_ptr
<
RetrieveRequest
>
>
requestsPtrs2
;
fillRetrieveRequests
(
requests2
,
requestsPtrs2
,
be2
,
agentRef2
);
auto
a1
=
agentRef2
.
getAgentAddress
();
auto
a2
=
agentRef2
.
getAgentAddress
();
ContainerAlgorithms
<
RetrieveQueue
,
RetrieveQueueToTransfer
ForUser
>
retrieveAlgos2
(
be2
,
agentRef2
);
ContainerAlgorithms
<
RetrieveQueue
,
RetrieveQueueToTransfer
>
retrieveAlgos2
(
be2
,
agentRef2
);
retrieveAlgos2
.
referenceAndSwitchOwnershipIfNecessary
(
"VID"
,
a2
,
a1
,
requests2
,
lc
);
}
ContainerAlgorithms
<
RetrieveQueue
,
RetrieveQueueToTransfer
ForUser
>
retrieveAlgos
(
be
,
agentRef
);
ContainerAlgorithms
<
RetrieveQueue
,
RetrieveQueueToTransfer
>
retrieveAlgos
(
be
,
agentRef
);
try
{
ASSERT_EQ
(
requests
.
size
(),
10
);
...
...
@@ -230,19 +230,19 @@ TEST(ObjectStore, RetrieveQueueAlgorithms) {
agentRef
.
getAgentAddress
(),
requests
,
lc
);
// Now get the requests back
ContainerTraits
<
RetrieveQueue
,
RetrieveQueueToTransfer
ForUser
>::
PopCriteria
popCriteria
;
ContainerTraits
<
RetrieveQueue
,
RetrieveQueueToTransfer
>::
PopCriteria
popCriteria
;
popCriteria
.
bytes
=
std
::
numeric_limits
<
decltype
(
popCriteria
.
bytes
)
>::
max
();
popCriteria
.
files
=
100
;
auto
poppedJobs
=
retrieveAlgos
.
popNextBatch
(
"VID"
,
popCriteria
,
lc
);
ASSERT_EQ
(
poppedJobs
.
summary
.
files
,
10
);
// Validate that the summary has the same information as the popped elements
ContainerTraits
<
RetrieveQueue
,
RetrieveQueueToTransfer
ForUser
>::
PoppedElementsSummary
s
;
ContainerTraits
<
RetrieveQueue
,
RetrieveQueueToTransfer
>::
PoppedElementsSummary
s
;
for
(
auto
&
e
:
poppedJobs
.
elements
)
{
s
+=
ContainerTraits
<
RetrieveQueue
,
RetrieveQueueToTransfer
ForUser
>::
getElementSummary
(
e
);
s
+=
ContainerTraits
<
RetrieveQueue
,
RetrieveQueueToTransfer
>::
getElementSummary
(
e
);
}
ASSERT_EQ
(
s
,
poppedJobs
.
summary
);
}
catch
(
ContainerTraits
<
RetrieveQueue
,
RetrieveQueueToTransfer
ForUser
>::
OwnershipSwitchFailure
&
ex
)
{
}
catch
(
ContainerTraits
<
RetrieveQueue
,
RetrieveQueueToTransfer
>::
OwnershipSwitchFailure
&
ex
)
{
for
(
auto
&
e
:
ex
.
failedElements
)
{
try
{
throw
e
.
failure
;
...
...
objectstore/CMakeLists.txt
View file @
9c2dcde7
...
...
@@ -75,8 +75,7 @@ add_library (ctaobjectstore SHARED
ArchiveQueueToTransferForRepackAlgorithms.cpp
RetrieveQueue.cpp
RetrieveQueueShard.cpp
RetrieveQueueToTransferForUserAlgorithms.cpp
RetrieveQueueToTransferForRepackAlgorithms.cpp
RetrieveQueueToTransferAlgorithms.cpp
RetrieveQueueToReportAlgorithms.cpp
RetrieveQueueFailedAlgorithms.cpp
RetrieveQueueToReportToRepackForSuccessAlgorithms.cpp
...
...
objectstore/GarbageCollector.cpp
View file @
9c2dcde7
...
...
@@ -330,7 +330,7 @@ void GarbageCollector::OwnedObjectSorter::sortFetchedObjects(Agent& agent, std::
// Get the list of vids for non failed tape files.
std
::
set
<
std
::
string
>
candidateVids
;
for
(
auto
&
j
:
rr
->
dumpJobs
())
{
if
(
j
.
status
==
RetrieveJobStatus
::
RJS_ToTransfer
ForUser
)
{
if
(
j
.
status
==
RetrieveJobStatus
::
RJS_ToTransfer
)
{
for
(
auto
&
tf
:
rr
->
getArchiveFile
().
tapeFiles
)
{
if
((
tf
.
copyNb
==
j
.
copyNb
)
&&
(
tf
.
supersededByVid
.
empty
()))
candidateVids
.
insert
(
tf
.
vid
);
...
...
objectstore/JobQueueType.hpp
View file @
9c2dcde7
...
...
@@ -21,6 +21,13 @@
#include <string>
namespace
cta
{
namespace
objectstore
{
enum
class
JobQueueType
{
JobsToTransferForUser
,
FailedJobs
,
JobsToReportToUser
,
JobsToReportToRepackForSuccess
,
JobsToReportToRepackForFailure
,
JobsToTransferForRepack
};
enum
class
JobQueueType
{
JobsToTransferForUser
,
FailedJobs
,
JobsToReportToUser
,
JobsToReportToRepackForSuccess
,
JobsToReportToRepackForFailure
,
JobsToTransferForRepack
};
std
::
string
toString
(
JobQueueType
queueType
);
}}
// namespace cta::objectstore
\ No newline at end of file
objectstore/ObjectOps.hpp
View file @
9c2dcde7
...
...
@@ -43,7 +43,7 @@ struct ArchiveQueueToReportToRepackForSuccess;
struct
ArchiveQueueFailed
;
struct
ArchiveQueueToTransferForRepack
;
struct
RetrieveQueue
;
struct
RetrieveQueueToTransfer
ForUser
;
struct
RetrieveQueueToTransfer
;
struct
RetrieveQueueToReportForUser
;
struct
RetrieveQueueFailed
;
struct
RetrieveQueueToReportToRepackForSuccess
;
...
...
@@ -64,7 +64,7 @@ class ObjectOpsBase {
friend
ContainerTraits
<
ArchiveQueue
,
ArchiveQueueToTransferForRepack
>
;
friend
ContainerTraits
<
ArchiveQueue
,
ArchiveQueueToReportToRepackForFailure
>
;
friend
ContainerTraits
<
ArchiveQueue
,
ArchiveQueueToReportToRepackForSuccess
>
;
friend
ContainerTraits
<
RetrieveQueue
,
RetrieveQueueToTransfer
ForUser
>
;
friend
ContainerTraits
<
RetrieveQueue
,
RetrieveQueueToTransfer
>
;
friend
ContainerTraits
<
RetrieveQueue
,
RetrieveQueueToReportForUser
>
;
friend
ContainerTraits
<
RetrieveQueue
,
RetrieveQueueFailed
>
;
friend
ContainerTraits
<
RetrieveQueue
,
RetrieveQueueToReportToRepackForSuccess
>
;
...
...
objectstore/RetrieveQueue.hpp
View file @
9c2dcde7
...
...
@@ -165,12 +165,11 @@ private:
uint64_t
m_maxShardSize
=
c_defaultMaxShardSize
;
};
class
RetrieveQueueToTransfer
ForUser
:
public
RetrieveQueue
{
using
RetrieveQueue
::
RetrieveQueue
;
};
class
RetrieveQueueToTransfer
:
public
RetrieveQueue
{
using
RetrieveQueue
::
RetrieveQueue
;
};
class
RetrieveQueueToReportForUser
:
public
RetrieveQueue
{
using
RetrieveQueue
::
RetrieveQueue
;
};
class
RetrieveQueueFailed
:
public
RetrieveQueue
{
using
RetrieveQueue
::
RetrieveQueue
;
};
class
RetrieveQueueToReportToRepackForSuccess
:
public
RetrieveQueue
{
using
RetrieveQueue
::
RetrieveQueue
;
};
class
RetrieveQueueToReportToRepackForFailure
:
public
RetrieveQueue
{
using
RetrieveQueue
::
RetrieveQueue
;
};
class
RetrieveQueueToTransferForRepack
:
public
RetrieveQueue
{
using
RetrieveQueue
::
RetrieveQueue
;
};
}}
objectstore/RetrieveQueueAlgorithms.hpp
View file @
9c2dcde7
...
...
@@ -445,7 +445,7 @@ trimContainerIfNeeded(Container &cont, ScopedExclusiveLock &contLock,
// RetrieveQueue full specialisations for ContainerTraits.
template
<
>
struct
ContainerTraits
<
RetrieveQueue
,
RetrieveQueueToTransfer
ForUser
>::
PopCriteria
{
struct
ContainerTraits
<
RetrieveQueue
,
RetrieveQueueToTransfer
>::
PopCriteria
{
uint64_t
files
;
uint64_t
bytes
;
PopCriteria
(
uint64_t
f
=
0
,
uint64_t
b
=
0
)
:
files
(
f
),
bytes
(
b
)
{}
...
...
@@ -459,7 +459,7 @@ struct ContainerTraits<RetrieveQueue,RetrieveQueueToTransferForUser>::PopCriteri
};
template
<
>
struct
ContainerTraits
<
RetrieveQueue
,
RetrieveQueueToTransfer
ForUser
>::
PoppedElementsSummary
{
struct
ContainerTraits
<
RetrieveQueue
,
RetrieveQueueToTransfer
>::
PoppedElementsSummary
{
uint64_t
files
;
uint64_t
bytes
;
PoppedElementsSummary
(
uint64_t
f
=
0
,
uint64_t
b
=
0
)
:
files
(
f
),
bytes
(
b
)
{}
...
...
@@ -518,7 +518,7 @@ template<typename C>
const
std
::
string
ContainerTraits
<
RetrieveQueue
,
C
>::
c_identifierType
=
"tapeVid"
;
template
<
>
struct
ContainerTraits
<
RetrieveQueue
,
RetrieveQueueToTransfer
ForUser
>::
QueueType
{
struct
ContainerTraits
<
RetrieveQueue
,
RetrieveQueueToTransfer
>::
QueueType
{
objectstore
::
JobQueueType
value
=
objectstore
::
JobQueueType
::
JobsToTransferForUser
;
};
...
...
objectstore/RetrieveQueueToTransfer
ForUser
Algorithms.cpp
→
objectstore/RetrieveQueueToTransferAlgorithms.cpp
View file @
9c2dcde7
...
...
@@ -21,10 +21,10 @@
namespace
cta
{
namespace
objectstore
{
template
<
>
const
std
::
string
ContainerTraits
<
RetrieveQueue
,
RetrieveQueueToTransfer
ForUser
>::
c_containerTypeName
=
"RetrieveQueueToTransfer
ForUser
"
;
const
std
::
string
ContainerTraits
<
RetrieveQueue
,
RetrieveQueueToTransfer
>::
c_containerTypeName
=
"RetrieveQueueToTransfer"
;
template
<
>
auto
ContainerTraits
<
RetrieveQueue
,
RetrieveQueueToTransfer
ForUser
>::
auto
ContainerTraits
<
RetrieveQueue
,
RetrieveQueueToTransfer
>::
getElementSummary
(
const
PoppedElement
&
poppedElement
)
->
PoppedElementsSummary
{
PoppedElementsSummary
ret
;
ret
.
bytes
=
poppedElement
.
bytes
;
...
...
@@ -33,14 +33,14 @@ getElementSummary(const PoppedElement &poppedElement) -> PoppedElementsSummary {
}
template
<
>
void
ContainerTraits
<
RetrieveQueue
,
RetrieveQueueToTransfer
ForUser
>::
PoppedElementsBatch
::
void
ContainerTraits
<
RetrieveQueue
,
RetrieveQueueToTransfer
>::
PoppedElementsBatch
::
addToLog
(
log
::
ScopedParamContainer
&
params
)
const
{
params
.
add
(
"bytes"
,
summary
.
bytes
)
.
add
(
"files"
,
summary
.
files
);
}
template
<
>
auto
ContainerTraits
<
RetrieveQueue
,
RetrieveQueueToTransfer
ForUser
>::
auto
ContainerTraits
<
RetrieveQueue
,
RetrieveQueueToTransfer
>::
getPoppingElementsCandidates
(
Container
&
cont
,
PopCriteria
&
unfulfilledCriteria
,
ElementsToSkipSet
&
elementsToSkip
,
log
::
LogContext
&
lc
)
->
PoppedElementsBatch
{
...
...
objectstore/RetrieveQueueToTransferForRepackAlgorithms.cpp
deleted
100644 → 0
View file @
337a4a92
/**
* The CERN Tape Archive (CTA) project
* Copyright © 2018 CERN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "RetrieveQueueAlgorithms.hpp"
namespace
cta
{
namespace
objectstore
{
template
<
>
const
std
::
string
ContainerTraits
<
RetrieveQueue
,
RetrieveQueueToTransferForRepack
>::
c_containerTypeName
=
"RetrieveQueueToTransferForRepack"
;
}}
\ No newline at end of file
objectstore/RetrieveRequest.cpp
View file @
9c2dcde7
...
...
@@ -84,7 +84,7 @@ void RetrieveRequest::garbageCollect(const std::string& presumedOwner, AgentRefe
using
serializers
::
RetrieveJobStatus
;
std
::
set
<
std
::
string
>
candidateVids
;
for
(
auto
&
j
:
m_payload
.
jobs
())
{
if
(
j
.
status
()
==
RetrieveJobStatus
::
RJS_ToTransfer
ForUser
)
{
if
(
j
.
status
()
==
RetrieveJobStatus
::
RJS_ToTransfer
)
{
// Find the job details in tape file
for
(
auto
&
tf
:
m_payload
.
archivefile
().
tapefiles
())
{
if
(
tf
.
copynb
()
==
j
.
copynb
())
{
...
...
@@ -114,7 +114,7 @@ queueForFailure:;
{
// If there is no candidate, we fail the jobs that are not yet, and queue the request as failed (on any VID).
for
(
auto
&
j
:
*
m_payload
.
mutable_jobs
())
{
if
(
j
.
status
()
==
RetrieveJobStatus
::
RJS_ToTransfer
ForUser
)
{
if
(
j
.
status
()
==
RetrieveJobStatus
::
RJS_ToTransfer
)
{
j
.
set_status
(
RetrieveJobStatus
::
RJS_Failed
);
log
::
ScopedParamContainer
params
(
lc
);
params
.
add
(
"fileId"
,
m_payload
.
archivefile
().
archivefileid
())
...
...
@@ -302,7 +302,7 @@ void RetrieveRequest::addJob(uint32_t copyNb, uint16_t maxRetriesWithinMount, ui
tf
->
set_totalretries
(
0
);
tf
->
set_maxreportretries
(
maxReportRetries
);
tf
->
set_totalreportretries
(
0
);
tf
->
set_status
(
serializers
::
RetrieveJobStatus
::
RJS_ToTransfer
ForUser
);
tf
->
set_status
(
serializers
::
RetrieveJobStatus
::
RJS_ToTransfer
);
}
//------------------------------------------------------------------------------
...
...
@@ -329,7 +329,7 @@ auto RetrieveRequest::addTransferFailure(uint32_t copyNumber, uint64_t mountId,
}
if
(
j
.
totalretries
()
<
j
.
maxtotalretries
())
{
EnqueueingNextStep
ret
;
ret
.
nextStatus
=
serializers
::
RetrieveJobStatus
::
RJS_ToTransfer
ForUser
;
ret
.
nextStatus
=
serializers
::
RetrieveJobStatus
::
RJS_ToTransfer
;
if
(
j
.
retrieswithinmount
()
<
j
.
maxretrieswithinmount
())
// Job can try again within this mount
ret
.
nextStep
=
EnqueueingNextStep
::
NextStep
::
EnqueueForTransferForUser
;
...
...
@@ -573,10 +573,10 @@ bool RetrieveRequest::addJobFailure(uint32_t copyNumber, uint64_t mountId,
if
(
j
.
totalretries
()
>=
j
.
maxtotalretries
())
{
j
.
set_status
(
serializers
::
RetrieveJobStatus
::
RJS_ToReportToUserForFailure
);
for
(
auto
&
j2
:
m_payload
.
jobs
())
if
(
j2
.
status
()
==
serializers
::
RetrieveJobStatus
::
RJS_ToTransfer
ForUser
)
return
false
;
if
(
j2
.
status
()
==
serializers
::
RetrieveJobStatus
::
RJS_ToTransfer
)
return
false
;
return
true
;
}
else
{
j
.
set_status
(
serializers
::
RetrieveJobStatus
::
RJS_ToTransfer
ForUser
);
j
.
set_status
(
serializers
::
RetrieveJobStatus
::
RJS_ToTransfer
);
return
false
;
}
}
...
...
@@ -644,7 +644,7 @@ JobQueueType RetrieveRequest::getQueueType() {
for
(
auto
&
j
:
m_payload
.
jobs
())
{
// Any job is to be transfered => To transfer
switch
(
j
.
status
())
{
case
serializers
::
RetrieveJobStatus
::
RJS_ToTransfer
ForUser
:
case
serializers
::
RetrieveJobStatus
::
RJS_ToTransfer
:
return
JobQueueType
::
JobsToTransferForUser
;
break
;
case
serializers
::
RetrieveJobStatus
::
RJS_ToReportToRepackForSuccess
:
...
...
@@ -656,8 +656,6 @@ JobQueueType RetrieveRequest::getQueueType() {
break
;
case
serializers
::
RetrieveJobStatus
::
RJS_ToReportToRepackForFailure
:
return
JobQueueType
::
JobsToReportToRepackForFailure
;
case
serializers
::
RetrieveJobStatus
::
RJS_ToTransferForRepack
:
return
JobQueueType
::
JobsToTransferForRepack
;
default:
break
;
}
}
...
...
@@ -665,12 +663,15 @@ JobQueueType RetrieveRequest::getQueueType() {
return
JobQueueType
::
FailedJobs
;
}
//------------------------------------------------------------------------------
// RetrieveRequest::getQueueType()
//------------------------------------------------------------------------------
JobQueueType
RetrieveRequest
::
getQueueType
(
uint32_t
copyNb
){
checkPayloadReadable
();
for
(
auto
&
j
:
m_payload
.
jobs
()){
if
(
j
.
copynb
()
==
copyNb
){
switch
(
j
.
status
()){
case
serializers
::
RetrieveJobStatus
::
RJS_ToTransfer
ForUser
:
case
serializers
::
RetrieveJobStatus
::
RJS_ToTransfer
:
return
JobQueueType
::
JobsToTransferForUser
;
case
serializers
::
RetrieveJobStatus
::
RJS_ToReportToRepackForSuccess
:
return
JobQueueType
::
JobsToReportToRepackForSuccess
;
...
...
@@ -680,8 +681,6 @@ JobQueueType RetrieveRequest::getQueueType(uint32_t copyNb){
return
JobQueueType
::
FailedJobs
;
case
serializers
::
RetrieveJobStatus
::
RJS_ToReportToRepackForFailure
:
return
JobQueueType
::
JobsToReportToRepackForFailure
;
case
serializers
::
RetrieveJobStatus
::
RJS_ToTransferForRepack
:
return
JobQueueType
::
JobsToTransferForRepack
;
default:
return
JobQueueType
::
FailedJobs
;
}
...
...
@@ -695,7 +694,7 @@ JobQueueType RetrieveRequest::getQueueType(uint32_t copyNb){
//------------------------------------------------------------------------------
std
::
string
RetrieveRequest
::
statusToString
(
const
serializers
::
RetrieveJobStatus
&
status
)
{
switch
(
status
)
{
case
serializers
::
RetrieveJobStatus
::
RJS_ToTransfer
ForUser
:
case
serializers
::
RetrieveJobStatus
::
RJS_ToTransfer
:
return
"ToTransfer"
;
case
serializers
::
RetrieveJobStatus
::
RJS_Failed
:
return
"Failed"
;
...
...
@@ -743,7 +742,7 @@ auto RetrieveRequest::determineNextStep(uint32_t copyNumberUpdated, JobEvent job
switch
(
jobEvent
)
{
case
JobEvent
::
TransferFailed
:
if
(
*
currentStatus
!=
RetrieveJobStatus
::
RJS_ToTransfer
ForUser
)
{
if
(
*
currentStatus
!=
RetrieveJobStatus
::
RJS_ToTransfer
)
{
// Wrong status, but the context leaves no ambiguity. Just warn.
log
::
ScopedParamContainer
params
(
lc
);
params
.
add
(
"event"
,
eventToString
(
jobEvent
))
...
...
@@ -801,7 +800,7 @@ void RetrieveRequest::updateLifecycleTiming(serializers::RetrieveRequest& payloa
LifecycleTimingsSerDeser
lifeCycleSerDeser
;
lifeCycleSerDeser
.
deserialize
(
payload
.
lifecycle_timings
());
switch
(
retrieveJob
.
status
()){
case
RetrieveJobStatus
::
RJS_ToTransfer
ForUser
:
case
RetrieveJobStatus
::
RJS_ToTransfer
:
if
(
retrieveJob
.
totalretries
()
==
0
){
//totalretries = 0 then this is the first selection of the request
lifeCycleSerDeser
.
first_selected_time
=
time
(
nullptr
);
...
...
objectstore/Sorter.cpp
View file @
9c2dcde7
...
...
@@ -185,7 +185,7 @@ void Sorter::dispatchRetrieveAlgorithm(const std::string vid, const JobQueueType
this
->
executeRetrieveAlgorithm
<
RetrieveQueueToReportForUser
>
(
vid
,
queueAddress
,
jobs
,
lc
);
break
;
case
JobQueueType
::
JobsToTransferForUser
:
this
->
executeRetrieveAlgorithm
<
RetrieveQueueToTransfer
ForUser
>
(
vid
,
queueAddress
,
jobs
,
lc
);
this
->
executeRetrieveAlgorithm
<
RetrieveQueueToTransfer
>
(
vid
,
queueAddress
,
jobs
,
lc
);
break
;
case
JobQueueType
::
JobsToReportToRepackForSuccess
:
this
->
executeRetrieveAlgorithm
<
RetrieveQueueToReportToRepackForSuccess
>
(
vid
,
queueAddress
,
jobs
,
lc
);
...
...
@@ -193,8 +193,6 @@ void Sorter::dispatchRetrieveAlgorithm(const std::string vid, const JobQueueType
case
JobQueueType
::
JobsToReportToRepackForFailure
:
this
->
executeRetrieveAlgorithm
<
RetrieveQueueToReportToRepackForFailure
>
(
vid
,
queueAddress
,
jobs
,
lc
);
break
;
case
JobQueueType
::
JobsToTransferForRepack
:
this
->
executeRetrieveAlgorithm
<
RetrieveQueueToTransferForRepack
>
(
vid
,
queueAddress
,
jobs
,
lc
);
break
;
case
JobQueueType
::
FailedJobs
:
break
;
...
...
@@ -308,7 +306,7 @@ std::set<std::string> Sorter::getCandidateVidsToTransfer(RetrieveRequestInfosAcc
using
serializers
::
RetrieveJobStatus
;
std
::
set
<
std
::
string
>
candidateVids
;
for
(
auto
&
j
:
requestAccessor
.
getJobs
()){
if
(
j
.
status
==
RetrieveJobStatus
::
RJS_ToTransfer
ForUser
){
if
(
j
.
status
==
RetrieveJobStatus
::
RJS_ToTransfer
){
candidateVids
.
insert
(
requestAccessor
.
getArchiveFile
().
tapeFiles
.
at
(
j
.
copyNb
).
vid
);
}
}
...
...
objectstore/SorterTest.cpp
View file @
9c2dcde7
...
...
@@ -270,7 +270,8 @@ TEST(ObjectStore,SorterInsertRetrieveRequest){
rqc
.
mountPolicy
.
retrievePriority
=
1
;
rr
.
setRetrieveFileQueueCriteria
(
rqc
);
rr
.
setJobStatus
(
2
,
cta
::
objectstore
::
serializers
::
RetrieveJobStatus
::
RJS_ToTransferForRepack
);
// Make sure job 1 will get queued by failing the other one.
rr
.
setJobStatus
(
2
,
cta
::
objectstore
::
serializers
::
RetrieveJobStatus
::
RJS_Failed
);
cta
::
common
::
dataStructures
::
RetrieveRequest
sReq
;
sReq
.
archiveFileID
=
rqc
.
archiveFile
.
archiveFileID
;
...
...
@@ -304,7 +305,7 @@ TEST(ObjectStore,SorterInsertRetrieveRequest){
allFutures
.
clear
();
typedef
ContainerAlgorithms
<
RetrieveQueue
,
RetrieveQueueToTransfer
ForUser
>
Algo
;
typedef
ContainerAlgorithms
<
RetrieveQueue
,
RetrieveQueueToTransfer
>
Algo
;
Algo
algo
(
be
,
agentRef
);
typename
Algo
::
PopCriteria
criteria
;
...
...
@@ -333,6 +334,10 @@ TEST(ObjectStore,SorterInsertRetrieveRequest){
{
ScopedExclusiveLock
sel
(
*
retrieveRequest
);
retrieveRequest
->
fetch
();
// Make sure now copy 2 will get queued.
retrieveRequest
->
setJobStatus
(
1
,
cta
::
objectstore
::
serializers
::
RetrieveJobStatus
::
RJS_Failed
);
retrieveRequest
->
setJobStatus
(
2
,
cta
::
objectstore
::
serializers
::
RetrieveJobStatus
::
RJS_ToTransfer
);
retrieveRequest
->
commit
();
ASSERT_NO_THROW
(
sorter
.
insertRetrieveRequest
(
retrieveRequest
,
agentRef
,
cta
::
optional
<
uint32_t
>
(
2
),
lc
));
...
...
@@ -352,11 +357,12 @@ TEST(ObjectStore,SorterInsertRetrieveRequest){
}
ASSERT_EQ
(
sorter
.
getAllRetrieve
().
size
(),
0
);
typedef
ContainerAlgorithms
<
RetrieveQueue
,
RetrieveQueueToTransfer
ForRepack
>
Algo
;
typedef
ContainerAlgorithms
<
RetrieveQueue
,
RetrieveQueueToTransfer
>
Algo
;
Algo
algo
(
be
,
agentRef
);
typename
Algo
::
PopCriteria
criteria
;
criteria
.
files
=
1
;
criteria
.
bytes
=
1000
;
typename
Algo
::
PoppedElementsBatch
elements
=
algo
.
popNextBatch
(
"Tape1"
,
criteria
,
lc
);
ASSERT_EQ
(
elements
.
elements
.
size
(),
1
);
auto
&
elt
=
elements
.
elements
.
front
();
...
...
@@ -379,10 +385,14 @@ TEST(ObjectStore,SorterInsertRetrieveRequest){
{
ScopedExclusiveLock
sel
(
*
retrieveRequest
);
retrieveRequest
->
fetch
();
// We should be forbidden to force queueing a non-exsiting copy number.
ASSERT_THROW
(
sorter
.
insertRetrieveRequest
(
retrieveRequest
,
agentRef
,
cta
::
optional
<
uint32_t
>
(
4
),
lc
),
cta
::
exception
::
Exception
);
retrieveRequest
->
setJobStatus
(
1
,
serializers
::
RetrieveJobStatus
::
RJS_ToReportToRepackForSuccess
);
retrieveRequest
->
setJobStatus
(
2
,
serializers
::
RetrieveJobStatus
::
RJS_Failed
);
retrieveRequest
->
commit
();
// We should be forbidden to requeue a request if no copy is in status ToTranfer.
ASSERT_THROW
(
sorter
.
insertRetrieveRequest
(
retrieveRequest
,
agentRef
,
cta
::
nullopt
,
lc
),
cta
::
exception
::
Exception
);
sel
.
release
();
...
...
@@ -644,7 +654,7 @@ TEST(ObjectStore,SorterInsertDifferentTypesOfRequests){
{
//Test the Retrieve Jobs
typedef
ContainerAlgorithms
<
RetrieveQueue
,
RetrieveQueueToTransfer
ForUser
>
Algo
;
typedef
ContainerAlgorithms
<
RetrieveQueue
,
RetrieveQueueToTransfer
>
Algo
;
Algo
algo
(
be
,
agentRef
);
typename
Algo
::
PopCriteria
criteria
;
criteria
.
files
=
2
;
...
...
@@ -938,7 +948,7 @@ TEST(ObjectStore,SorterInsertRetrieveRequestNotFetched){
job
.
fSeq
=
tf
.
fSeq
;
job
.
fileSize
=
rqc
.
archiveFile
.
fileSize
;
job
.
jobDump
.
copyNb
=
tf
.
copyNb
;
job
.
jobDump
.
status
=
serializers
::
RetrieveJobStatus
::
RJS_ToTransfer
ForUser
;
job
.
jobDump
.
status
=
serializers
::
RetrieveJobStatus
::
RJS_ToTransfer
;
job
.
jobQueueType
=
JobQueueType
::
JobsToTransferForUser
;
job
.
mountPolicy
=
rqc
.
mountPolicy
;
job
.
previousOwner
=
&
agentRef
;
...
...
@@ -965,7 +975,7 @@ TEST(ObjectStore,SorterInsertRetrieveRequestNotFetched){
{
//Test the Retrieve Jobs
typedef
ContainerAlgorithms
<
RetrieveQueue
,
RetrieveQueueToTransfer
ForUser
>
Algo
;
typedef
ContainerAlgorithms
<
RetrieveQueue
,
RetrieveQueueToTransfer
>
Algo
;
Algo
algo
(
be
,
agentRef
);
typename
Algo
::
PopCriteria
criteria
;
criteria
.
files
=
2
;
...
...
objectstore/cta.proto
View file @
9c2dcde7
...
...
@@ -359,12 +359,11 @@ message ArchiveRequest {
// of the request.
enum
RetrieveJobStatus
{
RJS_ToTransfer
ForUser
=
1
;
RJS_ToTransfer
=
1
;
RJS_ToReportToUserForFailure
=
997
;
RJS_Failed
=
998
;
RJS_ToReportToRepackForSuccess
=
1002
;
//For Retrieve request created by a Repack request
RJS_ToReportToRepackForFailure
=
1003
;
RJS_ToTransferForRepack
=
1004
;
}
message
SchedulerRetrieveRequest
{
...
...
scheduler/OStoreDB/OStoreDB.cpp
View file @
9c2dcde7
...
...
@@ -3471,7 +3471,7 @@ std::list<std::unique_ptr<SchedulerDatabase::RetrieveJob>> OStoreDB::RetrieveMou
// full and stop popping from it, after requeueing the jobs.
bool
failedAllocation
=
false
;
SchedulerDatabase
::
DiskSpaceReservationRequest
diskSpaceReservationRequest
;
typedef
objectstore
::
ContainerAlgorithms
<
RetrieveQueue
,
RetrieveQueueToTransfer
ForUser
>
RQAlgos
;
typedef
objectstore
::
ContainerAlgorithms
<
RetrieveQueue
,
RetrieveQueueToTransfer
>
RQAlgos
;
RQAlgos
rqAlgos
(
m_oStoreDB
.
m_objectStore
,
*
m_oStoreDB
.
m_agentReference
);
RQAlgos
::
PoppedElementsBatch
jobs
;
retryPop:
...
...
@@ -4680,7 +4680,7 @@ void OStoreDB::RetrieveJob::failTransfer(const std::string &failureReason, log::
}
case
NextStep
::
EnqueueForTransferForUser
:
{
typedef
objectstore
::
ContainerAlgorithms
<
RetrieveQueue
,
RetrieveQueueToTransfer
ForUser
>
CaRqtr
;
typedef
objectstore
::
ContainerAlgorithms
<
RetrieveQueue
,
RetrieveQueueToTransfer
>
CaRqtr
;
// Algorithms suppose the objects are not locked
auto
retryStatus
=
m_retrieveRequest
.
getRetryStatus
(
selectedCopyNb
);
...
...
@@ -4689,7 +4689,7 @@ void OStoreDB::RetrieveJob::failTransfer(const std::string &failureReason, log::
std
::
set
<
std
::
string
>
candidateVids
;
for
(
auto
&
tf
:
af
.
tapeFiles
)
{
if
(
m_retrieveRequest
.
getJobStatus
(
tf
.
copyNb
)
==
serializers
::
RetrieveJobStatus
::
RJS_ToTransfer
ForUser
)
if
(
m_retrieveRequest
.
getJobStatus
(
tf
.
copyNb
)
==
serializers
::
RetrieveJobStatus
::
RJS_ToTransfer
)
candidateVids
.
insert
(
tf
.
vid
);
}
if
(
candidateVids
.
empty
())
{
...
...
@@ -4716,7 +4716,7 @@ void OStoreDB::RetrieveJob::failTransfer(const std::string &failureReason, log::
CaRqtr
::
InsertedElement
::
list
insertedElements
;
insertedElements
.
push_back
(
CaRqtr
::
InsertedElement
{
&
m_retrieveRequest
,
tf
.
copyNb
,
tf
.
fSeq
,
af
.
fileSize
,
rfqc
.
mountPolicy
,
serializers
::
RetrieveJobStatus
::
RJS_ToTransfer
ForUser
,
&
m_retrieveRequest
,
tf
.
copyNb
,
tf
.
fSeq
,
af
.
fileSize
,
rfqc
.
mountPolicy
,
serializers
::
RetrieveJobStatus
::
RJS_ToTransfer
,
m_activityDescription
,
m_diskSystemName
});
...
...
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment