Skip to content
GitLab
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
dCache
cta
Commits
7facf272
Commit
7facf272
authored
Feb 06, 2018
by
Eric Cano
Browse files
Implemented sharded retrieve queues.
parent
e510a1df
Changes
17
Expand all
Hide whitespace changes
Inline
Side-by-side
continuousintegration/buildtree_runner/vmBootstrap/bootstrapCTA.sh
View file @
7facf272
...
...
@@ -3,7 +3,7 @@
set
-x
echo
Getting CTA sources...
(
cd
~
;
git clone https://:@gitlab.cern.ch:8443/cta/CTA.git
)
(
cd
~
;
git clone https://:@gitlab.cern.ch:8443/cta/CTA.git
;
cd
CTA
;
git submodule update
--init
--recursive
)
echo
Creating
source
rpm
mkdir
-p
~/CTA-build-srpm
...
...
objectstore/ArchiveQueue.cpp
View file @
7facf272
...
...
@@ -80,7 +80,7 @@ bool ArchiveQueue::checkMapsAndShardsCoherency() {
uint64_t
bytesFromShardPointers
=
0
;
uint64_t
jobsExpectedFromShardsPointers
=
0
;
// Add up shard summaries
for
(
auto
&
aqs
:
m_payload
.
archivequeues
s
hards
())
{
for
(
auto
&
aqs
:
m_payload
.
archivequeueshards
())
{
bytesFromShardPointers
+=
aqs
.
shardbytescount
();
jobsExpectedFromShardsPointers
+=
aqs
.
shardjobscount
();
}
...
...
@@ -120,7 +120,7 @@ void ArchiveQueue::rebuild() {
priorityMap
.
clear
();
ValueCountMap
minArchiveRequestAgeMap
(
m_payload
.
mutable_minarchiverequestagemap
());
minArchiveRequestAgeMap
.
clear
();
for
(
auto
&
sa
:
m_payload
.
archivequeues
s
hards
())
{
for
(
auto
&
sa
:
m_payload
.
archivequeueshards
())
{
shards
.
emplace_back
(
ArchiveQueueShard
(
sa
.
address
(),
m_objectStore
));
shardsFetchers
.
emplace_back
(
shards
.
back
().
asyncLockfreeFetch
());
}
...
...
@@ -135,10 +135,10 @@ void ArchiveQueue::rebuild() {
(
*
sf
)
->
wait
();
}
catch
(
Backend
::
NoSuchObject
&
ex
)
{
// Remove the shard from the list
auto
aqs
=
m_payload
.
mutable_archivequeues
s
hards
()
->
begin
();
while
(
aqs
!=
m_payload
.
mutable_archivequeues
s
hards
()
->
end
())
{
auto
aqs
=
m_payload
.
mutable_archivequeueshards
()
->
begin
();
while
(
aqs
!=
m_payload
.
mutable_archivequeueshards
()
->
end
())
{
if
(
aqs
->
address
()
==
s
->
getAddressIfSet
())
{
aqs
=
m_payload
.
mutable_archivequeues
s
hards
()
->
erase
(
aqs
);
aqs
=
m_payload
.
mutable_archivequeueshards
()
->
erase
(
aqs
);
}
else
{
aqs
++
;
}
...
...
@@ -161,11 +161,11 @@ void ArchiveQueue::rebuild() {
totalJobs
+=
jobs
;
totalBytes
+=
size
;
// And store the value in the shard pointers.
auto
maqs
=
m_payload
.
mutable_archivequeues
s
hards
();
for
(
auto
&
aqs
:
*
maqs
)
{
if
(
aqs
.
address
()
==
s
->
getAddressIfSet
())
{
aqs
.
set_shardjobscount
(
jobs
);
aqs
.
set_shardbytescount
(
size
);
auto
maqs
=
m_payload
.
mutable_archivequeueshards
();
for
(
auto
&
aqs
p
:
*
maqs
)
{
if
(
aqs
p
.
address
()
==
s
->
getAddressIfSet
())
{
aqs
p
.
set_shardjobscount
(
jobs
);
aqs
p
.
set_shardbytescount
(
size
);
goto
shardUpdated
;
}
}
...
...
@@ -199,7 +199,7 @@ void ArchiveQueue::rebuild() {
bool
ArchiveQueue
::
isEmpty
()
{
checkPayloadReadable
();
// Check we have no archive jobs pending
if
(
m_payload
.
archivequeues
s
hards_size
())
if
(
m_payload
.
archivequeueshards_size
())
return
false
;
// If we made it to here, it seems the pool is indeed empty.
return
true
;
...
...
@@ -272,9 +272,9 @@ void ArchiveQueue::addJobsAndCommit(std::list<JobToAdd> & jobsToAdd, AgentRefere
ArchiveQueueShard
aqs
(
m_objectStore
);
serializers
::
ArchiveQueueShardPointer
*
aqsp
=
nullptr
;
bool
newShard
=
false
;
uint64_t
shardCount
=
m_payload
.
archivequeues
s
hards_size
();
if
(
shardCount
&&
m_payload
.
archivequeues
s
hards
(
shardCount
-
1
).
shardjobscount
()
<
c_maxShardSize
)
{
auto
&
shardPointer
=
m_payload
.
archivequeues
s
hards
(
shardCount
-
1
);
uint64_t
shardCount
=
m_payload
.
archivequeueshards_size
();
if
(
shardCount
&&
m_payload
.
archivequeueshards
(
shardCount
-
1
).
shardjobscount
()
<
c_maxShardSize
)
{
auto
&
shardPointer
=
m_payload
.
archivequeueshards
(
shardCount
-
1
);
aqs
.
setAddress
(
shardPointer
.
address
());
// include-locking does not check existence of the object in the object store.
// we will find out on fetch. If we fail, we have to rebuild.
...
...
@@ -310,11 +310,11 @@ void ArchiveQueue::addJobsAndCommit(std::list<JobToAdd> & jobsToAdd, AgentRefere
continue
;
}
// The shard looks good. We will now proceed with the addition of individual jobs.
aqsp
=
m_payload
.
mutable_archivequeues
s
hards
(
shardCount
-
1
);
aqsp
=
m_payload
.
mutable_archivequeueshards
(
shardCount
-
1
);
}
else
{
// We need a new shard. Just add it (in memory).
newShard
=
true
;
aqsp
=
m_payload
.
mutable
_archivequeues
s
hards
()
->
Add
()
;
aqsp
=
m_payload
.
add
_archivequeueshards
();
// Create the shard in memory.
std
::
stringstream
shardName
;
shardName
<<
"ArchiveQueueShard-"
<<
m_payload
.
tapepool
();
...
...
@@ -393,7 +393,7 @@ ArchiveQueue::AdditionSummary ArchiveQueue::addJobsIfNecessaryAndCommit(std::lis
std
::
list
<
ArchiveQueueShard
>
shards
;
std
::
list
<
std
::
unique_ptr
<
ArchiveQueueShard
::
AsyncLockfreeFetcher
>>
shardsFetchers
;
for
(
auto
&
sa
:
m_payload
.
archivequeues
s
hards
())
{
for
(
auto
&
sa
:
m_payload
.
archivequeueshards
())
{
shards
.
emplace_back
(
ArchiveQueueShard
(
sa
.
address
(),
m_objectStore
));
shardsFetchers
.
emplace_back
(
shards
.
back
().
asyncLockfreeFetch
());
}
...
...
@@ -447,7 +447,7 @@ void ArchiveQueue::removeJobsAndCommit(const std::list<std::string>& jobsToRemov
// The jobs are expected to be removed from the front shards first.
// Remove jobs until there are no more jobs or no more shards.
ssize_t
shardIndex
=
0
;
auto
*
mutableArchiveQueueShards
=
m_payload
.
mutable_archivequeues
s
hards
();
auto
*
mutableArchiveQueueShards
=
m_payload
.
mutable_archivequeueshards
();
while
(
localJobsToRemove
.
size
()
&&
shardIndex
<
mutableArchiveQueueShards
->
size
())
{
auto
*
shardPointer
=
mutableArchiveQueueShards
->
Mutable
(
shardIndex
);
// Get hold of the shard
...
...
@@ -489,7 +489,7 @@ void ArchiveQueue::removeJobsAndCommit(const std::list<std::string>& jobsToRemov
for
(
auto
i
=
shardIndex
;
i
<
mutableArchiveQueueShards
->
size
()
-
1
;
i
++
)
{
mutableArchiveQueueShards
->
SwapElements
(
i
,
i
+
1
);
}
m_payload
.
mutable
_a
rchive
q
ueue
ss
hards
()
->
RemoveLast
();
mutable
A
rchive
Q
ueue
S
hards
->
RemoveLast
();
}
// We should also trim the removed jobs from our list.
localJobsToRemove
.
remove_if
(
...
...
@@ -512,7 +512,7 @@ auto ArchiveQueue::dumpJobs() -> std::list<JobDump> {
std
::
list
<
JobDump
>
ret
;
std
::
list
<
ArchiveQueueShard
>
shards
;
std
::
list
<
std
::
unique_ptr
<
ArchiveQueueShard
::
AsyncLockfreeFetcher
>>
shardsFetchers
;
for
(
auto
&
sa
:
m_payload
.
archivequeues
s
hards
())
{
for
(
auto
&
sa
:
m_payload
.
archivequeueshards
())
{
shards
.
emplace_back
(
ArchiveQueueShard
(
sa
.
address
(),
m_objectStore
));
shardsFetchers
.
emplace_back
(
shards
.
back
().
asyncLockfreeFetch
());
}
...
...
@@ -538,7 +538,7 @@ auto ArchiveQueue::dumpJobs() -> std::list<JobDump> {
auto
ArchiveQueue
::
getCandidateList
(
uint64_t
maxBytes
,
uint64_t
maxFiles
,
std
::
set
<
std
::
string
>
archiveRequestsToSkip
)
->
CandidateJobList
{
checkPayloadReadable
();
CandidateJobList
ret
;
for
(
auto
&
aqsp
:
m_payload
.
archivequeues
s
hards
())
{
for
(
auto
&
aqsp
:
m_payload
.
archivequeueshards
())
{
// We need to go through all shard poiters unconditionnaly to count what is left (see else part)
if
(
ret
.
candidateBytes
<
maxBytes
&&
ret
.
candidateFiles
<
maxFiles
)
{
// Fetch the shard
...
...
objectstore/ArchiveQueueShard.cpp
View file @
7facf272
...
...
@@ -151,7 +151,7 @@ auto ArchiveQueueShard::getJobsSummary() -> JobsSummary {
uint64_t
ArchiveQueueShard
::
addJob
(
ArchiveQueue
::
JobToAdd
&
jobToAdd
)
{
checkPayloadWritable
();
auto
*
j
=
m_payload
.
mutable
_archivejobs
()
->
Add
()
;
auto
*
j
=
m_payload
.
add
_archivejobs
();
j
->
set_address
(
jobToAdd
.
archiveRequestAddress
);
j
->
set_size
(
jobToAdd
.
fileSize
);
j
->
set_fileid
(
jobToAdd
.
archiveFileId
);
...
...
objectstore/CMakeLists.txt
View file @
7facf272
...
...
@@ -61,6 +61,7 @@ add_library (ctaobjectstore SHARED
ArchiveQueue.cpp
ArchiveQueueShard.cpp
RetrieveQueue.cpp
RetrieveQueueShard.cpp
ArchiveRequest.cpp
RetrieveRequest.cpp
DriveRegister.cpp
...
...
objectstore/GarbageCollector.cpp
View file @
7facf272
...
...
@@ -555,7 +555,7 @@ void GarbageCollector::cleanupDeadAgent(const std::string & address, log::LogCon
}
}
}
auto
addedJobs
=
rq
.
addJobsIfNecessaryAndCommit
(
jta
);
auto
addedJobs
=
rq
.
addJobsIfNecessaryAndCommit
(
jta
,
m_ourAgentReference
,
lc
);
queueProcessAndCommitTime
=
t
.
secs
(
utils
::
Timer
::
resetCounter
);
// If we have an unexpected failure, we will re-run the individual garbage collection. Before that,
// we will NOT remove the object from agent's ownership. This variable is declared a bit ahead so
...
...
objectstore/GarbageCollectorTest.cpp
View file @
7facf272
...
...
@@ -589,7 +589,7 @@ TEST(ObjectStore, GarbageCollectorRetrieveRequest) {
rq
.
fetch
();
std
::
list
<
cta
::
objectstore
::
RetrieveQueue
::
JobToAdd
>
jta
;
jta
.
push_back
({
1
,
rqc
.
archiveFile
.
tapeFiles
[
1
].
fSeq
,
rr
.
getAddressIfSet
(),
rqc
.
archiveFile
.
fileSize
,
rqc
.
mountPolicy
,
sReq
.
creationLog
.
time
});
rq
.
addJobsAndCommit
(
jta
);
rq
.
addJobsAndCommit
(
jta
,
agentRef
,
lc
);
}
if
(
pass
<
5
)
{
pass
++
;
continue
;
}
// - Still marked as not owned but referenced in the agent
...
...
objectstore/ObjectOps.hpp
View file @
7facf272
...
...
@@ -91,8 +91,9 @@ protected:
}
void
checkReadable
()
{
if
(
!
m_locksCount
&&
!
m_noLock
)
throw
NotLocked
(
"In ObjectOps::checkReadable: object not locked"
);
// We could still read from a fresh, not yet inserted object.
if
(
m_existingObject
&&
(
!
m_locksCount
&&
!
m_noLock
))
throw
NotLocked
(
"In ObjectOps::checkReadable: object not locked"
);
}
public:
...
...
objectstore/RetrieveQueue.cpp
View file @
7facf272
This diff is collapsed.
Click to expand it.
objectstore/RetrieveQueue.hpp
View file @
7facf272
...
...
@@ -37,6 +37,15 @@ public:
RetrieveQueue
(
GenericObject
&
go
);
void
initialize
(
const
std
::
string
&
vid
);
void
commit
();
private:
// Validates all summaries are in accordance with each other.
bool
checkMapsAndShardsCoherency
();
// Rebuild from shards if something goes wrong.
void
rebuild
();
public:
void
garbageCollect
(
const
std
::
string
&
presumedOwner
,
AgentReference
&
agentReference
,
log
::
LogContext
&
lc
,
cta
::
catalogue
::
Catalogue
&
catalogue
)
override
;
bool
isEmpty
();
...
...
@@ -48,29 +57,29 @@ public:
struct
JobToAdd
{
uint64_t
copyNb
;
uint64_t
fSeq
;
const
std
::
string
retrieveRequestAddress
;
uint64_t
s
ize
;
const
cta
::
common
::
dataStructures
::
MountPolicy
policy
;
std
::
string
retrieveRequestAddress
;
uint64_t
fileS
ize
;
cta
::
common
::
dataStructures
::
MountPolicy
policy
;
time_t
startTime
;
};
void
addJobsAndCommit
(
std
::
list
<
JobToAdd
>
&
jobsToAdd
);
void
addJobsAndCommit
(
std
::
list
<
JobToAdd
>
&
jobsToAdd
,
AgentReference
&
agentReference
,
log
::
LogContext
&
lc
);
/// This version will check for existence of the job in the queue before
// returns the count and sizes of actually added jobs (if any).
struct
AdditionSummary
{
uint64_t
files
=
0
;
uint64_t
bytes
=
0
;
};
AdditionSummary
addJobsIfNecessaryAndCommit
(
std
::
list
<
JobToAdd
>
&
jobsToAdd
);
AdditionSummary
addJobsIfNecessaryAndCommit
(
std
::
list
<
JobToAdd
>
&
jobsToAdd
,
AgentReference
&
agentReference
,
log
::
LogContext
&
lc
);
struct
JobsSummary
{
uint64_t
files
;
uint64_t
bytes
;
time_t
oldestJobStartTime
;
uint64_t
priority
;
uint64_t
min
Archi
veRequestAge
;
uint64_t
min
Retrie
veRequestAge
;
uint64_t
maxDrivesAllowed
;
};
JobsSummary
getJobsSummary
();
std
::
list
<
RetrieveRequestDump
>
dumpAndFetchRetrieveRequests
();
struct
JobDump
{
std
::
string
address
;
uint16_t
copyNb
;
...
...
@@ -88,9 +97,17 @@ public:
// which still should be removed from the queue. They will be disregarded from listing.
CandidateJobList
getCandidateList
(
uint64_t
maxBytes
,
uint64_t
maxFiles
,
std
::
set
<
std
::
string
>
retrieveRequestsToSkip
);
void
removeJobsAndCommit
(
const
std
::
list
<
std
::
string
>
&
request
sToRemove
);
void
removeJobsAndCommit
(
const
std
::
list
<
std
::
string
>
&
job
sToRemove
);
// -- Generic parameters
std
::
string
getVid
();
// The shard size. From experience, 100k is where we start to see performance difference,
// but nothing prevents us from using a smaller size.
// The performance will be roughly flat until the queue size reaches the square of this limit
// (meaning the queue object updates start to take too much time).
// with this current value of 25k, the performance should be roughly flat until 25k^2=625M.
static
const
uint64_t
c_maxShardSize
=
25000
;
};
}}
objectstore/RetrieveQueueShard.cpp
0 → 100644
View file @
7facf272
/*
* The CERN Tape Archive (CTA) project
* Copyright (C) 2015 CERN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include
"RetrieveQueueShard.hpp"
#include
"GenericObject.hpp"
#include
<google/protobuf/util/json_util.h>
namespace
cta
{
namespace
objectstore
{
RetrieveQueueShard
::
RetrieveQueueShard
(
Backend
&
os
)
:
ObjectOps
<
serializers
::
RetrieveQueueShard
,
serializers
::
RetrieveQueueShard_t
>
(
os
)
{
}
RetrieveQueueShard
::
RetrieveQueueShard
(
const
std
::
string
&
address
,
Backend
&
os
)
:
ObjectOps
<
serializers
::
RetrieveQueueShard
,
serializers
::
RetrieveQueueShard_t
>
(
os
,
address
)
{
}
RetrieveQueueShard
::
RetrieveQueueShard
(
GenericObject
&
go
)
:
ObjectOps
<
serializers
::
RetrieveQueueShard
,
serializers
::
RetrieveQueueShard_t
>
(
go
.
objectStore
())
{
// Here we transplant the generic object into the new object
go
.
transplantHeader
(
*
this
);
// And interpret the header.
getPayloadFromHeader
();
}
void
RetrieveQueueShard
::
rebuild
()
{
checkPayloadWritable
();
uint64_t
totalSize
=
0
;
for
(
auto
j
:
m_payload
.
retrievejobs
())
{
totalSize
+=
j
.
size
();
}
m_payload
.
set_retrievejobstotalsize
(
totalSize
);
}
std
::
string
RetrieveQueueShard
::
dump
()
{
checkPayloadReadable
();
google
::
protobuf
::
util
::
JsonPrintOptions
options
;
options
.
add_whitespace
=
true
;
options
.
always_print_primitive_fields
=
true
;
std
::
string
headerDump
;
google
::
protobuf
::
util
::
MessageToJsonString
(
m_payload
,
&
headerDump
,
options
);
return
headerDump
;
}
void
RetrieveQueueShard
::
garbageCollect
(
const
std
::
string
&
presumedOwner
,
AgentReference
&
agentReference
,
log
::
LogContext
&
lc
,
cta
::
catalogue
::
Catalogue
&
catalogue
)
{
throw
exception
::
Exception
(
"In RetrieveQueueShard::garbageCollect(): garbage collection should not be necessary for this type of object."
);
}
RetrieveQueue
::
CandidateJobList
RetrieveQueueShard
::
getCandidateJobList
(
uint64_t
maxBytes
,
uint64_t
maxFiles
,
std
::
set
<
std
::
string
>
retrieveRequestsToSkip
)
{
checkPayloadReadable
();
RetrieveQueue
::
CandidateJobList
ret
;
ret
.
remainingBytesAfterCandidates
=
m_payload
.
retrievejobstotalsize
();
ret
.
remainingFilesAfterCandidates
=
m_payload
.
retrievejobs_size
();
for
(
auto
&
j
:
m_payload
.
retrievejobs
())
{
if
(
!
retrieveRequestsToSkip
.
count
(
j
.
address
()))
{
ret
.
candidates
.
push_back
({
j
.
address
(),
(
uint16_t
)
j
.
copynb
(),
j
.
size
()});
ret
.
candidateBytes
+=
j
.
size
();
ret
.
candidateFiles
++
;
}
ret
.
remainingBytesAfterCandidates
-=
j
.
size
();
ret
.
remainingFilesAfterCandidates
--
;
if
(
ret
.
candidateBytes
>=
maxBytes
||
ret
.
candidateFiles
>=
maxFiles
)
break
;
}
return
ret
;
}
auto
RetrieveQueueShard
::
removeJobs
(
const
std
::
list
<
std
::
string
>&
jobsToRemove
)
->
RemovalResult
{
checkPayloadWritable
();
RemovalResult
ret
;
uint64_t
totalSize
=
m_payload
.
retrievejobstotalsize
();
auto
*
jl
=
m_payload
.
mutable_retrievejobs
();
for
(
auto
&
rrt
:
jobsToRemove
)
{
bool
found
=
false
;
do
{
found
=
false
;
// Push the found entry all the way to the end.
for
(
size_t
i
=
0
;
i
<
(
size_t
)
jl
->
size
();
i
++
)
{
if
(
jl
->
Get
(
i
).
address
()
==
rrt
)
{
found
=
true
;
const
auto
&
j
=
jl
->
Get
(
i
);
ret
.
removedJobs
.
emplace_back
(
JobInfo
());
ret
.
removedJobs
.
back
().
address
=
j
.
address
();
ret
.
removedJobs
.
back
().
copyNb
=
j
.
copynb
();
ret
.
removedJobs
.
back
().
maxDrivesAllowed
=
j
.
maxdrivesallowed
();
ret
.
removedJobs
.
back
().
minRetrieveRequestAge
=
j
.
minretrieverequestage
();
ret
.
removedJobs
.
back
().
priority
=
j
.
priority
();
ret
.
removedJobs
.
back
().
size
=
j
.
size
();
ret
.
removedJobs
.
back
().
startTime
=
j
.
starttime
();
ret
.
bytesRemoved
+=
j
.
size
();
totalSize
-=
j
.
size
();
ret
.
jobsRemoved
++
;
m_payload
.
set_retrievejobstotalsize
(
m_payload
.
retrievejobstotalsize
()
-
j
.
size
());
while
(
i
+
1
<
(
size_t
)
jl
->
size
())
{
jl
->
SwapElements
(
i
,
i
+
1
);
i
++
;
}
break
;
}
}
// and remove it
if
(
found
)
jl
->
RemoveLast
();
}
while
(
found
);
}
ret
.
bytesAfter
=
totalSize
;
ret
.
jobsAfter
=
m_payload
.
retrievejobs_size
();
return
ret
;
}
void
RetrieveQueueShard
::
initialize
(
const
std
::
string
&
owner
)
{
ObjectOps
<
serializers
::
RetrieveQueueShard
,
serializers
::
RetrieveQueueShard_t
>::
initialize
();
setOwner
(
owner
);
setBackupOwner
(
owner
);
m_payload
.
set_retrievejobstotalsize
(
0
);
m_payloadInterpreted
=
true
;
}
auto
RetrieveQueueShard
::
dumpJobs
()
->
std
::
list
<
JobInfo
>
{
checkPayloadReadable
();
std
::
list
<
JobInfo
>
ret
;
for
(
auto
&
j
:
m_payload
.
retrievejobs
())
{
ret
.
emplace_back
(
JobInfo
{
j
.
size
(),
j
.
address
(),
(
uint16_t
)
j
.
copynb
(),
j
.
priority
(),
j
.
minretrieverequestage
(),
j
.
maxdrivesallowed
(),
(
time_t
)
j
.
starttime
(),
j
.
fseq
()});
}
return
ret
;
}
std
::
list
<
RetrieveQueue
::
JobToAdd
>
RetrieveQueueShard
::
dumpJobsToAdd
()
{
checkPayloadReadable
();
std
::
list
<
RetrieveQueue
::
JobToAdd
>
ret
;
for
(
auto
&
j
:
m_payload
.
retrievejobs
())
{
ret
.
emplace_back
(
RetrieveQueue
::
JobToAdd
());
ret
.
back
().
copyNb
=
j
.
copynb
();
ret
.
back
().
fSeq
=
j
.
fseq
();
ret
.
back
().
fileSize
=
j
.
size
();
ret
.
back
().
policy
.
retrieveMinRequestAge
=
j
.
minretrieverequestage
();
ret
.
back
().
policy
.
maxDrivesAllowed
=
j
.
maxdrivesallowed
();
ret
.
back
().
policy
.
retrievePriority
=
j
.
priority
();
ret
.
back
().
startTime
=
j
.
starttime
();
ret
.
back
().
retrieveRequestAddress
=
j
.
address
();
}
return
ret
;
}
auto
RetrieveQueueShard
::
getJobsSummary
()
->
JobsSummary
{
checkPayloadReadable
();
JobsSummary
ret
;
ret
.
bytes
=
m_payload
.
retrievejobstotalsize
();
ret
.
jobs
=
m_payload
.
retrievejobs_size
();
ret
.
minFseq
=
m_payload
.
retrievejobs
(
0
).
fseq
();
ret
.
maxFseq
=
m_payload
.
retrievejobs
(
m_payload
.
retrievejobs_size
()
-
1
).
fseq
();
return
ret
;
}
uint64_t
RetrieveQueueShard
::
addJob
(
RetrieveQueue
::
JobToAdd
&
jobToAdd
)
{
checkPayloadWritable
();
auto
*
j
=
m_payload
.
add_retrievejobs
();
j
->
set_address
(
jobToAdd
.
retrieveRequestAddress
);
j
->
set_size
(
jobToAdd
.
fileSize
);
j
->
set_copynb
(
jobToAdd
.
copyNb
);
j
->
set_fseq
(
jobToAdd
.
fSeq
);
j
->
set_starttime
(
jobToAdd
.
startTime
);
j
->
set_maxdrivesallowed
(
jobToAdd
.
policy
.
maxDrivesAllowed
);
j
->
set_priority
(
jobToAdd
.
policy
.
retrievePriority
);
j
->
set_minretrieverequestage
(
jobToAdd
.
policy
.
retrieveMinRequestAge
);
j
->
set_starttime
(
jobToAdd
.
startTime
);
m_payload
.
set_retrievejobstotalsize
(
m_payload
.
retrievejobstotalsize
()
+
jobToAdd
.
fileSize
);
return
m_payload
.
retrievejobs_size
();
}
}}
\ No newline at end of file
objectstore/RetrieveQueueShard.hpp
0 → 100644
View file @
7facf272
/*
* The CERN Tape Archive (CTA) project
* Copyright (C) 2015 CERN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include
"RetrieveQueue.hpp"
namespace
cta
{
namespace
objectstore
{
class
RetrieveQueueShard
:
public
ObjectOps
<
serializers
::
RetrieveQueueShard
,
serializers
::
RetrieveQueueShard_t
>
{
public:
// Constructor with undefined address
RetrieveQueueShard
(
Backend
&
os
);
// Constructor
RetrieveQueueShard
(
const
std
::
string
&
address
,
Backend
&
os
);
// Upgrader form generic object
RetrieveQueueShard
(
GenericObject
&
go
);
// Forbid/hide base initializer
void
initialize
()
=
delete
;
// Initializer
void
initialize
(
const
std
::
string
&
owner
);
// dumper
std
::
string
dump
();
void
garbageCollect
(
const
std
::
string
&
presumedOwner
,
AgentReference
&
agentReference
,
log
::
LogContext
&
lc
,
cta
::
catalogue
::
Catalogue
&
catalogue
)
override
;
struct
JobInfo
{
uint64_t
size
;
std
::
string
address
;
uint16_t
copyNb
;
uint64_t
priority
;
uint64_t
minRetrieveRequestAge
;
uint64_t
maxDrivesAllowed
;
time_t
startTime
;
uint64_t
fSeq
;
};
std
::
list
<
JobInfo
>
dumpJobs
();
/** Variant function allowing shard to shard job transfer (not needed) archives,
* which do not split. */
std
::
list
<
RetrieveQueue
::
JobToAdd
>
dumpJobsToAdd
();
struct
JobsSummary
{
uint64_t
jobs
;
uint64_t
bytes
;
uint64_t
minFseq
;
uint64_t
maxFseq
;
};
JobsSummary
getJobsSummary
();
/**
* adds job, returns new size
*/
uint64_t
addJob
(
RetrieveQueue
::
JobToAdd
&
jobToAdd
);
struct
RemovalResult
{
uint64_t
jobsRemoved
=
0
;
uint64_t
jobsAfter
=
0
;
uint64_t
bytesRemoved
=
0
;
uint64_t
bytesAfter
=
0
;
std
::
list
<
JobInfo
>
removedJobs
;
};
/**
* Removes jobs from shard (and from the to remove list). Returns list of removed jobs.
*/
RemovalResult
removeJobs
(
const
std
::
list
<
std
::
string
>
&
jobsToRemove
);
RetrieveQueue
::
CandidateJobList
getCandidateJobList
(
uint64_t
maxBytes
,
uint64_t
maxFiles
,
std
::
set
<
std
::
string
>
retrieveRequestsToSkip
);
/** Re compute summaries in case they do not match the array content. */
void
rebuild
();
};
}}
// namespace cta::objectstore
\ No newline at end of file
objectstore/RetrieveRequest.cpp
View file @
7facf272
...
...
@@ -142,7 +142,7 @@ jobFound:;
std
::
list
<
RetrieveQueue
::
JobToAdd
>
jta
;
jta
.
push_back
({
bestTapeFile
->
copynb
(),
bestTapeFile
->
fseq
(),
getAddressIfSet
(),
m_payload
.
archivefile
().
filesize
(),
mp
,
(
signed
)
m_payload
.
schedulerrequest
().
entrylog
().
time
()});
rq
.
addJobsIfNecessaryAndCommit
(
jta
);
rq
.
addJobsIfNecessaryAndCommit
(
jta
,
agentReference
,
lc
);
auto
jobsSummary
=
rq
.
getJobsSummary
();
auto
queueUpdateTime
=
t
.
secs
(
utils
::
Timer
::
resetCounter
);
// We can now make the transition official
...
...
objectstore/cta.proto
View file @
7facf272
...
...
@@ -391,7 +391,7 @@ message ArchiveQueueShard {
message
ArchiveQueue
{
required
string
tapepool
=
10000
;
repeated
ArchiveQueueShardPointer
archivequeues
s
hards
=
10010
;
repeated
ArchiveQueueShardPointer
archivequeueshards
=
10010
;
repeated
ValueCountPair
prioritymap
=
10031
;
repeated
ValueCountPair
minarchiverequestagemap
=
10032
;
repeated
ValueCountPair
maxdrivesallowedmap
=
10033
;
...
...
@@ -409,15 +409,30 @@ message RetrieveJobPointer {
required
uint64
priority
=
3104
;
required
uint64
minretrieverequestage
=
3105
;
required
uint64
maxdrivesallowed
=
3106
;
required
uint64
starttime
=
3108
;
}
message
RetrieveQueueShardPointer
{
required
string
address
=
10400
;
required
uint64
shardjobscount
=
10401
;
required
uint64
shardbytescount
=
10402
;
required
uint64
minfseq
=
10403
;
required
uint64
maxfseq
=
10404
;
}
message
RetrieveQueueShard
{
repeated
RetrieveJobPointer
retrievejobs
=
10500
;
required
uint64
retrievejobstotalsize
=
10501
;
}
message
RetrieveQueue
{
required
string
vid
=
10100
;
repeated
Retrieve
Job
Pointer
retrieve
job
s
=
1011
0
;
repeated
Retrieve
QueueShard
Pointer
retrieve
queueshard
s
=
1011
1
;
repeated
ValueCountPair
prioritymap
=
10131
;
repeated
ValueCountPair
minretrieverequestagemap
=
10132
;
repeated
ValueCountPair
maxdrivesallowedmap
=
10133
;
required
uint64
retrievejobstotalsize
=
10140
;
required
uint64
retrievejobscount
=
10145
;
required
uint64
oldestjobcreationtime
=
10150
;