Commit ff27302b authored by Michael Davis's avatar Michael Davis
Browse files

[migration] Implements eos-import-files --retry

parent 66797f87
......@@ -37,14 +37,14 @@ CREATE INDEX I_CTAFiles_vid_filesize ON CTAFilesHelper (vid, filesize);
CREATE TABLE CTAFilesFailed (fileid INTEGER NOT NULL PRIMARY KEY, parent_fileid INTEGER, filename VARCHAR2(255), disk_uid INTEGER, disk_gid INTEGER,
filemode INTEGER, btime INTEGER, ctime INTEGER, mtime INTEGER, classid INTEGER,
filesize INTEGER, checksum NUMBER, copyno INTEGER, VID VARCHAR2(6), fseq INTEGER,
blockId INTEGER, s_mtime INTEGER);
blockId INTEGER, s_mtime INTEGER, retc INTEGER, message VARCHAR2(1024));
CREATE INDEX I_CTAFilesFailed_parent_id ON CTAFilesFailed (parent_fileid);
CREATE INDEX I_CTAFilesFailed_vid_filesize ON CTAFilesFailed (vid, filesize);
CREATE GLOBAL TEMPORARY TABLE CTAFilesFailedTemp(
fileid INTEGER NOT NULL PRIMARY KEY, parent_fileid INTEGER, filename VARCHAR2(255), disk_uid INTEGER, disk_gid INTEGER, filemode INTEGER,
btime INTEGER, ctime INTEGER, mtime INTEGER, classid INTEGER, filesize INTEGER, checksum NUMBER, copyno INTEGER, VID VARCHAR2(6), fseq INTEGER,
blockId INTEGER, s_mtime INTEGER)
blockId INTEGER, s_mtime INTEGER, retc INTEGER, message VARCHAR2(1024))
ON COMMIT PRESERVE ROWS;
......
......@@ -187,155 +187,170 @@ void EosImportFiles::getFileMetadata()
void EosImportFiles::saveFailedFiles(const std::vector<eos::rpc::FileMdProto> &files,
const eos::rpc::InsertReply &replies)
{
#if 0
std::cerr << "Updating list of failed inserts...";
auto dir_it = dirs.begin();
auto file_it = files.begin();
auto retc_it = replies.retc().begin();
auto message_it = replies.message().begin();
// Put the set of dirs into a temporary table.
// Put the set of files into a temporary table.
// Ensure the temporary table is empty (as the API does a commit after every transaction,
// we have to preserve rows on commit)
const std::string sql_truncate = "TRUNCATE TABLE CTADIRSFAILEDTEMP";
const std::string sql_truncate = "TRUNCATE TABLE CTAFILESFAILEDTEMP";
auto stmt_truncate = m_dbConn.createStmt(sql_truncate);
stmt_truncate.executeNonQuery();
// This could be optimised by doing a batch insert.
for( ; retc_it != replies.retc().end(); ++dir_it, ++retc_it, ++message_it) {
for( ; retc_it != replies.retc().end(); ++file_it, ++retc_it, ++message_it) {
auto retc = *retc_it;
// We only care about successes if we were reading from the failed table (because we need to
// delete successful retries in this case)
if(!m_is_retry && retc == 0) continue;
// If the failure mode was "Attempted to create container with id=<id>, which already exists",
// If the failure mode was "Attempted to create file with id=<id>, which already exists",
// verify that the metadata in the EOS namespace is what we expect. If so, ignore the error and
// treat it as a successful injection.
if(m_is_retry && retc == EINVAL && compareMD(*dir_it)) retc = 0;
if(m_is_retry && retc == EINVAL && compareMD(*file_it)) retc = 0;
const std::string sql_insert = "INSERT INTO CTADIRSFAILEDTEMP("
const std::string sql_insert = "INSERT INTO CTAFILESFAILEDTEMP("
" FILEID,"
" PATH,"
" PARENT_FILEID,"
" FILENAME,"
" DISK_UID,"
" DISK_GID,"
" FILEMODE,"
" BTIME,"
" CTIME,"
" MTIME,"
" CLASSID,"
" DEPTH,"
" FILESIZE,"
" CHECKSUM,"
" RETC,"
" MESSAGE) "
"VALUES("
" :FILEID,"
" :PATH,"
" :PARENT_FILEID,"
" :FILENAME,"
" :DISK_UID,"
" :DISK_GID,"
" :FILEMODE,"
" :BTIME,"
" :CTIME,"
" :MTIME,"
" :CLASSID,"
" :DEPTH, "
" :RETC, "
" :FILESIZE,"
" :CHECKSUM,"
" :RETC,"
" :MESSAGE)";
auto stmt = m_dbConn.createStmt(sql_insert);
stmt.bindString(":FILEID", std::to_string(dir_it->id()));
// Remove EOS prefix as it will be added again when we retry
stmt.bindString(":PATH", dir_it->path().substr(m_eos_prefix.length()-1));
stmt.bindString(":DISK_UID", std::to_string(dir_it->uid()));
stmt.bindString(":DISK_GID", std::to_string(dir_it->gid()));
stmt.bindString(":FILEMODE", std::to_string(dir_it->mode()));
stmt.bindString(":CTIME", std::to_string(dir_it->ctime().sec()));
stmt.bindString(":MTIME", std::to_string(dir_it->mtime().sec()));
stmt.bindString(":CLASSID", m_storageClassRev.at(dir_it->xattrs().at("CTA_StorageClass")));
// Calculate depth. Note that this may be different from the depth in the original table, because
// the path prefix has changed. The exact value doesn't matter, we just need an ordinal value so
// that directories are created top-down.
stmt.bindString(":DEPTH", std::to_string(std::count(dir_it->path().begin(), dir_it->path().end(), '/')));
stmt.bindString(":RETC", std::to_string(retc));
stmt.bindUint64(":FILEID", file_it->id());
stmt.bindUint64(":PARENT_FILEID", file_it->cont_id());
stmt.bindString(":FILENAME", file_it->name());
stmt.bindUint64(":DISK_UID", file_it->uid());
stmt.bindUint64(":DISK_GID", file_it->gid());
stmt.bindUint64(":FILEMODE", file_it->flags());
stmt.bindString(":BTIME", file_it->xattrs().at("eos.btime"));
stmt.bindUint64(":CTIME", file_it->ctime().sec());
stmt.bindUint64(":MTIME", file_it->mtime().sec());
stmt.bindString(":CLASSID", m_storageClassRev.at(file_it->xattrs().at("CTA_StorageClass")));
stmt.bindUint64(":FILESIZE", file_it->size());
stmt.bindUint64(":CHECKSUM", convertChecksum(file_it->checksum().value()));
stmt.bindUint64(":RETC", retc);
stmt.bindString(":MESSAGE", message_it->empty() ? "-" : *message_it);
stmt.executeNonQuery();
}
// Merge the temporary table into the failure table
std::cerr << "merging...";
const std::string sql_merge = "MERGE INTO CTADIRSFAILED A USING CTADIRSFAILEDTEMP B "
const std::string sql_merge = "MERGE INTO CTAFILESFAILED A USING CTAFILESFAILEDTEMP B "
"ON(A.FILEID = B.FILEID) "
"WHEN MATCHED THEN UPDATE SET"
" A.PATH = B.PATH,"
" A.PARENT_FILEID = B.PARENT_FILEID,"
" A.FILENAME = B.FILENAME,"
" A.DISK_UID = B.DISK_UID,"
" A.DISK_GID = B.DISK_GID,"
" A.FILEMODE = B.FILEMODE,"
" A.BTIME = B.BTIME,"
" A.CTIME = B.CTIME,"
" A.MTIME = B.MTIME,"
" A.CLASSID = B.CLASSID,"
" A.DEPTH = B.DEPTH,"
" A.FILESIZE = B.FILESIZE,"
" A.CHECKSUM = B.CHECKSUM,"
" A.RETC = B.RETC,"
" A.MESSAGE = B.MESSAGE "
"DELETE WHERE B.RETC = 0 "
"WHEN NOT MATCHED THEN INSERT("
" A.FILEID,"
" A.PATH,"
" A.PARENT_FILEID,"
" A.FILENAME,"
" A.DISK_UID,"
" A.DISK_GID,"
" A.FILEMODE,"
" A.BTIME,"
" A.CTIME,"
" A.MTIME,"
" A.CLASSID,"
" A.DEPTH,"
" A.FILESIZE,"
" A.CHECKSUM,"
" A.RETC,"
" A.MESSAGE) "
"VALUES("
" B.FILEID,"
" B.PATH,"
" B.PARENT_FILEID,"
" B.FILENAME,"
" B.DISK_UID,"
" B.DISK_GID,"
" B.FILEMODE,"
" B.BTIME,"
" B.CTIME,"
" B.MTIME,"
" B.CLASSID,"
" B.DEPTH,"
" B.FILESIZE,"
" B.CHECKSUM,"
" B.RETC,"
" B.MESSAGE) "
"WHERE B.RETC != 0";
auto stmt_merge = m_dbConn.createStmt(sql_merge);
stmt_merge.executeNonQuery();
std::cerr << "done." << std::endl;
#endif
}
bool EosImportFiles::compareMD(const eos::rpc::FileMdProto &file)
{
return false;
#if 0
auto remote_dir = m_eosgrpc->GetMD(eos::rpc::CONTAINER, dir.id(), dir.path());
// Check directory metadata is the same
if(dir.id() != remote_dir.cmd().id() ||
dir.uid() != remote_dir.cmd().uid() ||
dir.gid() != remote_dir.cmd().gid() ||
dir.ctime().sec() != remote_dir.cmd().ctime().sec() ||
dir.name() != remote_dir.cmd().name() ||
(dir.path() + '/') != remote_dir.cmd().path()) {
auto remote_file = m_eosgrpc->GetMD(eos::rpc::CONTAINER, file.id(), file.path());
// Check file metadata is the same
if(file.id() != remote_file.cmd().id() ||
file.uid() != remote_file.cmd().uid() ||
file.gid() != remote_file.cmd().gid() ||
file.ctime().sec() != remote_file.cmd().ctime().sec() ||
file.name() != remote_file.cmd().name() ||
(file.path() + '/') != remote_file.cmd().path()) {
return false;
}
if(dir.mode() != remote_dir.cmd().mode()) {
if(file.mode() != remote_file.cmd().mode()) {
// EOS gRPC bug prevents us reading back the file mode
std::cerr << "Warning: file " << dir.id() << ": injected mode=" << dir.mode() <<", read back mode=" << remote_dir.cmd().mode() << std::endl;
std::cerr << "Warning: file " << file.id() << ": injected mode=" << file.mode() <<", read back mode=" << remote_file.cmd().mode() << std::endl;
}
// Check xattrs
for(const auto &xattr : dir.xattrs()) {
auto xattr_it = remote_dir.cmd().xattrs().find(xattr.first);
if(xattr_it == remote_dir.cmd().xattrs().end() || xattr.second != xattr_it->second) {
std::cerr << "file " << dir.id() << ": xattrs do not match" << std::endl;
for(const auto &xattr : file.xattrs()) {
auto xattr_it = remote_file.cmd().xattrs().find(xattr.first);
if(xattr_it == remote_file.cmd().xattrs().end() || xattr.second != xattr_it->second) {
std::cerr << "file " << file.id() << ": xattrs do not match" << std::endl;
return false;
}
}
#endif
return true;
#endif
}
......@@ -456,4 +471,12 @@ std::string EosImportFiles::convertChecksum(uint32_t adler32)
return std::string(bytes, 4);
}
uint32_t EosImportFiles::convertChecksum(const std::string &bytes)
{
uint32_t adler32;
for(int i = 0; i < 4; ++i, adler32 <<= 8) { adler32 += bytes[i]; }
return adler32;
}
}} // namespace cta::migration
......@@ -41,6 +41,7 @@ private:
bool compareMD(const eos::rpc::FileMdProto &file);
std::vector<eos::rpc::FileMdProto> getNextBatch();
static std::string convertChecksum(uint32_t adler32);
uint32_t convertChecksum(const std::string &bytes);
std::chrono::steady_clock::time_point m_start_time; //!< Start the clock
unsigned int m_total_files; //!< Count how many have been processed
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment