-
Victor Kotlyar authoredVictor Kotlyar authored
OracleCatalogue.cpp 23.45 KiB
/*
* The CERN Tape Archive(CTA) project
* Copyright(C) 2015 CERN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
*(at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "catalogue/ArchiveFileRow.hpp"
#include "catalogue/OracleCatalogue.hpp"
#include "common/exception/UserError.hpp"
#include "common/exception/Exception.hpp"
#include "common/make_unique.hpp"
#include "common/threading/MutexLocker.hpp"
#include "common/utils/utils.hpp"
#include "rdbms/AutoRollback.hpp"
#include "rdbms/ConnFactoryFactory.hpp"
#include "rdbms/OcciStmt.hpp"
#include <string.h>
namespace cta {
namespace catalogue {
//------------------------------------------------------------------------------
// constructor
//------------------------------------------------------------------------------
OracleCatalogue::OracleCatalogue(
log::Logger &log,
const std::string &username,
const std::string &password,
const std::string &database,
const uint64_t nbConns,
const uint64_t nbArchiveFileListingConns):
RdbmsCatalogue(
log,
rdbms::ConnFactoryFactory::create(rdbms::Login(rdbms::Login::DBTYPE_ORACLE, username, password, database)),
nbConns,
nbArchiveFileListingConns) {
}
//------------------------------------------------------------------------------
// destructor
//------------------------------------------------------------------------------
OracleCatalogue::~OracleCatalogue() {
}
//------------------------------------------------------------------------------
// deleteArchiveFile
//------------------------------------------------------------------------------
void OracleCatalogue::deleteArchiveFile(const std::string &diskInstanceName, const uint64_t archiveFileId) {
try {
const char *selectSql =
"SELECT "
"ARCHIVE_FILE.ARCHIVE_FILE_ID AS ARCHIVE_FILE_ID,"
"ARCHIVE_FILE.DISK_INSTANCE_NAME AS DISK_INSTANCE_NAME,"
"ARCHIVE_FILE.DISK_FILE_ID AS DISK_FILE_ID,"
"ARCHIVE_FILE.DISK_FILE_PATH AS DISK_FILE_PATH,"
"ARCHIVE_FILE.DISK_FILE_USER AS DISK_FILE_USER,"
"ARCHIVE_FILE.DISK_FILE_GROUP AS DISK_FILE_GROUP,"
"ARCHIVE_FILE.DISK_FILE_RECOVERY_BLOB AS DISK_FILE_RECOVERY_BLOB,"
"ARCHIVE_FILE.SIZE_IN_BYTES AS SIZE_IN_BYTES,"
"ARCHIVE_FILE.CHECKSUM_TYPE AS CHECKSUM_TYPE,"
"ARCHIVE_FILE.CHECKSUM_VALUE AS CHECKSUM_VALUE,"
"ARCHIVE_FILE.STORAGE_CLASS_NAME AS STORAGE_CLASS_NAME,"
"ARCHIVE_FILE.CREATION_TIME AS ARCHIVE_FILE_CREATION_TIME,"
"ARCHIVE_FILE.RECONCILIATION_TIME AS RECONCILIATION_TIME,"
"TAPE_FILE.VID AS VID,"
"TAPE_FILE.FSEQ AS FSEQ,"
"TAPE_FILE.BLOCK_ID AS BLOCK_ID,"
"TAPE_FILE.COMPRESSED_SIZE_IN_BYTES AS COMPRESSED_SIZE_IN_BYTES,"
"TAPE_FILE.COPY_NB AS COPY_NB,"
"TAPE_FILE.CREATION_TIME AS TAPE_FILE_CREATION_TIME "
"FROM "
"ARCHIVE_FILE "
"LEFT OUTER JOIN TAPE_FILE ON "
"ARCHIVE_FILE.ARCHIVE_FILE_ID = TAPE_FILE.ARCHIVE_FILE_ID "
"WHERE "
"ARCHIVE_FILE.ARCHIVE_FILE_ID = :ARCHIVE_FILE_ID "
"FOR UPDATE";
auto conn = m_connPool.getConn();
auto selectStmt = conn.createStmt(selectSql, rdbms::Stmt::AutocommitMode::OFF);
selectStmt->bindUint64(":ARCHIVE_FILE_ID", archiveFileId);
rdbms::Rset selectRset = selectStmt->executeQuery();
std::unique_ptr<common::dataStructures::ArchiveFile> archiveFile;
while(selectRset.next()) {
if(nullptr == archiveFile.get()) {
archiveFile = cta::make_unique<common::dataStructures::ArchiveFile>();
archiveFile->archiveFileID = selectRset.columnUint64("ARCHIVE_FILE_ID");
archiveFile->diskInstance = selectRset.columnString("DISK_INSTANCE_NAME");
archiveFile->diskFileId = selectRset.columnString("DISK_FILE_ID");
archiveFile->diskFileInfo.path = selectRset.columnString("DISK_FILE_PATH");
archiveFile->diskFileInfo.owner = selectRset.columnString("DISK_FILE_USER");
archiveFile->diskFileInfo.group = selectRset.columnString("DISK_FILE_GROUP");
archiveFile->diskFileInfo.recoveryBlob = selectRset.columnString("DISK_FILE_RECOVERY_BLOB");
archiveFile->fileSize = selectRset.columnUint64("SIZE_IN_BYTES");
archiveFile->checksumType = selectRset.columnString("CHECKSUM_TYPE");
archiveFile->checksumValue = selectRset.columnString("CHECKSUM_VALUE");
archiveFile->storageClass = selectRset.columnString("STORAGE_CLASS_NAME");
archiveFile->creationTime = selectRset.columnUint64("ARCHIVE_FILE_CREATION_TIME");
archiveFile->reconciliationTime = selectRset.columnUint64("RECONCILIATION_TIME");
}
// If there is a tape file
if(!selectRset.columnIsNull("VID")) {
// Add the tape file to the archive file's in-memory structure
common::dataStructures::TapeFile tapeFile;
tapeFile.vid = selectRset.columnString("VID");
tapeFile.fSeq = selectRset.columnUint64("FSEQ");
tapeFile.blockId = selectRset.columnUint64("BLOCK_ID");
tapeFile.compressedSize = selectRset.columnUint64("COMPRESSED_SIZE_IN_BYTES");
tapeFile.copyNb = selectRset.columnUint64("COPY_NB");
tapeFile.creationTime = selectRset.columnUint64("TAPE_FILE_CREATION_TIME");
tapeFile.checksumType = archiveFile->checksumType; // Duplicated for convenience
tapeFile.checksumValue = archiveFile->checksumValue; // Duplicated for convenience
archiveFile->tapeFiles[selectRset.columnUint64("COPY_NB")] = tapeFile;
}
}
if(nullptr == archiveFile.get()) {
std::list<cta::log::Param> params;
params.push_back(cta::log::Param("fileId", std::to_string(archiveFileId)));
m_log(log::WARNING, "Ignoring request to delete Archive File because it does not exist in the catalogue", params);
return;
}
if(diskInstanceName != archiveFile->diskInstance) {
exception::UserError ue;
ue.getMessage() << "Failed to delete archive file with ID " << archiveFileId << " because the disk instance of "
"the request does not match that of the archived file: archiveFileId=" << archiveFileId << " path=" <<
archiveFile->diskFileInfo.path << " requestDiskInstance=" << diskInstanceName << " archiveFileDiskInstance=" <<
archiveFile->diskInstance;
throw ue;
}
{
const char *const sql = "DELETE FROM TAPE_FILE WHERE ARCHIVE_FILE_ID = :ARCHIVE_FILE_ID";
auto stmt = conn.createStmt(sql, rdbms::Stmt::AutocommitMode::OFF);
stmt->bindUint64(":ARCHIVE_FILE_ID", archiveFileId);
stmt->executeNonQuery();
}
{
const char *const sql = "DELETE FROM ARCHIVE_FILE WHERE ARCHIVE_FILE_ID = :ARCHIVE_FILE_ID";
auto stmt = conn.createStmt(sql, rdbms::Stmt::AutocommitMode::OFF);
stmt->bindUint64(":ARCHIVE_FILE_ID", archiveFileId);
stmt->executeNonQuery();
}
conn.commit();
std::list<cta::log::Param> params;
params.push_back(cta::log::Param("fileId", std::to_string(archiveFile->archiveFileID)));
params.push_back(cta::log::Param("diskInstance", archiveFile->diskInstance));
params.push_back(cta::log::Param("diskFileId", archiveFile->diskFileId));
params.push_back(cta::log::Param("diskFileInfo.path", archiveFile->diskFileInfo.path));
params.push_back(cta::log::Param("diskFileInfo.owner", archiveFile->diskFileInfo.owner));
params.push_back(cta::log::Param("diskFileInfo.group", archiveFile->diskFileInfo.group));
params.push_back(cta::log::Param("diskFileInfo.recoveryBlob", archiveFile->diskFileInfo.recoveryBlob));
params.push_back(cta::log::Param("fileSize", std::to_string(archiveFile->fileSize)));
params.push_back(cta::log::Param("checksumType", archiveFile->checksumType));
params.push_back(cta::log::Param("checksumValue", archiveFile->checksumValue));
params.push_back(cta::log::Param("creationTime", std::to_string(archiveFile->creationTime)));
params.push_back(cta::log::Param("reconciliationTime", std::to_string(archiveFile->reconciliationTime)));
params.push_back(cta::log::Param("storageClass", archiveFile->storageClass));
for(auto it=archiveFile->tapeFiles.begin(); it!=archiveFile->tapeFiles.end(); it++) {
std::stringstream tapeCopyLogStream;
tapeCopyLogStream << "copy number: " << it->first
<< " vid: " << it->second.vid
<< " fSeq: " << it->second.fSeq
<< " blockId: " << it->second.blockId
<< " creationTime: " << it->second.creationTime
<< " compressedSize: " << it->second.compressedSize
<< " checksumType: " << it->second.checksumType //this shouldn't be here: repeated field
<< " checksumValue: " << it->second.checksumValue //this shouldn't be here: repeated field
<< " copyNb: " << it->second.copyNb; //this shouldn't be here: repeated field
params.push_back(cta::log::Param("TAPE FILE", tapeCopyLogStream.str()));
}
m_log(log::INFO, "Archive File Deleted", params);
} catch(exception::UserError &) {
throw;
} catch(exception::Exception &ex) {
throw exception::Exception(std::string(__FUNCTION__) + " failed: " + ex.getMessage().str());
}
}
//------------------------------------------------------------------------------
// getNextArchiveFileId
//------------------------------------------------------------------------------
uint64_t OracleCatalogue::getNextArchiveFileId(rdbms::PooledConn &conn) {
try {
const char *const sql =
"SELECT "
"ARCHIVE_FILE_ID_SEQ.NEXTVAL AS ARCHIVE_FILE_ID "
"FROM "
"DUAL";
auto stmt = conn.createStmt(sql, rdbms::Stmt::AutocommitMode::OFF);
auto rset = stmt->executeQuery();
if (!rset.next()) {
throw exception::Exception(std::string("Result set is unexpectedly empty"));
}
return rset.columnUint64("ARCHIVE_FILE_ID");
} catch(exception::Exception &ex) {
throw exception::Exception(std::string(__FUNCTION__) + " failed: " + ex.getMessage().str());
}
}
//------------------------------------------------------------------------------
// selectTapeForUpdate
//------------------------------------------------------------------------------
common::dataStructures::Tape OracleCatalogue::selectTapeForUpdate(rdbms::PooledConn &conn, const std::string &vid) {
try {
const char *const sql =
"SELECT "
"VID AS VID,"
"LOGICAL_LIBRARY_NAME AS LOGICAL_LIBRARY_NAME,"
"TAPE_POOL_NAME AS TAPE_POOL_NAME,"
"ENCRYPTION_KEY AS ENCRYPTION_KEY,"
"CAPACITY_IN_BYTES AS CAPACITY_IN_BYTES,"
"DATA_IN_BYTES AS DATA_IN_BYTES,"
"LAST_FSEQ AS LAST_FSEQ,"
"IS_DISABLED AS IS_DISABLED,"
"IS_FULL AS IS_FULL,"
"LBP_IS_ON AS LBP_IS_ON,"
"LABEL_DRIVE AS LABEL_DRIVE,"
"LABEL_TIME AS LABEL_TIME,"
"LAST_READ_DRIVE AS LAST_READ_DRIVE,"
"LAST_READ_TIME AS LAST_READ_TIME,"
"LAST_WRITE_DRIVE AS LAST_WRITE_DRIVE,"
"LAST_WRITE_TIME AS LAST_WRITE_TIME,"
"USER_COMMENT AS USER_COMMENT,"
"CREATION_LOG_USER_NAME AS CREATION_LOG_USER_NAME,"
"CREATION_LOG_HOST_NAME AS CREATION_LOG_HOST_NAME,"
"CREATION_LOG_TIME AS CREATION_LOG_TIME,"
"LAST_UPDATE_USER_NAME AS LAST_UPDATE_USER_NAME,"
"LAST_UPDATE_HOST_NAME AS LAST_UPDATE_HOST_NAME,"
"LAST_UPDATE_TIME AS LAST_UPDATE_TIME "
"FROM "
"TAPE "
"WHERE "
"VID = :VID "
"FOR UPDATE";
auto stmt = conn.createStmt(sql, rdbms::Stmt::AutocommitMode::OFF);
stmt->bindString(":VID", vid);
auto rset = stmt->executeQuery();
if (!rset.next()) {
throw exception::Exception(std::string("The tape with VID " + vid + " does not exist"));
}
common::dataStructures::Tape tape;
tape.vid = rset.columnString("VID");
tape.logicalLibraryName = rset.columnString("LOGICAL_LIBRARY_NAME");
tape.tapePoolName = rset.columnString("TAPE_POOL_NAME");
tape.encryptionKey = rset.columnOptionalString("ENCRYPTION_KEY");
tape.capacityInBytes = rset.columnUint64("CAPACITY_IN_BYTES");
tape.dataOnTapeInBytes = rset.columnUint64("DATA_IN_BYTES");
tape.lastFSeq = rset.columnUint64("LAST_FSEQ");
tape.disabled = rset.columnBool("IS_DISABLED");
tape.full = rset.columnBool("IS_FULL");
tape.lbp = rset.columnOptionalBool("LBP_IS_ON");
tape.labelLog = getTapeLogFromRset(rset, "LABEL_DRIVE", "LABEL_TIME");
tape.lastReadLog = getTapeLogFromRset(rset, "LAST_READ_DRIVE", "LAST_READ_TIME");
tape.lastWriteLog = getTapeLogFromRset(rset, "LAST_WRITE_DRIVE", "LAST_WRITE_TIME");
tape.comment = rset.columnString("USER_COMMENT");
common::dataStructures::UserIdentity creatorUI;
creatorUI.name = rset.columnString("CREATION_LOG_USER_NAME");
common::dataStructures::EntryLog creationLog;
creationLog.username = rset.columnString("CREATION_LOG_USER_NAME");
creationLog.host = rset.columnString("CREATION_LOG_HOST_NAME");
creationLog.time = rset.columnUint64("CREATION_LOG_TIME");
tape.creationLog = creationLog;
common::dataStructures::UserIdentity updaterUI;
updaterUI.name = rset.columnString("LAST_UPDATE_USER_NAME");
common::dataStructures::EntryLog updateLog;
updateLog.username = rset.columnString("LAST_UPDATE_USER_NAME");
updateLog.host = rset.columnString("LAST_UPDATE_HOST_NAME");
updateLog.time = rset.columnUint64("LAST_UPDATE_TIME");
tape.lastModificationLog = updateLog;
return tape;
} catch (exception::Exception &ex) {
throw exception::Exception(std::string(__FUNCTION__) + " failed: " + ex.getMessage().str());
}
}
//------------------------------------------------------------------------------
// filesWrittenToTape
//------------------------------------------------------------------------------
void OracleCatalogue::filesWrittenToTape(const std::set<TapeFileWritten> &events) {
try {
if (events.empty()) {
return;
}
auto firstEventItor = events.begin();
const auto &firstEvent = *firstEventItor;
checkTapeFileWrittenFieldsAreSet(firstEvent);
const time_t now = time(nullptr);
threading::MutexLocker locker(m_mutex);
auto conn = m_connPool.getConn();
rdbms::AutoRollback autoRollback(conn);
const auto tape = selectTapeForUpdate(conn, firstEvent.vid);
uint64_t expectedFSeq = tape.lastFSeq + 1;
uint64_t totalCompressedBytesWritten = 0;
uint32_t i = 0;
TapeFileBatch tapeFileBatch(events.size());
for (const auto &event: events) {
checkTapeFileWrittenFieldsAreSet(firstEvent);
if (event.vid != firstEvent.vid) {
throw exception::Exception(std::string("VID mismatch: expected=") + firstEvent.vid + " actual=" + event.vid);
}
if (expectedFSeq != event.fSeq) {
exception::Exception ex;
ex.getMessage() << "FSeq mismatch for tape " << firstEvent.vid << ": expected=" << expectedFSeq << " actual=" <<
firstEvent.fSeq;
throw ex;
}
expectedFSeq++;
totalCompressedBytesWritten += event.compressedSize;
// Store the length of each field and implicitly calculate the maximum field
// length of each column
tapeFileBatch.vid.setFieldLenToValueLen(i, event.vid);
tapeFileBatch.fSeq.setFieldLenToValueLen(i, event.fSeq);
tapeFileBatch.blockId.setFieldLenToValueLen(i, event.blockId);
tapeFileBatch.compressedSize.setFieldLenToValueLen(i, event.compressedSize);
tapeFileBatch.copyNb.setFieldLenToValueLen(i, event.copyNb);
tapeFileBatch.creationTime.setFieldLenToValueLen(i, now);
tapeFileBatch.archiveFileId.setFieldLenToValueLen(i, event.archiveFileId);
i++;
}
auto lastEventItor = events.cend();
lastEventItor--;
const TapeFileWritten &lastEvent = *lastEventItor;
updateTape(conn, rdbms::Stmt::AutocommitMode::OFF, lastEvent.vid, lastEvent.fSeq, totalCompressedBytesWritten,
lastEvent.tapeDrive);
idempotentBatchInsertArchiveFiles(conn, rdbms::Stmt::AutocommitMode::OFF, events);
// Store the value of each field
i = 0;
for (const auto &event: events) {
tapeFileBatch.vid.setFieldValue(i, event.vid);
tapeFileBatch.fSeq.setFieldValue(i, event.fSeq);
tapeFileBatch.blockId.setFieldValue(i, event.blockId);
tapeFileBatch.compressedSize.setFieldValue(i, event.compressedSize);
tapeFileBatch.copyNb.setFieldValue(i, event.copyNb);
tapeFileBatch.creationTime.setFieldValue(i, now);
tapeFileBatch.archiveFileId.setFieldValue(i, event.archiveFileId);
i++;
}
const char *const sql =
"INSERT INTO TAPE_FILE("
"VID,"
"FSEQ,"
"BLOCK_ID,"
"COMPRESSED_SIZE_IN_BYTES,"
"COPY_NB,"
"CREATION_TIME,"
"ARCHIVE_FILE_ID)"
"VALUES("
":VID,"
":FSEQ,"
":BLOCK_ID,"
":COMPRESSED_SIZE_IN_BYTES,"
":COPY_NB,"
":CREATION_TIME,"
":ARCHIVE_FILE_ID)";
auto stmt = conn.createStmt(sql, rdbms::Stmt::AutocommitMode::OFF);
rdbms::OcciStmt &occiStmt = dynamic_cast<rdbms::OcciStmt &>(*stmt);
occiStmt.setColumn(tapeFileBatch.vid);
occiStmt.setColumn(tapeFileBatch.fSeq);
occiStmt.setColumn(tapeFileBatch.blockId);
occiStmt.setColumn(tapeFileBatch.compressedSize);
occiStmt.setColumn(tapeFileBatch.copyNb);
occiStmt.setColumn(tapeFileBatch.creationTime);
occiStmt.setColumn(tapeFileBatch.archiveFileId);
occiStmt->executeArrayUpdate(tapeFileBatch.nbRows);
conn.commit();
} catch(exception::Exception &ex) {
throw exception::Exception(std::string(__FUNCTION__) + " failed: " + ex.getMessage().str());
} catch(std::exception &se) {
throw exception::Exception(std::string(__FUNCTION__) + " failed: " + se.what());
}
}
//------------------------------------------------------------------------------
// idempotentBatchInsertArchiveFiles
//------------------------------------------------------------------------------
void OracleCatalogue::idempotentBatchInsertArchiveFiles(rdbms::PooledConn &conn,
const rdbms::Stmt::AutocommitMode autocommitMode, const std::set<TapeFileWritten> &events) {
try {
ArchiveFileBatch archiveFileBatch(events.size());
const time_t now = time(nullptr);
// Store the length of each field and implicitly calculate the maximum field
// length of each column
uint32_t i = 0;
for (const auto &event: events) {
archiveFileBatch.archiveFileId.setFieldLenToValueLen(i, event.archiveFileId);
archiveFileBatch.diskInstance.setFieldLenToValueLen(i, event.diskInstance);
archiveFileBatch.diskFileId.setFieldLenToValueLen(i, event.diskFileId);
archiveFileBatch.diskFilePath.setFieldLenToValueLen(i, event.diskFilePath);
archiveFileBatch.diskFileUser.setFieldLenToValueLen(i, event.diskFileUser);
archiveFileBatch.diskFileGroup.setFieldLenToValueLen(i, event.diskFileGroup);
archiveFileBatch.diskFileRecoveryBlob.setFieldLenToValueLen(i, event.diskFileRecoveryBlob);
archiveFileBatch.size.setFieldLenToValueLen(i, event.size);
archiveFileBatch.checksumType.setFieldLenToValueLen(i, event.checksumType);
archiveFileBatch.checksumValue.setFieldLenToValueLen(i, event.checksumValue);
archiveFileBatch.storageClassName.setFieldLenToValueLen(i, event.storageClassName);
archiveFileBatch.creationTime.setFieldLenToValueLen(i, now);
archiveFileBatch.reconciliationTime.setFieldLenToValueLen(i, now);
i++;
}
// Store the value of each field
i = 0;
for (const auto &event: events) {
archiveFileBatch.archiveFileId.setFieldValue(i, event.archiveFileId);
archiveFileBatch.diskInstance.setFieldValue(i, event.diskInstance);
archiveFileBatch.diskFileId.setFieldValue(i, event.diskFileId);
archiveFileBatch.diskFilePath.setFieldValue(i, event.diskFilePath);
archiveFileBatch.diskFileUser.setFieldValue(i, event.diskFileUser);
archiveFileBatch.diskFileGroup.setFieldValue(i, event.diskFileGroup);
archiveFileBatch.diskFileRecoveryBlob.setFieldValue(i, event.diskFileRecoveryBlob);
archiveFileBatch.size.setFieldValue(i, event.size);
archiveFileBatch.checksumType.setFieldValue(i, event.checksumType);
archiveFileBatch.checksumValue.setFieldValue(i, event.checksumValue);
archiveFileBatch.storageClassName.setFieldValue(i, event.storageClassName);
archiveFileBatch.creationTime.setFieldValue(i, now);
archiveFileBatch.reconciliationTime.setFieldValue(i, now);
i++;
}
const char *const sql =
"INSERT INTO ARCHIVE_FILE("
"ARCHIVE_FILE_ID,"
"DISK_INSTANCE_NAME,"
"DISK_FILE_ID,"
"DISK_FILE_PATH,"
"DISK_FILE_USER,"
"DISK_FILE_GROUP,"
"DISK_FILE_RECOVERY_BLOB,"
"SIZE_IN_BYTES,"
"CHECKSUM_TYPE,"
"CHECKSUM_VALUE,"
"STORAGE_CLASS_NAME,"
"CREATION_TIME,"
"RECONCILIATION_TIME)"
"VALUES("
":ARCHIVE_FILE_ID,"
":DISK_INSTANCE_NAME,"
":DISK_FILE_ID,"
":DISK_FILE_PATH,"
":DISK_FILE_USER,"
":DISK_FILE_GROUP,"
":DISK_FILE_RECOVERY_BLOB,"
":SIZE_IN_BYTES,"
":CHECKSUM_TYPE,"
":CHECKSUM_VALUE,"
":STORAGE_CLASS_NAME,"
":CREATION_TIME,"
":RECONCILIATION_TIME)";
auto stmt = conn.createStmt(sql, autocommitMode);
rdbms::OcciStmt &occiStmt = dynamic_cast<rdbms::OcciStmt &>(*stmt);
occiStmt->setBatchErrorMode(true);
occiStmt.setColumn(archiveFileBatch.archiveFileId);
occiStmt.setColumn(archiveFileBatch.diskInstance);
occiStmt.setColumn(archiveFileBatch.diskFileId);
occiStmt.setColumn(archiveFileBatch.diskFilePath);
occiStmt.setColumn(archiveFileBatch.diskFileUser);
occiStmt.setColumn(archiveFileBatch.diskFileGroup);
occiStmt.setColumn(archiveFileBatch.diskFileRecoveryBlob);
occiStmt.setColumn(archiveFileBatch.size);
occiStmt.setColumn(archiveFileBatch.checksumType);
occiStmt.setColumn(archiveFileBatch.checksumValue);
occiStmt.setColumn(archiveFileBatch.storageClassName);
occiStmt.setColumn(archiveFileBatch.creationTime);
occiStmt.setColumn(archiveFileBatch.reconciliationTime);
try {
occiStmt->executeArrayUpdate(archiveFileBatch.nbRows);
} catch(oracle::occi::BatchSQLException &be) {
const int nbFailedRows = be.getFailedRowCount();
exception::Exception ex;
ex.getMessage() << "Caught a BatchSQLException" << nbFailedRows;
bool foundErrorOtherThanUniqueConstraint = false;
for (int row = 0; row < nbFailedRows; row++ ) {
oracle::occi::SQLException err = be.getException(row);
const unsigned int rowIndex = be.getRowNum(row);
const int errorCode = err.getErrorCode();
// If the error is anything other than a unique constraint error
if(1 != errorCode) {
foundErrorOtherThanUniqueConstraint = true;
ex.getMessage() << ": Row " << rowIndex << " generated ORA error " << errorCode;
}
}
if (foundErrorOtherThanUniqueConstraint) {
throw ex;
}
} catch(std::exception &se) {
throw exception::Exception(std::string("executeArrayUpdate failed: ") + se.what());
}
} catch(exception::Exception &ex) {
throw exception::Exception(std::string(__FUNCTION__) + " failed: " + ex.getMessage().str());
}
}
} // namespace catalogue
} // namespace cta