Skip to content
GitLab
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
dCache
cta
Commits
203bc586
Commit
203bc586
authored
Oct 30, 2018
by
Tao Lin
Committed by
Steven Murray
Nov 16, 2018
Browse files
Added support for MySQL
parent
8dea7e97
Changes
31
Hide whitespace changes
Inline
Side-by-side
catalogue/CMakeLists.txt
View file @
203bc586
...
...
@@ -40,6 +40,8 @@ set (CATALOGUE_LIB_SRC_FILES
RdbmsCatalogue.cpp
SchemaCreatingSqliteCatalogue.cpp
SqliteCatalogue.cpp
MysqlCatalogue.cpp
MysqlCatalogueSchema.cpp
TapeForWriting.cpp
UserSpecifiedANonEmptyTape.cpp
UserSpecifiedANonExistentTape.cpp
...
...
@@ -67,12 +69,17 @@ target_link_libraries (ctacatalogue
ctacommon
ctardbms
)
add_custom_command
(
OUTPUT sqlite_catalogue_schema.sql oracle_catalogue_schema.sql
add_custom_command
(
OUTPUT sqlite_catalogue_schema.sql
mysql_catalogue_schema.sql
oracle_catalogue_schema.sql
COMMAND cat
${
CMAKE_CURRENT_SOURCE_DIR
}
/sqlite_catalogue_schema_header.sql
${
CMAKE_CURRENT_SOURCE_DIR
}
/common_catalogue_schema.sql
${
CMAKE_CURRENT_SOURCE_DIR
}
/sqlite_catalogue_schema_trailer.sql
> sqlite_catalogue_schema.sql
COMMAND cat
${
CMAKE_CURRENT_SOURCE_DIR
}
/mysql_catalogue_schema_header.sql
${
CMAKE_CURRENT_SOURCE_DIR
}
/common_catalogue_schema.sql
${
CMAKE_CURRENT_SOURCE_DIR
}
/mysql_catalogue_schema_trailer.sql
> mysql_catalogue_schema.sql
COMMAND cat
${
CMAKE_CURRENT_SOURCE_DIR
}
/oracle_catalogue_schema_header.sql
${
CMAKE_CURRENT_SOURCE_DIR
}
/common_catalogue_schema.sql
...
...
@@ -83,6 +90,8 @@ add_custom_command (OUTPUT sqlite_catalogue_schema.sql oracle_catalogue_schema.s
${
CMAKE_CURRENT_SOURCE_DIR
}
/common_catalogue_schema.sql
${
CMAKE_CURRENT_SOURCE_DIR
}
/sqlite_catalogue_schema_header.sql
${
CMAKE_CURRENT_SOURCE_DIR
}
/sqlite_catalogue_schema_trailer.sql
${
CMAKE_CURRENT_SOURCE_DIR
}
/mysql_catalogue_schema_header.sql
${
CMAKE_CURRENT_SOURCE_DIR
}
/mysql_catalogue_schema_trailer.sql
${
CMAKE_CURRENT_SOURCE_DIR
}
/oracle_catalogue_schema_header.sql
${
CMAKE_CURRENT_SOURCE_DIR
}
/oracle_catalogue_schema_trailer.sql
)
...
...
@@ -94,6 +103,21 @@ add_custom_command(OUTPUT SqliteCatalogueSchema.cpp
COMMAND sed '/CTA_SQL_SCHEMA/r sqlite_catalogue_schema.cpp'
${
CMAKE_CURRENT_SOURCE_DIR
}
/SqliteCatalogueSchema.before_SQL.cpp > SqliteCatalogueSchema.cpp
DEPENDS
${
CMAKE_CURRENT_SOURCE_DIR
}
/SqliteCatalogueSchema.before_SQL.cpp sqlite_catalogue_schema.cpp
)
# For Mysql
add_custom_command
(
OUTPUT mysql_catalogue_schema.cpp
COMMAND sed 's/^/\ \ \"/' mysql_catalogue_schema.sql | sed 's/$$/\"/' > mysql_catalogue_schema.cpp
DEPENDS mysql_catalogue_schema.sql
)
add_custom_command
(
OUTPUT mysql_catalogue_schema_trigger.cpp
COMMAND sed 's/^/\ \ \"/'
${
CMAKE_CURRENT_SOURCE_DIR
}
/mysql_catalogue_schema_trigger.sql | sed 's/$$/\"/' > mysql_catalogue_schema_trigger.cpp
DEPENDS
${
CMAKE_CURRENT_SOURCE_DIR
}
/mysql_catalogue_schema_trigger.sql
)
add_custom_command
(
OUTPUT MysqlCatalogueSchema.cpp
COMMAND sed -e '/CTA_SQL_SCHEMA/r mysql_catalogue_schema.cpp' -e '/CTA_SQL_TRIGGER/r mysql_catalogue_schema_trigger.cpp'
${
CMAKE_CURRENT_SOURCE_DIR
}
/MysqlCatalogueSchema.before_SQL.cpp > MysqlCatalogueSchema.cpp
DEPENDS
${
CMAKE_CURRENT_SOURCE_DIR
}
/MysqlCatalogueSchema.before_SQL.cpp mysql_catalogue_schema.cpp mysql_catalogue_schema_trigger.cpp
)
set
(
IN_MEMORY_CATALOGUE_UNIT_TESTS_LIB_SRC_FILES
CatalogueTest.cpp
CatalogueFactoryTest.cpp
...
...
@@ -141,7 +165,8 @@ add_executable(cta-catalogue-schema-create
CreateSchemaCmdLineArgs.cpp
CreateSchemaCmdMain.cpp
OracleCatalogueSchema.cpp
SqliteCatalogueSchema.cpp
)
SqliteCatalogueSchema.cpp
MysqlCatalogueSchema.cpp
)
target_link_libraries
(
cta-catalogue-schema-create
ctacatalogue
)
...
...
catalogue/CatalogueFactory.cpp
View file @
203bc586
...
...
@@ -21,6 +21,7 @@
#include
"catalogue/InMemoryCatalogue.hpp"
#include
"catalogue/OracleCatalogue.hpp"
#include
"catalogue/SqliteCatalogue.hpp"
#include
"catalogue/MysqlCatalogue.hpp"
#include
"common/exception/Exception.hpp"
#include
"common/make_unique.hpp"
...
...
@@ -55,6 +56,12 @@ std::unique_ptr<Catalogue> CatalogueFactory::create(
maxTriesToConnect
);
return
cta
::
make_unique
<
CatalogueRetryWrapper
>
(
log
,
std
::
move
(
c
),
maxTriesToConnect
);
}
case
rdbms
::
Login
::
DBTYPE_MYSQL
:
{
auto
c
=
cta
::
make_unique
<
MysqlCatalogue
>
(
log
,
login
,
nbConns
,
nbArchiveFileListingConns
,
maxTriesToConnect
);
return
cta
::
make_unique
<
CatalogueRetryWrapper
>
(
log
,
std
::
move
(
c
),
maxTriesToConnect
);
}
case
rdbms
::
Login
::
DBTYPE_NONE
:
throw
exception
::
Exception
(
"Cannot create a catalogue without a database type"
);
default:
...
...
catalogue/CatalogueTest.cpp
View file @
203bc586
...
...
@@ -5993,6 +5993,7 @@ TEST_P(cta_catalogue_CatalogueTest, prepareToRetrieveFileUsingArchiveFileId) {
const
uint64_t
archiveFileId
=
1234
;
ASSERT_FALSE
(
m_catalogue
->
getArchiveFiles
().
hasMore
());
ASSERT_THROW
(
m_catalogue
->
getArchiveFileById
(
archiveFileId
),
exception
::
Exception
);
...
...
catalogue/CreateSchemaCmd.cpp
View file @
203bc586
...
...
@@ -18,11 +18,13 @@
#include
"catalogue/CreateSchemaCmd.hpp"
#include
"catalogue/CreateSchemaCmdLineArgs.hpp"
#include
"catalogue/MysqlCatalogueSchema.hpp"
#include
"catalogue/OracleCatalogueSchema.hpp"
#include
"catalogue/SqliteCatalogueSchema.hpp"
#include
"common/exception/Exception.hpp"
#include
"rdbms/ConnPool.hpp"
#include
"rdbms/Login.hpp"
#include
"rdbms/AutocommitMode.hpp"
namespace
cta
{
namespace
catalogue
{
...
...
@@ -71,6 +73,18 @@ int CreateSchemaCmd::exceptionThrowingMain(const int argc, char *const *const ar
conn
.
executeNonQueries
(
schema
.
sql
,
rdbms
::
AutocommitMode
::
AUTOCOMMIT_ON
);
}
break
;
case
rdbms
::
Login
::
DBTYPE_MYSQL
:
{
MysqlCatalogueSchema
schema
;
conn
.
executeNonQueries
(
schema
.
sql
,
rdbms
::
AutocommitMode
::
AUTOCOMMIT_ON
);
// execute triggers
auto
triggers
=
schema
.
triggers
();
for
(
auto
trigger
:
triggers
)
{
conn
.
executeNonQuery
(
trigger
,
rdbms
::
AutocommitMode
::
AUTOCOMMIT_ON
);
}
}
break
;
case
rdbms
::
Login
::
DBTYPE_ORACLE
:
{
OracleCatalogueSchema
schema
;
...
...
catalogue/DropSchemaCmd.cpp
View file @
203bc586
...
...
@@ -104,6 +104,9 @@ void DropSchemaCmd::dropCatalogueSchema(const rdbms::Login::DbType &dbType, rdbm
case
rdbms
::
Login
::
DBTYPE_SQLITE
:
dropSqliteCatalogueSchema
(
conn
);
break
;
case
rdbms
::
Login
::
DBTYPE_MYSQL
:
dropMysqlCatalogueSchema
(
conn
);
break
;
case
rdbms
::
Login
::
DBTYPE_ORACLE
:
dropOracleCatalogueSchema
(
conn
);
break
;
...
...
@@ -148,6 +151,49 @@ void DropSchemaCmd::dropSqliteCatalogueSchema(rdbms::Conn &conn) {
}
}
//------------------------------------------------------------------------------
// dropSqliteCatalogueSchema
//------------------------------------------------------------------------------
void
DropSchemaCmd
::
dropMysqlCatalogueSchema
(
rdbms
::
Conn
&
conn
)
{
try
{
std
::
list
<
std
::
string
>
tablesInDb
=
conn
.
getTableNames
();
std
::
list
<
std
::
string
>
tablesToDrop
=
{
"CTA_CATALOGUE"
,
"ARCHIVE_ROUTE"
,
"TAPE_FILE"
,
"ARCHIVE_FILE"
,
"ARCHIVE_FILE_ID"
,
"TAPE"
,
"REQUESTER_MOUNT_RULE"
,
"REQUESTER_GROUP_MOUNT_RULE"
,
"ADMIN_USER"
,
"STORAGE_CLASS"
,
"STORAGE_CLASS_ID"
,
"TAPE_POOL"
,
"LOGICAL_LIBRARY"
,
"MOUNT_POLICY"
};
dropDatabaseTables
(
conn
,
tablesToDrop
);
std
::
list
<
std
::
string
>
triggersToDrop
=
{
"TAPE_POOL_IS_ENCRYPTED_BOOL_CK_BEFORE_INSERT"
,
"TAPE_POOL_IS_ENCRYPTED_BOOL_CK_BEFORE_UPDATE"
,
"ARCHIVE_ROUTE_COPY_NB_GT_ZERO_BEFORE_INSERT"
,
"ARCHIVE_ROUTE_COPY_NB_GT_ZERO_BEFORE_UPDATE"
,
"CHECK_TAPE_BEFORE_INSERT"
,
"CHECK_TAPE_BEFORE_UPDATE"
,
"TAPE_FILE_COPY_NB_GT_ZERO_BEFORE_INSERT"
,
"TAPE_FILE_COPY_NB_GT_ZERO_BEFORE_UPDATE"
};
for
(
auto
triggerToDrop
:
triggersToDrop
)
{
conn
.
executeNonQuery
(
std
::
string
(
"DROP TRIGGER IF EXISTS "
)
+
triggerToDrop
,
rdbms
::
AutocommitMode
::
AUTOCOMMIT_ON
);
m_out
<<
"Dropped Trigger "
<<
triggerToDrop
<<
std
::
endl
;
}
}
catch
(
exception
::
Exception
&
ex
)
{
throw
exception
::
Exception
(
std
::
string
(
__FUNCTION__
)
+
" failed: "
+
ex
.
getMessage
().
str
());
}
}
//------------------------------------------------------------------------------
// dropDatabaseTables
//------------------------------------------------------------------------------
...
...
catalogue/DropSchemaCmd.hpp
View file @
203bc586
...
...
@@ -90,6 +90,14 @@ private:
*/
void
dropSqliteCatalogueSchema
(
rdbms
::
Conn
&
conn
);
/**
* Unconditionally drops the schema of the catalogue database associated with
* the specified database connection.
*
* @param conn The database connection.
*/
void
dropMysqlCatalogueSchema
(
rdbms
::
Conn
&
conn
);
/**
* Drops the database tables with the specified names.
*
...
...
catalogue/MysqlCatalogue.cpp
0 → 100644
View file @
203bc586
/*
* The CERN Tape Archive(CTA) project
* Copyright(C) 2015 CERN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
*(at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include
"catalogue/ArchiveFileRow.hpp"
#include
"catalogue/ChecksumTypeMismatch.hpp"
#include
"catalogue/ChecksumValueMismatch.hpp"
#include
"catalogue/FileSizeMismatch.hpp"
#include
"catalogue/MysqlCatalogueSchema.hpp"
#include
"catalogue/MysqlCatalogue.hpp"
#include
"common/exception/DatabaseConstraintError.hpp"
#include
"common/exception/DatabasePrimaryKeyError.hpp"
#include
"common/exception/Exception.hpp"
#include
"common/exception/UserError.hpp"
#include
"common/make_unique.hpp"
#include
"common/threading/MutexLocker.hpp"
#include
"common/Timer.hpp"
#include
"common/utils/utils.hpp"
#include
"rdbms/AutoRollback.hpp"
namespace
cta
{
namespace
catalogue
{
//------------------------------------------------------------------------------
// constructor
//------------------------------------------------------------------------------
MysqlCatalogue
::
MysqlCatalogue
(
log
::
Logger
&
log
,
const
rdbms
::
Login
&
login
,
const
uint64_t
nbConns
,
const
uint64_t
nbArchiveFileListingConns
,
const
uint32_t
maxTriesToConnect
)
:
RdbmsCatalogue
(
log
,
rdbms
::
Login
(
rdbms
::
Login
::
DBTYPE_MYSQL
,
login
.
username
,
login
.
password
,
login
.
database
,
login
.
hostname
,
login
.
port
),
nbConns
,
nbArchiveFileListingConns
,
maxTriesToConnect
)
{
}
//------------------------------------------------------------------------------
// destructor
//------------------------------------------------------------------------------
MysqlCatalogue
::~
MysqlCatalogue
()
{
}
//------------------------------------------------------------------------------
// getNextArchiveFileId
//------------------------------------------------------------------------------
uint64_t
MysqlCatalogue
::
getNextArchiveFileId
(
rdbms
::
Conn
&
conn
)
{
try
{
rdbms
::
AutoRollback
autoRollback
(
conn
);
{
const
char
*
const
sql
=
"UPDATE ARCHIVE_FILE_ID SET ID = LAST_INSERT_ID(ID + 1)"
;
auto
stmt
=
conn
.
createStmt
(
sql
);
stmt
.
executeNonQuery
(
rdbms
::
AutocommitMode
::
AUTOCOMMIT_OFF
);
}
uint64_t
archiveFileId
=
0
;
{
const
char
*
const
sql
=
"SELECT LAST_INSERT_ID() AS ID "
;
auto
stmt
=
conn
.
createStmt
(
sql
);
auto
rset
=
stmt
.
executeQuery
(
rdbms
::
AutocommitMode
::
AUTOCOMMIT_OFF
);
if
(
!
rset
.
next
())
{
throw
exception
::
Exception
(
"ARCHIVE_FILE_ID table is empty"
);
}
archiveFileId
=
rset
.
columnUint64
(
"ID"
);
if
(
rset
.
next
())
{
throw
exception
::
Exception
(
"Found more than one ID counter in the ARCHIVE_FILE_ID table"
);
}
}
conn
.
commit
();
return
archiveFileId
;
}
catch
(
exception
::
UserError
&
)
{
throw
;
}
catch
(
exception
::
Exception
&
ex
)
{
ex
.
getMessage
().
str
(
std
::
string
(
__FUNCTION__
)
+
": "
+
ex
.
getMessage
().
str
());
throw
;
}
}
//------------------------------------------------------------------------------
// getNextStorageClassId
//------------------------------------------------------------------------------
uint64_t
MysqlCatalogue
::
getNextStorageClassId
(
rdbms
::
Conn
&
conn
)
{
try
{
rdbms
::
AutoRollback
autoRollback
(
conn
);
{
const
char
*
const
sql
=
"UPDATE STORAGE_CLASS_ID SET ID = LAST_INSERT_ID(ID + 1)"
;
auto
stmt
=
conn
.
createStmt
(
sql
);
stmt
.
executeNonQuery
(
rdbms
::
AutocommitMode
::
AUTOCOMMIT_OFF
);
}
uint64_t
storageClassId
=
0
;
{
const
char
*
const
sql
=
"SELECT LAST_INSERT_ID() AS ID "
;
auto
stmt
=
conn
.
createStmt
(
sql
);
auto
rset
=
stmt
.
executeQuery
(
rdbms
::
AutocommitMode
::
AUTOCOMMIT_OFF
);
if
(
!
rset
.
next
())
{
throw
exception
::
Exception
(
"STORAGE_CLASS_ID table is empty"
);
}
storageClassId
=
rset
.
columnUint64
(
"ID"
);
if
(
rset
.
next
())
{
throw
exception
::
Exception
(
"Found more than one ID counter in the STORAGE_CLASS_ID table"
);
}
}
conn
.
commit
();
return
storageClassId
;
}
catch
(
exception
::
UserError
&
)
{
throw
;
}
catch
(
exception
::
Exception
&
ex
)
{
ex
.
getMessage
().
str
(
std
::
string
(
__FUNCTION__
)
+
": "
+
ex
.
getMessage
().
str
());
throw
;
}
}
//------------------------------------------------------------------------------
// selectTapeForUpdate
//------------------------------------------------------------------------------
common
::
dataStructures
::
Tape
MysqlCatalogue
::
selectTapeForUpdate
(
rdbms
::
Conn
&
conn
,
const
std
::
string
&
vid
)
{
try
{
const
char
*
const
sql
=
"SELECT "
"VID AS VID,"
"LOGICAL_LIBRARY_NAME AS LOGICAL_LIBRARY_NAME,"
"TAPE_POOL_NAME AS TAPE_POOL_NAME,"
"ENCRYPTION_KEY AS ENCRYPTION_KEY,"
"CAPACITY_IN_BYTES AS CAPACITY_IN_BYTES,"
"DATA_IN_BYTES AS DATA_IN_BYTES,"
"LAST_FSEQ AS LAST_FSEQ,"
"IS_DISABLED AS IS_DISABLED,"
"IS_FULL AS IS_FULL,"
"LBP_IS_ON AS LBP_IS_ON,"
"LABEL_DRIVE AS LABEL_DRIVE,"
"LABEL_TIME AS LABEL_TIME,"
"LAST_READ_DRIVE AS LAST_READ_DRIVE,"
"LAST_READ_TIME AS LAST_READ_TIME,"
"LAST_WRITE_DRIVE AS LAST_WRITE_DRIVE,"
"LAST_WRITE_TIME AS LAST_WRITE_TIME,"
"USER_COMMENT AS USER_COMMENT,"
"CREATION_LOG_USER_NAME AS CREATION_LOG_USER_NAME,"
"CREATION_LOG_HOST_NAME AS CREATION_LOG_HOST_NAME,"
"CREATION_LOG_TIME AS CREATION_LOG_TIME,"
"LAST_UPDATE_USER_NAME AS LAST_UPDATE_USER_NAME,"
"LAST_UPDATE_HOST_NAME AS LAST_UPDATE_HOST_NAME,"
"LAST_UPDATE_TIME AS LAST_UPDATE_TIME "
"FROM "
"TAPE "
"WHERE "
"VID = :VID "
"FOR UPDATE"
;
auto
stmt
=
conn
.
createStmt
(
sql
);
stmt
.
bindString
(
":VID"
,
vid
);
auto
rset
=
stmt
.
executeQuery
(
rdbms
::
AutocommitMode
::
AUTOCOMMIT_OFF
);
if
(
!
rset
.
next
())
{
throw
exception
::
Exception
(
std
::
string
(
"The tape with VID "
+
vid
+
" does not exist"
));
}
common
::
dataStructures
::
Tape
tape
;
tape
.
vid
=
rset
.
columnString
(
"VID"
);
tape
.
logicalLibraryName
=
rset
.
columnString
(
"LOGICAL_LIBRARY_NAME"
);
tape
.
tapePoolName
=
rset
.
columnString
(
"TAPE_POOL_NAME"
);
tape
.
encryptionKey
=
rset
.
columnOptionalString
(
"ENCRYPTION_KEY"
);
tape
.
capacityInBytes
=
rset
.
columnUint64
(
"CAPACITY_IN_BYTES"
);
tape
.
dataOnTapeInBytes
=
rset
.
columnUint64
(
"DATA_IN_BYTES"
);
tape
.
lastFSeq
=
rset
.
columnUint64
(
"LAST_FSEQ"
);
tape
.
disabled
=
rset
.
columnBool
(
"IS_DISABLED"
);
tape
.
full
=
rset
.
columnBool
(
"IS_FULL"
);
tape
.
lbp
=
rset
.
columnOptionalBool
(
"LBP_IS_ON"
);
tape
.
labelLog
=
getTapeLogFromRset
(
rset
,
"LABEL_DRIVE"
,
"LABEL_TIME"
);
tape
.
lastReadLog
=
getTapeLogFromRset
(
rset
,
"LAST_READ_DRIVE"
,
"LAST_READ_TIME"
);
tape
.
lastWriteLog
=
getTapeLogFromRset
(
rset
,
"LAST_WRITE_DRIVE"
,
"LAST_WRITE_TIME"
);
tape
.
comment
=
rset
.
columnString
(
"USER_COMMENT"
);
common
::
dataStructures
::
UserIdentity
creatorUI
;
creatorUI
.
name
=
rset
.
columnString
(
"CREATION_LOG_USER_NAME"
);
common
::
dataStructures
::
EntryLog
creationLog
;
creationLog
.
username
=
rset
.
columnString
(
"CREATION_LOG_USER_NAME"
);
creationLog
.
host
=
rset
.
columnString
(
"CREATION_LOG_HOST_NAME"
);
creationLog
.
time
=
rset
.
columnUint64
(
"CREATION_LOG_TIME"
);
tape
.
creationLog
=
creationLog
;
common
::
dataStructures
::
UserIdentity
updaterUI
;
updaterUI
.
name
=
rset
.
columnString
(
"LAST_UPDATE_USER_NAME"
);
common
::
dataStructures
::
EntryLog
updateLog
;
updateLog
.
username
=
rset
.
columnString
(
"LAST_UPDATE_USER_NAME"
);
updateLog
.
host
=
rset
.
columnString
(
"LAST_UPDATE_HOST_NAME"
);
updateLog
.
time
=
rset
.
columnUint64
(
"LAST_UPDATE_TIME"
);
tape
.
lastModificationLog
=
updateLog
;
return
tape
;
}
catch
(
exception
::
UserError
&
)
{
throw
;
}
catch
(
exception
::
Exception
&
ex
)
{
ex
.
getMessage
().
str
(
std
::
string
(
__FUNCTION__
)
+
": "
+
ex
.
getMessage
().
str
());
throw
;
}
}
//------------------------------------------------------------------------------
// filesWrittenToTape
//------------------------------------------------------------------------------
void
MysqlCatalogue
::
filesWrittenToTape
(
const
std
::
set
<
TapeItemWrittenPointer
>
&
events
)
{
try
{
if
(
events
.
empty
())
{
return
;
}
auto
firstEventItor
=
events
.
cbegin
();
const
auto
&
firstEvent
=
**
firstEventItor
;;
checkTapeItemWrittenFieldsAreSet
(
__FUNCTION__
,
firstEvent
);
auto
conn
=
m_connPool
.
getConn
();
rdbms
::
AutoRollback
autoRollback
(
conn
);
const
auto
tape
=
selectTapeForUpdate
(
conn
,
firstEvent
.
vid
);
uint64_t
expectedFSeq
=
tape
.
lastFSeq
+
1
;
uint64_t
totalCompressedBytesWritten
=
0
;
for
(
const
auto
&
eventP
:
events
)
{
const
auto
&
event
=
*
eventP
;
checkTapeItemWrittenFieldsAreSet
(
__FUNCTION__
,
event
);
if
(
event
.
vid
!=
firstEvent
.
vid
)
{
throw
exception
::
Exception
(
std
::
string
(
"VID mismatch: expected="
)
+
firstEvent
.
vid
+
" actual="
+
event
.
vid
);
}
if
(
expectedFSeq
!=
event
.
fSeq
)
{
exception
::
Exception
ex
;
ex
.
getMessage
()
<<
"FSeq mismatch for tape "
<<
firstEvent
.
vid
<<
": expected="
<<
expectedFSeq
<<
" actual="
<<
firstEvent
.
fSeq
;
throw
ex
;
}
expectedFSeq
++
;
try
{
// If this is a file (as opposed to a placeholder), do the full processing.
const
auto
&
fileEvent
=
dynamic_cast
<
const
TapeFileWritten
&>
(
event
);
totalCompressedBytesWritten
+=
fileEvent
.
compressedSize
;
}
catch
(
std
::
bad_cast
&
)
{}
}
auto
lastEventItor
=
events
.
cend
();
lastEventItor
--
;
const
TapeItemWritten
&
lastEvent
=
**
lastEventItor
;
updateTape
(
conn
,
rdbms
::
AutocommitMode
::
AUTOCOMMIT_ON
,
lastEvent
.
vid
,
lastEvent
.
fSeq
,
totalCompressedBytesWritten
,
lastEvent
.
tapeDrive
);
for
(
const
auto
&
event
:
events
)
{
try
{
// If this is a file (as opposed to a placeholder), do the full processing.
const
auto
&
fileEvent
=
dynamic_cast
<
const
TapeFileWritten
&>
(
*
event
);
fileWrittenToTape
(
rdbms
::
AutocommitMode
::
AUTOCOMMIT_ON
,
conn
,
fileEvent
);
}
catch
(
std
::
bad_cast
&
)
{}
}
conn
.
commit
();
}
catch
(
exception
::
UserError
&
)
{
throw
;
}
catch
(
exception
::
Exception
&
ex
)
{
ex
.
getMessage
().
str
(
std
::
string
(
__FUNCTION__
)
+
": "
+
ex
.
getMessage
().
str
());
throw
;
}
}
//------------------------------------------------------------------------------
// fileWrittenToTape
//------------------------------------------------------------------------------
void
MysqlCatalogue
::
fileWrittenToTape
(
const
rdbms
::
AutocommitMode
autocommitMode
,
rdbms
::
Conn
&
conn
,
const
TapeFileWritten
&
event
)
{
try
{
checkTapeFileWrittenFieldsAreSet
(
__FUNCTION__
,
event
);
// Try to insert a row into the ARCHIVE_FILE table - it is normal this will
// fail if another tape copy has already been written to tape
try
{
ArchiveFileRow
row
;
row
.
archiveFileId
=
event
.
archiveFileId
;
row
.
diskFileId
=
event
.
diskFileId
;
row
.
diskInstance
=
event
.
diskInstance
;
row
.
size
=
event
.
size
;
row
.
checksumType
=
event
.
checksumType
;
row
.
checksumValue
=
event
.
checksumValue
;
row
.
storageClassName
=
event
.
storageClassName
;
row
.
diskFilePath
=
event
.
diskFilePath
;
row
.
diskFileUser
=
event
.
diskFileUser
;
row
.
diskFileGroup
=
event
.
diskFileGroup
;
row
.
diskFileRecoveryBlob
=
event
.
diskFileRecoveryBlob
;
insertArchiveFile
(
conn
,
autocommitMode
,
row
);
}
catch
(
exception
::
DatabasePrimaryKeyError
&
)
{
// Ignore this error
}
catch
(...)
{
throw
;
}
const
time_t
now
=
time
(
nullptr
);
const
auto
archiveFile
=
getArchiveFileByArchiveFileId
(
conn
,
event
.
archiveFileId
);
if
(
nullptr
==
archiveFile
)
{
// This should never happen
exception
::
Exception
ex
;
ex
.
getMessage
()
<<
"Failed to find archive file: archiveFileId="
<<
event
.
archiveFileId
;
throw
ex
;
}
std
::
ostringstream
fileContext
;
fileContext
<<
"archiveFileId="
<<
event
.
archiveFileId
<<
", diskInstanceName="
<<
event
.
diskInstance
<<
", diskFileId="
<<
event
.
diskFileId
<<
", diskFilePath="
<<
event
.
diskFilePath
;
if
(
archiveFile
->
fileSize
!=
event
.
size
)
{
catalogue
::
FileSizeMismatch
ex
;
ex
.
getMessage
()
<<
"File size mismatch: expected="
<<
archiveFile
->
fileSize
<<
", actual="
<<
event
.
size
<<
": "
<<
fileContext
.
str
();
throw
ex
;
}
if
(
archiveFile
->
checksumType
!=
event
.
checksumType
)
{
catalogue
::
ChecksumTypeMismatch
ex
;
ex
.
getMessage
()
<<
"Checksum type mismatch: expected="
<<
archiveFile
->
checksumType
<<
", actual="
<<
event
.
checksumType
<<
": "
<<
fileContext
.
str
();
throw
ex
;
}
if
(
archiveFile
->
checksumValue
!=
event
.
checksumValue
)
{
catalogue
::
ChecksumValueMismatch
ex
;
ex
.
getMessage
()
<<
"Checksum value mismatch: expected="
<<
archiveFile
->
checksumValue
<<
", actual="
<<
event
.
checksumValue
<<
": "
<<
fileContext
.
str
();
throw
ex
;
}
// Insert the tape file
common
::
dataStructures
::
TapeFile
tapeFile
;
tapeFile
.
vid
=
event
.
vid
;
tapeFile
.
fSeq
=
event
.
fSeq
;
tapeFile
.
blockId
=
event
.
blockId
;
tapeFile
.
compressedSize
=
event
.
compressedSize
;
tapeFile
.
copyNb
=
event
.
copyNb
;
tapeFile
.
creationTime
=
now
;
insertTapeFile
(
conn
,
autocommitMode
,
tapeFile
,
event
.
archiveFileId
);
}
catch
(
exception
::
UserError
&
)
{
throw
;
}
catch
(
exception
::
Exception
&
ex
)
{
ex
.
getMessage
().
str
(
std
::
string
(
__FUNCTION__
)
+
": "
+
ex
.
getMessage
().
str
());
throw
;
}
}
}
// namespace catalogue
}
// namespace cta
catalogue/MysqlCatalogue.hpp
0 → 100644
View file @
203bc586
/*
* The CERN Tape Archive(CTA) project
* Copyright(C) 2015 CERN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
*(at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once