Skip to content
Snippets Groups Projects
Commit d7ad0872 authored by Steven Murray's avatar Steven Murray
Browse files

OracleCatalogue::filesWrittenToTape() used a batched insert

parent a1e4fafc
No related branches found
No related tags found
No related merge requests found
......@@ -15,6 +15,22 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
cmake_minimum_required (VERSION 2.6)
# OCCI support is on by default
set (OCCI_SUPPORT ON)
# Switch OCCI support off if using gcc 5.1 or higher
if (CMAKE_COMPILER_IS_GNUCC)
if (GCC_VERSION VERSION_GREATER 5.0)
message(STATUS "Detected gcc >= 5.1 - Switching off OCCI support")
set (OCCI_SUPPORT OFF)
endif (GCC_VERSION VERSION_GREATER 5.0)
endif (CMAKE_COMPILER_IS_GNUCC)
if(OCCI_SUPPORT)
find_package (oracle-instantclient REQUIRED)
include_directories (${ORACLE-INSTANTCLIENT_INCLUDE_DIRS})
endif(OCCI_SUPPORT)
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wshadow")
set (CATALOGUE_LIB_SRC_FILES
......
......@@ -24,6 +24,9 @@
#include "common/utils/utils.hpp"
#include "rdbms/AutoRollback.hpp"
#include "rdbms/ConnFactoryFactory.hpp"
#include "rdbms/OcciStmt.hpp"
#include <string.h>
namespace cta {
namespace catalogue {
......@@ -282,13 +285,14 @@ common::dataStructures::Tape OracleCatalogue::selectTapeForUpdate(rdbms::PooledC
//------------------------------------------------------------------------------
void OracleCatalogue::filesWrittenToTape(const std::list<TapeFileWritten> &events) {
try {
if(events.empty()) {
if (events.empty()) {
return;
}
const auto &firstEvent = events.front();
checkTapeFileWrittenFieldsAreSet(firstEvent);
const time_t now = time(nullptr);
const std::string nowStr = std::to_string(now);
std::lock_guard<std::mutex> m_lock(m_mutex);
auto conn = m_connPool.getConn();
rdbms::AutoRollback autoRollback(conn);
......@@ -297,53 +301,90 @@ void OracleCatalogue::filesWrittenToTape(const std::list<TapeFileWritten> &event
uint64_t expectedFSeq = tape.lastFSeq + 1;
uint64_t totalCompressedBytesWritten = 0;
for(const auto &event: events) {
uint32_t vidMaxFieldSize = 0;
uint32_t fSeqMaxFieldSize = 0;
uint32_t blockIdMaxFieldSize = 0;
uint32_t compressedSizeMaxFieldSize = 0;
uint32_t copyNbMaxFieldSize = 0;
const uint32_t creationTimeMaxFieldSize = nowStr.length() + 1;
uint32_t archiveFileIdMaxFieldSize = 0;
std::unique_ptr<ub2[]> vidFieldSizeArray(new ub2[events.size()]);
std::unique_ptr<ub2[]> fSeqFieldSizeArray(new ub2[events.size()]);
std::unique_ptr<ub2[]> blockIdFieldSizeArray(new ub2[events.size()]);
std::unique_ptr<ub2[]> compressedSizeFieldSizeArray(new ub2[events.size()]);
std::unique_ptr<ub2[]> copyNbFieldSizeArray(new ub2[events.size()]);
std::unique_ptr<ub2[]> creationTimeFieldSizeArray(new ub2[events.size()]);
std::unique_ptr<ub2[]> archiveFileIdFieldSizeArray(new ub2[events.size()]);
uint32_t i = 0;
for (const auto &event: events) {
checkTapeFileWrittenFieldsAreSet(firstEvent);
if(event.vid != firstEvent.vid) {
if (event.vid != firstEvent.vid) {
throw exception::Exception(std::string("VID mismatch: expected=") + firstEvent.vid + " actual=event.vid");
}
if(expectedFSeq != event.fSeq) {
if (expectedFSeq != event.fSeq) {
exception::Exception ex;
ex.getMessage() << "FSeq mismatch for tape " << firstEvent.vid << ": expected=" << expectedFSeq << " actual=" <<
firstEvent.fSeq;
firstEvent.fSeq;
throw ex;
}
expectedFSeq++;
totalCompressedBytesWritten += event.compressedSize;
vidFieldSizeArray[i] = event.vid.length() + 1;
if (vidFieldSizeArray[i] > vidMaxFieldSize) {
vidMaxFieldSize = vidFieldSizeArray[i];
}
const std::string fSeqStr = std::to_string(event.fSeq);
fSeqFieldSizeArray[i] = fSeqStr.length() + 1;
if (fSeqFieldSizeArray[i] > fSeqMaxFieldSize) {
fSeqMaxFieldSize = fSeqFieldSizeArray[i];
}
const std::string blockIdStr = std::to_string(event.blockId);
blockIdFieldSizeArray[i] = blockIdStr.length() + 1;
if (blockIdFieldSizeArray[i] > blockIdMaxFieldSize) {
blockIdMaxFieldSize = blockIdFieldSizeArray[i];
}
const std::string compressedSizeStr = std::to_string(event.compressedSize);
compressedSizeFieldSizeArray[i] = compressedSizeStr.length() + 1;
if (compressedSizeFieldSizeArray[i] > compressedSizeMaxFieldSize) {
compressedSizeMaxFieldSize = compressedSizeFieldSizeArray[i];
}
const std::string copyNbStr = std::to_string(event.copyNb);
copyNbFieldSizeArray[i] = copyNbStr.length() + 1;
if (copyNbFieldSizeArray[i] > copyNbMaxFieldSize) {
copyNbMaxFieldSize = copyNbFieldSizeArray[i];
}
creationTimeFieldSizeArray[i] = creationTimeMaxFieldSize;
const std::string archiveFileIdStr = std::to_string(event.archiveFileId);
archiveFileIdFieldSizeArray[i] = archiveFileIdStr.length() + 1;
if (archiveFileIdFieldSizeArray[i] > archiveFileIdMaxFieldSize) {
archiveFileIdMaxFieldSize = archiveFileIdFieldSizeArray[i];
}
i++;
}
const TapeFileWritten &lastEvent = events.back();
updateTape(conn, rdbms::Stmt::AutocommitMode::OFF, lastEvent.vid, lastEvent.fSeq, totalCompressedBytesWritten,
lastEvent.tapeDrive);
const char *const sql =
"INSERT INTO TAPE_FILE("
"VID,"
"FSEQ,"
"BLOCK_ID,"
"COMPRESSED_SIZE_IN_BYTES,"
"COPY_NB,"
"CREATION_TIME,"
"ARCHIVE_FILE_ID)"
"VALUES("
":VID,"
":FSEQ,"
":BLOCK_ID,"
":COMPRESSED_SIZE_IN_BYTES,"
":COPY_NB,"
":CREATION_TIME,"
":ARCHIVE_FILE_ID)";
for(const auto &event: events) {
checkTapeFileWrittenFieldsAreSet(firstEvent);
// To be moved to the queueing of the archive request
for (const auto &event: events) {
std::unique_ptr<common::dataStructures::ArchiveFile> archiveFile = getArchiveFile(conn, event.archiveFileId);
// If the archive file does not already exist
if(nullptr == archiveFile.get()) {
if (nullptr == archiveFile.get()) {
// Create one
ArchiveFileRow row;
row.archiveFileId = event.archiveFileId;
......@@ -361,22 +402,98 @@ void OracleCatalogue::filesWrittenToTape(const std::list<TapeFileWritten> &event
} else {
throwIfCommonEventDataMismatch(*archiveFile, event);
}
}
auto stmt = conn.createStmt(sql, rdbms::Stmt::AutocommitMode::OFF);
stmt->bindString(":VID", event.vid);
stmt->bindUint64(":FSEQ", event.fSeq);
stmt->bindUint64(":BLOCK_ID", event.blockId);
stmt->bindUint64(":COMPRESSED_SIZE_IN_BYTES", event.compressedSize);
stmt->bindUint64(":COPY_NB", event.copyNb);
stmt->bindUint64(":CREATION_TIME", now);
stmt->bindUint64(":ARCHIVE_FILE_ID", event.archiveFileId);
stmt->executeNonQuery();
} // for(const auto &event: events)
std::unique_ptr<char[]> vidCStrArray(new char[events.size() * vidMaxFieldSize]);
std::unique_ptr<char[]> fSeqCStrArray(new char[events.size() * fSeqMaxFieldSize]);
std::unique_ptr<char[]> blockIdCStrArray(new char[events.size() * blockIdMaxFieldSize]);
std::unique_ptr<char[]> compressedSizeCStrArray(new char[events.size() * compressedSizeMaxFieldSize]);
std::unique_ptr<char[]> copyNbCStrArray(new char[events.size() * copyNbMaxFieldSize]);
std::unique_ptr<char[]> creationTimeCStrArray(new char[events.size() * creationTimeMaxFieldSize]);
std::unique_ptr<char[]> archiveFileIdCStrArray(new char[events.size() * archiveFileIdMaxFieldSize]);
i = 0;
for (const auto &event: events) {
{
char *const element = vidCStrArray.get() + i * vidMaxFieldSize;
strncpy(element, event.vid.c_str(), vidMaxFieldSize);
element[vidMaxFieldSize - 1] = '\0';
}
{
char *const element = fSeqCStrArray.get() + i * fSeqMaxFieldSize;
strncpy(element, std::to_string(event.fSeq).c_str(), fSeqMaxFieldSize);
element[fSeqMaxFieldSize - 1] = '\0';
}
{
char *const element = blockIdCStrArray.get() + i * blockIdMaxFieldSize;
strncpy(element, std::to_string(event.blockId).c_str(), blockIdMaxFieldSize);
element[blockIdMaxFieldSize - 1] = '\0';
}
{
char *const element = compressedSizeCStrArray.get() + i * compressedSizeMaxFieldSize;
strncpy(element, std::to_string(event.compressedSize).c_str(), compressedSizeMaxFieldSize);
element[compressedSizeMaxFieldSize - 1] = '\0';
}
{
char *const element = copyNbCStrArray.get() + i * copyNbMaxFieldSize;
strncpy(element, std::to_string(event.copyNb).c_str(), copyNbMaxFieldSize);
element[copyNbMaxFieldSize - 1] = '\0';
}
{
char *const element = creationTimeCStrArray.get() + i * creationTimeMaxFieldSize;
strncpy(element, nowStr.c_str(), creationTimeMaxFieldSize);
element[creationTimeMaxFieldSize - 1] = '\0';
}
{
char *const element = archiveFileIdCStrArray.get() + i * archiveFileIdMaxFieldSize;
strncpy(element, std::to_string(event.archiveFileId).c_str(), archiveFileIdMaxFieldSize);
element[archiveFileIdMaxFieldSize - 1] = '\0';
}
i++;
}
const char *const sql =
"INSERT INTO TAPE_FILE("
"VID,"
"FSEQ,"
"BLOCK_ID,"
"COMPRESSED_SIZE_IN_BYTES,"
"COPY_NB,"
"CREATION_TIME,"
"ARCHIVE_FILE_ID)"
"VALUES("
":VID,"
":FSEQ,"
":BLOCK_ID,"
":COMPRESSED_SIZE_IN_BYTES,"
":COPY_NB,"
":CREATION_TIME,"
":ARCHIVE_FILE_ID)";
auto stmt = conn.createStmt(sql, rdbms::Stmt::AutocommitMode::OFF);
rdbms::OcciStmt &occiStmt = dynamic_cast<rdbms::OcciStmt &>(*stmt);
occiStmt->setDataBuffer(occiStmt.getParamIdx(":VID"), vidCStrArray.get(), oracle::occi::OCCI_SQLT_STR,
vidMaxFieldSize, vidFieldSizeArray.get());
occiStmt->setDataBuffer(occiStmt.getParamIdx(":FSEQ"), fSeqCStrArray.get(), oracle::occi::OCCI_SQLT_STR,
fSeqMaxFieldSize, fSeqFieldSizeArray.get());
occiStmt->setDataBuffer(occiStmt.getParamIdx(":BLOCK_ID"), blockIdCStrArray.get(), oracle::occi::OCCI_SQLT_STR,
blockIdMaxFieldSize, blockIdFieldSizeArray.get());
occiStmt->setDataBuffer(occiStmt.getParamIdx(":COMPRESSED_SIZE_IN_BYTES"), compressedSizeCStrArray.get(),
oracle::occi::OCCI_SQLT_STR, compressedSizeMaxFieldSize, compressedSizeFieldSizeArray.get());
occiStmt->setDataBuffer(occiStmt.getParamIdx(":COPY_NB"), copyNbCStrArray.get(), oracle::occi::OCCI_SQLT_STR,
copyNbMaxFieldSize, copyNbFieldSizeArray.get());
occiStmt->setDataBuffer(occiStmt.getParamIdx(":CREATION_TIME"), creationTimeCStrArray.get(),
oracle::occi::OCCI_SQLT_STR, creationTimeMaxFieldSize, creationTimeFieldSizeArray.get());
occiStmt->setDataBuffer(occiStmt.getParamIdx(":ARCHIVE_FILE_ID"), archiveFileIdCStrArray.get(),
oracle::occi::OCCI_SQLT_STR, archiveFileIdMaxFieldSize, archiveFileIdFieldSizeArray.get());
occiStmt->executeArrayUpdate(events.size());
conn.commit();
} catch(exception::Exception &ex) {
throw exception::Exception(std::string(__FUNCTION__) + " failed: " + ex.getMessage().str());
} catch(std::exception &se) {
throw exception::Exception(std::string(__FUNCTION__) + " failed: " + se.what());
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment