Commit 846a19c4 authored by Michael Davis's avatar Michael Davis
Browse files

[migration] Injects file metadata into EOS

parent b0cde52d
......@@ -243,7 +243,7 @@ void throwUsage(const std::string &program, const std::string &error_txt)
std::stringstream help;
help << program << ": " << error_txt << std::endl
<< "Usage: " << program << " [--config <config_file>]";
<< "Usage: " << program << " [--config <config_file>] [ping]";
throw std::runtime_error(help.str());
}
......
......@@ -134,13 +134,11 @@ bool EosImportFiles::processBatch()
if(delim == ',') std::cout << "]";
}
#if 0
// Inject files into EOS
int retc = m_eosgrpc->FileInsert(files);
if(retc != 0) {
throw std::runtime_error("EosImportFiles::processBatch(): FileInsert failed with error " + std::to_string(retc));
}
#endif
auto elapsed_time = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::steady_clock::now() - m_start_time);
std::cerr << "Processed " << m_total_files << " files in " << elapsed_time.count() << "s" << std::endl;
......@@ -174,20 +172,21 @@ bool EosImportFiles::getNextBatch(std::vector<eos::rpc::FileMdProto> &files)
// Filename and path
file.set_name(m_castordb.getResultColumnString("FILENAME"));
//bytes link_name = 9;
//file.set_path(filename.pathname);
file.set_path("/etc/grpc/");
#if 0
repeated uint32 locations = 13;
repeated uint32 unlink_locations = 14;
#endif
// Extended attributes
std::string archiveId(std::to_string(file.id()));
file.mutable_xattrs()->insert(google::protobuf::MapPair<std::string,std::string>("CTA_ArchiveFileId", archiveId));
auto sc = m_storageClass.at(m_castordb.getResultColumnUint64("CLASSID"));
file.mutable_xattrs()->insert(google::protobuf::MapPair<std::string,std::string>("CTA_StorageClass", sc));
// Indicate that the file is resident on tape
file.mutable_locations()->Add(65535);
#if 0
// What is this?
repeated uint32 unlink_locations = 14;
#endif
files.push_back(file);
++m_total_files;
......@@ -214,8 +213,6 @@ int main(int argc, const char* argv[])
{
std::string configfile = "/etc/cta/castor-migration.conf";
bool doPing = false;
try {
for(auto i = 1; i < argc; ++i) {
std::string option(argv[i]);
......@@ -223,81 +220,17 @@ int main(int argc, const char* argv[])
if(option == "--config" && argc > ++i) {
configfile = argv[i];
continue;
} else if(option == "ping") {
doPing = true;
continue;
}
throwUsage(argv[0], "invalid option " + option);
}
cta::migration::EosImportFiles importFiles(configfile);
if(doPing) {
std::cout << "Pinging EOS MGM using gRPC API..." << std::endl;
auto pingStr = importFiles.ping("Ping from EosImportFiles");
std::cout << "Ping successful. Server responded with: " << pingStr << std::endl;
} else {
importFiles.getStorageClasses();
importFiles.getFileMetadata();
while(importFiles.processBatch()) ;
}
importFiles.getStorageClasses();
importFiles.getFileMetadata();
while(importFiles.processBatch()) ;
} catch(std::runtime_error &ex) {
std::cerr << ex.what() << std::endl;
return -1;
}
return 0;
}
#if 0
std::chrono::steady_clock::time_point watch_global =
std::chrono::steady_clock::now();
for ( std::string line ; std::getline ( input, line ); ) {
n++;
line.insert(0,prefix);
std::cout << n << " " << line << std::endl;
if (line.back() == '/') {
// dir
if (dirmode) {
paths.push_back(line);
} else {
// SEND OFF DIRS
int retc = eosgrpc->FileInsert(paths);
std::cout << "::send::files" << " retc=" << retc << std::endl;
paths.clear();
paths.push_back(line);
dirmode = true;
}
} else {
// file
if (dirmode) {
// SEND OFF FILES
int retc = eosgrpc->ContainerInsert(paths);
std::cout << "::send::dirs " << " retc=" << retc << std::endl;
paths.clear();
paths.push_back(line);
dirmode = false;
} else {
paths.push_back(line);
}
}
if (paths.size() >= bulk) {
if (dirmode) {
// SEND OF DIRS
int retc = eosgrpc->ContainerInsert(paths);
std::cout << "::send::dirs" << " retc=" << retc << std::endl;
paths.clear();
} else {
// SEND OF FILES
int retc = eosgrpc->FileInsert(paths);
std::cout << "::send::files" << " retc=" << retc << std::endl;
paths.clear();
}
}
}
std::chrono::microseconds elapsed_global =
std::chrono::duration_cast<std::chrono::microseconds>
(std::chrono::steady_clock::now() - watch_global);
std::cout << n << " requests took " << elapsed_global.count() <<
" micro seconds" << std::endl;
#endif
......@@ -18,34 +18,6 @@
#include <sys/stat.h>
#include "GrpcClient.hpp"
/*
#include "proto/Rpc.grpc.pb.h"
#include "common/StringConversion.hh"
#include "common/Timing.hh"
#include <grpcpp/grpcpp.h>
#include <grpc/support/log.h>
#include <google/protobuf/util/json_util.h>
*/
/*
using grpc::Channel;
using grpc::ClientAsyncResponseReader;
using grpc::ClientAsyncReader;
using grpc::ClientContext;
using grpc::CompletionQueue;
using grpc::Status;
using eos::rpc::Eos;
using eos::rpc::PingRequest;
using eos::rpc::PingReply;
using eos::rpc::MDRequest;
using eos::rpc::MDResponse;
using eos::rpc::FileInsertRequest;
using eos::rpc::ContainerInsertRequest;
using eos::rpc::InsertReply;
using eos::rpc::ContainerMdProto;
using eos::rpc::FileMdProto;
*/
namespace eos {
namespace client {
......@@ -91,96 +63,11 @@ std::string GrpcClient::ping(const std::string& payload)
}
}
#if 0
std::string
GrpcClient::Md(const std::string& path,
uint64_t id,
uint64_t ino,
bool list)
{
MDRequest request;
if (list) {
request.set_type(eos::rpc::LISTING);
} else {
request.set_type(eos::rpc::CONTAINER);
}
if (path.length()) {
request.mutable_id()->set_path(path);
} else if (id) {
request.mutable_id()->set_id(id);
} else if (ino) {
request.mutable_id()->set_ino(ino);
} else {
return "";
}
request.set_authkey(token());
MDResponse response;
ClientContext context;
std::string responsestring;
CompletionQueue cq;
Status status;
std::unique_ptr<ClientAsyncReader<MDResponse> > rpc(
stub_->AsyncMD(&context, request, &cq, (void*) 1));
void* got_tag;
bool ok = false;
bool ret = cq.Next(&got_tag, &ok);
while (1) {
rpc->Read(&response, (void*) 1);
ok = false;
ret = cq.Next(&got_tag, &ok);
if (!ret || !ok || got_tag != (void*) 1) {
break;
}
google::protobuf::util::JsonPrintOptions options;
options.add_whitespace = true;
options.always_print_primitive_fields = true;
std::string jsonstring;
google::protobuf::util::MessageToJsonString(response,
&jsonstring, options);
responsestring += jsonstring;
}
if (!status.ok()) {
std::cerr << "error: " << status.error_message() << std::endl;
}
return responsestring;
}
#endif
int
GrpcClient::FileInsert(const std::vector<std::string>& paths)
int GrpcClient::FileInsert(const std::vector<eos::rpc::FileMdProto> &files)
{
eos::rpc::FileInsertRequest request;
size_t cnt=0;
for (auto it : paths ) {
struct timespec tsnow;
//eos::common::Timing::GetTimeSpec(tsnow);
cnt++;
eos::rpc::FileMdProto* file = request.add_files();
file->set_path(it);
file->set_uid(2);
file->set_gid(2);
file->set_size(cnt);
file->set_layout_id(0x00100002);
file->mutable_checksum()->set_value("\0\0\0\1",4);
file->set_flags(0);
file->mutable_ctime()->set_sec(tsnow.tv_sec);
file->mutable_ctime()->set_n_sec(tsnow.tv_nsec);
file->mutable_mtime()->set_sec(tsnow.tv_sec);
file->mutable_mtime()->set_n_sec(tsnow.tv_nsec);
file->mutable_locations()->Add(65535);
auto map = file->mutable_xattrs();
(*map)["sys.acl"] = "u:100:rwx";
(*map)["sys.cta.id"] = "fake";
for(auto &file : files) {
*(request.add_files()) = file;
}
request.set_authkey(token());
......@@ -276,35 +163,6 @@ GrpcClient::Create(std::string endpoint,
std::string ca;
bool ssl = false;
#if 0
if (keyfile.length() || certfile.length() || cafile.length()) {
if (!keyfile.length() || !certfile.length() || !cafile.length()) {
return 0;
}
ssl = true;
if (eos::common::StringConversion::LoadFileIntoString(certfile.c_str(),
cert) && !cert.length()) {
fprintf(stderr, "error: unable to load ssl certificate file '%s'\n",
certfile.c_str());
return 0;
}
if (eos::common::StringConversion::LoadFileIntoString(keyfile.c_str(),
key) && !key.length()) {
fprintf(stderr, "unable to load ssl key file '%s'\n", keyfile.c_str());
return 0;
}
if (eos::common::StringConversion::LoadFileIntoString(cafile.c_str(),
ca) && !ca.length()) {
fprintf(stderr, "unable to load ssl ca file '%s'\n", cafile.c_str());
return 0;
}
}
#endif
grpc::SslCredentialsOptions opts = {
ca,
key,
......
......@@ -41,7 +41,7 @@ public:
std::string ping(const std::string& payload);
std::string Md(const std::string& path, uint64_t id = 0, uint64_t ino = 0, bool list = false);
int FileInsert(const std::vector<std::string>& paths);
int FileInsert(const std::vector<eos::rpc::FileMdProto> &paths);
int ContainerInsert(const std::vector<eos::rpc::ContainerMdProto> &dirs);
void set_ssl(bool onoff) {
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment