Skip to content
Snippets Groups Projects
Commit 1920f62c authored by Michael Davis's avatar Michael Davis
Browse files

[EOS-CTA] Removes code depending on eos::wfe::Wrapper

parent 8ab98edc
Branches
Tags
No related merge requests found
......@@ -24,8 +24,7 @@ include_directories(${CMAKE_SOURCE_DIR}/tapeserver)
find_package(Protobuf3 REQUIRED)
include_directories(${PROTOBUF3_INCLUDE_DIRS})
#add_library (XrdCtaOfs MODULE ListArchiveFilesCmd.cpp XrdCtaFilesystem.cpp XrdCtaFile.cpp XrdCtaDir.cpp)
add_library (XrdCtaOfs MODULE ListArchiveFilesCmd.cpp XrdCtaFile.cpp XrdCtaDir.cpp)
add_library (XrdCtaOfs MODULE ListArchiveFilesCmd.cpp XrdCtaFilesystem.cpp XrdCtaFile.cpp XrdCtaDir.cpp)
target_link_libraries (XrdCtaOfs ctacatalogue ctaeosmessages ctascheduler ctacommon ${PROTOBUF3_LIBRARIES} ctaobjectstore cryptopp)
set_target_properties(XrdCtaOfs PROPERTIES INSTALL_RPATH ${PROTOBUF3_RPATH})
......
......@@ -73,273 +73,6 @@ extern "C"
namespace cta { namespace xroot_plugins {
//------------------------------------------------------------------------------
// FSctl
//------------------------------------------------------------------------------
int XrdCtaFilesystem::FSctl(const int cmd, XrdSfsFSctl &args, XrdOucErrInfo &eInfo, const XrdSecEntity *client) {
std::ostringstream errMsg;
try {
if (SFS_FSCTL_PLUGIO != cmd) {
eInfo.setErrInfo(ENOTSUP, "Not supported: cmd != SFS_FSCTL_PLUGIO");
return SFS_ERROR;
}
{
std::list<cta::log::Param> params;
params.push_back({"args.Arg1Len", args.Arg1Len});
params.push_back({"args.Arg2Len", args.Arg2Len});
params.push_back({"client->host", client->host});
params.push_back({"client->name", client->name});
(*m_log)(log::INFO, "FSctl called", params);
}
if (args.Arg1 == nullptr || args.Arg1Len == 0) {
throw cta::exception::Exception("Did not receive a query argument");
}
const std::string msgBuffer(args.Arg1, args.Arg1Len);
eos::wfe::Wrapper msg;
if (!msg.ParseFromString(msgBuffer)) {
throw cta::exception::Exception("Failed to parse incoming wrapper message");
}
auto reply = processWrapperMsg(msg, client);
const int replySize = reply->DataLen();
eInfo.setErrInfo(replySize, reply.release());
return SFS_DATA;
} catch(cta::exception::Exception &ex) {
errMsg << __FUNCTION__ << " failed: " << ex.getMessage().str();
} catch(std::exception &se) {
errMsg << __FUNCTION__ << " failed: " << se.what();
} catch(...) {
errMsg << __FUNCTION__ << " failed: Caught an unknown exception";
}
// Reaching this point means an exception was thrown and errMsg has been set
try {
{
std::list<cta::log::Param> params;
params.push_back({"errMsg", errMsg.str()});
(*m_log)(log::ERR, "FSctl encountered an unexpected exception", params);
}
eos::wfe::Wrapper wrapper;
wrapper.set_type(eos::wfe::Wrapper::ERROR);
eos::wfe::Error *const error = wrapper.mutable_error();
error->set_audience(eos::wfe::Error::EOSLOG);
error->set_code(ECANCELED);
error->set_message(errMsg.str());
std::string replyString = wrapper.SerializeAsString();
auto reply = make_UniqueXrdOucBuffer(replyString.size(), replyString.c_str());
const int replySize = reply->DataLen();
eInfo.setErrInfo(replySize, reply.release());
return SFS_DATA;
} catch(...) {
eInfo.setErrInfo(ECANCELED, "Failed to create reply eos::wfe::Error message");
return SFS_ERROR;
}
}
//------------------------------------------------------------------------------
// processWrapperMsg
//------------------------------------------------------------------------------
XrdCtaFilesystem::UniqueXrdOucBuffer XrdCtaFilesystem::processWrapperMsg(const eos::wfe::Wrapper &msg,
const XrdSecEntity *const client) {
switch(msg.type()) {
case eos::wfe::Wrapper::NONE:
throw cta::exception::Exception("Cannot process a wrapped message of type NONE");
case eos::wfe::Wrapper::NOTIFICATION:
return processNotificationMsg(msg.notification(), client);
default:
{
cta::exception::Exception ex;
ex.getMessage() << "Cannot process a wrapped message with a numeric type value of " << msg.type();
throw ex;
}
}
}
//------------------------------------------------------------------------------
// processNotificationMsg
//------------------------------------------------------------------------------
XrdCtaFilesystem::UniqueXrdOucBuffer XrdCtaFilesystem::processNotificationMsg(const eos::wfe::Notification &msg,
const XrdSecEntity *const client) {
{
std::list<cta::log::Param> params;
params.push_back({"wf.event", msg.wf().event()});
params.push_back({"wf.queue", msg.wf().queue()});
params.push_back({"wf.wfname", msg.wf().wfname()});
params.push_back({"fid", msg.file().fid()});
params.push_back({"path", msg.file().lpath()});
(*m_log)(log::INFO, "Processing notification message", params);
}
switch(msg.wf().event()) {
case eos::wfe::Workflow::NONE:
throw cta::exception::Exception("Cannot process a NONE workflow event");
case eos::wfe::Workflow::OPENR:
throw cta::exception::Exception("Cannot process a OPENR workflow event");
case eos::wfe::Workflow::OPENW:
throw cta::exception::Exception("Cannot process a OPENW workflow event");
case eos::wfe::Workflow::CLOSER:
throw cta::exception::Exception("Cannot process a CLOSER workflow event");
case eos::wfe::Workflow::CLOSEW:
return processCLOSEW(msg, client);
case eos::wfe::Workflow::DELETE:
throw cta::exception::Exception("Cannot process a DELETE workflow event");
case eos::wfe::Workflow::PREPARE:
return processPREPARE(msg, client);
default:
{
cta::exception::Exception ex;
ex.getMessage() << "Workflow events with numeric value " << msg.wf().event() << " are not supported";
throw ex;
}
}
}
//------------------------------------------------------------------------------
// processCLOSEW
//------------------------------------------------------------------------------
XrdCtaFilesystem::UniqueXrdOucBuffer XrdCtaFilesystem::processCLOSEW(const eos::wfe::Notification &msg,
const XrdSecEntity *const client) {
if(msg.wf().wfname() == "default") {
return processDefaultCLOSEW(msg, client);
} else {
cta::exception::Exception ex;
ex.getMessage() << "Cannot process a CLOSEW event for a " << msg.wf().wfname() << " workflow";
throw ex;
}
}
//------------------------------------------------------------------------------
// processDefaultCLOSEW
//------------------------------------------------------------------------------
XrdCtaFilesystem::UniqueXrdOucBuffer XrdCtaFilesystem::processDefaultCLOSEW(const eos::wfe::Notification &msg,
const XrdSecEntity *const client) {
cta::common::dataStructures::DiskFileInfo diskFileInfo;
diskFileInfo.recoveryBlob = toJson(msg);
diskFileInfo.group = msg.file().owner().groupname();
diskFileInfo.owner = msg.file().owner().username();
diskFileInfo.path = msg.file().lpath();
cta::common::dataStructures::UserIdentity requester;
requester.name = msg.cli().user().username();
requester.group = msg.cli().user().groupname();
std::ostringstream archiveReportURL;
archiveReportURL << "eosQuery://" << msg.wf().instance().name() << "//eos/wfe/passwd?mgm.pcmd=event&mgm.fid=" <<
std::hex << msg.file().fid() <<
"&mgm.logid=cta&mgm.event=archived&mgm.workflow=default&mgm.path=/eos/wfe/passwd&mgm.ruid=0&mgm.rgid=0";
cta::common::dataStructures::ArchiveRequest request;
request.checksumType = msg.file().cks().name();
request.checksumValue = msg.file().cks().value();
request.diskFileInfo = diskFileInfo;
request.diskFileID = msg.file().fid();
request.fileSize = msg.file().size();
request.requester = requester;
request.srcURL = msg.turl();
request.storageClass = getDirXattr("CTA_StorageClass", msg.directory());
request.archiveReportURL = archiveReportURL.str();
const std::string diskInstance = msg.wf().instance().name();
log::LogContext lc(*m_log);
const uint64_t archiveFileId = m_scheduler->queueArchive(diskInstance, request, lc);
eos::wfe::Wrapper wrapper;
wrapper.set_type(eos::wfe::Wrapper::XATTR);
eos::wfe::Xattr *const xattr = wrapper.mutable_xattr();
xattr->set_fid(msg.file().fid());
xattr->set_op(eos::wfe::Xattr::ADD);
(*xattr->mutable_xattrs())["sys.archiveFileId"] = std::to_string(archiveFileId);
std::string replyString = wrapper.SerializeAsString();
return make_UniqueXrdOucBuffer(replyString.size(), replyString.c_str());
}
//------------------------------------------------------------------------------
// getDirXattr
//------------------------------------------------------------------------------
std::string XrdCtaFilesystem::getDirXattr(const std::string &attributeName, const eos::wfe::Md &dir) {
const auto itor = dir.xattr().find(attributeName);
if(itor == dir.xattr().end()) {
cta::exception::Exception ex;
ex.getMessage() << "Directory " << dir.lpath() << " has no attribute named " << attributeName;
throw ex;
}
return itor->second;
}
//------------------------------------------------------------------------------
// processPREPARE
//------------------------------------------------------------------------------
XrdCtaFilesystem::UniqueXrdOucBuffer XrdCtaFilesystem::processPREPARE(const eos::wfe::Notification &msg,
const XrdSecEntity *const client) {
if(msg.wf().wfname() == "default") {
auto reply = processDefaultPREPARE(msg, client);
return UniqueXrdOucBuffer(reply.release());
} else {
cta::exception::Exception ex;
ex.getMessage() << "Cannot process a PREPARE event for a " << msg.wf().wfname() << " workflow";
throw ex;
}
}
//------------------------------------------------------------------------------
// processDefaultPREPARE
//------------------------------------------------------------------------------
XrdCtaFilesystem::UniqueXrdOucBuffer XrdCtaFilesystem::processDefaultPREPARE(const eos::wfe::Notification &msg,
const XrdSecEntity *const client) {
cta::common::dataStructures::DiskFileInfo diskFileInfo;
diskFileInfo.recoveryBlob = toJson(msg);
diskFileInfo.group = msg.file().owner().groupname();
diskFileInfo.owner = msg.file().owner().username();
diskFileInfo.path = msg.file().lpath();
cta::common::dataStructures::UserIdentity requester;
requester.name = msg.cli().user().username();
requester.group = msg.cli().user().groupname();
const std::string archiveFileIdStr = getFileXattr("sys.archiveFileId", msg.file());
cta::common::dataStructures::RetrieveRequest request;
request.diskFileInfo = diskFileInfo;
request.archiveFileID = cta::utils::toUint64(archiveFileIdStr);
request.requester = requester;
request.dstURL = msg.turl();
const std::string diskInstance = msg.wf().instance().name();
log::LogContext lc(*m_log);
m_scheduler->queueRetrieve(diskInstance, request, lc);
eos::wfe::Wrapper wrapper;
wrapper.set_type(eos::wfe::Wrapper::ERROR); // Actually success - error code will be 0
eos::wfe::Error *const error = wrapper.mutable_error();
error->set_audience(eos::wfe::Error::EOSLOG);
error->set_code(0);
error->set_message("");
std::string replyString = wrapper.SerializeAsString();
return make_UniqueXrdOucBuffer(replyString.size(), replyString.c_str());
}
//------------------------------------------------------------------------------
// getFileXattr
//------------------------------------------------------------------------------
std::string XrdCtaFilesystem::getFileXattr(const std::string &attributeName, const eos::wfe::Md &file) {
const auto itor = file.xattr().find(attributeName);
if(itor == file.xattr().end()) {
cta::exception::Exception ex;
ex.getMessage() << "File " << file.lpath() << " has no attribute named " << attributeName;
throw ex;
}
return itor->second;
}
//------------------------------------------------------------------------------
// newFile
//------------------------------------------------------------------------------
......
......@@ -47,7 +47,6 @@ public:
virtual int chmod(const char *path, XrdSfsMode mode, XrdOucErrInfo &eInfo, const XrdSecEntity *client = 0, const char *opaque = 0);
virtual void Disc(const XrdSecEntity *client = 0);
virtual void EnvInfo(XrdOucEnv *envP);
virtual int FSctl(const int cmd, XrdSfsFSctl &args, XrdOucErrInfo &eInfo, const XrdSecEntity *client = 0);
virtual int fsctl(const int cmd, const char *args, XrdOucErrInfo &eInfo, const XrdSecEntity *client = 0);
virtual int getStats(char *buff, int blen);
virtual const char *getVersion();
......@@ -173,110 +172,6 @@ protected:
xbuf->SetLen(dataSize);
return UniqueXrdOucBuffer(xbuf);
}
/**
* Processes the specified wrapper message.
*
* @param msg The message.
* @param client Same semantic as the XrdCtaFilesystem::FSctl() method.
* @return The result in the form of an XrdOucBuffer to be sent back to the
* client.
*/
UniqueXrdOucBuffer processWrapperMsg(const eos::wfe::Wrapper &msg, const XrdSecEntity *const client);
/**
* Processes the specified notification message.
*
* @param msg The message.
* @param client Same semantic as the XrdCtaFilesystem::FSctl() method.
* @return The result in the form of an XrdOucBuffer to be sent back to the
* client.
*/
UniqueXrdOucBuffer processNotificationMsg(const eos::wfe::Notification &msg, const XrdSecEntity *const client);
/**
* Processes the specified CLOSEW workflow event.
*
* @param msg The notification message.
* @param client Same semantic as the XrdCtaFilesystem::FSctl() method.
* @return The result in the form of an XrdOucBuffer to be sent back to the
* client.
*/
UniqueXrdOucBuffer processCLOSEW(const eos::wfe::Notification &msg, const XrdSecEntity *const client);
/**
* Processes the specified CLOSEW workflow event triggered by an EOS user
* writing a file to disk, as opposed to a tape server writing a file to disk.
*
* A user uses the "default" workflow when they write a file to disk. A tape
* server uses the "cta" workflow when it writes a file to disk.
*
* @param msg The message.
* @param client Same semantic as the XrdCtaFilesystem::FSctl() method.
* @return The result in the form of an XrdOucBuffer to be sent back to the
* client.
*/
UniqueXrdOucBuffer processDefaultCLOSEW(const eos::wfe::Notification &msg, const XrdSecEntity *const client);
/**
* Processes the specified PREPARE workflow event.
*
* @param msg The notification message.
* @param client Same semantic as the XrdCtaFilesystem::FSctl() method.
* @return The result in the form of an XrdOucBuffer to be sent back to the
* client.
*/
UniqueXrdOucBuffer processPREPARE(const eos::wfe::Notification &msg, const XrdSecEntity *const client);
/**
* Processes the specified PREPARE workflow event for a default workflow.
*
* @param msg The notification message.
* @param client Same semantic as the XrdCtaFilesystem::FSctl() method.
* @return The result in the form of an XrdOucBuffer to be sent back to the
* client.
*/
UniqueXrdOucBuffer processDefaultPREPARE(const eos::wfe::Notification &msg, const XrdSecEntity *const client);
/**
* Return the JSON representation of teh specified Google protocol buffer
* message.
* @param protobufMsg The Google protocol buffer message.
* @return The JSON string.
*/
template <typename T> static std::string toJson(T protobufMsg) {
google::protobuf::util::JsonPrintOptions jsonPrintOptions;
jsonPrintOptions.add_whitespace = false;
jsonPrintOptions.always_print_primitive_fields = false;
std::string json;
google::protobuf::util::MessageToJsonString(protobufMsg, &json, jsonPrintOptions);
return json;
}
/**
* Returns the value of the specified attribute of the specified directory.
*
* This method throws an exception if the specified directory does not have
* the specified attribute.
*
* @param attributeName The name pf the attribute.
* @param dir The directory.
* @return The storage class.
*/
static std::string getDirXattr(const std::string &attributeName, const eos::wfe::Md &dir);
/**
* Returns the value of the specified attribute of the specified file.
*
* This method throws an exception if the specified file does not have the
* specified attribute.
*
* @param attributeName The name pf the attribute.
* @param file The file.
* @return The storage class.
*/
static std::string getFileXattr(const std::string &attributeName, const eos::wfe::Md &file);
}; // XrdCtaFilesystem
}}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment