Skip to content
GitLab
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
dCache
cta
Commits
f60b166e
Commit
f60b166e
authored
Oct 01, 2018
by
Eric Cano
Browse files
Renamed QueueType to JobQueueType. Created RepackQueueType.
parent
697a4f21
Changes
30
Hide whitespace changes
Inline
Side-by-side
objectstore/Algorithms.hpp
View file @
f60b166e
...
...
@@ -45,7 +45,7 @@ public:
* are provided existing and owned by algorithm's agent.
*/
void
referenceAndSwitchOwnership
(
const
typename
ContainerTraits
<
Q
,
C
>::
ContainerIdentifier
&
contId
,
QueueType
queueType
,
const
typename
ContainerTraits
<
Q
,
C
>::
Container
Identifier
&
prevCont
Id
,
const
typename
ContainerTraits
<
Q
,
C
>::
QueueType
queueType
,
const
typename
ContainerTraits
<
Q
,
C
>::
Container
Address
&
prevCont
Address
,
typename
ContainerTraits
<
Q
,
C
>::
InsertedElement
::
list
&
elements
,
log
::
LogContext
&
lc
)
{
C
cont
(
m_backend
);
ScopedExclusiveLock
contLock
;
...
...
@@ -92,7 +92,7 @@ public:
* Addition of jobs to container. Convenience overload for cases when current agent is the previous owner
* (most cases except garbage collection).
*/
void
referenceAndSwitchOwnership
(
const
typename
ContainerTraits
<
Q
,
C
>::
ContainerIdentifier
&
contId
,
QueueType
queueType
,
void
referenceAndSwitchOwnership
(
const
typename
ContainerTraits
<
Q
,
C
>::
ContainerIdentifier
&
contId
,
Job
QueueType
queueType
,
typename
ContainerTraits
<
Q
,
C
>::
InsertedElement
::
list
&
elements
,
log
::
LogContext
&
lc
)
{
referenceAndSwitchOwnership
(
contId
,
queueType
,
m_agentReference
.
getAgentAddress
(),
elements
,
lc
);
}
...
...
@@ -104,7 +104,7 @@ public:
* might vary. This function is typically used by the garbage collector. We do not take care of
* dereferencing the object from the caller.
*/
void
referenceAndSwitchOwnershipIfNecessary
(
const
typename
ContainerTraits
<
Q
,
C
>::
ContainerIdentifier
&
contId
,
QueueType
queueType
,
void
referenceAndSwitchOwnershipIfNecessary
(
const
typename
ContainerTraits
<
Q
,
C
>::
ContainerIdentifier
&
contId
,
Job
QueueType
queueType
,
typename
ContainerTraits
<
Q
,
C
>::
ContainerAddress
&
previousOwnerAddress
,
typename
ContainerTraits
<
Q
,
C
>::
ContainerAddress
&
contAddress
,
typename
ContainerTraits
<
Q
,
C
>::
InsertedElement
::
list
&
elements
,
log
::
LogContext
&
lc
)
{
...
...
@@ -154,7 +154,7 @@ public:
typename
ContainerTraits
<
Q
,
C
>::
PoppedElementsBatch
popNextBatch
(
const
typename
ContainerTraits
<
Q
,
C
>::
ContainerIdentifier
&
contId
,
QueueType
queueType
,
Job
QueueType
queueType
,
typename
ContainerTraits
<
Q
,
C
>::
PopCriteria
&
popCriteria
,
log
::
LogContext
&
lc
)
{
...
...
objectstore/AlgorithmsTest.cpp
View file @
f60b166e
...
...
@@ -150,12 +150,12 @@ TEST(ObjectStore, ArchiveQueueAlgorithms) {
ar
.
insert
();
}
ContainerAlgorithms
<
ArchiveQueue
,
ArchiveQueueToTransfer
>
archiveAlgos
(
be
,
agentRef
);
archiveAlgos
.
referenceAndSwitchOwnership
(
"Tapepool"
,
QueueType
::
JobsToTransfer
,
requests
,
lc
);
archiveAlgos
.
referenceAndSwitchOwnership
(
"Tapepool"
,
Job
QueueType
::
JobsToTransfer
,
requests
,
lc
);
// Now get the requests back
ContainerTraits
<
ArchiveQueue
,
ArchiveQueueToTransfer
>::
PopCriteria
popCriteria
;
popCriteria
.
bytes
=
std
::
numeric_limits
<
decltype
(
popCriteria
.
bytes
)
>::
max
();
popCriteria
.
files
=
100
;
auto
poppedJobs
=
archiveAlgos
.
popNextBatch
(
"Tapepool"
,
QueueType
::
JobsToTransfer
,
popCriteria
,
lc
);
auto
poppedJobs
=
archiveAlgos
.
popNextBatch
(
"Tapepool"
,
Job
QueueType
::
JobsToTransfer
,
popCriteria
,
lc
);
ASSERT_EQ
(
poppedJobs
.
summary
.
files
,
10
);
}
...
...
@@ -210,7 +210,7 @@ TEST(ObjectStore, RetrieveQueueAlgorithms) {
auto
a1
=
agentRef2
.
getAgentAddress
();
auto
a2
=
agentRef2
.
getAgentAddress
();
ContainerAlgorithms
<
RetrieveQueue
,
RetrieveQueueToTransfer
>
retrieveAlgos2
(
be2
,
agentRef2
);
retrieveAlgos2
.
referenceAndSwitchOwnershipIfNecessary
(
"VID"
,
QueueType
::
JobsToTransfer
,
retrieveAlgos2
.
referenceAndSwitchOwnershipIfNecessary
(
"VID"
,
Job
QueueType
::
JobsToTransfer
,
a2
,
a1
,
requests2
,
lc
);
}
...
...
@@ -218,14 +218,14 @@ TEST(ObjectStore, RetrieveQueueAlgorithms) {
try
{
ASSERT_EQ
(
requests
.
size
(),
10
);
retrieveAlgos
.
referenceAndSwitchOwnership
(
"VID"
,
QueueType
::
JobsToTransfer
,
retrieveAlgos
.
referenceAndSwitchOwnership
(
"VID"
,
Job
QueueType
::
JobsToTransfer
,
agentRef
.
getAgentAddress
(),
requests
,
lc
);
// Now get the requests back
ContainerTraits
<
RetrieveQueue
,
RetrieveQueueToTransfer
>::
PopCriteria
popCriteria
;
popCriteria
.
bytes
=
std
::
numeric_limits
<
decltype
(
popCriteria
.
bytes
)
>::
max
();
popCriteria
.
files
=
100
;
auto
poppedJobs
=
retrieveAlgos
.
popNextBatch
(
"VID"
,
QueueType
::
JobsToTransfer
,
popCriteria
,
lc
);
auto
poppedJobs
=
retrieveAlgos
.
popNextBatch
(
"VID"
,
Job
QueueType
::
JobsToTransfer
,
popCriteria
,
lc
);
ASSERT_EQ
(
poppedJobs
.
summary
.
files
,
10
);
// Validate that the summary has the same information as the popped elements
...
...
objectstore/ArchiveQueue.cpp
View file @
f60b166e
...
...
@@ -221,7 +221,7 @@ void ArchiveQueue::garbageCollect(const std::string &presumedOwner, AgentReferen
RootEntry
re
(
m_objectStore
);
ScopedSharedLock
rel
(
re
);
re
.
fetch
();
auto
tpd
=
re
.
dumpArchiveQueues
(
QueueType
::
JobsToTransfer
);
auto
tpd
=
re
.
dumpArchiveQueues
(
Job
QueueType
::
JobsToTransfer
);
for
(
auto
tp
=
tpd
.
begin
();
tp
!=
tpd
.
end
();
tp
++
)
{
if
(
tp
->
address
==
getAddressIfSet
())
{
setOwner
(
re
.
getAddressIfSet
());
...
...
objectstore/ArchiveQueueAlgorithms.hpp
View file @
f60b166e
...
...
@@ -32,6 +32,8 @@ struct ContainerTraits<ArchiveQueue,C>
struct
ContainerSummary
:
public
ArchiveQueue
::
JobsSummary
{
void
addDeltaToLog
(
ContainerSummary
&
,
log
::
ScopedParamContainer
&
);
};
typedef
cta
::
objectstore
::
JobQueueType
QueueType
;
struct
InsertedElement
{
ArchiveRequest
*
archiveRequest
;
...
...
@@ -231,7 +233,7 @@ void ContainerTraits<ArchiveQueue,C>::
getLockedAndFetched
(
Container
&
cont
,
ScopedExclusiveLock
&
aqL
,
AgentReference
&
agRef
,
const
ContainerIdentifier
&
contId
,
QueueType
queueType
,
log
::
LogContext
&
lc
)
{
Helpers
::
getLockedAndFetchedQueue
<
Container
>
(
cont
,
aqL
,
agRef
,
contId
,
queueType
,
lc
);
Helpers
::
getLockedAndFetched
Job
Queue
<
Container
>
(
cont
,
aqL
,
agRef
,
contId
,
queueType
,
lc
);
}
template
<
typename
C
>
...
...
objectstore/ArchiveQueueFailedAlgorithms.cpp
View file @
f60b166e
...
...
@@ -35,7 +35,7 @@ void ContainerTraits<ArchiveQueue,ArchiveQueueFailed>::
trimContainerIfNeeded
(
Container
&
cont
,
ScopedExclusiveLock
&
contLock
,
const
ContainerIdentifier
&
cId
,
log
::
LogContext
&
lc
)
{
trimContainerIfNeeded
(
cont
,
QueueType
::
FailedJobs
,
contLock
,
cId
,
lc
);
trimContainerIfNeeded
(
cont
,
Job
QueueType
::
FailedJobs
,
contLock
,
cId
,
lc
);
}
}}
// namespace cta::objectstore
objectstore/ArchiveQueueToReportAlgorithms.cpp
View file @
f60b166e
...
...
@@ -80,7 +80,7 @@ void ContainerTraits<ArchiveQueue,ArchiveQueueToReport>::
trimContainerIfNeeded
(
Container
&
cont
,
ScopedExclusiveLock
&
contLock
,
const
ContainerIdentifier
&
cId
,
log
::
LogContext
&
lc
)
{
trimContainerIfNeeded
(
cont
,
QueueType
::
JobsToReport
,
contLock
,
cId
,
lc
);
trimContainerIfNeeded
(
cont
,
Job
QueueType
::
JobsToReport
,
contLock
,
cId
,
lc
);
}
}}
// namespace cta::objectstore
objectstore/ArchiveQueueToTransferAlgorithms.cpp
View file @
f60b166e
...
...
@@ -74,7 +74,7 @@ void ContainerTraits<ArchiveQueue,ArchiveQueueToTransfer>::
trimContainerIfNeeded
(
Container
&
cont
,
ScopedExclusiveLock
&
contLock
,
const
ContainerIdentifier
&
cId
,
log
::
LogContext
&
lc
)
{
trimContainerIfNeeded
(
cont
,
QueueType
::
JobsToTransfer
,
contLock
,
cId
,
lc
);
trimContainerIfNeeded
(
cont
,
Job
QueueType
::
JobsToTransfer
,
contLock
,
cId
,
lc
);
}
}}
// namespace cta::objectstore
objectstore/ArchiveRequest.cpp
View file @
f60b166e
...
...
@@ -71,23 +71,26 @@ void cta::objectstore::ArchiveRequest::addJob(uint16_t copyNumber,
j
->
set_maxreportretries
(
maxReportRetries
);
}
QueueType
ArchiveRequest
::
getJobQueueType
(
uint16_t
copyNumber
)
{
//------------------------------------------------------------------------------
// ArchiveRequest::getJobQueueType()
//------------------------------------------------------------------------------
JobQueueType
ArchiveRequest
::
getJobQueueType
(
uint16_t
copyNumber
)
{
checkPayloadReadable
();
for
(
auto
&
j
:
m_payload
.
jobs
())
{
if
(
j
.
copynb
()
==
copyNumber
)
{
switch
(
j
.
status
())
{
case
serializers
::
ArchiveJobStatus
::
AJS_ToTransfer
:
return
QueueType
::
JobsToTransfer
;
return
Job
QueueType
::
JobsToTransfer
;
case
serializers
::
ArchiveJobStatus
::
AJS_Complete
:
throw
JobNotQueueable
(
"In ArchiveRequest::getJobQueueType(): Complete jobs are not queueable. They are finished and pend siblings completion."
);
case
serializers
::
ArchiveJobStatus
::
AJS_ToReportForTransfer
:
// We should report a success...
return
QueueType
::
JobsToReport
;
return
Job
QueueType
::
JobsToReport
;
case
serializers
::
ArchiveJobStatus
::
AJS_ToReportForFailure
:
// We should report a failure. The report queue can be shared.
return
QueueType
::
JobsToReport
;
return
Job
QueueType
::
JobsToReport
;
case
serializers
::
ArchiveJobStatus
::
AJS_Failed
:
return
QueueType
::
FailedJobs
;
return
Job
QueueType
::
FailedJobs
;
case
serializers
::
ArchiveJobStatus
::
AJS_Abandoned
:
throw
JobNotQueueable
(
"In ArchiveRequest::getJobQueueType(): Abandoned jobs are not queueable. They are finished and pend siblings completion."
);
}
...
...
@@ -357,7 +360,7 @@ void ArchiveRequest::garbageCollect(const std::string &presumedOwner, AgentRefer
// recreated (this will be done by helper).
ArchiveQueue
aq
(
m_objectStore
);
ScopedExclusiveLock
aql
;
Helpers
::
getLockedAndFetchedQueue
<
ArchiveQueue
>
(
aq
,
aql
,
agentReference
,
j
->
tapepool
(),
getQueueType
(
status
),
lc
);
Helpers
::
getLockedAndFetched
Job
Queue
<
ArchiveQueue
>
(
aq
,
aql
,
agentReference
,
j
->
tapepool
(),
getQueueType
(
status
),
lc
);
queueObject
=
aq
.
getAddressIfSet
();
ArchiveRequest
::
JobDump
jd
;
jd
.
copyNb
=
j
->
copynb
();
...
...
@@ -629,21 +632,27 @@ std::string ArchiveRequest::getJobOwner(uint16_t copyNumber) {
return
j
->
owner
();
}
QueueType
ArchiveRequest
::
getQueueType
(
const
serializers
::
ArchiveJobStatus
&
status
)
{
//------------------------------------------------------------------------------
// ArchiveRequest::getQueueType()
//------------------------------------------------------------------------------
JobQueueType
ArchiveRequest
::
getQueueType
(
const
serializers
::
ArchiveJobStatus
&
status
)
{
using
serializers
::
ArchiveJobStatus
;
switch
(
status
)
{
case
ArchiveJobStatus
::
AJS_ToTransfer
:
return
QueueType
::
JobsToTransfer
;
return
Job
QueueType
::
JobsToTransfer
;
case
ArchiveJobStatus
::
AJS_ToReportForTransfer
:
case
ArchiveJobStatus
::
AJS_ToReportForFailure
:
return
QueueType
::
JobsToReport
;
return
Job
QueueType
::
JobsToReport
;
case
ArchiveJobStatus
::
AJS_Failed
:
return
QueueType
::
FailedJobs
;
return
Job
QueueType
::
FailedJobs
;
default:
throw
cta
::
exception
::
Exception
(
"In ArchiveRequest::getQueueType(): invalid status for queueing."
);
}
}
//------------------------------------------------------------------------------
// ArchiveRequest::statusToString()
//------------------------------------------------------------------------------
std
::
string
ArchiveRequest
::
statusToString
(
const
serializers
::
ArchiveJobStatus
&
status
)
{
switch
(
status
)
{
case
serializers
::
ArchiveJobStatus
::
AJS_ToTransfer
:
...
...
objectstore/ArchiveRequest.hpp
View file @
f60b166e
...
...
@@ -24,7 +24,7 @@
#include
"common/dataStructures/MountPolicy.hpp"
#include
"common/dataStructures/UserIdentity.hpp"
#include
"common/dataStructures/ArchiveFile.hpp"
#include
"QueueType.hpp"
#include
"
Job
QueueType.hpp"
#include
"common/Timer.hpp"
#include
"common/optional.hpp"
#include
"ObjectOps.hpp"
...
...
@@ -98,7 +98,7 @@ public:
EnqueueingNextStep
addReportFailure
(
uint16_t
copyNumber
,
uint64_t
sessionId
,
const
std
::
string
&
failureReason
,
log
::
LogContext
&
lc
);
//< returns next step to take with the job
CTA_GENERATE_EXCEPTION_CLASS
(
JobNotQueueable
);
QueueType
getJobQueueType
(
uint16_t
copyNumber
);
Job
QueueType
getJobQueueType
(
uint16_t
copyNumber
);
CTA_GENERATE_EXCEPTION_CLASS
(
NoSuchJob
);
// Set a job ownership
void
setJobOwner
(
uint16_t
copyNumber
,
const
std
::
string
&
owner
);
...
...
@@ -162,7 +162,7 @@ public:
std
::
string
getJobOwner
(
uint16_t
copyNumber
);
// Utility to convert status to queue type
static
QueueType
getQueueType
(
const
serializers
::
ArchiveJobStatus
&
status
);
static
Job
QueueType
getQueueType
(
const
serializers
::
ArchiveJobStatus
&
status
);
// ===========================================================================
// TODO: ArchiveFile comes with extraneous information.
...
...
objectstore/CMakeLists.txt
View file @
f60b166e
...
...
@@ -70,13 +70,14 @@ add_library (ctaobjectstore SHARED
RetrieveQueueToTransferAlgorithms.cpp
RetrieveQueueToReportAlgorithms.cpp
RetrieveQueueFailedAlgorithms.cpp
QueueType.cpp
Job
QueueType.cpp
ArchiveRequest.cpp
RetrieveRequest.cpp
DriveRegister.cpp
DriveState.cpp
RepackIndex.cpp
RepackRequest.cpp
RepackQueueType.cpp
BackendVFS.cpp
BackendRados.cpp
BackendPopulator.cpp
...
...
objectstore/GarbageCollector.cpp
View file @
f60b166e
...
...
@@ -351,7 +351,7 @@ void GarbageCollector::OwnedObjectSorter::sortFetchedObjects(Agent& agent, std::
otherObjects
.
emplace_back
(
new
GenericObject
(
rr
->
getAddressIfSet
(),
objectStore
));
break
;
}
retrieveQueuesAndRequests
[
std
::
make_tuple
(
vid
,
QueueType
::
JobsToTransfer
)].
emplace_back
(
rr
);
retrieveQueuesAndRequests
[
std
::
make_tuple
(
vid
,
Job
QueueType
::
JobsToTransfer
)].
emplace_back
(
rr
);
log
::
ScopedParamContainer
params3
(
lc
);
// Find copyNb for logging
size_t
copyNb
=
std
::
numeric_limits
<
size_t
>::
max
();
...
...
@@ -389,7 +389,7 @@ void GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateArchiveJobs(Agent& a
// individual requeue operations, we limit the number of concurrently requeued objects to an
// arbitrary 500.
std
::
string
tapepool
;
QueueType
queueType
;
Job
QueueType
queueType
;
std
::
tie
(
tapepool
,
queueType
)
=
archiveQueueIdAndReqs
.
first
;
auto
&
requestsList
=
archiveQueueIdAndReqs
.
second
;
while
(
requestsList
.
size
())
{
...
...
@@ -508,7 +508,7 @@ void GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateRetrieveJobs(Agent&
// Then should hence not have changes since we pre-fetched them.
for
(
auto
&
retriveQueueIdAndReqs
:
retrieveQueuesAndRequests
)
{
std
::
string
vid
;
QueueType
queueType
;
Job
QueueType
queueType
;
std
::
tie
(
vid
,
queueType
)
=
retriveQueueIdAndReqs
.
first
;
auto
&
requestsList
=
retriveQueueIdAndReqs
.
second
;
while
(
requestsList
.
size
())
{
...
...
@@ -532,7 +532,7 @@ void GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateRetrieveJobs(Agent&
// Get the retrieve queue and add references to the jobs to it.
RetrieveQueue
rq
(
objectStore
);
ScopedExclusiveLock
rql
;
Helpers
::
getLockedAndFetchedQueue
<
RetrieveQueue
>
(
rq
,
rql
,
agentReference
,
vid
,
queueType
,
lc
);
Helpers
::
getLockedAndFetched
Job
Queue
<
RetrieveQueue
>
(
rq
,
rql
,
agentReference
,
vid
,
queueType
,
lc
);
queueLockFetchTime
=
t
.
secs
(
utils
::
Timer
::
resetCounter
);
auto
jobsSummary
=
rq
.
getJobsSummary
();
filesBefore
=
jobsSummary
.
files
;
...
...
objectstore/GarbageCollector.hpp
View file @
f60b166e
...
...
@@ -22,7 +22,7 @@
#include
"Agent.hpp"
#include
"AgentWatchdog.hpp"
#include
"AgentRegister.hpp"
#include
"QueueType.hpp"
#include
"
Job
QueueType.hpp"
#include
"common/log/LogContext.hpp"
/**
...
...
@@ -55,8 +55,8 @@ public:
/** Structure allowing the sorting of owned objects, so they can be requeued in batches,
* one batch per queue. */
struct
OwnedObjectSorter
{
std
::
map
<
std
::
tuple
<
std
::
string
,
QueueType
>
,
std
::
list
<
std
::
shared_ptr
<
ArchiveRequest
>>>
archiveQueuesAndRequests
;
std
::
map
<
std
::
tuple
<
std
::
string
,
QueueType
>
,
std
::
list
<
std
::
shared_ptr
<
RetrieveRequest
>>>
retrieveQueuesAndRequests
;
std
::
map
<
std
::
tuple
<
std
::
string
,
Job
QueueType
>
,
std
::
list
<
std
::
shared_ptr
<
ArchiveRequest
>>>
archiveQueuesAndRequests
;
std
::
map
<
std
::
tuple
<
std
::
string
,
Job
QueueType
>
,
std
::
list
<
std
::
shared_ptr
<
RetrieveRequest
>>>
retrieveQueuesAndRequests
;
std
::
list
<
std
::
shared_ptr
<
GenericObject
>>
otherObjects
;
/// Fill up the fetchedObjects with objects of interest.
void
fetchOwnedObjects
(
Agent
&
agent
,
std
::
list
<
std
::
shared_ptr
<
GenericObject
>>
&
fetchedObjects
,
Backend
&
objectStore
,
...
...
objectstore/GarbageCollectorTest.cpp
View file @
f60b166e
...
...
@@ -341,7 +341,7 @@ TEST(ObjectStore, GarbageCollectorArchiveRequest) {
re
.
fetch
();
std
::
stringstream
tapePoolName
;
tapePoolName
<<
"TapePool"
<<
i
;
tpAddr
[
i
]
=
re
.
addOrGetArchiveQueueAndCommit
(
tapePoolName
.
str
(),
agentRef
,
cta
::
objectstore
::
QueueType
::
JobsToTransfer
,
lc
);
tpAddr
[
i
]
=
re
.
addOrGetArchiveQueueAndCommit
(
tapePoolName
.
str
(),
agentRef
,
cta
::
objectstore
::
Job
QueueType
::
JobsToTransfer
,
lc
);
cta
::
objectstore
::
ArchiveQueue
aq
(
tpAddr
[
i
],
be
);
}
// Create the various ATFR's, stopping one step further each time.
...
...
@@ -463,7 +463,7 @@ TEST(ObjectStore, GarbageCollectorArchiveRequest) {
std
::
list
<
std
::
string
>
tapePools
=
{
"TapePool0"
,
"TapePool1"
};
for
(
auto
&
tp
:
tapePools
)
{
// Empty queue
cta
::
objectstore
::
ArchiveQueue
aq
(
re
.
getArchiveQueueAddress
(
tp
,
cta
::
objectstore
::
QueueType
::
JobsToTransfer
),
be
);
cta
::
objectstore
::
ArchiveQueue
aq
(
re
.
getArchiveQueueAddress
(
tp
,
cta
::
objectstore
::
Job
QueueType
::
JobsToTransfer
),
be
);
cta
::
objectstore
::
ScopedExclusiveLock
aql
(
aq
);
aq
.
fetch
();
std
::
list
<
std
::
string
>
ajtr
;
...
...
@@ -473,7 +473,7 @@ TEST(ObjectStore, GarbageCollectorArchiveRequest) {
aq
.
removeJobsAndCommit
(
ajtr
);
aql
.
release
();
// Remove queues from root
re
.
removeArchiveQueueAndCommit
(
tp
,
cta
::
objectstore
::
QueueType
::
JobsToTransfer
,
lc
);
re
.
removeArchiveQueueAndCommit
(
tp
,
cta
::
objectstore
::
Job
QueueType
::
JobsToTransfer
,
lc
);
}
ASSERT_NO_THROW
(
re
.
removeAgentRegisterAndCommit
(
lc
));
...
...
@@ -533,7 +533,7 @@ TEST(ObjectStore, GarbageCollectorRetrieveRequest) {
re
.
fetch
();
std
::
stringstream
vid
;
vid
<<
"Tape"
<<
i
;
tAddr
[
i
]
=
re
.
addOrGetRetrieveQueueAndCommit
(
vid
.
str
(),
agentRef
,
cta
::
objectstore
::
QueueType
::
JobsToTransfer
,
lc
);
tAddr
[
i
]
=
re
.
addOrGetRetrieveQueueAndCommit
(
vid
.
str
(),
agentRef
,
cta
::
objectstore
::
Job
QueueType
::
JobsToTransfer
,
lc
);
cta
::
objectstore
::
RetrieveQueue
rq
(
tAddr
[
i
],
be
);
}
// Create the various ATFR's, stopping one step further each time.
...
...
@@ -646,7 +646,7 @@ TEST(ObjectStore, GarbageCollectorRetrieveRequest) {
std
::
list
<
std
::
string
>
retrieveQueues
=
{
"Tape0"
,
"Tape1"
};
for
(
auto
&
vid
:
retrieveQueues
)
{
// Empty queue
cta
::
objectstore
::
RetrieveQueue
rq
(
re
.
getRetrieveQueueAddress
(
vid
,
cta
::
objectstore
::
QueueType
::
JobsToTransfer
),
be
);
cta
::
objectstore
::
RetrieveQueue
rq
(
re
.
getRetrieveQueueAddress
(
vid
,
cta
::
objectstore
::
Job
QueueType
::
JobsToTransfer
),
be
);
cta
::
objectstore
::
ScopedExclusiveLock
rql
(
rq
);
rq
.
fetch
();
std
::
list
<
std
::
string
>
jtrl
;
...
...
@@ -656,7 +656,7 @@ TEST(ObjectStore, GarbageCollectorRetrieveRequest) {
rq
.
removeJobsAndCommit
(
jtrl
);
rql
.
release
();
// Remove queues from root
re
.
removeRetrieveQueueAndCommit
(
vid
,
cta
::
objectstore
::
QueueType
::
JobsToTransfer
,
lc
);
re
.
removeRetrieveQueueAndCommit
(
vid
,
cta
::
objectstore
::
Job
QueueType
::
JobsToTransfer
,
lc
);
}
ASSERT_NO_THROW
(
re
.
removeAgentRegisterAndCommit
(
lc
));
...
...
objectstore/Helpers.cpp
View file @
f60b166e
...
...
@@ -35,9 +35,9 @@ namespace cta { namespace objectstore {
// Helpers::getLockedAndFetchedQueue <ArchiveQueue> ()
//------------------------------------------------------------------------------
template
<
>
void
Helpers
::
getLockedAndFetchedQueue
<
ArchiveQueue
>
(
ArchiveQueue
&
archiveQueue
,
void
Helpers
::
getLockedAndFetched
Job
Queue
<
ArchiveQueue
>
(
ArchiveQueue
&
archiveQueue
,
ScopedExclusiveLock
&
archiveQueueLock
,
AgentReference
&
agentReference
,
const
std
::
string
&
tapePool
,
QueueType
queueType
,
log
::
LogContext
&
lc
)
{
const
cta
::
optional
<
std
::
string
>
&
tapePool
,
Job
QueueType
queueType
,
log
::
LogContext
&
lc
)
{
// TODO: if necessary, we could use a singleton caching object here to accelerate
// lookups.
// Getting a locked AQ is the name of the game.
...
...
@@ -58,13 +58,13 @@ void Helpers::getLockedAndFetchedQueue<ArchiveQueue>(ArchiveQueue& archiveQueue,
re
.
fetchNoLock
();
rootFetchNoLockTime
=
t
.
secs
(
utils
::
Timer
::
resetCounter
);
try
{
archiveQueue
.
setAddress
(
re
.
getArchiveQueueAddress
(
tapePool
,
queueType
));
archiveQueue
.
setAddress
(
re
.
getArchiveQueueAddress
(
tapePool
.
value
()
,
queueType
));
}
catch
(
cta
::
exception
::
Exception
&
ex
)
{
ScopedExclusiveLock
rexl
(
re
);
rootRelockExclusiveTime
=
t
.
secs
(
utils
::
Timer
::
resetCounter
);
re
.
fetch
();
rootRefetchTime
=
t
.
secs
(
utils
::
Timer
::
resetCounter
);
archiveQueue
.
setAddress
(
re
.
addOrGetArchiveQueueAndCommit
(
tapePool
,
agentReference
,
queueType
,
lc
));
archiveQueue
.
setAddress
(
re
.
addOrGetArchiveQueueAndCommit
(
tapePool
.
value
()
,
agentReference
,
queueType
,
lc
));
addOrGetQueueandCommitTime
=
t
.
secs
(
utils
::
Timer
::
resetCounter
);
}
}
...
...
@@ -108,10 +108,10 @@ void Helpers::getLockedAndFetchedQueue<ArchiveQueue>(ArchiveQueue& archiveQueue,
re
.
fetch
();
rootRefetchTime
+=
t
.
secs
(
utils
::
Timer
::
resetCounter
);
try
{
re
.
removeArchiveQueueAndCommit
(
tapePool
,
queueType
,
lc
);
re
.
removeArchiveQueueAndCommit
(
tapePool
.
value
()
,
queueType
,
lc
);
rootQueueDereferenceTime
+=
t
.
secs
(
utils
::
Timer
::
resetCounter
);
log
::
ScopedParamContainer
params
(
lc
);
params
.
add
(
"tapepool"
,
tapePool
)
params
.
add
(
"tapepool"
,
tapePool
.
value
()
)
.
add
(
"queueObject"
,
archiveQueue
.
getAddressIfSet
())
.
add
(
"exceptionMsg"
,
ex
.
getMessageValue
());
lc
.
log
(
log
::
INFO
,
"In Helpers::getLockedAndFetchedQueue<ArchiveQueue>(): removed reference to gone archive queue from root entry."
);
...
...
@@ -144,7 +144,7 @@ void Helpers::getLockedAndFetchedQueue<ArchiveQueue>(ArchiveQueue& archiveQueue,
archiveQueue
.
resetAddress
();
throw
cta
::
exception
::
Exception
(
std
::
string
(
"In OStoreDB::getLockedAndFetchedArchiveQueue(): failed to find or create and lock archive queue after 5 retries for tapepool: "
)
+
tapePool
);
+
tapePool
.
value
()
);
}
...
...
@@ -152,9 +152,9 @@ void Helpers::getLockedAndFetchedQueue<ArchiveQueue>(ArchiveQueue& archiveQueue,
// Helpers::getLockedAndFetchedQueue <RetrieveQueue> ()
//------------------------------------------------------------------------------
template
<
>
void
Helpers
::
getLockedAndFetchedQueue
<
RetrieveQueue
>
(
RetrieveQueue
&
retrieveQueue
,
void
Helpers
::
getLockedAndFetched
Job
Queue
<
RetrieveQueue
>
(
RetrieveQueue
&
retrieveQueue
,
ScopedExclusiveLock
&
retrieveQueueLock
,
AgentReference
&
agentReference
,
const
std
::
string
&
vid
,
QueueType
queueType
,
log
::
LogContext
&
lc
)
{
const
cta
::
optional
<
std
::
string
>
&
vid
,
Job
QueueType
queueType
,
log
::
LogContext
&
lc
)
{
// TODO: if necessary, we could use a singleton caching object here to accelerate
// lookups.
// Getting a locked AQ is the name of the game.
...
...
@@ -175,13 +175,13 @@ void Helpers::getLockedAndFetchedQueue<RetrieveQueue>(RetrieveQueue& retrieveQue
re
.
fetchNoLock
();
rootFetchNoLockTime
=
t
.
secs
(
utils
::
Timer
::
resetCounter
);
try
{
retrieveQueue
.
setAddress
(
re
.
getRetrieveQueueAddress
(
vid
,
queueType
));
retrieveQueue
.
setAddress
(
re
.
getRetrieveQueueAddress
(
vid
.
value
()
,
queueType
));
}
catch
(
cta
::
exception
::
Exception
&
ex
)
{
ScopedExclusiveLock
rexl
(
re
);
rootRelockExclusiveTime
=
t
.
secs
(
utils
::
Timer
::
resetCounter
);
re
.
fetch
();
rootRefetchTime
=
t
.
secs
(
utils
::
Timer
::
resetCounter
);
retrieveQueue
.
setAddress
(
re
.
addOrGetRetrieveQueueAndCommit
(
vid
,
agentReference
,
queueType
,
lc
));
retrieveQueue
.
setAddress
(
re
.
addOrGetRetrieveQueueAndCommit
(
vid
.
value
()
,
agentReference
,
queueType
,
lc
));
addOrGetQueueandCommitTime
=
t
.
secs
(
utils
::
Timer
::
resetCounter
);
}
}
...
...
@@ -225,10 +225,10 @@ void Helpers::getLockedAndFetchedQueue<RetrieveQueue>(RetrieveQueue& retrieveQue
re
.
fetch
();
rootRefetchTime
+=
t
.
secs
(
utils
::
Timer
::
resetCounter
);
try
{
re
.
removeRetrieveQueueAndCommit
(
vid
,
queueType
,
lc
);
re
.
removeRetrieveQueueAndCommit
(
vid
.
value
()
,
queueType
,
lc
);
rootQueueDereferenceTime
+=
t
.
secs
(
utils
::
Timer
::
resetCounter
);
log
::
ScopedParamContainer
params
(
lc
);
params
.
add
(
"vid"
,
vid
)
params
.
add
(
"vid"
,
vid
.
value
()
)
.
add
(
"queueObject"
,
retrieveQueue
.
getAddressIfSet
())
.
add
(
"exceptionMsg"
,
ex
.
getMessageValue
());
lc
.
log
(
log
::
INFO
,
"In Helpers::getLockedAndFetchedQueue<RetrieveQueue>(): removed reference to gone retrieve queue from root entry."
);
...
...
@@ -261,7 +261,7 @@ void Helpers::getLockedAndFetchedQueue<RetrieveQueue>(RetrieveQueue& retrieveQue
retrieveQueue
.
resetAddress
();
throw
cta
::
exception
::
Exception
(
std
::
string
(
"In OStoreDB::getLockedAndFetchedRetrieveQueue(): failed to find or create and lock archive queue after 5 retries for vid: "
)
+
vid
);
+
vid
.
value
()
);
}
//------------------------------------------------------------------------------
...
...
@@ -412,7 +412,7 @@ std::list<SchedulerDatabase::RetrieveQueueStatistics> Helpers::getRetrieveQueueS
continue
;
std
::
string
rqAddr
;
try
{
std
::
string
rqAddr
=
re
.
getRetrieveQueueAddress
(
tf
.
second
.
vid
,
QueueType
::
JobsToTransfer
);
std
::
string
rqAddr
=
re
.
getRetrieveQueueAddress
(
tf
.
second
.
vid
,
Job
QueueType
::
JobsToTransfer
);
}
catch
(
cta
::
exception
::
Exception
&
)
{
ret
.
push_back
(
SchedulerDatabase
::
RetrieveQueueStatistics
());
ret
.
back
().
vid
=
tf
.
second
.
vid
;
...
...
objectstore/Helpers.hpp
View file @
f60b166e
...
...
@@ -23,7 +23,7 @@
#include
"common/threading/MutexLocker.hpp"
#include
"catalogue/Catalogue.hpp"
#include
"scheduler/OStoreDB/OStoreDB.hpp"
#include
"QueueType.hpp"
#include
"
Job
QueueType.hpp"
#include
<string>
#include
<set>
#include
<future>
...
...
@@ -52,9 +52,10 @@ public:
* @param tapePool or vid the name of the needed tape pool
*/
template
<
class
Queue
>
static
void
getLockedAndFetchedQueue
(
Queue
&
queue
,
static
void
getLockedAndFetched
Job
Queue
(
Queue
&
queue
,
ScopedExclusiveLock
&
queueLock
,
AgentReference
&
agentReference
,
const
std
::
string
&
tapePoolOrVid
,
QueueType
queueType
,
log
::
LogContext
&
lc
);
const
cta
::
optional
<
std
::
string
>
&
tapePoolOrVid
,
JobQueueType
queueType
,
log
::
LogContext
&
lc
);
CTA_GENERATE_EXCEPTION_CLASS
(
NoTapeAvailableForRetrieve
);
/**
...
...
objectstore/QueueType.cpp
→
objectstore/
Job
QueueType.cpp
View file @
f60b166e
...
...
@@ -16,17 +16,17 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include
"QueueType.hpp"
#include
"
Job
QueueType.hpp"
namespace
cta
{
namespace
objectstore
{
std
::
string
toString
(
QueueType
queueType
)
{
std
::
string
toString
(
Job
QueueType
queueType
)
{
switch
(
queueType
)
{
case
QueueType
::
FailedJobs
:
case
Job
QueueType
::
FailedJobs
:
return
"failedJobs"
;
case
QueueType
::
JobsToReport
:
case
Job
QueueType
::
JobsToReport
:
return
"jobsToReport"
;
case
QueueType
::
JobsToTransfer
:
case
Job
QueueType
::
JobsToTransfer
:
return
"jobsToTranfer"
;
default:
return
"Unknown queue type."
;
...
...
objectstore/JobQueueType.hpp
0 → 100644
View file @
f60b166e
/*
* The CERN Tape Archive (CTA) project
* Copyright (C) 2015 CERN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include
<string>
namespace
cta
{
namespace
objectstore
{
enum
class
JobQueueType
{
JobsToTransfer
,
FailedJobs
,
JobsToReport
};
std
::
string
toString
(
JobQueueType
queueType
);
}}
// namespace cta::objectstore
\ No newline at end of file
objectstore/RepackQueueType.cpp
0 → 100644
View file @
f60b166e
/*
* The CERN Tape Archive (CTA) project
* Copyright (C) 2015 CERN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include
"RepackQueueType.hpp"
#include
"common/exception/Exception.hpp"
namespace
cta
{
namespace
objectstore
{
std
::
string
toString
(
RepackQueueType
queueType
)
{
switch
(
queueType
)
{
case
RepackQueueType
::
Pending
:
return
"Pending"
;
case
RepackQueueType
::
ToExpand
:
return
"ToExpand"
;
}
throw
exception
::
Exception
(
"In toString(RepackQueueType): unexpected queue type."
);
}
}}
// namespace cta::objectstore
\ No newline at end of file
objectstore/QueueType.hpp
→
objectstore/
Repack
QueueType.hpp
View file @
f60b166e
...
...
@@ -21,6 +21,6 @@
#include
<string>
namespace
cta
{
namespace
objectstore
{
enum
class
QueueType
{
JobsToTransfer
,
FailedJobs
,
JobsToReport
};
std
::
string
toString
(
QueueType
queueType
);
}}
// namespace cta::objectstore
\ No newline at end of file
enum
class
RepackQueueType
{
Pending
,
ToExpand
};
std
::
string
toString
(
RepackQueueType
queueType
);
}}
// namespace cta::objectstore
objectstore/RetrieveQueueAlgorithms.hpp
View file @
f60b166e
...
...
@@ -32,6 +32,8 @@ struct ContainerTraits<RetrieveQueue,C>
ContainerSummary
(
const
RetrieveQueue
::
JobsSummary
&
c
)
:
RetrieveQueue
::
JobsSummary
()
{}
void
addDeltaToLog
(
const
ContainerSummary
&
,
log
::
ScopedParamContainer
&
)
const
;
};
typedef
cta
::
objectstore
::
JobQueueType
QueueType
;
struct
InsertedElement
{
std
::
unique_ptr
<
RetrieveRequest
>
retrieveRequest
;
...
...
@@ -206,7 +208,7 @@ void ContainerTraits<RetrieveQueue,C>::