Skip to content
GitLab
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
dCache
cta
Commits
f2cf80df
Commit
f2cf80df
authored
Feb 05, 2019
by
Steven Murray
Browse files
The database autocommit mode is now contolled by rdbms::Conn()
parent
48d8ac7b
Changes
37
Hide whitespace changes
Inline
Side-by-side
catalogue/CMakeLists.txt
View file @
f2cf80df
...
...
@@ -142,19 +142,19 @@ target_link_libraries (ctainmemorycatalogueunittests
install
(
TARGETS ctainmemorycatalogueunittests DESTINATION usr/
${
CMAKE_INSTALL_LIBDIR
}
)
set
(
CATALOGUE_UNIT_TESTS_LIB_SRC_FILES
set
(
DBCONFIG_
CATALOGUE_UNIT_TESTS_LIB_SRC_FILES
CatalogueTest.cpp
DbConfigVersionOfCatalogueTest.cpp
)
add_library
(
ctacatalogueunittests SHARED
${
CATALOGUE_UNIT_TESTS_LIB_SRC_FILES
}
)
set_property
(
TARGET ctacatalogueunittests PROPERTY SOVERSION
"
${
CTA_SOVERSION
}
"
)
set_property
(
TARGET ctacatalogueunittests PROPERTY VERSION
"
${
CTA_LIBVERSION
}
"
)
add_library
(
cta
dbconfig
catalogueunittests SHARED
${
DBCONFIG_
CATALOGUE_UNIT_TESTS_LIB_SRC_FILES
}
)
set_property
(
TARGET cta
dbconfig
catalogueunittests PROPERTY SOVERSION
"
${
CTA_SOVERSION
}
"
)
set_property
(
TARGET cta
dbconfig
catalogueunittests PROPERTY VERSION
"
${
CTA_LIBVERSION
}
"
)
target_link_libraries
(
ctacatalogueunittests
target_link_libraries
(
cta
dbconfig
catalogueunittests
ctacatalogue
)
install
(
TARGETS ctacatalogueunittests DESTINATION usr/
${
CMAKE_INSTALL_LIBDIR
}
)
install
(
TARGETS cta
dbconfig
catalogueunittests DESTINATION usr/
${
CMAKE_INSTALL_LIBDIR
}
)
install
(
FILES cta-catalogue.conf.example
DESTINATION
${
CMAKE_INSTALL_SYSCONFDIR
}
/cta
...
...
catalogue/OracleCatalogue.cpp
View file @
f2cf80df
...
...
@@ -167,7 +167,7 @@ uint64_t OracleCatalogue::getNextArchiveFileId(rdbms::Conn &conn) {
"FROM "
"DUAL"
;
auto
stmt
=
conn
.
createStmt
(
sql
);
auto
rset
=
stmt
.
executeQuery
(
rdbms
::
AutocommitMode
::
AUTOCOMMIT_OFF
);
auto
rset
=
stmt
.
executeQuery
();
if
(
!
rset
.
next
())
{
throw
exception
::
Exception
(
std
::
string
(
"Result set is unexpectedly empty"
));
}
...
...
@@ -192,7 +192,7 @@ uint64_t OracleCatalogue::getNextStorageClassId(rdbms::Conn &conn) {
"FROM "
"DUAL"
;
auto
stmt
=
conn
.
createStmt
(
sql
);
auto
rset
=
stmt
.
executeQuery
(
rdbms
::
AutocommitMode
::
AUTOCOMMIT_OFF
);
auto
rset
=
stmt
.
executeQuery
();
if
(
!
rset
.
next
())
{
throw
exception
::
Exception
(
std
::
string
(
"Result set is unexpectedly empty"
));
}
...
...
@@ -249,7 +249,7 @@ common::dataStructures::Tape OracleCatalogue::selectTapeForUpdate(rdbms::Conn &c
"FOR UPDATE"
;
auto
stmt
=
conn
.
createStmt
(
sql
);
stmt
.
bindString
(
":VID"
,
vid
);
auto
rset
=
stmt
.
executeQuery
(
rdbms
::
AutocommitMode
::
AUTOCOMMIT_OFF
);
auto
rset
=
stmt
.
executeQuery
();
if
(
!
rset
.
next
())
{
throw
exception
::
Exception
(
std
::
string
(
"The tape with VID "
+
vid
+
" does not exist"
));
}
...
...
@@ -319,6 +319,8 @@ void OracleCatalogue::filesWrittenToTape(const std::set<TapeItemWrittenPointer>
auto
conn
=
m_connPool
.
getConn
();
rdbms
::
AutoRollback
autoRollback
(
conn
);
conn
.
setAutocommitMode
(
rdbms
::
AutocommitMode
::
AUTOCOMMIT_OFF
);
const
auto
tape
=
selectTapeForUpdate
(
conn
,
firstEvent
.
vid
);
uint64_t
expectedFSeq
=
tape
.
lastFSeq
+
1
;
uint64_t
totalCompressedBytesWritten
=
0
;
...
...
@@ -383,13 +385,13 @@ void OracleCatalogue::filesWrittenToTape(const std::set<TapeItemWrittenPointer>
if
(
fileEvents
.
empty
())
return
;
// Create the archive file entries, skipping those that already exist
idempotentBatchInsertArchiveFiles
(
conn
,
rdbms
::
AutocommitMode
::
AUTOCOMMIT_OFF
,
fileEvents
);
idempotentBatchInsertArchiveFiles
(
conn
,
fileEvents
);
insertTapeFileBatchIntoTempTable
(
conn
,
rdbms
::
AutocommitMode
::
AUTOCOMMIT_OFF
,
fileEvents
);
insertTapeFileBatchIntoTempTable
(
conn
,
fileEvents
);
// Verify that the archive file entries in the catalogue database agree with
// the tape file written events
const
auto
fileSizesAndChecksums
=
selectArchiveFileSizesAndChecksums
(
conn
,
rdbms
::
AutocommitMode
::
AUTOCOMMIT_OFF
,
fileEvents
);
const
auto
fileSizesAndChecksums
=
selectArchiveFileSizesAndChecksums
(
conn
,
fileEvents
);
for
(
const
auto
&
event
:
fileEvents
)
{
const
auto
fileSizeAndChecksumItor
=
fileSizesAndChecksums
.
find
(
event
.
archiveFileId
);
...
...
@@ -508,8 +510,7 @@ void OracleCatalogue::filesWrittenToTape(const std::set<TapeItemWrittenPointer>
//------------------------------------------------------------------------------
// idempotentBatchInsertArchiveFiles
//------------------------------------------------------------------------------
void
OracleCatalogue
::
idempotentBatchInsertArchiveFiles
(
rdbms
::
Conn
&
conn
,
const
rdbms
::
AutocommitMode
autocommitMode
,
const
std
::
set
<
TapeFileWritten
>
&
events
)
{
void
OracleCatalogue
::
idempotentBatchInsertArchiveFiles
(
rdbms
::
Conn
&
conn
,
const
std
::
set
<
TapeFileWritten
>
&
events
)
{
try
{
ArchiveFileBatch
archiveFileBatch
(
events
.
size
());
const
time_t
now
=
time
(
nullptr
);
...
...
@@ -664,7 +665,7 @@ void OracleCatalogue::idempotentBatchInsertArchiveFiles(rdbms::Conn &conn,
// selectArchiveFileSizeAndChecksum
//------------------------------------------------------------------------------
std
::
map
<
uint64_t
,
OracleCatalogue
::
FileSizeAndChecksum
>
OracleCatalogue
::
selectArchiveFileSizesAndChecksums
(
rdbms
::
Conn
&
conn
,
const
rdbms
::
AutocommitMode
autocommitMode
,
const
std
::
set
<
TapeFileWritten
>
&
events
)
{
rdbms
::
Conn
&
conn
,
const
std
::
set
<
TapeFileWritten
>
&
events
)
{
try
{
std
::
vector
<
oracle
::
occi
::
Number
>
archiveFileIdList
(
events
.
size
());
for
(
const
auto
&
event
:
events
)
{
...
...
@@ -683,7 +684,7 @@ std::map<uint64_t, OracleCatalogue::FileSizeAndChecksum> OracleCatalogue::select
"ARCHIVE_FILE.ARCHIVE_FILE_ID = TEMP_TAPE_FILE_BATCH.ARCHIVE_FILE_ID"
;
auto
stmt
=
conn
.
createStmt
(
sql
);
auto
rset
=
stmt
.
executeQuery
(
autocommitMode
);
auto
rset
=
stmt
.
executeQuery
();
std
::
map
<
uint64_t
,
FileSizeAndChecksum
>
fileSizesAndChecksums
;
while
(
rset
.
next
())
{
...
...
@@ -716,8 +717,7 @@ std::map<uint64_t, OracleCatalogue::FileSizeAndChecksum> OracleCatalogue::select
//------------------------------------------------------------------------------
// insertArchiveFilesIntoTempTable
//------------------------------------------------------------------------------
void
OracleCatalogue
::
insertTapeFileBatchIntoTempTable
(
rdbms
::
Conn
&
conn
,
const
rdbms
::
AutocommitMode
autocommitMode
,
const
std
::
set
<
TapeFileWritten
>
&
events
)
{
void
OracleCatalogue
::
insertTapeFileBatchIntoTempTable
(
rdbms
::
Conn
&
conn
,
const
std
::
set
<
TapeFileWritten
>
&
events
)
{
try
{
TempTapeFileBatch
tempTapeFileBatch
(
events
.
size
());
...
...
@@ -756,6 +756,202 @@ void OracleCatalogue::insertTapeFileBatchIntoTempTable(rdbms::Conn &conn,
}
}
//------------------------------------------------------------------------------
// deleteArchiveFile
//------------------------------------------------------------------------------
void
OracleCatalogue
::
deleteArchiveFile
(
const
std
::
string
&
diskInstanceName
,
const
uint64_t
archiveFileId
,
log
::
LogContext
&
lc
)
{
try
{
const
char
*
selectSql
=
"SELECT "
"ARCHIVE_FILE.ARCHIVE_FILE_ID AS ARCHIVE_FILE_ID,"
"ARCHIVE_FILE.DISK_INSTANCE_NAME AS DISK_INSTANCE_NAME,"
"ARCHIVE_FILE.DISK_FILE_ID AS DISK_FILE_ID,"
"ARCHIVE_FILE.DISK_FILE_PATH AS DISK_FILE_PATH,"
"ARCHIVE_FILE.DISK_FILE_USER AS DISK_FILE_USER,"
"ARCHIVE_FILE.DISK_FILE_GROUP AS DISK_FILE_GROUP,"
"ARCHIVE_FILE.DISK_FILE_RECOVERY_BLOB AS DISK_FILE_RECOVERY_BLOB,"
"ARCHIVE_FILE.SIZE_IN_BYTES AS SIZE_IN_BYTES,"
"ARCHIVE_FILE.CHECKSUM_TYPE AS CHECKSUM_TYPE,"
"ARCHIVE_FILE.CHECKSUM_VALUE AS CHECKSUM_VALUE,"
"STORAGE_CLASS.STORAGE_CLASS_NAME AS STORAGE_CLASS_NAME,"
"ARCHIVE_FILE.CREATION_TIME AS ARCHIVE_FILE_CREATION_TIME,"
"ARCHIVE_FILE.RECONCILIATION_TIME AS RECONCILIATION_TIME,"
"TAPE_FILE.VID AS VID,"
"TAPE_FILE.FSEQ AS FSEQ,"
"TAPE_FILE.BLOCK_ID AS BLOCK_ID,"
"TAPE_FILE.COMPRESSED_SIZE_IN_BYTES AS COMPRESSED_SIZE_IN_BYTES,"
"TAPE_FILE.COPY_NB AS COPY_NB,"
"TAPE_FILE.CREATION_TIME AS TAPE_FILE_CREATION_TIME "
"FROM "
"ARCHIVE_FILE "
"INNER JOIN STORAGE_CLASS ON "
"ARCHIVE_FILE.STORAGE_CLASS_ID = STORAGE_CLASS.STORAGE_CLASS_ID "
"LEFT OUTER JOIN TAPE_FILE ON "
"ARCHIVE_FILE.ARCHIVE_FILE_ID = TAPE_FILE.ARCHIVE_FILE_ID "
"WHERE "
"ARCHIVE_FILE.ARCHIVE_FILE_ID = :ARCHIVE_FILE_ID "
"FOR UPDATE"
;
utils
::
Timer
t
;
auto
conn
=
m_connPool
.
getConn
();
rdbms
::
AutoRollback
autoRollback
(
conn
);
conn
.
setAutocommitMode
(
rdbms
::
AutocommitMode
::
AUTOCOMMIT_OFF
);
const
auto
getConnTime
=
t
.
secs
(
utils
::
Timer
::
resetCounter
);
auto
selectStmt
=
conn
.
createStmt
(
selectSql
);
const
auto
createStmtTime
=
t
.
secs
();
selectStmt
.
bindUint64
(
":ARCHIVE_FILE_ID"
,
archiveFileId
);
t
.
reset
();
rdbms
::
Rset
selectRset
=
selectStmt
.
executeQuery
();
const
auto
selectFromArchiveFileTime
=
t
.
secs
();
std
::
unique_ptr
<
common
::
dataStructures
::
ArchiveFile
>
archiveFile
;
while
(
selectRset
.
next
())
{
if
(
nullptr
==
archiveFile
.
get
())
{
archiveFile
=
cta
::
make_unique
<
common
::
dataStructures
::
ArchiveFile
>
();
archiveFile
->
archiveFileID
=
selectRset
.
columnUint64
(
"ARCHIVE_FILE_ID"
);
archiveFile
->
diskInstance
=
selectRset
.
columnString
(
"DISK_INSTANCE_NAME"
);
archiveFile
->
diskFileId
=
selectRset
.
columnString
(
"DISK_FILE_ID"
);
archiveFile
->
diskFileInfo
.
path
=
selectRset
.
columnString
(
"DISK_FILE_PATH"
);
archiveFile
->
diskFileInfo
.
owner
=
selectRset
.
columnString
(
"DISK_FILE_USER"
);
archiveFile
->
diskFileInfo
.
group
=
selectRset
.
columnString
(
"DISK_FILE_GROUP"
);
archiveFile
->
diskFileInfo
.
recoveryBlob
=
selectRset
.
columnString
(
"DISK_FILE_RECOVERY_BLOB"
);
archiveFile
->
fileSize
=
selectRset
.
columnUint64
(
"SIZE_IN_BYTES"
);
archiveFile
->
checksumType
=
selectRset
.
columnString
(
"CHECKSUM_TYPE"
);
archiveFile
->
checksumValue
=
selectRset
.
columnString
(
"CHECKSUM_VALUE"
);
archiveFile
->
storageClass
=
selectRset
.
columnString
(
"STORAGE_CLASS_NAME"
);
archiveFile
->
creationTime
=
selectRset
.
columnUint64
(
"ARCHIVE_FILE_CREATION_TIME"
);
archiveFile
->
reconciliationTime
=
selectRset
.
columnUint64
(
"RECONCILIATION_TIME"
);
}
// If there is a tape file
if
(
!
selectRset
.
columnIsNull
(
"VID"
))
{
// Add the tape file to the archive file's in-memory structure
common
::
dataStructures
::
TapeFile
tapeFile
;
tapeFile
.
vid
=
selectRset
.
columnString
(
"VID"
);
tapeFile
.
fSeq
=
selectRset
.
columnUint64
(
"FSEQ"
);
tapeFile
.
blockId
=
selectRset
.
columnUint64
(
"BLOCK_ID"
);
tapeFile
.
compressedSize
=
selectRset
.
columnUint64
(
"COMPRESSED_SIZE_IN_BYTES"
);
tapeFile
.
copyNb
=
selectRset
.
columnUint64
(
"COPY_NB"
);
tapeFile
.
creationTime
=
selectRset
.
columnUint64
(
"TAPE_FILE_CREATION_TIME"
);
tapeFile
.
checksumType
=
archiveFile
->
checksumType
;
// Duplicated for convenience
tapeFile
.
checksumValue
=
archiveFile
->
checksumValue
;
// Duplicated for convenience
archiveFile
->
tapeFiles
[
selectRset
.
columnUint64
(
"COPY_NB"
)]
=
tapeFile
;
}
}
if
(
nullptr
==
archiveFile
.
get
())
{
log
::
ScopedParamContainer
spc
(
lc
);
spc
.
add
(
"fileId"
,
archiveFileId
);
lc
.
log
(
log
::
WARNING
,
"Ignoring request to delete archive file because it does not exist in the catalogue"
);
return
;
}
if
(
diskInstanceName
!=
archiveFile
->
diskInstance
)
{
log
::
ScopedParamContainer
spc
(
lc
);
spc
.
add
(
"fileId"
,
std
::
to_string
(
archiveFile
->
archiveFileID
))
.
add
(
"diskInstance"
,
archiveFile
->
diskInstance
)
.
add
(
"requestDiskInstance"
,
diskInstanceName
)
.
add
(
"diskFileId"
,
archiveFile
->
diskFileId
)
.
add
(
"diskFileInfo.path"
,
archiveFile
->
diskFileInfo
.
path
)
.
add
(
"diskFileInfo.owner"
,
archiveFile
->
diskFileInfo
.
owner
)
.
add
(
"diskFileInfo.group"
,
archiveFile
->
diskFileInfo
.
group
)
.
add
(
"fileSize"
,
std
::
to_string
(
archiveFile
->
fileSize
))
.
add
(
"checksumType"
,
archiveFile
->
checksumType
)
.
add
(
"checksumValue"
,
archiveFile
->
checksumValue
)
.
add
(
"creationTime"
,
std
::
to_string
(
archiveFile
->
creationTime
))
.
add
(
"reconciliationTime"
,
std
::
to_string
(
archiveFile
->
reconciliationTime
))
.
add
(
"storageClass"
,
archiveFile
->
storageClass
)
.
add
(
"getConnTime"
,
getConnTime
)
.
add
(
"createStmtTime"
,
createStmtTime
)
.
add
(
"selectFromArchiveFileTime"
,
selectFromArchiveFileTime
);
for
(
auto
it
=
archiveFile
->
tapeFiles
.
begin
();
it
!=
archiveFile
->
tapeFiles
.
end
();
it
++
)
{
std
::
stringstream
tapeCopyLogStream
;
tapeCopyLogStream
<<
"copy number: "
<<
it
->
first
<<
" vid: "
<<
it
->
second
.
vid
<<
" fSeq: "
<<
it
->
second
.
fSeq
<<
" blockId: "
<<
it
->
second
.
blockId
<<
" creationTime: "
<<
it
->
second
.
creationTime
<<
" compressedSize: "
<<
it
->
second
.
compressedSize
<<
" checksumType: "
<<
it
->
second
.
checksumType
//this shouldn't be here: repeated field
<<
" checksumValue: "
<<
it
->
second
.
checksumValue
//this shouldn't be here: repeated field
<<
" copyNb: "
<<
it
->
second
.
copyNb
;
//this shouldn't be here: repeated field
spc
.
add
(
"TAPE FILE"
,
tapeCopyLogStream
.
str
());
}
lc
.
log
(
log
::
WARNING
,
"Failed to delete archive file because the disk instance of the request does not match that "
"of the archived file"
);
exception
::
UserError
ue
;
ue
.
getMessage
()
<<
"Failed to delete archive file with ID "
<<
archiveFileId
<<
" because the disk instance of "
"the request does not match that of the archived file: archiveFileId="
<<
archiveFileId
<<
" path="
<<
archiveFile
->
diskFileInfo
.
path
<<
" requestDiskInstance="
<<
diskInstanceName
<<
" archiveFileDiskInstance="
<<
archiveFile
->
diskInstance
;
throw
ue
;
}
t
.
reset
();
{
const
char
*
const
sql
=
"DELETE FROM TAPE_FILE WHERE ARCHIVE_FILE_ID = :ARCHIVE_FILE_ID"
;
auto
stmt
=
conn
.
createStmt
(
sql
);
stmt
.
bindUint64
(
":ARCHIVE_FILE_ID"
,
archiveFileId
);
stmt
.
executeNonQuery
();
}
const
auto
deleteFromTapeFileTime
=
t
.
secs
(
utils
::
Timer
::
resetCounter
);
{
const
char
*
const
sql
=
"DELETE FROM ARCHIVE_FILE WHERE ARCHIVE_FILE_ID = :ARCHIVE_FILE_ID"
;
auto
stmt
=
conn
.
createStmt
(
sql
);
stmt
.
bindUint64
(
":ARCHIVE_FILE_ID"
,
archiveFileId
);
stmt
.
executeNonQuery
();
}
const
auto
deleteFromArchiveFileTime
=
t
.
secs
(
utils
::
Timer
::
resetCounter
);
conn
.
commit
();
const
auto
commitTime
=
t
.
secs
();
log
::
ScopedParamContainer
spc
(
lc
);
spc
.
add
(
"fileId"
,
std
::
to_string
(
archiveFile
->
archiveFileID
))
.
add
(
"diskInstance"
,
archiveFile
->
diskInstance
)
.
add
(
"diskFileId"
,
archiveFile
->
diskFileId
)
.
add
(
"diskFileInfo.path"
,
archiveFile
->
diskFileInfo
.
path
)
.
add
(
"diskFileInfo.owner"
,
archiveFile
->
diskFileInfo
.
owner
)
.
add
(
"diskFileInfo.group"
,
archiveFile
->
diskFileInfo
.
group
)
.
add
(
"fileSize"
,
std
::
to_string
(
archiveFile
->
fileSize
))
.
add
(
"checksumType"
,
archiveFile
->
checksumType
)
.
add
(
"checksumValue"
,
archiveFile
->
checksumValue
)
.
add
(
"creationTime"
,
std
::
to_string
(
archiveFile
->
creationTime
))
.
add
(
"reconciliationTime"
,
std
::
to_string
(
archiveFile
->
reconciliationTime
))
.
add
(
"storageClass"
,
archiveFile
->
storageClass
)
.
add
(
"getConnTime"
,
getConnTime
)
.
add
(
"createStmtTime"
,
createStmtTime
)
.
add
(
"selectFromArchiveFileTime"
,
selectFromArchiveFileTime
)
.
add
(
"deleteFromTapeFileTime"
,
deleteFromTapeFileTime
)
.
add
(
"deleteFromArchiveFileTime"
,
deleteFromArchiveFileTime
)
.
add
(
"commitTime"
,
commitTime
);
for
(
auto
it
=
archiveFile
->
tapeFiles
.
begin
();
it
!=
archiveFile
->
tapeFiles
.
end
();
it
++
)
{
std
::
stringstream
tapeCopyLogStream
;
tapeCopyLogStream
<<
"copy number: "
<<
it
->
first
<<
" vid: "
<<
it
->
second
.
vid
<<
" fSeq: "
<<
it
->
second
.
fSeq
<<
" blockId: "
<<
it
->
second
.
blockId
<<
" creationTime: "
<<
it
->
second
.
creationTime
<<
" compressedSize: "
<<
it
->
second
.
compressedSize
<<
" checksumType: "
<<
it
->
second
.
checksumType
//this shouldn't be here: repeated field
<<
" checksumValue: "
<<
it
->
second
.
checksumValue
//this shouldn't be here: repeated field
<<
" copyNb: "
<<
it
->
second
.
copyNb
;
//this shouldn't be here: repeated field
spc
.
add
(
"TAPE FILE"
,
tapeCopyLogStream
.
str
());
}
lc
.
log
(
log
::
INFO
,
"Archive file deleted from CTA catalogue"
);
}
catch
(
exception
::
UserError
&
)
{
throw
;
}
catch
(
exception
::
Exception
&
ex
)
{
ex
.
getMessage
().
str
(
std
::
string
(
__FUNCTION__
)
+
": "
+
ex
.
getMessage
().
str
());
throw
;
}
}
}
// namespace catalogue
}
// namespace cta
catalogue/OracleCatalogue.hpp
View file @
f2cf80df
...
...
@@ -92,6 +92,27 @@ public:
*/
void
filesWrittenToTape
(
const
std
::
set
<
TapeItemWrittenPointer
>
&
events
)
override
;
/**
* Deletes the specified archive file and its associated tape copies from the
* catalogue.
*
* Please note that the name of the disk instance is specified in order to
* prevent a disk instance deleting an archive file that belongs to another
* disk instance.
*
* Please note that this method is idempotent. If the file to be deleted does
* not exist in the CTA catalogue then this method returns without error.
*
* @param instanceName The name of the instance from where the deletion request
* originated
* @param archiveFileId The unique identifier of the archive file.
* @param lc The log context.
* @return The metadata of the deleted archive file including the metadata of
* the associated and also deleted tape copies.
*/
void
deleteArchiveFile
(
const
std
::
string
&
instanceName
,
const
uint64_t
archiveFileId
,
log
::
LogContext
&
lc
)
override
;
private:
/**
...
...
@@ -116,11 +137,9 @@ private:
* rows will be unique.
*
* @param conn The database connection.
* @param autocommitMode The autocommit mode of the SQL insert statement.
* @param events The tape file written events.
*/
void
idempotentBatchInsertArchiveFiles
(
rdbms
::
Conn
&
conn
,
const
rdbms
::
AutocommitMode
autocommitMode
,
const
std
::
set
<
TapeFileWritten
>
&
events
);
void
idempotentBatchInsertArchiveFiles
(
rdbms
::
Conn
&
conn
,
const
std
::
set
<
TapeFileWritten
>
&
events
);
/**
* The size and checksum of a file.
...
...
@@ -135,23 +154,20 @@ private:
* Returns the sizes and checksums of the specified archive files.
*
* @param conn The database connection.
* @param autocommitMode The autocommit mode of the SQL select statement.
* @param events The tape file written events that identify the archive files.
* @return A map from the identifier of each archive file to its size and checksum.
*/
std
::
map
<
uint64_t
,
FileSizeAndChecksum
>
selectArchiveFileSizesAndChecksums
(
rdbms
::
Conn
&
conn
,
const
rdbms
::
AutocommitMode
autocommitMode
,
const
std
::
set
<
TapeFileWritten
>
&
events
);
const
std
::
set
<
TapeFileWritten
>
&
events
);
/**
* Batch inserts rows into the TAPE_FILE_BATCH temporary table that correspond
* to the specified TapeFileWritten events.
*
* @param conn The database connection.
* @param autocommitMode The autocommit mode of the SQL insert statement.
* @param events The tape file written events.
*/
void
insertTapeFileBatchIntoTempTable
(
rdbms
::
Conn
&
conn
,
const
rdbms
::
AutocommitMode
autocommitMode
,
const
std
::
set
<
TapeFileWritten
>
&
events
);
void
insertTapeFileBatchIntoTempTable
(
rdbms
::
Conn
&
conn
,
const
std
::
set
<
TapeFileWritten
>
&
events
);
};
// class OracleCatalogue
...
...
catalogue/RdbmsCatalogue.cpp
View file @
f2cf80df
...
...
@@ -4712,199 +4712,6 @@ RequesterAndGroupMountPolicies RdbmsCatalogue::getMountPolicies(
}
}
//------------------------------------------------------------------------------
// deleteArchiveFile
//------------------------------------------------------------------------------
void
RdbmsCatalogue
::
deleteArchiveFile
(
const
std
::
string
&
diskInstanceName
,
const
uint64_t
archiveFileId
,
log
::
LogContext
&
lc
)
{
try
{
const
char
*
selectSql
=
"SELECT "
"ARCHIVE_FILE.ARCHIVE_FILE_ID AS ARCHIVE_FILE_ID,"
"ARCHIVE_FILE.DISK_INSTANCE_NAME AS DISK_INSTANCE_NAME,"
"ARCHIVE_FILE.DISK_FILE_ID AS DISK_FILE_ID,"
"ARCHIVE_FILE.DISK_FILE_PATH AS DISK_FILE_PATH,"
"ARCHIVE_FILE.DISK_FILE_USER AS DISK_FILE_USER,"
"ARCHIVE_FILE.DISK_FILE_GROUP AS DISK_FILE_GROUP,"
"ARCHIVE_FILE.DISK_FILE_RECOVERY_BLOB AS DISK_FILE_RECOVERY_BLOB,"
"ARCHIVE_FILE.SIZE_IN_BYTES AS SIZE_IN_BYTES,"
"ARCHIVE_FILE.CHECKSUM_TYPE AS CHECKSUM_TYPE,"
"ARCHIVE_FILE.CHECKSUM_VALUE AS CHECKSUM_VALUE,"
"STORAGE_CLASS.STORAGE_CLASS_NAME AS STORAGE_CLASS_NAME,"
"ARCHIVE_FILE.CREATION_TIME AS ARCHIVE_FILE_CREATION_TIME,"
"ARCHIVE_FILE.RECONCILIATION_TIME AS RECONCILIATION_TIME,"
"TAPE_FILE.VID AS VID,"
"TAPE_FILE.FSEQ AS FSEQ,"
"TAPE_FILE.BLOCK_ID AS BLOCK_ID,"
"TAPE_FILE.COMPRESSED_SIZE_IN_BYTES AS COMPRESSED_SIZE_IN_BYTES,"
"TAPE_FILE.COPY_NB AS COPY_NB,"
"TAPE_FILE.CREATION_TIME AS TAPE_FILE_CREATION_TIME "
"FROM "
"ARCHIVE_FILE "
"INNER JOIN STORAGE_CLASS ON "
"ARCHIVE_FILE.STORAGE_CLASS_ID = STORAGE_CLASS.STORAGE_CLASS_ID "
"LEFT OUTER JOIN TAPE_FILE ON "
"ARCHIVE_FILE.ARCHIVE_FILE_ID = TAPE_FILE.ARCHIVE_FILE_ID "
"WHERE "
"ARCHIVE_FILE.ARCHIVE_FILE_ID = :ARCHIVE_FILE_ID "
"FOR UPDATE"
;
utils
::
Timer
t
;
auto
conn
=
m_connPool
.
getConn
();
rdbms
::
AutoRollback
autoRollback
(
conn
);
const
auto
getConnTime
=
t
.
secs
(
utils
::
Timer
::
resetCounter
);
auto
selectStmt
=
conn
.
createStmt
(
selectSql
);
const
auto
createStmtTime
=
t
.
secs
();
selectStmt
.
bindUint64
(
":ARCHIVE_FILE_ID"
,
archiveFileId
);
t
.
reset
();
rdbms
::
Rset
selectRset
=
selectStmt
.
executeQuery
(
rdbms
::
AutocommitMode
::
AUTOCOMMIT_OFF
);
const
auto
selectFromArchiveFileTime
=
t
.
secs
();
std
::
unique_ptr
<
common
::
dataStructures
::
ArchiveFile
>
archiveFile
;
while
(
selectRset
.
next
())
{
if
(
nullptr
==
archiveFile
.
get
())
{
archiveFile
=
cta
::
make_unique
<
common
::
dataStructures
::
ArchiveFile
>
();
archiveFile
->
archiveFileID
=
selectRset
.
columnUint64
(
"ARCHIVE_FILE_ID"
);
archiveFile
->
diskInstance
=
selectRset
.
columnString
(
"DISK_INSTANCE_NAME"
);
archiveFile
->
diskFileId
=
selectRset
.
columnString
(
"DISK_FILE_ID"
);
archiveFile
->
diskFileInfo
.
path
=
selectRset
.
columnString
(
"DISK_FILE_PATH"
);
archiveFile
->
diskFileInfo
.
owner
=
selectRset
.
columnString
(
"DISK_FILE_USER"
);
archiveFile
->
diskFileInfo
.
group
=
selectRset
.
columnString
(
"DISK_FILE_GROUP"
);
archiveFile
->
diskFileInfo
.
recoveryBlob
=
selectRset
.
columnString
(
"DISK_FILE_RECOVERY_BLOB"
);
archiveFile
->
fileSize
=
selectRset
.
columnUint64
(
"SIZE_IN_BYTES"
);
archiveFile
->
checksumType
=
selectRset
.
columnString
(
"CHECKSUM_TYPE"
);
archiveFile
->
checksumValue
=
selectRset
.
columnString
(
"CHECKSUM_VALUE"
);
archiveFile
->
storageClass
=
selectRset
.
columnString
(
"STORAGE_CLASS_NAME"
);
archiveFile
->
creationTime
=
selectRset
.
columnUint64
(
"ARCHIVE_FILE_CREATION_TIME"
);
archiveFile
->
reconciliationTime
=
selectRset
.
columnUint64
(
"RECONCILIATION_TIME"
);
}
// If there is a tape file
if
(
!
selectRset
.
columnIsNull
(
"VID"
))
{
// Add the tape file to the archive file's in-memory structure
common
::
dataStructures
::
TapeFile
tapeFile
;
tapeFile
.
vid
=
selectRset
.
columnString
(
"VID"
);
tapeFile
.
fSeq
=
selectRset
.
columnUint64
(
"FSEQ"
);
tapeFile
.
blockId
=
selectRset
.
columnUint64
(
"BLOCK_ID"
);
tapeFile
.
compressedSize
=
selectRset
.
columnUint64
(
"COMPRESSED_SIZE_IN_BYTES"
);
tapeFile
.
copyNb
=
selectRset
.
columnUint64
(
"COPY_NB"
);
tapeFile
.
creationTime
=
selectRset
.
columnUint64
(
"TAPE_FILE_CREATION_TIME"
);
tapeFile
.
checksumType
=
archiveFile
->
checksumType
;
// Duplicated for convenience
tapeFile
.
checksumValue
=
archiveFile
->
checksumValue
;
// Duplicated for convenience
archiveFile
->
tapeFiles
[
selectRset
.
columnUint64
(
"COPY_NB"
)]
=
tapeFile
;
}
}
if
(
nullptr
==
archiveFile
.
get
())
{
log
::
ScopedParamContainer
spc
(
lc
);
spc
.
add
(
"fileId"
,
archiveFileId
);
lc
.
log
(
log
::
WARNING
,
"Ignoring request to delete archive file because it does not exist in the catalogue"
);
return
;
}
if
(
diskInstanceName
!=
archiveFile
->
diskInstance
)
{
log
::
ScopedParamContainer
spc
(
lc
);
spc
.
add
(
"fileId"
,
std
::
to_string
(
archiveFile
->
archiveFileID
))
.
add
(
"diskInstance"
,
archiveFile
->
diskInstance
)
.
add
(
"requestDiskInstance"
,
diskInstanceName
)
.
add
(
"diskFileId"
,
archiveFile
->
diskFileId
)
.
add
(
"diskFileInfo.path"
,
archiveFile
->
diskFileInfo
.
path
)
.
add
(
"diskFileInfo.owner"
,
archiveFile
->
diskFileInfo
.
owner
)
.
add
(
"diskFileInfo.group"
,
archiveFile
->
diskFileInfo
.
group
)
.
add
(
"fileSize"
,
std
::
to_string
(
archiveFile
->
fileSize
))
.
add
(
"checksumType"
,
archiveFile
->
checksumType
)
.
add
(
"checksumValue"
,
archiveFile
->
checksumValue
)
.
add
(
"creationTime"
,
std
::
to_string
(
archiveFile
->
creationTime
))
.
add
(
"reconciliationTime"
,
std
::
to_string
(
archiveFile
->
reconciliationTime
))
.
add
(
"storageClass"
,
archiveFile
->
storageClass
)
.
add
(
"getConnTime"
,
getConnTime
)
.
add
(
"createStmtTime"
,
createStmtTime
)
.
add
(
"selectFromArchiveFileTime"
,
selectFromArchiveFileTime
);
for
(
auto
it
=
archiveFile
->
tapeFiles
.
begin
();
it
!=
archiveFile
->
tapeFiles
.
end
();
it
++
)
{
std
::
stringstream
tapeCopyLogStream
;
tapeCopyLogStream
<<
"copy number: "
<<
it
->
first
<<
" vid: "
<<
it
->
second
.
vid
<<
" fSeq: "
<<
it
->
second
.
fSeq
<<
" blockId: "
<<
it
->
second
.
blockId
<<
" creationTime: "
<<
it
->
second
.
creationTime
<<
" compressedSize: "
<<
it
->
second
.
compressedSize
<<
" checksumType: "
<<
it
->
second
.
checksumType
//this shouldn't be here: repeated field
<<
" checksumValue: "
<<
it
->
second
.
checksumValue
//this shouldn't be here: repeated field
<<
" copyNb: "
<<
it
->
second
.
copyNb
;
//this shouldn't be here: repeated field
spc
.
add
(
"TAPE FILE"
,
tapeCopyLogStream
.
str
());
}
lc
.
log
(
log
::
WARNING
,
"Failed to delete archive file because the disk instance of the request does not match that "
"of the archived file"
);
exception
::
UserError
ue
;
ue
.
getMessage
()
<<
"Failed to delete archive file with ID "
<<
archiveFileId
<<
" because the disk instance of "
"the request does not match that of the archived file: archiveFileId="
<<
archiveFileId
<<
" path="
<<
archiveFile
->
diskFileInfo
.
path
<<
" requestDiskInstance="
<<
diskInstanceName
<<
" archiveFileDiskInstance="
<<
archiveFile
->
diskInstance
;
throw
ue
;
}
t
.
reset
();
{
const
char
*
const
sql
=
"DELETE FROM TAPE_FILE WHERE ARCHIVE_FILE_ID = :ARCHIVE_FILE_ID"
;
auto
stmt
=
conn
.
createStmt
(
sql
);
stmt
.
bindUint64
(
":ARCHIVE_FILE_ID"
,
archiveFileId
);
stmt
.
executeNonQuery
(
rdbms
::
AutocommitMode
::
AUTOCOMMIT_OFF
);
}
const
auto
deleteFromTapeFileTime
=
t
.
secs
(
utils
::
Timer
::
resetCounter
);
{
const
char
*
const
sql
=
"DELETE FROM ARCHIVE_FILE WHERE ARCHIVE_FILE_ID = :ARCHIVE_FILE_ID"
;
auto
stmt
=
conn
.
createStmt
(
sql
);
stmt
.
bindUint64
(
":ARCHIVE_FILE_ID"
,
archiveFileId
);
stmt
.
executeNonQuery
(
rdbms
::
AutocommitMode
::
AUTOCOMMIT_OFF
);
}
const
auto
deleteFromArchiveFileTime
=
t
.
secs
(
utils
::
Timer
::
resetCounter
);
conn
.
commit
();
const
auto
commitTime
=
t
.
secs
();
log
::
ScopedParamContainer
spc
(
lc
);
spc
.
add
(
"fileId"
,
std
::
to_string
(
archiveFile
->
archiveFileID
))
.
add
(
"diskInstance"
,
archiveFile
->
diskInstance
)
.
add
(
"diskFileId"
,
archiveFile
->
diskFileId
)
.
add
(
"diskFileInfo.path"
,
archiveFile
->
diskFileInfo
.
path
)
.
add
(
"diskFileInfo.owner"
,
archiveFile
->
diskFileInfo
.
owner
)
.
add
(
"diskFileInfo.group"
,
archiveFile
->
diskFileInfo
.
group
)
.
add
(
"fileSize"
,
std
::
to_string
(
archiveFile
->
fileSize
))
.
add
(
"checksumType"
,
archiveFile
->
checksumType
)
.
add
(
"checksumValue"
,
archiveFile
->
checksumValue
)
.
add
(
"creationTime"
,
std
::
to_string
(
archiveFile
->
creationTime
))
.
add
(
"reconciliationTime"
,
std
::
to_string
(
archiveFile
->
reconciliationTime
))
.
add
(
"storageClass"
,
archiveFile
->
storageClass
)
.
add
(
"getConnTime"
,
getConnTime
)
.
add
(
"createStmtTime"
,
createStmtTime
)
.
add
(
"selectFromArchiveFileTime"
,
selectFromArchiveFileTime
)
.
add
(
"deleteFromTapeFileTime"
,
deleteFromTapeFileTime
)
.
add
(
"deleteFromArchiveFileTime"
,
deleteFromArchiveFileTime
)
.
add
(
"commitTime"
,
commitTime
);
for
(
auto
it
=
archiveFile
->
tapeFiles
.
begin
();
it
!=
archiveFile
->
tapeFiles
.
end
();
it
++
)
{
std
::
stringstream
tapeCopyLogStream
;
tapeCopyLogStream
<<
"copy number: "
<<
it
->
first
<<
" vid: "
<<
it
->
second
.
vid
<<
" fSeq: "
<<
it
->
second
.
fSeq
<<
" blockId: "
<<
it
->
second
.
blockId
<<
" creationTime: "
<<
it
->
second
.
creationTime
<<
" compressedSize: "
<<
it
->
second
.
compressedSize
<<
" checksumType: "
<<
it
->
second
.
checksumType
//this shouldn't be here: repeated field
<<
" checksumValue: "
<<
it
->
second
.
checksumValue
//this shouldn't be here: repeated field
<<
" copyNb: "
<<
it
->
second
.
copyNb
;
//this shouldn't be here: repeated field
spc
.
add
(
"TAPE FILE"
,
tapeCopyLogStream
.
str
());
}
lc
.
log
(
log
::
INFO
,
"Archive file deleted from CTA catalogue"
);
}
catch
(
exception
::
UserError
&
)
{
throw
;
}
catch
(
exception
::
Exception
&
ex
)
{
ex
.
getMessage
().
str
(
std
::
string
(
__FUNCTION__
)
+
": "
+
ex
.
getMessage
().
str
());
throw
;
}
}
//------------------------------------------------------------------------------
// isAdmin
//------------------------------------------------------------------------------
...
...
catalogue/RdbmsCatalogue.hpp