diff --git a/catalogue/Catalogue.hpp b/catalogue/Catalogue.hpp index 391f717de53226f356a5dfe41087c61a3f20765f..b5e911cdb15decc95a9e08e0da6fbfc423a503ed 100644 --- a/catalogue/Catalogue.hpp +++ b/catalogue/Catalogue.hpp @@ -128,7 +128,7 @@ public: * * @param events The tape file written events. */ - virtual void filesWrittenToTape(const std::list<TapeFileWritten> &event) = 0; + virtual void filesWrittenToTape(const std::set<TapeFileWritten> &event) = 0; /** * Notifies the CTA catalogue that the specified tape has been mounted in diff --git a/catalogue/CatalogueTest.cpp b/catalogue/CatalogueTest.cpp index d8e70e61c4deb8ccd9b196066d6e049699c17944..9a00884d6211381b34af28e14652ec0d62ff61d6 100644 --- a/catalogue/CatalogueTest.cpp +++ b/catalogue/CatalogueTest.cpp @@ -2112,7 +2112,7 @@ TEST_P(cta_catalogue_CatalogueTest, createTape_1_tape_with_write_log_1_tape_with file1Written.compressedSize = 1; file1Written.copyNb = 1; file1Written.tapeDrive = "tape_drive"; - m_catalogue->filesWrittenToTape(std::list<catalogue::TapeFileWritten>{file1Written}); + m_catalogue->filesWrittenToTape(std::set<catalogue::TapeFileWritten>{file1Written}); } { @@ -4842,7 +4842,7 @@ TEST_P(cta_catalogue_CatalogueTest, prepareToRetrieveFile) { file1Written.compressedSize = 1; file1Written.copyNb = 1; file1Written.tapeDrive = tapeDrive; - m_catalogue->filesWrittenToTape(std::list<catalogue::TapeFileWritten>{file1Written}); + m_catalogue->filesWrittenToTape(std::set<catalogue::TapeFileWritten>{file1Written}); { const common::dataStructures::ArchiveFile archiveFile = m_catalogue->getArchiveFileById(archiveFileId); @@ -4891,7 +4891,7 @@ TEST_P(cta_catalogue_CatalogueTest, prepareToRetrieveFile) { file2Written.compressedSize = 1; file2Written.copyNb = 2; file2Written.tapeDrive = tapeDrive; - m_catalogue->filesWrittenToTape(std::list<catalogue::TapeFileWritten>{file2Written}); + m_catalogue->filesWrittenToTape(std::set<catalogue::TapeFileWritten>{file2Written}); { const common::dataStructures::ArchiveFile archiveFile = m_catalogue->getArchiveFileById(archiveFileId); @@ -5255,7 +5255,7 @@ TEST_P(cta_catalogue_CatalogueTest, fileWrittenToTape_many_archive_files) { const uint64_t archiveFileSize = 1000; const uint64_t compressedFileSize = 800; - std::list<catalogue::TapeFileWritten> tapeFilesWrittenCopy1; + std::set<catalogue::TapeFileWritten> tapeFilesWrittenCopy1; for(uint64_t i = 1; i <= nbArchiveFiles; i++) { std::ostringstream diskFileId; diskFileId << (12345677 + i); @@ -5281,7 +5281,7 @@ TEST_P(cta_catalogue_CatalogueTest, fileWrittenToTape_many_archive_files) { fileWritten.compressedSize = compressedFileSize; fileWritten.copyNb = 1; fileWritten.tapeDrive = tapeDrive; - tapeFilesWrittenCopy1.push_back(fileWritten); + tapeFilesWrittenCopy1.emplace(fileWritten); } m_catalogue->filesWrittenToTape(tapeFilesWrittenCopy1); @@ -5303,7 +5303,7 @@ TEST_P(cta_catalogue_CatalogueTest, fileWrittenToTape_many_archive_files) { } } - std::list<catalogue::TapeFileWritten> tapeFilesWrittenCopy2; + std::set<catalogue::TapeFileWritten> tapeFilesWrittenCopy2; for(uint64_t i = 1; i <= nbArchiveFiles; i++) { std::ostringstream diskFileId; diskFileId << (12345677 + i); @@ -5329,9 +5329,9 @@ TEST_P(cta_catalogue_CatalogueTest, fileWrittenToTape_many_archive_files) { fileWritten.compressedSize = compressedFileSize; fileWritten.copyNb = 2; fileWritten.tapeDrive = tapeDrive; - tapeFilesWrittenCopy2.push_back(fileWritten); + tapeFilesWrittenCopy2.emplace(fileWritten); } - m_catalogue->filesWrittenToTape(std::list<catalogue::TapeFileWritten>{tapeFilesWrittenCopy2}); + m_catalogue->filesWrittenToTape(tapeFilesWrittenCopy2); { const std::list<common::dataStructures::Tape> tapes = m_catalogue->getTapes(); @@ -5742,7 +5742,7 @@ TEST_P(cta_catalogue_CatalogueTest, fileWrittenToTape_2_tape_files_different_tap file1Written.compressedSize = 1; file1Written.copyNb = 1; file1Written.tapeDrive = tapeDrive; - m_catalogue->filesWrittenToTape(std::list<catalogue::TapeFileWritten>{file1Written}); + m_catalogue->filesWrittenToTape(std::set<catalogue::TapeFileWritten>{file1Written}); { catalogue::TapeSearchCriteria searchCriteria; @@ -5800,7 +5800,7 @@ TEST_P(cta_catalogue_CatalogueTest, fileWrittenToTape_2_tape_files_different_tap file2Written.compressedSize = 1; file2Written.copyNb = 2; file2Written.tapeDrive = tapeDrive; - m_catalogue->filesWrittenToTape(std::list<catalogue::TapeFileWritten>{file2Written}); + m_catalogue->filesWrittenToTape(std::set<catalogue::TapeFileWritten>{file2Written}); { ASSERT_EQ(2, m_catalogue->getTapes().size()); @@ -5940,7 +5940,7 @@ TEST_P(cta_catalogue_CatalogueTest, fileWrittenToTape_2_tape_files_same_archive_ file1Written.compressedSize = 1; file1Written.copyNb = 1; file1Written.tapeDrive = tapeDrive; - m_catalogue->filesWrittenToTape(std::list<catalogue::TapeFileWritten>{file1Written}); + m_catalogue->filesWrittenToTape(std::set<catalogue::TapeFileWritten>{file1Written}); { catalogue::TapeSearchCriteria searchCriteria; @@ -5998,7 +5998,7 @@ TEST_P(cta_catalogue_CatalogueTest, fileWrittenToTape_2_tape_files_same_archive_ file2Written.compressedSize = 1; file2Written.copyNb = 2; file2Written.tapeDrive = tapeDrive; - ASSERT_THROW(m_catalogue->filesWrittenToTape(std::list<catalogue::TapeFileWritten>{file2Written}), exception::Exception); + ASSERT_THROW(m_catalogue->filesWrittenToTape(std::set<catalogue::TapeFileWritten>{file2Written}), exception::Exception); } TEST_P(cta_catalogue_CatalogueTest, fileWrittenToTape_2_tape_files_corrupted_diskFilePath) { @@ -6082,7 +6082,7 @@ TEST_P(cta_catalogue_CatalogueTest, fileWrittenToTape_2_tape_files_corrupted_dis file1Written.compressedSize = 1; file1Written.copyNb = 1; file1Written.tapeDrive = tapeDrive; - m_catalogue->filesWrittenToTape(std::list<catalogue::TapeFileWritten>{file1Written}); + m_catalogue->filesWrittenToTape(std::set<catalogue::TapeFileWritten>{file1Written}); { const common::dataStructures::ArchiveFile archiveFile = m_catalogue->getArchiveFileById(archiveFileId); @@ -6132,7 +6132,7 @@ TEST_P(cta_catalogue_CatalogueTest, fileWrittenToTape_2_tape_files_corrupted_dis file2Written.copyNb = 2; file2Written.tapeDrive = tapeDrive; - ASSERT_THROW(m_catalogue->filesWrittenToTape(std::list<catalogue::TapeFileWritten>{file2Written}), exception::Exception); + ASSERT_THROW(m_catalogue->filesWrittenToTape(std::set<catalogue::TapeFileWritten>{file2Written}), exception::Exception); } TEST_P(cta_catalogue_CatalogueTest, deleteArchiveFile) { @@ -6249,7 +6249,7 @@ TEST_P(cta_catalogue_CatalogueTest, deleteArchiveFile) { file1Written.compressedSize = 1; file1Written.copyNb = 1; file1Written.tapeDrive = tapeDrive; - m_catalogue->filesWrittenToTape(std::list<catalogue::TapeFileWritten>{file1Written}); + m_catalogue->filesWrittenToTape(std::set<catalogue::TapeFileWritten>{file1Written}); { catalogue::TapeSearchCriteria searchCriteria; @@ -6342,7 +6342,7 @@ TEST_P(cta_catalogue_CatalogueTest, deleteArchiveFile) { file2Written.compressedSize = 1; file2Written.copyNb = 2; file2Written.tapeDrive = tapeDrive; - m_catalogue->filesWrittenToTape(std::list<catalogue::TapeFileWritten>{file2Written}); + m_catalogue->filesWrittenToTape(std::set<catalogue::TapeFileWritten>{file2Written}); { ASSERT_EQ(2, m_catalogue->getTapes().size()); @@ -6603,7 +6603,7 @@ TEST_P(cta_catalogue_CatalogueTest, deleteArchiveFile_of_another_disk_instance) file1Written.compressedSize = 1; file1Written.copyNb = 1; file1Written.tapeDrive = tapeDrive; - m_catalogue->filesWrittenToTape(std::list<catalogue::TapeFileWritten>{file1Written}); + m_catalogue->filesWrittenToTape(std::set<catalogue::TapeFileWritten>{file1Written}); { catalogue::TapeSearchCriteria searchCriteria; @@ -6696,7 +6696,7 @@ TEST_P(cta_catalogue_CatalogueTest, deleteArchiveFile_of_another_disk_instance) file2Written.compressedSize = 1; file2Written.copyNb = 2; file2Written.tapeDrive = tapeDrive; - m_catalogue->filesWrittenToTape(std::list<catalogue::TapeFileWritten>{file2Written}); + m_catalogue->filesWrittenToTape(std::set<catalogue::TapeFileWritten>{file2Written}); { ASSERT_EQ(2, m_catalogue->getTapes().size()); @@ -7039,7 +7039,7 @@ TEST_P(cta_catalogue_CatalogueTest, reclaimTape_full_lastFSeq_1_no_tape_files) { file1Written.compressedSize = 1; file1Written.copyNb = 1; file1Written.tapeDrive = tapeDrive; - m_catalogue->filesWrittenToTape(std::list<catalogue::TapeFileWritten>{file1Written}); + m_catalogue->filesWrittenToTape(std::set<catalogue::TapeFileWritten>{file1Written}); { const common::dataStructures::ArchiveFile archiveFile = m_catalogue->getArchiveFileById(archiveFileId); @@ -7250,7 +7250,7 @@ TEST_P(cta_catalogue_CatalogueTest, reclaimTape_full_lastFSeq_1_one_tape_file) { file1Written.compressedSize = 1; file1Written.copyNb = 1; file1Written.tapeDrive = tapeDrive; - m_catalogue->filesWrittenToTape(std::list<catalogue::TapeFileWritten>{file1Written}); + m_catalogue->filesWrittenToTape(std::set<catalogue::TapeFileWritten>{file1Written}); { const common::dataStructures::ArchiveFile archiveFile = m_catalogue->getArchiveFileById(archiveFileId); diff --git a/catalogue/OracleCatalogue.cpp b/catalogue/OracleCatalogue.cpp index 1d01bd6fb85357133471700947d9d6065d8734fd..90a2e4fee8ee554fe99a9dd132fa3e96d0c85436 100644 --- a/catalogue/OracleCatalogue.cpp +++ b/catalogue/OracleCatalogue.cpp @@ -283,17 +283,16 @@ common::dataStructures::Tape OracleCatalogue::selectTapeForUpdate(rdbms::PooledC //------------------------------------------------------------------------------ // filesWrittenToTape //------------------------------------------------------------------------------ -void OracleCatalogue::filesWrittenToTape(const std::list<TapeFileWritten> &events) { +void OracleCatalogue::filesWrittenToTape(const std::set<TapeFileWritten> &events) { try { if (events.empty()) { return; } - const auto &firstEvent = events.front(); + auto firstEventItor = events.begin(); + const auto &firstEvent = *firstEventItor; checkTapeFileWrittenFieldsAreSet(firstEvent); const time_t now = time(nullptr); - const std::string nowStr = std::to_string(now); - const uint32_t creationTimeMaxFieldSize = nowStr.length() + 1; std::lock_guard<std::mutex> m_lock(m_mutex); auto conn = m_connPool.getConn(); rdbms::AutoRollback autoRollback(conn); @@ -322,55 +321,37 @@ void OracleCatalogue::filesWrittenToTape(const std::list<TapeFileWritten> &event expectedFSeq++; totalCompressedBytesWritten += event.compressedSize; + // Store the length of each field and implicitly calculate the maximum field + // length of each column tapeFileBatch.vid.setFieldLenToValueLen(i, event.vid); tapeFileBatch.fSeq.setFieldLenToValueLen(i, event.fSeq); tapeFileBatch.blockId.setFieldLenToValueLen(i, event.blockId); tapeFileBatch.compressedSize.setFieldLenToValueLen(i, event.compressedSize); tapeFileBatch.copyNb.setFieldLenToValueLen(i, event.copyNb); - tapeFileBatch.creationTime.setFieldLen(i, creationTimeMaxFieldSize); + tapeFileBatch.creationTime.setFieldLenToValueLen(i, now); tapeFileBatch.archiveFileId.setFieldLenToValueLen(i, event.archiveFileId); i++; } - const TapeFileWritten &lastEvent = events.back(); + auto lastEventItor = events.cend(); + lastEventItor--; + const TapeFileWritten &lastEvent = *lastEventItor; updateTape(conn, rdbms::Stmt::AutocommitMode::OFF, lastEvent.vid, lastEvent.fSeq, totalCompressedBytesWritten, lastEvent.tapeDrive); - // To be moved to the queueing of the archive request - for (const auto &event: events) { - std::unique_ptr<common::dataStructures::ArchiveFile> archiveFile = getArchiveFile(conn, event.archiveFileId); - - // If the archive file does not already exist - if (nullptr == archiveFile.get()) { - // Create one - ArchiveFileRow row; - row.archiveFileId = event.archiveFileId; - row.diskFileId = event.diskFileId; - row.diskInstance = event.diskInstance; - row.size = event.size; - row.checksumType = event.checksumType; - row.checksumValue = event.checksumValue; - row.storageClassName = event.storageClassName; - row.diskFilePath = event.diskFilePath; - row.diskFileUser = event.diskFileUser; - row.diskFileGroup = event.diskFileGroup; - row.diskFileRecoveryBlob = event.diskFileRecoveryBlob; - insertArchiveFile(conn, rdbms::Stmt::AutocommitMode::OFF, row); - } else { - throwIfCommonEventDataMismatch(*archiveFile, event); - } - } + idempotentBatchInsertArchiveFiles(conn, rdbms::Stmt::AutocommitMode::OFF, events); + // Store the value of each field i = 0; for (const auto &event: events) { - tapeFileBatch.vid.copyStrIntoField(i, event.vid.c_str()); - tapeFileBatch.fSeq.copyStrIntoField(i, std::to_string(event.fSeq)); - tapeFileBatch.blockId.copyStrIntoField(i, std::to_string(event.blockId)); - tapeFileBatch.compressedSize.copyStrIntoField(i, std::to_string(event.compressedSize)); - tapeFileBatch.copyNb.copyStrIntoField(i, std::to_string(event.copyNb)); - tapeFileBatch.creationTime.copyStrIntoField(i, nowStr); - tapeFileBatch.archiveFileId.copyStrIntoField(i, std::to_string(event.archiveFileId)); + tapeFileBatch.vid.setFieldValue(i, event.vid); + tapeFileBatch.fSeq.setFieldValue(i, event.fSeq); + tapeFileBatch.blockId.setFieldValue(i, event.blockId); + tapeFileBatch.compressedSize.setFieldValue(i, event.compressedSize); + tapeFileBatch.copyNb.setFieldValue(i, event.copyNb); + tapeFileBatch.creationTime.setFieldValue(i, now); + tapeFileBatch.archiveFileId.setFieldValue(i, event.archiveFileId); i++; } @@ -411,5 +392,129 @@ void OracleCatalogue::filesWrittenToTape(const std::list<TapeFileWritten> &event } } +//------------------------------------------------------------------------------ +// idempotentBatchInsertArchiveFiles +//------------------------------------------------------------------------------ +void OracleCatalogue::idempotentBatchInsertArchiveFiles(rdbms::PooledConn &conn, + const rdbms::Stmt::AutocommitMode autocommitMode, const std::set<TapeFileWritten> &events) { + try { + ArchiveFileBatch archiveFileBatch(events.size()); + const time_t now = time(nullptr); + + // Store the length of each field and implicitly calculate the maximum field + // length of each column + uint32_t i = 0; + for (const auto &event: events) { + archiveFileBatch.archiveFileId.setFieldLenToValueLen(i, event.archiveFileId); + archiveFileBatch.diskInstance.setFieldLenToValueLen(i, event.diskInstance); + archiveFileBatch.diskFileId.setFieldLenToValueLen(i, event.diskFileId); + archiveFileBatch.diskFilePath.setFieldLenToValueLen(i, event.diskFilePath); + archiveFileBatch.diskFileUser.setFieldLenToValueLen(i, event.diskFileUser); + archiveFileBatch.diskFileGroup.setFieldLenToValueLen(i, event.diskFileGroup); + archiveFileBatch.diskFileRecoveryBlob.setFieldLenToValueLen(i, event.diskFileRecoveryBlob); + archiveFileBatch.size.setFieldLenToValueLen(i, event.size); + archiveFileBatch.checksumType.setFieldLenToValueLen(i, event.checksumType); + archiveFileBatch.checksumValue.setFieldLenToValueLen(i, event.checksumValue); + archiveFileBatch.storageClassName.setFieldLenToValueLen(i, event.storageClassName); + archiveFileBatch.creationTime.setFieldLenToValueLen(i, now); + archiveFileBatch.reconciliationTime.setFieldLenToValueLen(i, now); + i++; + } + + // Store the value of each field + i = 0; + for (const auto &event: events) { + archiveFileBatch.archiveFileId.setFieldValue(i, event.archiveFileId); + archiveFileBatch.diskInstance.setFieldValue(i, event.diskInstance); + archiveFileBatch.diskFileId.setFieldValue(i, event.diskFileId); + archiveFileBatch.diskFilePath.setFieldValue(i, event.diskFilePath); + archiveFileBatch.diskFileUser.setFieldValue(i, event.diskFileUser); + archiveFileBatch.diskFileGroup.setFieldValue(i, event.diskFileGroup); + archiveFileBatch.diskFileRecoveryBlob.setFieldValue(i, event.diskFileRecoveryBlob); + archiveFileBatch.size.setFieldValue(i, event.size); + archiveFileBatch.checksumType.setFieldValue(i, event.checksumType); + archiveFileBatch.checksumValue.setFieldValue(i, event.checksumValue); + archiveFileBatch.storageClassName.setFieldValue(i, event.storageClassName); + archiveFileBatch.creationTime.setFieldValue(i, now); + archiveFileBatch.reconciliationTime.setFieldValue(i, now); + i++; + } + + const char *const sql = + "INSERT INTO ARCHIVE_FILE(" + "ARCHIVE_FILE_ID," + "DISK_INSTANCE_NAME," + "DISK_FILE_ID," + "DISK_FILE_PATH," + "DISK_FILE_USER," + "DISK_FILE_GROUP," + "DISK_FILE_RECOVERY_BLOB," + "SIZE_IN_BYTES," + "CHECKSUM_TYPE," + "CHECKSUM_VALUE," + "STORAGE_CLASS_NAME," + "CREATION_TIME," + "RECONCILIATION_TIME)" + "VALUES(" + ":ARCHIVE_FILE_ID," + ":DISK_INSTANCE_NAME," + ":DISK_FILE_ID," + ":DISK_FILE_PATH," + ":DISK_FILE_USER," + ":DISK_FILE_GROUP," + ":DISK_FILE_RECOVERY_BLOB," + ":SIZE_IN_BYTES," + ":CHECKSUM_TYPE," + ":CHECKSUM_VALUE," + ":STORAGE_CLASS_NAME," + ":CREATION_TIME," + ":RECONCILIATION_TIME)"; + auto stmt = conn.createStmt(sql, autocommitMode); + rdbms::OcciStmt &occiStmt = dynamic_cast<rdbms::OcciStmt &>(*stmt); + occiStmt->setBatchErrorMode(true); + + occiStmt.setColumn(archiveFileBatch.archiveFileId); + occiStmt.setColumn(archiveFileBatch.diskInstance); + occiStmt.setColumn(archiveFileBatch.diskFileId); + occiStmt.setColumn(archiveFileBatch.diskFilePath); + occiStmt.setColumn(archiveFileBatch.diskFileUser); + occiStmt.setColumn(archiveFileBatch.diskFileGroup); + occiStmt.setColumn(archiveFileBatch.diskFileRecoveryBlob); + occiStmt.setColumn(archiveFileBatch.size); + occiStmt.setColumn(archiveFileBatch.checksumType); + occiStmt.setColumn(archiveFileBatch.checksumValue); + occiStmt.setColumn(archiveFileBatch.storageClassName); + occiStmt.setColumn(archiveFileBatch.creationTime); + occiStmt.setColumn(archiveFileBatch.reconciliationTime); + + try { + occiStmt->executeArrayUpdate(archiveFileBatch.nbRows); + } catch(oracle::occi::BatchSQLException &be) { + const int nbFailedRows = be.getFailedRowCount(); + exception::Exception ex; + ex.getMessage() << "Caught a BatchSQLException" << nbFailedRows; + bool foundErrorOtherThanUniqueConstraint = false; + for (int row = 0; row < nbFailedRows; row++ ) { + oracle::occi::SQLException err = be.getException(row); + const unsigned int rowIndex = be.getRowNum(row); + const int errorCode = err.getErrorCode(); + + // If the error is anything other than a unique constraint error + if(1 != errorCode) { + foundErrorOtherThanUniqueConstraint = true; + ex.getMessage() << ": Row " << rowIndex << " generated ORA error " << errorCode; + } + } + if (foundErrorOtherThanUniqueConstraint) { + throw ex; + } + } catch(std::exception &se) { + throw exception::Exception(std::string("executeArrayUpdate failed: ") + se.what()); + } + } catch(exception::Exception &ex) { + throw exception::Exception(std::string(__FUNCTION__) + " failed: " + ex.getMessage().str()); + } +} + } // namespace catalogue } // namespace cta diff --git a/catalogue/OracleCatalogue.hpp b/catalogue/OracleCatalogue.hpp index f2a5089223f6d7b9d7172e27cfaedfb3a0ed672f..0811b63acfbdb8d326305dbe8d4f583866e1e5e4 100644 --- a/catalogue/OracleCatalogue.hpp +++ b/catalogue/OracleCatalogue.hpp @@ -20,6 +20,7 @@ #include "catalogue/RdbmsCatalogue.hpp" #include "rdbms/OcciColumn.hpp" +#include "rdbms/PooledConn.hpp" #include <occi.h> #include <string.h> @@ -90,7 +91,7 @@ public: * * @param events The tape file written events. */ - void filesWrittenToTape(const std::list<TapeFileWritten> &events) override; + void filesWrittenToTape(const std::set<TapeFileWritten> &events) override; private: @@ -102,6 +103,10 @@ private: */ common::dataStructures::Tape selectTapeForUpdate(rdbms::PooledConn &conn, const std::string &vid); + /** + * Structure used to assemble a batch of rows to insert into the TAPE_FILE + * table. + */ struct TapeFileBatch { size_t nbRows; rdbms::OcciColumn vid; @@ -112,6 +117,11 @@ private: rdbms::OcciColumn creationTime; rdbms::OcciColumn archiveFileId; + /** + * Constructor. + * + * @param nbRowsValue The Number of rows to be inserted. + */ TapeFileBatch(const size_t nbRowsValue): nbRows(nbRowsValue), vid("VID", nbRows), @@ -122,7 +132,70 @@ private: creationTime("CREATION_TIME", nbRows), archiveFileId("ARCHIVE_FILE_ID", nbRows) { } - }; + }; // struct TapeFileBatch + + /** + * Structure used to assemble a batch of rows to insert into the ARCHIVE_FILE + * table. + */ + struct ArchiveFileBatch { + size_t nbRows; + rdbms::OcciColumn archiveFileId; + rdbms::OcciColumn diskInstance; + rdbms::OcciColumn diskFileId; + rdbms::OcciColumn diskFilePath; + rdbms::OcciColumn diskFileUser; + rdbms::OcciColumn diskFileGroup; + rdbms::OcciColumn diskFileRecoveryBlob; + rdbms::OcciColumn size; + rdbms::OcciColumn checksumType; + rdbms::OcciColumn checksumValue; + rdbms::OcciColumn storageClassName; + rdbms::OcciColumn creationTime; + rdbms::OcciColumn reconciliationTime; + + /** + * Constructor. + * + * @param nbRowsValue The Number of rows to be inserted. + */ + ArchiveFileBatch(const size_t nbRowsValue): + nbRows(nbRowsValue), + archiveFileId("ARCHIVE_FILE_ID", nbRows), + diskInstance("DISK_INSTANCE_NAME", nbRows), + diskFileId("DISK_FILE_ID", nbRows), + diskFilePath("DISK_FILE_PATH", nbRows), + diskFileUser("DISK_FILE_USER", nbRows), + diskFileGroup("DISK_FILE_GROUP", nbRows), + diskFileRecoveryBlob("DISK_FILE_RECOVERY_BLOB", nbRows), + size("SIZE_IN_BYTES", nbRows), + checksumType("CHECKSUM_TYPE", nbRows), + checksumValue("CHECKSUM_VALUE", nbRows), + storageClassName("STORAGE_CLASS_NAME", nbRows), + creationTime("CREATION_TIME", nbRows), + reconciliationTime("RECONCILIATION_TIME", nbRows) { + } + }; // struct ArchiveFileBatch + + /** + * Batch inserts rows into the ARCHIVE_FILE table that correspond to the + * specified TapeFileWritten events. + * + * This method has idempotent behaviour in the case where an ARCHIVE_FILE + * already exists. Such a situation will occur when a file has more than one + * copy on tape. The first tape copy will cause two successful inserts, one + * into the ARCHIVE_FILE table and one into the TAPE_FILE table. The second + * tape copy will try to do the same, but the insert into the ARCHIVE_FILE + * table will fail or simply bounce as the row will already exists. The + * insert into the TABLE_FILE table will succeed because the two TAPE_FILE + * rows will be unique. + * + * @param conn The database connection. + * @param autocommitMode The autocommit mode of the SQL insert statement. + * @param events The tape file written events. + */ + void idempotentBatchInsertArchiveFiles(rdbms::PooledConn &conn, const rdbms::Stmt::AutocommitMode autocommitMode, + const std::set<TapeFileWritten> &events); }; // class OracleCatalogue diff --git a/catalogue/SqliteCatalogue.cpp b/catalogue/SqliteCatalogue.cpp index 6f6baa4d14dea4fc7d8de10dd0932dfc04361451..f47b89deec7d1288ebcb69b7b4a144c5c750f391 100644 --- a/catalogue/SqliteCatalogue.cpp +++ b/catalogue/SqliteCatalogue.cpp @@ -222,13 +222,14 @@ common::dataStructures::Tape SqliteCatalogue::selectTape(rdbms::PooledConn &conn //------------------------------------------------------------------------------ // filesWrittenToTape //------------------------------------------------------------------------------ -void SqliteCatalogue::filesWrittenToTape(const std::list<TapeFileWritten> &events) { +void SqliteCatalogue::filesWrittenToTape(const std::set<TapeFileWritten> &events) { try { if(events.empty()) { return; } - const auto &firstEvent = events.front(); + auto firstEventItor = events.cbegin(); + const auto &firstEvent = *firstEventItor;; checkTapeFileWrittenFieldsAreSet(firstEvent); std::lock_guard<std::mutex> m_lock(m_mutex); @@ -257,7 +258,9 @@ void SqliteCatalogue::filesWrittenToTape(const std::list<TapeFileWritten> &event totalCompressedBytesWritten += event.compressedSize; } - const TapeFileWritten &lastEvent = events.back(); + auto lastEventItor = events.cend(); + lastEventItor--; + const TapeFileWritten &lastEvent = *lastEventItor; updateTape(conn, rdbms::Stmt::AutocommitMode::OFF, lastEvent.vid, lastEvent.fSeq, totalCompressedBytesWritten, lastEvent.tapeDrive); diff --git a/catalogue/SqliteCatalogue.hpp b/catalogue/SqliteCatalogue.hpp index b91339f95eb53ba8a98d3e99e3a881ac8bed7944..c0553c34baa9c1edd7beaf5d20d8cedcd787088b 100644 --- a/catalogue/SqliteCatalogue.hpp +++ b/catalogue/SqliteCatalogue.hpp @@ -83,7 +83,7 @@ protected: * * @param events The tape file written events. */ - void filesWrittenToTape(const std::list<TapeFileWritten> &events) override; + void filesWrittenToTape(const std::set<TapeFileWritten> &events) override; private: diff --git a/catalogue/TapeFileWritten.cpp b/catalogue/TapeFileWritten.cpp index 8de49896f87c80e721beff82f7b58e78e3462649..bbfd77637de730069b8d64d4b8fdcdd984b45c7d 100644 --- a/catalogue/TapeFileWritten.cpp +++ b/catalogue/TapeFileWritten.cpp @@ -57,6 +57,13 @@ bool TapeFileWritten::operator==(const TapeFileWritten &rhs) const { tapeDrive == rhs.tapeDrive; } +//------------------------------------------------------------------------------ +// operator< +//------------------------------------------------------------------------------ +bool TapeFileWritten::operator<(const TapeFileWritten &rhs) const { + return archiveFileId < rhs.archiveFileId; +} + //------------------------------------------------------------------------------ // operator<< //------------------------------------------------------------------------------ diff --git a/catalogue/TapeFileWritten.hpp b/catalogue/TapeFileWritten.hpp index 2c6e0ca061bc216cbf66555d3b0b6b6c0ae294ac..3ddfbcd6cadf28e3c6fa48277cc80e24f09d3466 100644 --- a/catalogue/TapeFileWritten.hpp +++ b/catalogue/TapeFileWritten.hpp @@ -41,10 +41,25 @@ struct TapeFileWritten { /** * Equality operator. * - * @param ths The right hand side of the operator. + * @param rhs The right hand side of the operator. */ bool operator==(const TapeFileWritten &rhs) const; + /** + * Less than operator. + * + * TapeFileWritten events are ordered by their archiveFileId member variable. + * + * TapeFileWritten events are written to the catalogue database in batches in + * order to improve performance by reducing the number of network round trips + * to the database. Thse batches are implemented using sorted containers in + * order to avoid deadlocks when two or more batches of overlapping events are + * concurrently written to the database. + * + * @param rhs The right hand side of the operator. + */ + bool operator<(const TapeFileWritten &rhs) const; + /** * The unique identifier of the file being archived. */ diff --git a/rdbms/OcciColumn.hpp b/rdbms/OcciColumn.hpp index 934892694ebcbf4ef69c6ef95536b127591fa6ea..4fa8f2059591172e3497c8c22b41d71ccdd7e478 100644 --- a/rdbms/OcciColumn.hpp +++ b/rdbms/OcciColumn.hpp @@ -62,14 +62,59 @@ public: * Please not that this method does not record the specified value. This * method only stores the length of the value. * + * This method tag dispatches using std::is_integral. + * * @param index The index of the field. * @param value The value whose length is to be recorded. */ template<typename T> void setFieldLenToValueLen(const size_t index, const T &value) { + // Tag dispatch using std::is_integral setFieldLenToValueLen(index, value, std::is_integral<T>()); } + /** + * Returns the array of field lengths. + * + * @return The array of field lengths. + */ + ub2 *getFieldLengths(); + + /** + * Returns the buffer of column field values. + * + * @return The buffer of column field values. + */ + char *getBuffer(); + + /** + * Returns the maximum of the field lengths. + * + * @return the maximum of the field lengths. + */ + ub2 getMaxFieldLength() const; + + /** + * Sets the field at the specified index to the specified value. + * + * This method tag dispatches using std::is_integral. + * + * @param index The index of the field. + * @param value The value of the field. + */ + template<typename T> void setFieldValue(const size_t index, const T &value) { + setFieldValue(index, value, std::is_integral<T>()); + } + private: + + /** + * Sets the length of the field at the specified index. + * + * @param index The index of the field. + * @param length The length of the field. + */ + void setFieldLen(const size_t index, const ub2 length); + /** * Sets the length of the field at the specified index to the length of the * specified value. @@ -77,6 +122,9 @@ private: * Please not that this method does not record the specified value. This * method only stores the length of the value. * + * The third unnamed parameter of this method is used to implement tag + * dispatching with std::is_integral. + * * @param index The index of the field. * @param value The value whose length is to be recorded. */ @@ -91,6 +139,9 @@ private: * Please not that this method does not record the specified value. This * method only stores the length of the value. * + * The third unnamed parameter of this method is used to implement tag + * dispatching with std::is_integral. + * * @param index The index of the field. * @param value The value whose length is to be recorded. */ @@ -98,35 +149,31 @@ private: setFieldLen(index, std::to_string(value).length() + 1); } -public: /** - * Sets the length of the field at the specified index. + * Sets the field at the specified index to the specified value. * - * @param index The index of the field. - * @param length The length of the field. - */ - void setFieldLen(const size_t index, const ub2 length); - - /** - * Returns the array of field lengths. + * The third unnamed parameter of this method is used to implement tag + * dispatching with std::is_integral. * - * @return The array of field lengths. + * @param index The index of the field. + * @param value The value of the field. */ - ub2 *getFieldLengths(); + template<typename T> void setFieldValue(const size_t index, const T &value, std::true_type) { + copyStrIntoField(index, std::to_string(value)); + } /** - * Returns the buffer of column field values. + * Sets the field at the specified index to the specified value. * - * @return The buffer of column field values. - */ - char *getBuffer(); - - /** - * Returns the maximum of the field lengths. + * The third unnamed parameter of this method is used to implement tag + * dispatching with std::is_integral. * - * @return the maximum of the field lengths. + * @param index The index of the field. + * @param value The value of the field. */ - ub2 getMaxFieldLength() const; + template<typename T> void setFieldValue(const size_t index, const T &value, std::false_type) { + copyStrIntoField(index, value); + } /** * Copies the specified string value into the field at the specified index. @@ -136,8 +183,6 @@ public: */ void copyStrIntoField(const size_t index, const std::string &str); -private: - /** * The name of the column. */ diff --git a/rdbms/OcciColumnTest.cpp b/rdbms/OcciColumnTest.cpp index c9ea054c3334bb0b0baaac69db9c4134d5fdbdea..af834309fb1103148db4d73d3ddc8274d3a53fe2 100644 --- a/rdbms/OcciColumnTest.cpp +++ b/rdbms/OcciColumnTest.cpp @@ -64,9 +64,9 @@ TEST_F(cta_rdbms_OcciColumnTest, setFieldLen) { ASSERT_EQ(0, col.getMaxFieldLength()); - const ub2 field0Len = 1234; - col.setFieldLen(0, field0Len); - ASSERT_EQ(field0Len, col.getMaxFieldLength()); + const ub2 field0Value = 1234; + col.setFieldLenToValueLen(0, field0Value); + ASSERT_EQ(5, col.getMaxFieldLength()); } TEST_F(cta_rdbms_OcciColumnTest, setFieldLenToValueLen_stringValue) { @@ -107,13 +107,13 @@ TEST_F(cta_rdbms_OcciColumnTest, setFieldLen_tooLate) { const size_t nbRows = 2; OcciColumn col(colName, nbRows); - const ub2 field0Len = 1234; - const ub2 field1Len = 5678; - col.setFieldLen(0, field0Len); + const ub2 field0Value = 1234; + const ub2 field1Value = 5678; + col.setFieldLenToValueLen(0, field0Value); col.getBuffer(); - ASSERT_THROW(col.setFieldLen(1, field1Len), exception::Exception); + ASSERT_THROW(col.setFieldLenToValueLen(1, field1Value), exception::Exception); } TEST_F(cta_rdbms_OcciColumnTest, setFieldLen_invalidIndex) { @@ -124,8 +124,8 @@ TEST_F(cta_rdbms_OcciColumnTest, setFieldLen_invalidIndex) { const size_t nbRows = 1; OcciColumn col(colName, nbRows); - const ub2 field1Len = 5678; - ASSERT_THROW(col.setFieldLen(1, field1Len), exception::Exception); + const ub2 field1Value = 5678; + ASSERT_THROW(col.setFieldLenToValueLen(1, field1Value), exception::Exception); } TEST_F(cta_rdbms_OcciColumnTest, getFieldLengths) { @@ -136,18 +136,18 @@ TEST_F(cta_rdbms_OcciColumnTest, getFieldLengths) { const size_t nbRows = 3; OcciColumn col(colName, nbRows); - const ub2 field0Len = 1234; - const ub2 field1Len = 5678; - const ub2 field2Len = 9012; - col.setFieldLen(0, field0Len); - col.setFieldLen(1, field1Len); - col.setFieldLen(2, field2Len); + const ub2 field0Value = 1; + const ub2 field1Value = 22; + const ub2 field2Value = 333; + col.setFieldLenToValueLen(0, field0Value); // Field Length is 1 + 1 + col.setFieldLenToValueLen(1, field1Value); // Field length is 2 + 1 + col.setFieldLenToValueLen(2, field2Value); // Field length is 3 + 1 ub2 *const fieldLens = col.getFieldLengths(); - ASSERT_EQ(field0Len, fieldLens[0]); - ASSERT_EQ(field1Len, fieldLens[1]); - ASSERT_EQ(field2Len, fieldLens[2]); + ASSERT_EQ(2, fieldLens[0]); + ASSERT_EQ(3, fieldLens[1]); + ASSERT_EQ(4, fieldLens[2]); } TEST_F(cta_rdbms_OcciColumnTest, getBuffer) { @@ -158,8 +158,8 @@ TEST_F(cta_rdbms_OcciColumnTest, getBuffer) { const size_t nbRows = 1; OcciColumn col(colName, nbRows); - const ub2 field0Len = 1234; - col.setFieldLen(0, field0Len); + const ub2 field0Value = 1234; + col.setFieldLenToValueLen(0, field0Value); char *const buf = col.getBuffer(); ASSERT_NE(nullptr, buf); @@ -184,16 +184,16 @@ TEST_F(cta_rdbms_OcciColumnTest, getMaxFieldLength) { const size_t nbRows = 4; OcciColumn col(colName, nbRows); - const ub2 field0Len = 1234; - const ub2 field1Len = 5678; - const ub2 field2Len = 9012; - const ub2 field3Len = 3456; - col.setFieldLen(0, field0Len); - col.setFieldLen(1, field1Len); - col.setFieldLen(2, field2Len); - col.setFieldLen(3, field3Len); + const ub2 field0Value = 1; + const ub2 field1Value = 22; + const ub2 field2Value = 333; // Max field length is 3 + 1 + const ub2 field3Value = 1; + col.setFieldLenToValueLen(0, field0Value); + col.setFieldLenToValueLen(1, field1Value); + col.setFieldLenToValueLen(2, field2Value); + col.setFieldLenToValueLen(3, field3Value); - ASSERT_EQ(field2Len, col.getMaxFieldLength()); + ASSERT_EQ(4, col.getMaxFieldLength()); } TEST_F(cta_rdbms_OcciColumnTest, copyStrIntoField_1_oneField) { @@ -205,14 +205,14 @@ TEST_F(cta_rdbms_OcciColumnTest, copyStrIntoField_1_oneField) { OcciColumn col(colName, nbRows); const std::string field0Value = "FIELD 0 VALUE"; - col.setFieldLen(0, field0Value.length() + 1); - col.copyStrIntoField(0, field0Value); + col.setFieldLenToValueLen(0, field0Value); + col.setFieldValue(0, field0Value); char *const buf = col.getBuffer(); ASSERT_EQ(field0Value, std::string(buf)); } -TEST_F(cta_rdbms_OcciColumnTest, copyStrIntoField_twoFields) { +TEST_F(cta_rdbms_OcciColumnTest, setFieldValue_twoFields) { using namespace cta; using namespace cta::rdbms; @@ -222,10 +222,10 @@ TEST_F(cta_rdbms_OcciColumnTest, copyStrIntoField_twoFields) { const std::string field0Value = "FIELD 0 VALUE"; const std::string field1Value = "FIELD 1 VALUE"; - col.setFieldLen(0, field0Value.length() + 1); - col.setFieldLen(1, field1Value.length() + 1); - col.copyStrIntoField(0, field0Value); - col.copyStrIntoField(1, field1Value); + col.setFieldLenToValueLen(0, field0Value); + col.setFieldLenToValueLen(1, field1Value); + col.setFieldValue(0, field0Value); + col.setFieldValue(1, field1Value); char *const buf = col.getBuffer(); const char *const bufField0 = buf; @@ -234,7 +234,7 @@ TEST_F(cta_rdbms_OcciColumnTest, copyStrIntoField_twoFields) { ASSERT_EQ(field1Value, std::string(bufField1)); } -TEST_F(cta_rdbms_OcciColumnTest, copyStrIntoField_tooLong) { +TEST_F(cta_rdbms_OcciColumnTest, setFieldValue_tooLong) { using namespace cta; using namespace cta::rdbms; @@ -242,9 +242,10 @@ TEST_F(cta_rdbms_OcciColumnTest, copyStrIntoField_tooLong) { const size_t nbRows = 1; OcciColumn col(colName, nbRows); + const std::string tooShortValue = "SHORT"; const std::string field0Value = "FIELD 0 VALUE"; - col.setFieldLen(0, 1); - ASSERT_THROW(col.copyStrIntoField(0, field0Value), exception::Exception); + col.setFieldLenToValueLen(0, tooShortValue); + ASSERT_THROW(col.setFieldValue(0, field0Value), exception::Exception); } } // namespace unitTests diff --git a/scheduler/ArchiveJob.cpp b/scheduler/ArchiveJob.cpp index 47e44df24b78f51190cd35fd05effe43d1133451..4839b2ac3c68e164ba85003bb5b38cb5d7e16ddf 100644 --- a/scheduler/ArchiveJob.cpp +++ b/scheduler/ArchiveJob.cpp @@ -81,7 +81,7 @@ bool cta::ArchiveJob::complete() { fileReport.storageClassName = archiveFile.storageClass; fileReport.tapeDrive = m_mount.getDrive(); fileReport.vid = tapeFile.vid; - m_catalogue.filesWrittenToTape (std::list<catalogue::TapeFileWritten>{fileReport}); + m_catalogue.filesWrittenToTape (std::set<catalogue::TapeFileWritten>{fileReport}); //m_ns.addTapeFile(SecurityIdentity(UserIdentity(std::numeric_limits<uint32_t>::max(), // std::numeric_limits<uint32_t>::max()), ""), archiveFile.fileId, nameServerTapeFile); // We will now report the successful archival to the EOS instance. diff --git a/tapeserver/castor/tape/tapeserver/daemon/DataTransferSessionTest.cpp b/tapeserver/castor/tape/tapeserver/daemon/DataTransferSessionTest.cpp index efb015b8bc1d7e3555a8b6ea2e0de5ebe8db214e..0f06b227dd9ef24d5ab79304f97612a4de6c25e1 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/DataTransferSessionTest.cpp +++ b/tapeserver/castor/tape/tapeserver/daemon/DataTransferSessionTest.cpp @@ -405,7 +405,7 @@ TEST_P(DataTransferSessionTest, DataTransferSessionGooddayRecall) { tapeFileWritten.diskFileRecoveryBlob = "B106"; tapeFileWritten.storageClassName = s_storageClassName; tapeFileWritten.tapeDrive = "drive0"; - catalogue.filesWrittenToTape(std::list<cta::catalogue::TapeFileWritten>{tapeFileWritten}); + catalogue.filesWrittenToTape(std::set<cta::catalogue::TapeFileWritten>{tapeFileWritten}); // Schedule the retrieval of the file std::string diskInstance="disk_instance"; @@ -577,7 +577,7 @@ TEST_P(DataTransferSessionTest, DataTransferSessionWrongRecall) { tapeFileWritten.diskFileRecoveryBlob = "B106"; tapeFileWritten.storageClassName = s_storageClassName; tapeFileWritten.tapeDrive = "drive0"; - catalogue.filesWrittenToTape(std::list<cta::catalogue::TapeFileWritten>{tapeFileWritten}); + catalogue.filesWrittenToTape(std::set<cta::catalogue::TapeFileWritten>{tapeFileWritten}); // Create an archive file entry in the archive catalogue tapeFileWritten.archiveFileId=1000 + fseq; @@ -597,7 +597,7 @@ TEST_P(DataTransferSessionTest, DataTransferSessionWrongRecall) { tapeFileWritten.diskFileRecoveryBlob = "B106"; tapeFileWritten.storageClassName = s_storageClassName; tapeFileWritten.tapeDrive = "drive0"; - catalogue.filesWrittenToTape(std::list<cta::catalogue::TapeFileWritten>{tapeFileWritten}); + catalogue.filesWrittenToTape(std::set<cta::catalogue::TapeFileWritten>{tapeFileWritten}); // Schedule the retrieval of the file std::string diskInstance="disk_instance"; @@ -761,7 +761,7 @@ TEST_P(DataTransferSessionTest, DataTransferSessionNoSuchDrive) { tapeFileWritten.diskFileRecoveryBlob = "B106"; tapeFileWritten.storageClassName = s_storageClassName; tapeFileWritten.tapeDrive = "drive0"; - catalogue.filesWrittenToTape(std::list<cta::catalogue::TapeFileWritten>{tapeFileWritten}); + catalogue.filesWrittenToTape(std::set<cta::catalogue::TapeFileWritten>{tapeFileWritten}); // Schedule the retrieval of the file std::string diskInstance="disk_instance"; @@ -904,7 +904,7 @@ TEST_P(DataTransferSessionTest, DataTransferSessionFailtoMount) { tapeFileWritten.diskFileRecoveryBlob = "B106"; tapeFileWritten.storageClassName = s_storageClassName; tapeFileWritten.tapeDrive = "drive0"; - catalogue.filesWrittenToTape(std::list<cta::catalogue::TapeFileWritten>{tapeFileWritten}); + catalogue.filesWrittenToTape(std::set<cta::catalogue::TapeFileWritten>{tapeFileWritten}); // Schedule the retrieval of the file std::string diskInstance="disk_instance";