Commit 37a8e072 authored by Michael Davis's avatar Michael Davis
Browse files

[migration] Implements --check feature

parent 9769e893
......@@ -527,6 +527,21 @@ bool EosImportFiles::compareMD(const eos::rpc::FileMdProto &file)
}
unsigned int EosImportFiles::pruneExistingFiles()
{
std::vector<eos::rpc::FileMdProto> filesToCheck;
filesToCheck.swap(m_files);
for(auto &file : filesToCheck) {
if(compareMD(file)) {
m_files.push_back(file);
}
}
return filesToCheck.size() - m_files.size();
}
void EosImportFiles::processFiles()
{
while(true) {
......@@ -627,6 +642,8 @@ std::vector<eos::rpc::FileMdProto> EosImportFiles::getNextBatch()
void EosImportFiles::grpcInject()
{
unsigned int pruned_files;
while(true) {
std::lock_guard<std::mutex> files_lock(m_files_mutex);
......@@ -639,14 +656,24 @@ void EosImportFiles::grpcInject()
if(m_skip > m_files.size()) {
m_skip -= m_files.size();
m_total_files += m_files.size();
auto elapsed_time = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::steady_clock::now() - m_start_time);
std::cerr << "Skipped " << m_files.size() << " files in " << elapsed_time.count() << "s" << std::endl;
m_files.clear();
continue;
} else if (m_skip > 0) {
m_files.erase(m_files.begin(), m_files.begin() + m_skip);
m_total_files += m_skip;
auto elapsed_time = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::steady_clock::now() - m_start_time);
std::cerr << "Skipped " << m_skip << " files in " << elapsed_time.count() << "s" << std::endl;
m_skip = 0;
}
// Remove files that already exist in the namespace
if(m_is_check) {
pruned_files = pruneExistingFiles();
m_total_files += pruned_files;
}
try {
// Inject files into EOS
int num_errors = 0;
......@@ -662,7 +689,9 @@ void EosImportFiles::grpcInject()
}
auto elapsed_time = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::steady_clock::now() - m_start_time);
std::cerr << "Processed " << m_total_files << " files in " << elapsed_time.count() << "s (" << num_errors << " failures)" << std::endl;
std::cerr << "Processed " << m_total_files << " files in " << elapsed_time.count() << "s (";
if(pruned_files > 0) std::cerr << pruned_files << " files already existed, ";
std::cerr << num_errors << " failures)" << std::endl;
// Save errors for later processing
if(m_is_retry || num_errors > 0) {
......
......@@ -43,6 +43,7 @@ public:
private:
void saveFailedFiles(const std::vector<eos::rpc::FileMdProto> &files, const eos::rpc::InsertReply &replies);
bool compareMD(const eos::rpc::FileMdProto &file);
unsigned int pruneExistingFiles();
std::vector<eos::rpc::FileMdProto> getNextBatch();
static std::string convertChecksum(uint32_t adler32);
uint32_t convertChecksum(const std::string &bytes);
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment