Skip to content
GitLab
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
dCache
cta
Commits
1b4e77f1
Commit
1b4e77f1
authored
Jul 07, 2016
by
Eric Cano
Browse files
Partial implementation of the retrieve queue handling in object store.
Implementation of the top level retrieve request queueing in Scheduler.
parent
db12d877
Changes
33
Hide whitespace changes
Inline
Side-by-side
common/CMakeLists.txt
View file @
1b4e77f1
...
...
@@ -76,6 +76,7 @@ set (COMMON_LIB_SRC_FILES
exception/Errnum.cpp
exception/Exception.cpp
exception/UserError.cpp
exception/NonRetryableError.cpp
log/DummyLogger.cpp
log/LogContext.cpp
log/Logger.cpp
...
...
common/dataStructures/RetrieveRequest.cpp
View file @
1b4e77f1
...
...
@@ -41,7 +41,7 @@ bool RetrieveRequest::operator==(const RetrieveRequest &rhs) const {
&&
diskFileInfo
==
rhs
.
diskFileInfo
&&
diskpoolName
==
rhs
.
diskpoolName
&&
diskpoolThroughput
==
rhs
.
diskpoolThroughput
&&
creation
Log
==
rhs
.
creation
Log
;
&&
entry
Log
==
rhs
.
entry
Log
;
}
//------------------------------------------------------------------------------
...
...
@@ -61,7 +61,7 @@ std::ostream &operator<<(std::ostream &os, const RetrieveRequest &obj) {
<<
" diskFileInfo="
<<
obj
.
diskFileInfo
<<
" diskpoolName="
<<
obj
.
diskpoolName
<<
" diskpoolThroughput="
<<
obj
.
diskpoolThroughput
<<
" creationLog="
<<
obj
.
creation
Log
<<
")"
;
<<
" creationLog="
<<
obj
.
entry
Log
<<
")"
;
return
os
;
}
...
...
common/dataStructures/RetrieveRequest.hpp
View file @
1b4e77f1
...
...
@@ -48,7 +48,7 @@ struct RetrieveRequest {
DiskFileInfo
diskFileInfo
;
std
::
string
diskpoolName
;
uint64_t
diskpoolThroughput
;
EntryLog
creation
Log
;
EntryLog
entry
Log
;
};
// struct RetrieveRequest
...
...
common/dataStructures/TapeFile.hpp
View file @
1b4e77f1
...
...
@@ -47,11 +47,13 @@ struct TapeFile {
* The position of the file on tape in the form of its file sequence
* number.
*/
// TODO: could be modified to match SCSI nomenclature (Logical file identifier), delta factor 3 between our files and tape's
uint64_t
fSeq
;
/**
* The position of the file on tape in the form of its logical block
* identifier.
*/
// TODO: change denomination to match SCSI nomenclature (logical object identifier).
uint64_t
blockId
;
/**
* The compressed size of the tape file in bytes. In other words the
...
...
common/exception/NonRetryableError.cpp
0 → 100644
View file @
1b4e77f1
/*
* The CERN Tape Archive (CTA) project
* Copyright (C) 2015 CERN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include
"common/exception/NonRetryableError.hpp"
namespace
cta
{
namespace
exception
{
//------------------------------------------------------------------------------
// constructor
//------------------------------------------------------------------------------
NonRetryableError
::
NonRetryableError
(
const
std
::
string
&
context
,
const
bool
embedBacktrace
)
:
Exception
(
context
,
embedBacktrace
)
{
}
}
// namespace catalogue
}
// namespace cta
common/exception/NonRetryableError.hpp
0 → 100644
View file @
1b4e77f1
/*
* The CERN Tape Archive (CTA) project
* Copyright (C) 2015 CERN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include
"common/exception/Exception.hpp"
namespace
cta
{
namespace
exception
{
/**
* An exception class representing an application error that should not be retried.
*/
class
NonRetryableError
:
public
exception
::
Exception
{
public:
/**
* Constructor.
*
* @param context optional context string added to the message
* at initialisation time.
* @param embedBacktrace whether to embed a backtrace of where the
* exception was throw in the message
*/
NonRetryableError
(
const
std
::
string
&
context
=
""
,
const
bool
embedBacktrace
=
true
);
};
// class UserError
}
// namespace exception
}
// namespace cta
objectstore/ArchiveQueue.cpp
View file @
1b4e77f1
...
...
@@ -19,7 +19,7 @@
#include
"ArchiveQueue.hpp"
#include
"GenericObject.hpp"
#include
"ProtocolBuffersAlgorithms.hpp"
#include
"EntryLog.hpp"
#include
"EntryLog
SerDeser
.hpp"
#include
"RootEntry.hpp"
#include
"ValueCountMap.hpp"
#include
<json-c/json.h>
...
...
objectstore/ArchiveQueue.hpp
View file @
1b4e77f1
...
...
@@ -26,7 +26,7 @@
#include
"common/MountControl.hpp"
#include
"ArchiveRequest.hpp"
#include
"ArchiveRequest.hpp"
#include
"EntryLog.hpp"
#include
"EntryLog
SerDeser
.hpp"
#include
"Agent.hpp"
namespace
cta
{
namespace
objectstore
{
...
...
objectstore/ArchiveRequest.cpp
View file @
1b4e77f1
...
...
@@ -20,6 +20,7 @@
#include
"GenericObject.hpp"
#include
"ArchiveQueue.hpp"
#include
"common/dataStructures/EntryLog.hpp"
#include
"MountPolicySerDeser.hpp"
#include
<json-c/json.h>
namespace
cta
{
namespace
objectstore
{
...
...
@@ -207,11 +208,7 @@ uint64_t cta::objectstore::ArchiveRequest::getDiskpoolThroughput() {
//------------------------------------------------------------------------------
void
cta
::
objectstore
::
ArchiveRequest
::
setMountPolicy
(
const
cta
::
common
::
dataStructures
::
MountPolicy
&
mountPolicy
)
{
checkPayloadWritable
();
auto
payloadMountPolicy
=
m_payload
.
mutable_mountpolicy
();
payloadMountPolicy
->
set_name
(
mountPolicy
.
name
);
payloadMountPolicy
->
set_maxdrives
(
mountPolicy
.
maxDrivesAllowed
);
payloadMountPolicy
->
set_minrequestage
(
mountPolicy
.
archiveMinRequestAge
);
payloadMountPolicy
->
set_priority
(
mountPolicy
.
archivePriority
);
MountPolicySerDeser
(
mountPolicy
).
serialize
(
*
m_payload
.
mutable_mountpolicy
());
}
//------------------------------------------------------------------------------
...
...
@@ -219,13 +216,9 @@ void cta::objectstore::ArchiveRequest::setMountPolicy(const cta::common::dataStr
//------------------------------------------------------------------------------
cta
::
common
::
dataStructures
::
MountPolicy
cta
::
objectstore
::
ArchiveRequest
::
getMountPolicy
()
{
checkPayloadReadable
();
cta
::
common
::
dataStructures
::
MountPolicy
mountPolicy
;
auto
payloadMountPolicy
=
m_payload
.
mountpolicy
();
mountPolicy
.
name
=
payloadMountPolicy
.
name
();
mountPolicy
.
maxDrivesAllowed
=
payloadMountPolicy
.
maxdrives
();
mountPolicy
.
archiveMinRequestAge
=
payloadMountPolicy
.
minrequestage
();
mountPolicy
.
archivePriority
=
payloadMountPolicy
.
priority
();
return
mountPolicy
;
MountPolicySerDeser
mp
;
mp
.
deserialize
(
m_payload
.
mountpolicy
());
return
mp
;
}
//------------------------------------------------------------------------------
...
...
objectstore/ArchiveRequest.hpp
View file @
1b4e77f1
...
...
@@ -33,7 +33,7 @@ namespace cta { namespace objectstore {
class
Backend
;
class
Agent
;
class
GenericObject
;
class
EntryLog
;
class
EntryLog
SerDeser
;
class
ArchiveRequest
:
public
ObjectOps
<
serializers
::
ArchiveRequest
,
serializers
::
ArchiveRequest_t
>
{
public:
...
...
objectstore/BackendPopulator.cpp
View file @
1b4e77f1
...
...
@@ -31,7 +31,7 @@ BackendPopulator::BackendPopulator(cta::objectstore::Backend & be): m_backend(be
re
.
fetch
();
m_agent
.
generateName
(
"OStoreDBFactory"
);
m_agent
.
initialize
();
cta
::
objectstore
::
EntryLog
cl
(
"user0"
,
"systemhost"
,
time
(
NULL
));
cta
::
objectstore
::
EntryLog
SerDeser
cl
(
"user0"
,
"systemhost"
,
time
(
NULL
));
re
.
addOrGetAgentRegisterPointerAndCommit
(
m_agent
,
cl
);
rel
.
release
();
m_agent
.
insertAndRegisterSelf
();
...
...
objectstore/DiskFileInfoSerDeser.hpp
0 → 100644
View file @
1b4e77f1
/*
* The CERN Tape Archive (CTA) project
* Copyright (C) 2015 CERN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include
"common/UserIdentity.hpp"
#include
"objectstore/cta.pb.h"
#include
"common/dataStructures/DiskFileInfo.hpp"
#include
<string>
#include
<stdint.h>
#include
<limits>
namespace
cta
{
namespace
objectstore
{
/**
* A decorator class of scheduler's creation log adding serialization.
*/
class
DiskFileInfoSerDeser
:
public
cta
::
common
::
dataStructures
::
DiskFileInfo
{
public:
DiskFileInfoSerDeser
()
:
cta
::
common
::
dataStructures
::
DiskFileInfo
()
{}
DiskFileInfoSerDeser
(
const
cta
::
common
::
dataStructures
::
DiskFileInfo
&
dfi
)
:
cta
::
common
::
dataStructures
::
DiskFileInfo
(
dfi
)
{}
DiskFileInfoSerDeser
(
const
std
::
string
&
path
,
const
std
::
string
&
owner
,
const
std
::
string
&
group
,
const
std
::
string
&
recoveryBlob
)
:
cta
::
common
::
dataStructures
::
DiskFileInfo
()
{
this
->
path
=
path
;
this
->
owner
=
owner
;
this
->
group
=
group
;
this
->
recoveryBlob
=
recoveryBlob
;
}
operator
cta
::
common
::
dataStructures
::
DiskFileInfo
()
{
return
cta
::
common
::
dataStructures
::
DiskFileInfo
(
*
this
);
}
void
serialize
(
cta
::
objectstore
::
serializers
::
DiskFileInfo
&
osdfi
)
const
{
osdfi
.
set_path
(
path
);
osdfi
.
set_owner
(
owner
);
osdfi
.
set_group
(
group
);
osdfi
.
set_recoveryblob
(
recoveryBlob
);
}
void
deserialize
(
const
cta
::
objectstore
::
serializers
::
DiskFileInfo
&
osdfi
)
{
path
=
osdfi
.
path
();
owner
=
osdfi
.
owner
();
group
=
osdfi
.
group
();
recoveryBlob
=
osdfi
.
recoveryblob
();
}
};
}}
objectstore/DriveRegister.cpp
View file @
1b4e77f1
...
...
@@ -67,7 +67,6 @@ std::string cta::objectstore::DriveRegister::dump() {
json_object
*
creationlog_jot
=
json_object_new_object
();
json_object_object_add
(
creationlog_jot
,
"host"
,
json_object_new_string
(
i
->
creationlog
().
host
().
c_str
()));
json_object_object_add
(
creationlog_jot
,
"time"
,
json_object_new_int64
(
i
->
creationlog
().
time
()));
json_object_object_add
(
creationlog_jot
,
"comment"
,
json_object_new_string
(
i
->
creationlog
().
comment
().
c_str
()));
json_object_object_add
(
jot
,
"creationlog"
,
creationlog_jot
);
json_object
*
mounttype_jot
=
json_object_new_object
();
...
...
@@ -131,7 +130,7 @@ namespace {
}
void
cta
::
objectstore
::
DriveRegister
::
addDrive
(
const
std
::
string
&
driveName
,
const
std
::
string
&
logicalLibrary
,
const
EntryLog
&
creationLog
)
{
const
std
::
string
&
logicalLibrary
,
const
EntryLog
SerDeser
&
creationLog
)
{
//add logical library to the parameters
checkPayloadWritable
();
// Check that we are not trying to duplicate a drive
...
...
@@ -279,7 +278,7 @@ void cta::objectstore::DriveRegister::reportDriveStatus(const std::string& drive
if
(
!
alreadyInSessionStatuses
.
count
((
int
)
status
))
{
// TODO: the creation should be moved to another place to better logging
// Explicitely by scheduler? To be decided.
EntryLog
el
(
"name0"
,
""
,
reportTime
);
EntryLog
SerDeser
el
(
"name0"
,
""
,
reportTime
);
addDrive
(
driveName
,
logicalLibary
,
el
);
}
else
{
throw
NoSuchDrive
(
"In DriveRegister::reportDriveStatus(): No such drive"
);
...
...
objectstore/DriveRegister.hpp
View file @
1b4e77f1
...
...
@@ -28,7 +28,7 @@ namespace cta { namespace objectstore {
class
Backend
;
class
Agent
;
class
GenericObject
;
class
EntryLog
;
class
EntryLog
SerDeser
;
class
DriveRegister
:
public
ObjectOps
<
serializers
::
DriveRegister
,
serializers
::
DriveRegister_t
>
{
CTA_GENERATE_EXCEPTION_CLASS
(
DuplicateEntry
);
...
...
@@ -42,7 +42,7 @@ public:
// Drives management =========================================================
void
addDrive
(
const
std
::
string
&
driveName
,
const
std
::
string
&
logicalLibrary
,
const
EntryLog
&
creationLog
);
const
EntryLog
SerDeser
&
creationLog
);
void
removeDrive
(
const
std
::
string
&
name
);
private:
cta
::
MountType
::
Enum
deserializeMountType
(
serializers
::
MountType
);
...
...
objectstore/EntryLog.hpp
→
objectstore/EntryLog
SerDeser
.hpp
View file @
1b4e77f1
...
...
@@ -30,11 +30,12 @@ namespace cta { namespace objectstore {
/**
* A decorator class of scheduler's creation log adding serialization.
*/
class
EntryLog
:
public
cta
::
common
::
dataStructures
::
EntryLog
{
// TODO: generalize the mechanism to other structures.
class
EntryLogSerDeser
:
public
cta
::
common
::
dataStructures
::
EntryLog
{
public:
EntryLog
()
:
cta
::
common
::
dataStructures
::
EntryLog
()
{}
EntryLog
(
const
cta
::
common
::
dataStructures
::
EntryLog
&
el
)
:
cta
::
common
::
dataStructures
::
EntryLog
(
el
)
{}
EntryLog
(
const
std
::
string
&
un
,
const
std
::
string
&
hn
,
uint64_t
t
)
:
cta
::
common
::
dataStructures
::
EntryLog
()
{
EntryLog
SerDeser
()
:
cta
::
common
::
dataStructures
::
EntryLog
()
{}
EntryLog
SerDeser
(
const
cta
::
common
::
dataStructures
::
EntryLog
&
el
)
:
cta
::
common
::
dataStructures
::
EntryLog
(
el
)
{}
EntryLog
SerDeser
(
const
std
::
string
&
un
,
const
std
::
string
&
hn
,
uint64_t
t
)
:
cta
::
common
::
dataStructures
::
EntryLog
()
{
username
=
un
;
host
=
hn
;
time
=
t
;
...
...
@@ -42,12 +43,12 @@ public:
operator
cta
::
common
::
dataStructures
::
EntryLog
()
{
return
cta
::
common
::
dataStructures
::
EntryLog
(
*
this
);
}
void
serialize
(
cta
::
objectstore
::
serializers
::
Creation
Log
&
log
)
const
{
void
serialize
(
cta
::
objectstore
::
serializers
::
Entry
Log
&
log
)
const
{
log
.
set_username
(
username
);
log
.
set_host
(
host
);
log
.
set_time
(
time
);
}
void
deserialize
(
const
cta
::
objectstore
::
serializers
::
Creation
Log
&
log
)
{
void
deserialize
(
const
cta
::
objectstore
::
serializers
::
Entry
Log
&
log
)
{
username
=
log
.
username
();
host
=
log
.
host
();
time
=
log
.
time
();
...
...
objectstore/GarbageCollectorTest.cpp
View file @
1b4e77f1
...
...
@@ -27,7 +27,7 @@
#include
"DriveRegister.hpp"
#include
"ArchiveRequest.hpp"
#include
"ArchiveQueue.hpp"
#include
"EntryLog.hpp"
#include
"EntryLog
SerDeser
.hpp"
namespace
unitTests
{
...
...
@@ -42,7 +42,7 @@ TEST(ObjectStore, GarbageCollectorBasicFuctionnality) {
re
.
initialize
();
re
.
insert
();
// Create the agent register
cta
::
objectstore
::
EntryLog
el
(
"user0"
,
cta
::
objectstore
::
EntryLog
SerDeser
el
(
"user0"
,
"unittesthost"
,
time
(
NULL
));
cta
::
objectstore
::
ScopedExclusiveLock
rel
(
re
);
re
.
addOrGetAgentRegisterPointerAndCommit
(
agent
,
el
);
...
...
@@ -90,7 +90,7 @@ TEST(ObjectStore, GarbageCollectorRegister) {
re
.
initialize
();
re
.
insert
();
// Create the agent register
cta
::
objectstore
::
EntryLog
el
(
"user0"
,
cta
::
objectstore
::
EntryLog
SerDeser
el
(
"user0"
,
"unittesthost"
,
time
(
NULL
));
cta
::
objectstore
::
ScopedExclusiveLock
rel
(
re
);
re
.
addOrGetAgentRegisterPointerAndCommit
(
agent
,
el
);
...
...
@@ -147,7 +147,7 @@ TEST(ObjectStore, GarbageCollectorArchiveQueue) {
re
.
initialize
();
re
.
insert
();
// Create the agent register
cta
::
objectstore
::
EntryLog
el
(
"user0"
,
cta
::
objectstore
::
EntryLog
SerDeser
el
(
"user0"
,
"unittesthost"
,
time
(
NULL
));
cta
::
objectstore
::
ScopedExclusiveLock
rel
(
re
);
re
.
addOrGetAgentRegisterPointerAndCommit
(
agent
,
el
);
...
...
@@ -204,7 +204,7 @@ TEST(ObjectStore, GarbageCollectorDriveRegister) {
re
.
initialize
();
re
.
insert
();
// Create the agent register
cta
::
objectstore
::
EntryLog
el
(
"user0"
,
cta
::
objectstore
::
EntryLog
SerDeser
el
(
"user0"
,
"unittesthost"
,
time
(
NULL
));
cta
::
objectstore
::
ScopedExclusiveLock
rel
(
re
);
re
.
addOrGetAgentRegisterPointerAndCommit
(
agent
,
el
);
...
...
@@ -261,7 +261,7 @@ TEST(ObjectStore, GarbageCollectorArchiveRequest) {
re
.
initialize
();
re
.
insert
();
// Create the agent register
cta
::
objectstore
::
EntryLog
el
(
"user0"
,
cta
::
objectstore
::
EntryLog
SerDeser
el
(
"user0"
,
"unittesthost"
,
time
(
NULL
));
cta
::
objectstore
::
ScopedExclusiveLock
rel
(
re
);
re
.
addOrGetAgentRegisterPointerAndCommit
(
agent
,
el
);
...
...
objectstore/MountPolicySerDeser.hpp
0 → 100644
View file @
1b4e77f1
/*
* The CERN Tape Archive (CTA) project
* Copyright (C) 2015 CERN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include
"common/UserIdentity.hpp"
#include
"objectstore/cta.pb.h"
#include
"common/dataStructures/MountPolicy.hpp"
#include
<string>
#include
<stdint.h>
#include
<limits>
namespace
cta
{
namespace
objectstore
{
/**
* A decorator class of scheduler's creation log adding serialization.
*/
class
MountPolicySerDeser
:
public
cta
::
common
::
dataStructures
::
MountPolicy
{
public:
MountPolicySerDeser
()
:
cta
::
common
::
dataStructures
::
MountPolicy
()
{}
MountPolicySerDeser
(
const
cta
::
common
::
dataStructures
::
MountPolicy
&
mp
)
:
cta
::
common
::
dataStructures
::
MountPolicy
(
mp
)
{}
operator
cta
::
common
::
dataStructures
::
MountPolicy
()
{
return
cta
::
common
::
dataStructures
::
MountPolicy
(
*
this
);
}
void
serialize
(
cta
::
objectstore
::
serializers
::
MountPolicy
&
osmp
)
const
{
osmp
.
set_name
(
name
);
osmp
.
set_archivepriority
(
archivePriority
);
osmp
.
set_archiveminrequestage
(
archiveMinRequestAge
);
osmp
.
set_retrievepriority
(
retrievePriority
);
osmp
.
set_retieveminrequestage
(
retrieveMinRequestAge
);
osmp
.
set_maxdrivesallowed
(
maxDrivesAllowed
);
EntryLogSerDeser
(
creationLog
).
serialize
(
*
osmp
.
mutable_creationlog
());
EntryLogSerDeser
(
lastModificationLog
).
serialize
(
*
osmp
.
mutable_lastmodificationlog
());
osmp
.
set_comment
(
comment
);
}
void
deserialize
(
const
cta
::
objectstore
::
serializers
::
MountPolicy
&
osmp
)
{
name
=
osmp
.
name
();
archivePriority
=
osmp
.
archivepriority
();
archiveMinRequestAge
=
osmp
.
archiveminrequestage
();
retrievePriority
=
osmp
.
retrievepriority
();
retrieveMinRequestAge
=
osmp
.
retieveminrequestage
();
maxDrivesAllowed
=
osmp
.
maxdrivesallowed
();
EntryLogSerDeser
el
;
el
.
deserialize
(
osmp
.
creationlog
());
creationLog
=
el
;
el
.
deserialize
(
osmp
.
lastmodificationlog
());
lastModificationLog
=
el
;
comment
=
osmp
.
comment
();
}
};
}}
objectstore/RetrieveQueue.cpp
View file @
1b4e77f1
...
...
@@ -18,7 +18,7 @@
#include
"RetrieveQueue.hpp"
#include
"GenericObject.hpp"
#include
"EntryLog.hpp"
#include
"EntryLog
SerDeser
.hpp"
#include
<json-c/json.h>
cta
::
objectstore
::
RetrieveQueue
::
RetrieveQueue
(
const
std
::
string
&
address
,
Backend
&
os
)
:
...
...
@@ -105,7 +105,7 @@ void cta::objectstore::RetrieveQueue::addJob(const RetrieveRequest::JobDump& job
auto
*
j
=
m_payload
.
add_retrievejobs
();
j
->
set_address
(
retrieveToFileAddress
);
j
->
set_size
(
size
);
j
->
set_copynb
(
job
.
copyNb
);
j
->
set_copynb
(
job
.
tapeFile
.
copyNb
);
}
cta
::
objectstore
::
RetrieveQueue
::
JobsSummary
cta
::
objectstore
::
RetrieveQueue
::
getJobsSummary
()
{
...
...
@@ -129,20 +129,9 @@ auto cta::objectstore::RetrieveQueue::dumpAndFetchRetrieveRequests()
retrieveRequest
.
fetch
();
ret
.
push_back
(
RetrieveRequestDump
());
auto
&
retReq
=
ret
.
back
();
retReq
.
archiveFile
=
retrieveRequest
.
getArchiveFile
();
retReq
.
dstURL
=
retrieveRequest
.
getDstURL
();
retReq
.
entryLog
=
retrieveRequest
.
getEntryLog
();
// Find the copy number from the list of jobs
retReq
.
activeCopyNb
=
rj
->
copynb
();
auto
jl
=
retrieveRequest
.
dumpJobs
();
for
(
auto
j
=
jl
.
begin
();
j
!=
jl
.
end
();
j
++
)
{
retReq
.
tapeFiles
.
push_back
(
common
::
dataStructures
::
TapeFile
());
auto
&
retJob
=
retReq
.
tapeFiles
.
back
();
retJob
.
blockId
=
j
->
blockid
;
retJob
.
copyNb
=
j
->
copyNb
;
retJob
.
fSeq
=
j
->
fseq
;
retJob
.
vid
=
j
->
tape
;
}
retReq
.
retrieveRequest
=
retrieveRequest
.
getSchedulerRequest
();
retReq
.
criteria
=
retrieveRequest
.
getRetrieveFileQueueCriteria
();
retReq
.
activeCopyNb
=
retrieveRequest
.
getActiveCopyNumber
();
}
catch
(
cta
::
exception
::
Exception
&
)
{}
}
return
ret
;
...
...
objectstore/RetrieveRequest.cpp
View file @
1b4e77f1
...
...
@@ -18,7 +18,8 @@
#include
"RetrieveRequest.hpp"
#include
"GenericObject.hpp"
#include
"EntryLog.hpp"
#include
"EntryLogSerDeser.hpp"
#include
"DiskFileInfoSerDeser.hpp"
#include
"objectstore/cta.pb.h"
#include
<json-c/json.h>
...
...
@@ -43,25 +44,22 @@ void RetrieveRequest::initialize() {
m_payloadInterpreted
=
true
;
}
void
RetrieveRequest
::
addJob
(
const
cta
::
common
::
dataStructures
::
TapeFile
&
tapeFile
,
const
std
::
string
&
tapeaddress
)
{
void
RetrieveRequest
::
addJob
(
const
cta
::
common
::
dataStructures
::
TapeFile
&
tapeFile
,
uint16_t
maxRetiesWithinMount
,
uint16_t
maxTotalRetries
)
{
checkPayloadWritable
();
auto
*
j
=
m_payload
.
add_jobs
();
j
->
set_copynb
(
tapeFile
.
copyNb
);
j
->
set_status
(
serializers
::
RetrieveJobStatus
::
RJS_LinkingToTape
);
j
->
set_tape
(
tapeFile
.
vid
);
j
->
set_tapeaddress
(
tapeaddress
);
j
->
set_totalretries
(
0
);
j
->
set_retrieswithinmount
(
0
);
j
->
set_blockid
(
tapeFile
.
blockId
);
j
->
set_fseq
(
tapeFile
.
fSeq
);
auto
*
tf
=
m_payload
.
add_jobs
();
TapeFileSerDeser
(
tapeFile
).
serialize
(
*
tf
->
mutable_tapefile
());
tf
->
set_maxretrieswithinmount
(
maxRetiesWithinMount
);
tf
->
set_maxtotalretries
(
maxTotalRetries
);
tf
->
set_retrieswithinmount
(
0
);
tf
->
set_totalretries
(
0
);
tf
->
set_status
(
serializers
::
RetrieveJobStatus
::
RJS_Pending
);
}
bool
RetrieveRequest
::
setJobSuccessful
(
uint16_t
copyNumber
)
{
checkPayloadWritable
();
auto
*
jl
=
m_payload
.
mutable_jobs
();
for
(
auto
j
=
jl
->
begin
();
j
!=
jl
->
end
();
j
++
)
{
if
(
j
->
copynb
()
==
copyNumber
)
{
if
(
j
->
tapefile
().
copynb
()
==
copyNumber
)
{
j
->
set_status
(
serializers
::
RetrieveJobStatus
::
RJS_Complete
);
for
(
auto
j2
=
jl
->
begin
();
j2
!=
jl
->
end
();
j2
++
)
{
if
(
j2
->
status
()
!=
serializers
::
RetrieveJobStatus
::
RJS_Complete
&&
...
...
@@ -76,225 +74,141 @@ bool RetrieveRequest::setJobSuccessful(uint16_t copyNumber) {
//------------------------------------------------------------------------------
// set
ArchiveFile
// set
SchedulerRequest
//------------------------------------------------------------------------------
void
RetrieveRequest
::
set
ArchiveFile
(
const
cta
::
common
::
dataStructures
::
ArchiveFile
&
archiveFile
)
{
void
RetrieveRequest
::
set
SchedulerRequest
(
const
cta
::
common
::
dataStructures
::
RetrieveRequest
&
retrieveRequest
)
{
checkPayloadWritable
();
auto
*
af
=
m_payload
.
mutable_archivefile
();
af
->
set_checksumtype
(
archiveFile
.
checksumType
);
af
->
set_checksumvalue
(
archiveFile
.
checksumValue
);
af
->
set_fileid
(
archiveFile
.
archiveFileID
);
af
->
set_creationtime
(
archiveFile
.
creationTime
);
af
->
set_size
(
archiveFile
.
fileSize
);
auto
*
sr
=
m_payload
.
mutable_schedulerrequest
();
sr
->
mutable_requester
()
->
set_name
(
retrieveRequest
.
requester
.
name
);
sr
->
mutable_requester
()
->
set_group
(
retrieveRequest
.
requester
.
group
);
sr
->
set_archivefileid
(
retrieveRequest
.
archiveFileID
);
sr
->
set_dsturl
(
retrieveRequest
.
dstURL
);
DiskFileInfoSerDeser
dfisd
(
retrieveRequest
.
diskFileInfo
);
dfisd
.
serialize
(
*
sr
->
mutable_diskfileinfo
());
sr
->
set_diskpoolname
(
retrieveRequest
.
diskpoolName
);
sr
->
set_diskpoolthroughput
(
retrieveRequest
.
diskpoolThroughput
);
objectstore
::
EntryLogSerDeser
el
(
retrieveRequest
.
entryLog
);
el
.
serialize
(
*
sr
->
mutable_entrylog
());
}
//------------------------------------------------------------------------------
// get
ArchiveFileID
// get
SchedulerRequest
//------------------------------------------------------------------------------
cta
::
common
::
dataStructures
::
ArchiveFile
RetrieveRequest
::
get
ArchiveFile
()
{
cta
::
common
::
dataStructures
::
RetrieveRequest
RetrieveRequest
::
get
SchedulerRequest
()
{
checkPayloadReadable
();
common
::
dataStructures
::
ArchiveFile
ret
;
ret
.
archiveFileID
=
m_payload
.
archivefile
().
fileid
();
ret
.
checksumType
=
m_payload
.
archivefile
().
checksumtype
();
ret
.
checksumValue
=
m_payload
.
archivefile
().
checksumvalue
();
ret
.
creationTime
=
m_payload
.
archivefile
().
creationtime
();
ret
.
diskFileId
=
m_payload
.
archivefile
().
diskfileid
();
ret
.
diskInstance
=
m_payload
.
diskinstance
();
ret
.
diskFileInfo
.
recoveryBlob
=
m_payload
.
diskfileinfo
().
recoveryblob
();
ret
.
diskFileInfo
.
group
=
m_payload
.
diskfileinfo
().
group
();