Commit a86e8704 authored by Eric Cano's avatar Eric Cano
Browse files

Added support for multiple items recording in the catalogue.

The catalogue can receive polymorphic items as input for recording.
The strict sequencing of fSeqs is still enforced.
Currently, 2 items types are recorded: files and placehoders.
Files are recorded to the tape catalogue, while place holders are not,
but they are used to enforce the fSeq sequencing.
parent b2f1c4e5
......@@ -34,6 +34,7 @@ set (CATALOGUE_LIB_SRC_FILES
OracleCatalogue.cpp
SqliteCatalogueSchema.cpp
TapeFileWritten.cpp
TapeItemImplementation.cpp
TapePool.cpp
RdbmsArchiveFileItorImpl.cpp
RdbmsCatalogue.cpp
......
......@@ -20,6 +20,7 @@
#include "catalogue/ArchiveFileItor.hpp"
#include "catalogue/TapeFileSearchCriteria.hpp"
#include "catalogue/TapeItemWrittenPointer.hpp"
#include "catalogue/TapeFileWritten.hpp"
#include "catalogue/TapeForWriting.hpp"
#include "catalogue/TapePool.hpp"
......@@ -66,6 +67,7 @@
#include <set>
#include <stdint.h>
#include <string>
#include <memory>
namespace cta {
......@@ -151,7 +153,7 @@ public:
*
* @param events The tape file written events.
*/
virtual void filesWrittenToTape(const std::set<TapeFileWritten> &event) = 0;
virtual void filesWrittenToTape(const std::set<TapeItemWrittenPointer> &event) = 0;
/**
* Notifies the CTA catalogue that the specified tape has been mounted in
......
......@@ -81,7 +81,7 @@ public:
return retryOnLostConnection(m_log, [&]{return m_catalogue->getTapesForWriting(logicalLibraryName);}, m_maxTriesToConnect);
}
void filesWrittenToTape(const std::set<TapeFileWritten> &event) override {
void filesWrittenToTape(const std::set<TapeItemWrittenPointer> &event) override {
return retryOnLostConnection(m_log, [&]{return m_catalogue->filesWrittenToTape(event);}, m_maxTriesToConnect);
}
......
This diff is collapsed.
......@@ -54,7 +54,7 @@ public:
void deleteStorageClass(const std::string& diskInstanceName, const std::string& storageClassName) override { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
void deleteTape(const std::string& vid) override { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
void deleteTapePool(const std::string& name) override { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
void filesWrittenToTape(const std::set<TapeFileWritten>& event) override { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
void filesWrittenToTape(const std::set<TapeItemWrittenPointer>& event) override { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
std::list<common::dataStructures::AdminUser> getAdminUsers() const { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
common::dataStructures::ArchiveFile getArchiveFileById(const uint64_t id) override { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
ArchiveFileItor getArchiveFiles(const TapeFileSearchCriteria& searchCriteria) const { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
......
......@@ -33,6 +33,7 @@
#include "rdbms/rdbms.hpp"
#include "rdbms/wrapper/OcciColumn.hpp"
#include "rdbms/wrapper/OcciStmt.hpp"
#include <algorithm>
namespace cta {
namespace catalogue {
......@@ -652,15 +653,15 @@ common::dataStructures::Tape OracleCatalogue::selectTapeForUpdate(rdbms::Conn &c
//------------------------------------------------------------------------------
// filesWrittenToTape
//------------------------------------------------------------------------------
void OracleCatalogue::filesWrittenToTape(const std::set<TapeFileWritten> &events) {
void OracleCatalogue::filesWrittenToTape(const std::set<TapeItemWrittenPointer> &events) {
try {
if (events.empty()) {
return;
}
auto firstEventItor = events.begin();
const auto &firstEvent = *firstEventItor;
checkTapeFileWrittenFieldsAreSet(__FUNCTION__, firstEvent);
const auto &firstEvent = **firstEventItor;
checkTapeItemWrittenFieldsAreSet(__FUNCTION__, firstEvent);
const time_t now = time(nullptr);
threading::MutexLocker locker(m_mutex);
auto conn = m_connPool.getConn();
......@@ -671,54 +672,72 @@ void OracleCatalogue::filesWrittenToTape(const std::set<TapeFileWritten> &events
uint64_t totalCompressedBytesWritten = 0;
uint32_t i = 0;
TapeFileBatch tapeFileBatch(events.size());
for (const auto &event: events) {
checkTapeFileWrittenFieldsAreSet(__FUNCTION__, event);
// We have a mix of files and items. Only files will be recorded, but items
// allow checking fSeq coherency.
// determine the number of files
size_t filesCount=std::count_if(events.cbegin(), events.cend(),
[](const TapeItemWrittenPointer &e){return typeid(*e)==typeid(TapeFileWritten);});
TapeFileBatch tapeFileBatch(filesCount);
std::set<TapeFileWritten> fileEvents;
for (const auto &eventP: events) {
// Check for all item types.
const auto &event = *eventP;
checkTapeItemWrittenFieldsAreSet(__FUNCTION__, event);
if (event.vid != firstEvent.vid) {
throw exception::Exception(std::string("VID mismatch: expected=") + firstEvent.vid + " actual=" + event.vid);
}
if (expectedFSeq != event.fSeq) {
exception::Exception ex;
ex.getMessage() << "FSeq mismatch for tape " << firstEvent.vid << ": expected=" << expectedFSeq << " actual=" <<
firstEvent.fSeq;
event.fSeq;
throw ex;
}
expectedFSeq++;
totalCompressedBytesWritten += event.compressedSize;
// Store the length of each field and implicitly calculate the maximum field
// length of each column
tapeFileBatch.vid.setFieldLenToValueLen(i, event.vid);
tapeFileBatch.fSeq.setFieldLenToValueLen(i, event.fSeq);
tapeFileBatch.blockId.setFieldLenToValueLen(i, event.blockId);
tapeFileBatch.compressedSize.setFieldLenToValueLen(i, event.compressedSize);
tapeFileBatch.copyNb.setFieldLenToValueLen(i, event.copyNb);
tapeFileBatch.creationTime.setFieldLenToValueLen(i, now);
tapeFileBatch.archiveFileId.setFieldLenToValueLen(i, event.archiveFileId);
try {
// If this is a file (as opposed to a placeholder), do the full processing.
const auto &fileEvent=dynamic_cast<const TapeFileWritten &>(event);
checkTapeFileWrittenFieldsAreSet(__FUNCTION__, fileEvent);
totalCompressedBytesWritten += fileEvent.compressedSize;
// Store the length of each field and implicitly calculate the maximum field
// length of each column
tapeFileBatch.vid.setFieldLenToValueLen(i, fileEvent.vid);
tapeFileBatch.fSeq.setFieldLenToValueLen(i, fileEvent.fSeq);
tapeFileBatch.blockId.setFieldLenToValueLen(i, fileEvent.blockId);
tapeFileBatch.compressedSize.setFieldLenToValueLen(i, fileEvent.compressedSize);
tapeFileBatch.copyNb.setFieldLenToValueLen(i, fileEvent.copyNb);
tapeFileBatch.creationTime.setFieldLenToValueLen(i, now);
tapeFileBatch.archiveFileId.setFieldLenToValueLen(i, fileEvent.archiveFileId);
fileEvents.insert(fileEvent);
} catch (std::bad_cast&) {}
i++;
}
// Update the tape because all the necessary information is now available
auto lastEventItor = events.cend();
lastEventItor--;
const TapeFileWritten &lastEvent = *lastEventItor;
const TapeItemWritten &lastEvent = **lastEventItor;
updateTape(conn, rdbms::AutocommitMode::OFF, lastEvent.vid, lastEvent.fSeq, totalCompressedBytesWritten,
lastEvent.tapeDrive);
// Create the archive file entries, skipping those that already exist
idempotentBatchInsertArchiveFiles(conn, rdbms::AutocommitMode::OFF, events);
idempotentBatchInsertArchiveFiles(conn, rdbms::AutocommitMode::OFF, fileEvents);
insertTapeFileBatchIntoTempTable(conn, rdbms::AutocommitMode::OFF, events);
insertTapeFileBatchIntoTempTable(conn, rdbms::AutocommitMode::OFF, fileEvents);
// Verify that the archive file entries in the catalogue database agree with
// the tape file written events
const auto fileSizesAndChecksums = selectArchiveFileSizesAndChecksums(conn, rdbms::AutocommitMode::OFF, events);
for (const auto &event: events) {
const auto fileSizesAndChecksums = selectArchiveFileSizesAndChecksums(conn, rdbms::AutocommitMode::OFF, fileEvents);
for (const auto &event: fileEvents) {
const auto fileSizeAndChecksumItor = fileSizesAndChecksums.find(event.archiveFileId);
std::ostringstream fileContext;
......@@ -758,7 +777,7 @@ void OracleCatalogue::filesWrittenToTape(const std::set<TapeFileWritten> &events
// Store the value of each field
i = 0;
for (const auto &event: events) {
for (const auto &event: fileEvents) {
tapeFileBatch.vid.setFieldValue(i, event.vid);
tapeFileBatch.fSeq.setFieldValue(i, event.fSeq);
tapeFileBatch.blockId.setFieldValue(i, event.blockId);
......
......@@ -136,7 +136,7 @@ public:
*
* @param events The tape file written events.
*/
void filesWrittenToTape(const std::set<TapeFileWritten> &events) override;
void filesWrittenToTape(const std::set<TapeItemWrittenPointer> &events) override;
private:
......
......@@ -4800,5 +4800,19 @@ void RdbmsCatalogue::checkTapeFileWrittenFieldsAreSet(const std::string &calling
}
}
//------------------------------------------------------------------------------
// checkTapeItemWrittenFieldsAreSet
//------------------------------------------------------------------------------
void RdbmsCatalogue::checkTapeItemWrittenFieldsAreSet(const std::string& callingFunc, const TapeItemWritten& event) const {
try {
if(event.vid.empty()) throw exception::Exception("vid is an empty string");
if(0 == event.fSeq) throw exception::Exception("fSeq is 0");
if(event.tapeDrive.empty()) throw exception::Exception("tapeDrive is an empty string");
} catch (exception::Exception &ex) {
throw exception::Exception(callingFunc + " failed: TapeItemWrittenEvent is invalid: " + ex.getMessage().str());
}
}
} // namespace catalogue
} // namespace cta
......@@ -1134,6 +1134,15 @@ protected:
common::dataStructures::TapeCopyToPoolMap getTapeCopyToPoolMap(rdbms::Conn &conn,
const StorageClass &storageClass) const;
/**
* Throws an exception if one of the fields of the specified event have not
* been set.
*
* @param callingFunc The name of the calling function.
* @param event The evnt to be checked.
*/
void checkTapeItemWrittenFieldsAreSet(const std::string &callingFunc, const TapeItemWritten &event) const;
/**
* Throws an exception if one of the fields of the specified event have not
* been set.
......
......@@ -435,15 +435,15 @@ common::dataStructures::Tape SqliteCatalogue::selectTape(const rdbms::Autocommit
//------------------------------------------------------------------------------
// filesWrittenToTape
//------------------------------------------------------------------------------
void SqliteCatalogue::filesWrittenToTape(const std::set<TapeFileWritten> &events) {
void SqliteCatalogue::filesWrittenToTape(const std::set<TapeItemWrittenPointer> &events) {
try {
if(events.empty()) {
return;
}
auto firstEventItor = events.cbegin();
const auto &firstEvent = *firstEventItor;;
checkTapeFileWrittenFieldsAreSet(__FUNCTION__, firstEvent);
const auto &firstEvent = **firstEventItor;;
checkTapeItemWrittenFieldsAreSet(__FUNCTION__, firstEvent);
// The SQLite implementation of this method relies on the fact that a tape
// cannot be physically mounted in two or more drives at the same time
......@@ -458,8 +458,9 @@ void SqliteCatalogue::filesWrittenToTape(const std::set<TapeFileWritten> &events
uint64_t expectedFSeq = tape.lastFSeq + 1;
uint64_t totalCompressedBytesWritten = 0;
for(const auto &event: events) {
checkTapeFileWrittenFieldsAreSet(__FUNCTION__, event);
for(const auto &eventP: events) {
const auto & event = *eventP;
checkTapeItemWrittenFieldsAreSet(__FUNCTION__, event);
if(event.vid != firstEvent.vid) {
throw exception::Exception(std::string("VID mismatch: expected=") + firstEvent.vid + " actual=" + event.vid);
......@@ -472,18 +473,27 @@ void SqliteCatalogue::filesWrittenToTape(const std::set<TapeFileWritten> &events
throw ex;
}
expectedFSeq++;
totalCompressedBytesWritten += event.compressedSize;
try {
// If this is a file (as opposed to a placeholder), do the full processing.
const auto &fileEvent=dynamic_cast<const TapeFileWritten &>(event);
totalCompressedBytesWritten += fileEvent.compressedSize;
} catch (std::bad_cast&) {}
}
auto lastEventItor = events.cend();
lastEventItor--;
const TapeFileWritten &lastEvent = *lastEventItor;
const TapeItemWritten &lastEvent = **lastEventItor;
updateTape(conn, rdbms::AutocommitMode::ON, lastEvent.vid, lastEvent.fSeq, totalCompressedBytesWritten,
lastEvent.tapeDrive);
for(const auto &event : events) {
fileWrittenToTape(rdbms::AutocommitMode::ON, conn, event);
try {
// If this is a file (as opposed to a placeholder), do the full processing.
const auto &fileEvent=dynamic_cast<const TapeFileWritten &>(*event);
fileWrittenToTape(rdbms::AutocommitMode::ON, conn, fileEvent);
} catch (std::bad_cast&) {}
}
} catch(exception::UserError &) {
throw;
......
......@@ -141,7 +141,7 @@ protected:
*
* @param events The tape file written events.
*/
void filesWrittenToTape(const std::set<TapeFileWritten> &events) override;
void filesWrittenToTape(const std::set<TapeItemWrittenPointer> &events) override;
private:
......
......@@ -27,7 +27,6 @@ namespace catalogue {
TapeFileWritten::TapeFileWritten() :
archiveFileId(0),
size(0),
fSeq(0),
blockId(0),
compressedSize(0),
copyNb(0) {
......@@ -38,6 +37,7 @@ TapeFileWritten::TapeFileWritten() :
//------------------------------------------------------------------------------
bool TapeFileWritten::operator==(const TapeFileWritten &rhs) const {
return
TapeItemWritten::operator ==(rhs) &&
archiveFileId == rhs.archiveFileId &&
diskInstance == rhs.diskInstance &&
diskFileId == rhs.diskFileId &&
......@@ -49,21 +49,12 @@ bool TapeFileWritten::operator==(const TapeFileWritten &rhs) const {
checksumType == rhs.checksumType &&
checksumValue == rhs.checksumValue &&
storageClassName == rhs.storageClassName &&
vid == rhs.vid &&
fSeq == rhs.fSeq &&
blockId == rhs.blockId &&
compressedSize == rhs.compressedSize &&
copyNb == rhs.copyNb &&
tapeDrive == rhs.tapeDrive;
}
//------------------------------------------------------------------------------
// operator<
//------------------------------------------------------------------------------
bool TapeFileWritten::operator<(const TapeFileWritten &rhs) const {
return fSeq < rhs.fSeq;
}
//------------------------------------------------------------------------------
// operator<<
//------------------------------------------------------------------------------
......
......@@ -19,8 +19,8 @@
#pragma once
#include "common/checksum/Checksum.hpp"
#include "TapeItemWritten.hpp"
#include <stdint.h>
#include <string>
namespace cta {
......@@ -29,7 +29,7 @@ namespace catalogue {
/**
* Structure describing the event of having written a file to tape.
*/
struct TapeFileWritten {
struct TapeFileWritten: public TapeItemWritten {
/**
* Constructor.
......@@ -45,21 +45,6 @@ struct TapeFileWritten {
*/
bool operator==(const TapeFileWritten &rhs) const;
/**
* Less than operator.
*
* TapeFileWritten events are ordered by their tape file sequence number.
*
* TapeFileWritten events are written to the catalogue database in batches in
* order to improve performance by reducing the number of network round trips
* to the database. Each batch is ordered by tape file sequence number so
* that the CTA catalogue code can easily assert that files that are written
* to tape are reported correctly.
*
* @param rhs The right hand side of the operator.
*/
bool operator<(const TapeFileWritten &rhs) const;
/**
* The unique identifier of the file being archived.
*/
......@@ -120,16 +105,6 @@ struct TapeFileWritten {
*/
std::string storageClassName;
/**
* The volume identifier of the tape on which the file has been written.
*/
std::string vid;
/**
* The position of the file on tape in the form of its file sequence number.
*/
uint64_t fSeq;
/**
* The position of the file on tape in the form of its logical block
* identifier.
......@@ -147,11 +122,6 @@ struct TapeFileWritten {
*/
uint64_t copyNb;
/**
* The name of the tape drive that wrote the file.
*/
std::string tapeDrive;
}; // struct TapeFileWritten
/**
......
/*
* The CERN Tape Archive (CTA) project
* Copyright (C) 2015 CERN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "catalogue/TapeItemWritten.hpp"
#include "catalogue/TapeItemWrittenPointer.hpp"
#include <iostream>
namespace cta {
namespace catalogue {
//------------------------------------------------------------------------------
// constructor
//------------------------------------------------------------------------------
TapeItemWritten::TapeItemWritten() :
fSeq(0) {
}
//------------------------------------------------------------------------------
// operator==
//------------------------------------------------------------------------------
bool TapeItemWritten::operator==(const TapeItemWritten &rhs) const {
return
fSeq == rhs.fSeq &&
vid == rhs.vid;
}
//------------------------------------------------------------------------------
// operator<
//------------------------------------------------------------------------------
bool TapeItemWritten::operator<(const TapeItemWritten &rhs) const {
return fSeq < rhs.fSeq;
}
//------------------------------------------------------------------------------
// operator<<
//------------------------------------------------------------------------------
std::ostream &operator<<(std::ostream &os, const TapeItemWritten &obj) {
os <<
"{"
"vid=" << obj.vid << ","
"fSeq=" << obj.fSeq << ","
"tapeDrive=" << obj.tapeDrive <<
"}";
return os;
}
bool operator<(const TapeItemWrittenPointer& a, const TapeItemWrittenPointer& b) { return *a < *b; }
} // namespace catalogue
} // namespace cta
\ No newline at end of file
/*
* The CERN Tape Archive (CTA) project
* Copyright (C) 2015 CERN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include <stdint.h>
#include <string>
namespace cta {
namespace catalogue {
/**
* Structure describing the event of having written a file to tape.
*/
struct TapeItemWritten {
/**
* Constructor.
*
* Sets the value of all integer member-variables to zero.
*/
TapeItemWritten();
/**
* Virtual trivial destructor to make the object polymorphic.
*/
virtual ~TapeItemWritten() {}
/**
* Equality operator.
*
* @param rhs The right hand side of the operator.
*/
bool operator==(const TapeItemWritten &rhs) const;
/**
* Less than operator.
*
* TapeItemWritten events are ordered by their tape file sequence number.
*
* TapeItemWritten events are written to the catalogue database in batches in
* order to improve performance by reducing the number of network round trips
* to the database. Each batch is ordered by tape file sequence number so
* that the CTA catalogue code can easily assert that files that are written
* to tape are reported correctly.
*
* @param rhs The right hand side of the operator.
*/
bool operator<(const TapeItemWritten &rhs) const;
/**
* The volume identifier of the tape on which the file has been written.
*/
std::string vid;
/**
* The position of the item on tape in the form of its file sequence number.
*/
uint64_t fSeq;
/**
* The name of the tape drive that wrote the item.
*/
std::string tapeDrive;
}; // struct TapeFileWritten
/**
* Output stream operator for an TapeItemWritten object.
*
* This function writes a human readable form of the specified object to the
* specified output stream.
*
* @param os The output stream.
* @param obj The object.
*/
std::ostream &operator<<(std::ostream &os, const TapeItemWritten &obj);
} // namespace catalogue
} // namespace cta
\ No newline at end of file
/*
* The CERN Tape Archive (CTA) project
* Copyright (C) 2015 CERN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include "TapeItemWritten.hpp"
#include <memory>
namespace cta {
namespace catalogue {
/**
* A utility struct allowing sorting unique_ptr according to content instead of pointer value
*/
struct TapeItemWrittenPointer: public std::unique_ptr<TapeItemWritten> {
template<typename ... Ts> TapeItemWrittenPointer(Ts...args): std::unique_ptr<TapeItemWritten>::unique_ptr<TapeItemWritten>(args...) {}
};