diff --git a/catalogue/RdbmsCatalogue.cpp b/catalogue/RdbmsCatalogue.cpp index aff68c9bf39ec3df53e8b1bd7cb82a824dfe2dca..43b63b1119f7f7e8bd0d31b7898836bf3d317705 100644 --- a/catalogue/RdbmsCatalogue.cpp +++ b/catalogue/RdbmsCatalogue.cpp @@ -4009,7 +4009,7 @@ common::dataStructures::ArchiveFileQueueCriteria RdbmsCatalogue::prepareForNewFi } // Now that we have both the archive routes and the mount policy it's safe to - // consume an archive file identifier + // consume an archive file identifierarchiveFileId const uint64_t archiveFileId = getNextArchiveFileId(conn); return common::dataStructures::ArchiveFileQueueCriteria(archiveFileId, copyToPoolMap, mountPolicy); diff --git a/eos/messages/eos_messages.proto b/eos/messages/eos_messages.proto index 8db9b4372d501e7c14a2d424e3d9ab883d595eb6..45ff886e46c2e83595af67ac1c0b0bebdaa5eec3 100644 --- a/eos/messages/eos_messages.proto +++ b/eos/messages/eos_messages.proto @@ -52,9 +52,17 @@ message Service { } message Workflow { - string event = 1; //< event + enum EventType { + NONE = 0; + OPENR = 1; + OPENW = 2; + CLOSER = 3; + CLOSEW = 4; + DELETE = 5; + PREPARE = 6;} + EventType event = 1; //< event, default value is NONE string queue = 2; //< queue - string wfname = 3; //< workflow + string wfname = 3; //< workflow string vpath = 4; //< vpath Service instance = 5; //< instance information fixed64 timestamp = 6; //< event timestamp diff --git a/xroot_plugins/OpaqueQueryCmd.cpp b/xroot_plugins/OpaqueQueryCmd.cpp index e0cf0eed08404caf4ee74fc0b596b2736dc2db39..096ff46c86c991a6a73377b4b954b06d2d2be853 100644 --- a/xroot_plugins/OpaqueQueryCmd.cpp +++ b/xroot_plugins/OpaqueQueryCmd.cpp @@ -73,7 +73,6 @@ int OpaqueQueryCmd::exceptionThrowingMain(const int argc, char *const *const arg const XrdCl::XRootDStatus status = fs.Query(XrdCl::QueryCode::Opaque, arg, response); std::unique_ptr<XrdCl::Buffer> smartResponse(response); - std::cout << "status.ToStr()=" << status.ToStr() << std::endl; std::cout << "status.IsError()=" << (status.IsError() ? "true" : "false") << std::endl; std::cout << "status.IsFatal()=" << (status.IsFatal() ? "true" : "false") << std::endl; std::cout << "status.IsOK()=" << (status.IsOK() ? "true" : "false") << std::endl; diff --git a/xroot_plugins/WriteNotificationMsgCmd.cpp b/xroot_plugins/WriteNotificationMsgCmd.cpp index f8da2224ad4bdfb828f12ace1a35ead2f665e7ec..2227bc27be513b73b6677cdff19a3d174f4022fa 100644 --- a/xroot_plugins/WriteNotificationMsgCmd.cpp +++ b/xroot_plugins/WriteNotificationMsgCmd.cpp @@ -57,9 +57,9 @@ int WriteNotificationMsgCmd::exceptionThrowingMain(const int argc, char *const * eos::wfe::Wrapper wrapper; wrapper.set_type(eos::wfe::Wrapper::NOTIFICATION); - wrapper.mutable_notification()->mutable_wf()->set_event("notification_workflow_event"); + wrapper.mutable_notification()->mutable_wf()->set_event(eos::wfe::Workflow::CLOSEW); wrapper.mutable_notification()->mutable_wf()->set_queue("notification_workflow_queue"); - wrapper.mutable_notification()->mutable_wf()->set_wfname("notification_workflow_wfname"); + wrapper.mutable_notification()->mutable_wf()->set_wfname("default"); wrapper.mutable_notification()->mutable_wf()->set_vpath("notification_workflow_vpath"); wrapper.mutable_notification()->mutable_wf()->mutable_instance()->set_name("notification_instance_name"); wrapper.mutable_notification()->mutable_wf()->mutable_instance()->set_url("notification_instance_url"); @@ -116,6 +116,7 @@ int WriteNotificationMsgCmd::exceptionThrowingMain(const int argc, char *const * wrapper.mutable_notification()->mutable_directory()->set_lpath("notification_directory_lpath"); (*wrapper.mutable_notification()->mutable_directory()->mutable_xattr())["notification_directory_attr1"] = "directory_xattr1_value"; (*wrapper.mutable_notification()->mutable_directory()->mutable_xattr())["notification_directory_attr2"] = "directory_xattr2_value"; + (*wrapper.mutable_notification()->mutable_directory()->mutable_xattr())["CTA_StorageClass"] = "CTA_StorageClass"; if(cmdLineArgs.writeJsonToStdOut) { google::protobuf::util::JsonPrintOptions options; diff --git a/xroot_plugins/XrdCtaFilesystem.cpp b/xroot_plugins/XrdCtaFilesystem.cpp index d37dd4c1421dd429bf6c8404a9bbf57676a545fe..d40077730906296c9e865ad6d703abedd18e9a75 100644 --- a/xroot_plugins/XrdCtaFilesystem.cpp +++ b/xroot_plugins/XrdCtaFilesystem.cpp @@ -26,7 +26,6 @@ #include "common/archiveRoutes/ArchiveRoute.hpp" #include "common/Configuration.hpp" #include "common/exception/Exception.hpp" -#include "common/make_unique.hpp" #include "common/TapePool.hpp" #include "eos/messages/eos_messages.pb.h" #include "objectstore/RootEntry.hpp" @@ -77,91 +76,185 @@ namespace cta { namespace xrootPlugins { //------------------------------------------------------------------------------ // FSctl //------------------------------------------------------------------------------ -int XrdCtaFilesystem::FSctl(const int cmd, XrdSfsFSctl &args, XrdOucErrInfo &eInfo, const XrdSecEntity *client) -{ - if(SFS_FSCTL_PLUGIO != cmd) { - eInfo.setErrInfo(ENOTSUP, "Not supported: cmd != SFS_FSCTL_PLUGIO"); - return SFS_ERROR; - } +int XrdCtaFilesystem::FSctl(const int cmd, XrdSfsFSctl &args, XrdOucErrInfo &eInfo, const XrdSecEntity *client) { + std::ostringstream errMsg; - { - std::list<cta::log::Param> params; - params.push_back({"args.Arg1Len", args.Arg1Len}); - params.push_back({"args.Arg2Len", args.Arg2Len}); - params.push_back({"client->host", client->host}); - params.push_back({"client->name", client->name}); - (*m_log)(log::INFO, "FSctl called", params); - } + try { + if (SFS_FSCTL_PLUGIO != cmd) { + eInfo.setErrInfo(ENOTSUP, "Not supported: cmd != SFS_FSCTL_PLUGIO"); + return SFS_ERROR; + } - if(args.Arg1 == nullptr || args.Arg1Len == 0) { - eInfo.setErrInfo(EINVAL, "Did not receive a query argument"); - return SFS_ERROR; + { + std::list<cta::log::Param> params; + params.push_back({"args.Arg1Len", args.Arg1Len}); + params.push_back({"args.Arg2Len", args.Arg2Len}); + params.push_back({"client->host", client->host}); + params.push_back({"client->name", client->name}); + (*m_log)(log::INFO, "FSctl called", params); + } + + if (args.Arg1 == nullptr || args.Arg1Len == 0) { + throw cta::exception::Exception("Did not receive a query argument"); + } + + const std::string msgBuffer(args.Arg1, args.Arg1Len); + eos::wfe::Wrapper msg; + if (!msg.ParseFromString(msgBuffer)) { + throw cta::exception::Exception("Failed to parse incoming wrapper message"); + } + + auto reply = processWrapperMsg(msg, client); + eInfo.setErrInfo(reply->BuffSize(), reply.release()); + } catch(cta::exception::Exception &ex) { + errMsg << __FUNCTION__ << " failed: " << ex.getMessage().str(); + } catch(std::exception &se) { + errMsg << __FUNCTION__ << " failed: " << se.what(); + } catch(...) { + errMsg << __FUNCTION__ << " failed: Caught an unknown exception"; } - const std::string msgBuffer(args.Arg1, args.Arg1Len); - eos::wfe::Wrapper msg; - if(!msg.ParseFromString(msgBuffer)) { - eInfo.setErrInfo(EINVAL, "Failed to parse incoming wrapper message"); + try { + eos::wfe::Wrapper wrapper; + wrapper.set_type(eos::wfe::Wrapper::ERROR); + eos::wfe::Error *const error = wrapper.mutable_error(); + error->set_audience(eos::wfe::Error::EOSLOG); + error->set_code(ECANCELED); + error->set_message(errMsg.str()); + + std::string replyString = wrapper.SerializeAsString(); + auto reply = make_UniqueXrdOucBuffer(replyString.size()); + memcpy(reply->Buffer(), replyString.c_str(), replyString.size()); + + eInfo.setErrInfo(replyString.size(), reply.release()); + return SFS_DATA; + } catch(...) { + eInfo.setErrInfo(ECANCELED, "Failed to create reply eos::wfe::Error message"); return SFS_ERROR; } - - return processWrapperMsg(msg, eInfo, client); } //------------------------------------------------------------------------------ // processWrapperMsg //------------------------------------------------------------------------------ -int XrdCtaFilesystem::processWrapperMsg(const eos::wfe::Wrapper &msg, XrdOucErrInfo &eInfo, - const XrdSecEntity *client) { +XrdCtaFilesystem::UniqueXrdOucBuffer XrdCtaFilesystem::processWrapperMsg(const eos::wfe::Wrapper &msg, + const XrdSecEntity *const client) { switch(msg.type()) { case eos::wfe::Wrapper::NONE: - eInfo.setErrInfo(EINVAL, "Cannot process a wrapped message of type NONE"); + throw cta::exception::Exception("Cannot process a wrapped message of type NONE"); case eos::wfe::Wrapper::NOTIFICATION: - return processNotificationMsg(msg.notification(), eInfo, client); + return processNotificationMsg(msg.notification(), client); default: { - std::ostringstream errMsg; - errMsg << "Cannot process a wrapped message with a numeric type value of " << msg.type(); - eInfo.setErrInfo(EINVAL, errMsg.str().c_str()); + cta::exception::Exception ex; + ex.getMessage() << "Cannot process a wrapped message with a numeric type value of " << msg.type(); + throw ex; } - return SFS_ERROR; } } //------------------------------------------------------------------------------ // processNotificationMsg //------------------------------------------------------------------------------ -int XrdCtaFilesystem::processNotificationMsg(const eos::wfe::Notification &msg, XrdOucErrInfo &eInfo, - const XrdSecEntity *client) { +XrdCtaFilesystem::UniqueXrdOucBuffer XrdCtaFilesystem::processNotificationMsg(const eos::wfe::Notification &msg, + const XrdSecEntity *const client) { { std::list<cta::log::Param> params; params.push_back({"wf.event", msg.wf().event()}); params.push_back({"wf.queue", msg.wf().queue()}); params.push_back({"wf.wfname", msg.wf().wfname()}); - params.push_back({"eosfid", msg.file().fid()}); - params.push_back({"eoslpath", msg.file().lpath()}); + params.push_back({"fid", msg.file().fid()}); + params.push_back({"path", msg.file().lpath()}); (*m_log)(log::INFO, "Processing notification message", params); } - const size_t sizeOfReply = 10*1024*1024; - char *const reply = static_cast<char *>(malloc(sizeOfReply)); - if(nullptr == reply) { - (*m_log)(log::ERR, "FSctl failed to allocate reply message"); + switch(msg.wf().event()) { + case eos::wfe::Workflow::NONE: + throw cta::exception::Exception("Cannot process a NONE workflow event"); + case eos::wfe::Workflow::CLOSEW: + return processCLOSEW(msg, client); + default: + { + cta::exception::Exception ex; + ex.getMessage() << "Workflow events with numeric value " << msg.wf().event() << " are not supported"; + throw ex; + } } - memset(reply, '\0', sizeOfReply); - char replyTxt[] = "Reply from CTA"; - strncpy(reply, replyTxt, sizeOfReply); - reply[sizeOfReply - 1] = '\0'; - // buf takes ownership of msg - XrdOucBuffer *buf = new XrdOucBuffer(reply, sizeOfReply); - if(nullptr == buf) { - (*m_log)(log::ERR, "FSctl failed to allocate reply buffer"); +} + +//------------------------------------------------------------------------------ +// processCLOSEW +//------------------------------------------------------------------------------ +XrdCtaFilesystem::UniqueXrdOucBuffer XrdCtaFilesystem::processCLOSEW(const eos::wfe::Notification &msg, + const XrdSecEntity *const client) { + if(msg.wf().wfname() == "default") { + return processDefaultCLOSEW(msg, client); + } else { + cta::exception::Exception ex; + ex.getMessage() << "Cannot process a CLOSEW event for a " << msg.wf().wfname() << " workflow"; + throw ex; } +} - // eInfo takes ownership of buf - eInfo.setErrInfo(buf->BuffSize(), buf); +//------------------------------------------------------------------------------ +// processDefaultCLOSEW +//------------------------------------------------------------------------------ +XrdCtaFilesystem::UniqueXrdOucBuffer XrdCtaFilesystem::processDefaultCLOSEW(const eos::wfe::Notification &msg, +const XrdSecEntity *const client) { + cta::common::dataStructures::DiskFileInfo diskFileInfo; + diskFileInfo.recoveryBlob = toJson(msg); + diskFileInfo.group = msg.file().owner().groupname(); + diskFileInfo.owner = msg.file().owner().username(); + diskFileInfo.path = msg.file().lpath(); + + cta::common::dataStructures::UserIdentity requester; + requester.name = msg.cli().user().username(); + requester.group = msg.cli().user().groupname(); + + std::ostringstream archiveReportURL; + archiveReportURL << "eosQuery://" << msg.wf().instance().name() << "//eos/wfe/passwd?mgm.pcmd=event&mgm.fid=" << + std::hex << msg.file().fid() << + "&mgm.logid=cta&mgm.event=archived&mgm.workflow=default&mgm.path=/eos/wfe/passwd&mgm.ruid=0&mgm.rgid=0"; + + cta::common::dataStructures::ArchiveRequest request; + request.checksumType = msg.file().cks().name(); + request.checksumValue = msg.file().cks().value(); + request.diskFileInfo = diskFileInfo; + request.diskFileID = msg.file().fid(); + request.fileSize = msg.file().size(); + request.requester = requester; + request.srcURL = msg.turl(); + request.storageClass = getDirStorageClass(msg.directory()); + request.archiveReportURL = archiveReportURL.str(); + + log::LogContext lc(*m_log); + const uint64_t archiveFileId = m_scheduler->queueArchive(msg.cli().user().username(), request, lc); + + eos::wfe::Wrapper wrapper; + wrapper.set_type(eos::wfe::Wrapper::XATTR); + eos::wfe::Xattr *const xattr = wrapper.mutable_xattr(); + xattr->set_fid(msg.file().fid()); + xattr->set_op(eos::wfe::Xattr::ADD); + (*xattr->mutable_xattrs())["sys.archiveFileId"] = std::to_string(archiveFileId); + + std::string replyString = wrapper.SerializeAsString(); + auto reply = make_UniqueXrdOucBuffer(replyString.size()); + memcpy(reply->Buffer(), replyString.c_str(), replyString.size()); + + return reply; +} - return SFS_DATA; +//------------------------------------------------------------------------------ +// getDirStorageClass +//------------------------------------------------------------------------------ +std::string XrdCtaFilesystem::getDirStorageClass(const eos::wfe::Md &dir) const { + const auto itor = dir.xattr().find("CTA_StorageClass"); + if(itor == dir.xattr().end()) { + cta::exception::Exception ex; + ex.getMessage() << "Directory " << dir.lpath() << " has no CTA_StorageClass"; + throw ex; + } + return itor->second; } //------------------------------------------------------------------------------ diff --git a/xroot_plugins/XrdCtaFilesystem.hpp b/xroot_plugins/XrdCtaFilesystem.hpp index 344194149386e2f2e709fc7addc89890772e5de6..827afb46ceeb35164df45ad1fd669fe7da13f2e8 100644 --- a/xroot_plugins/XrdCtaFilesystem.hpp +++ b/xroot_plugins/XrdCtaFilesystem.hpp @@ -21,13 +21,14 @@ #include "catalogue/Catalogue.hpp" #include "common/Configuration.hpp" #include "common/log/Logger.hpp" +#include "common/make_unique.hpp" #include "objectstore/BackendPopulator.hpp" #include "objectstore/BackendVFS.hpp" #include "scheduler/OStoreDB/OStoreDBWithAgent.hpp" #include "scheduler/Scheduler.hpp" - #include "XrdSfs/XrdSfsInterface.hh" +#include <google/protobuf/util/json_util.h> #include <memory> @@ -98,25 +99,113 @@ protected: */ std::unique_ptr<log::Logger> m_log; + /** + * A deleter for instances of the XrdOucBuffer class. + * + * The destructor of the XrdOucBuffer class is private. The Recycle() + * method can be called on an instance of the XrdOucBuffer class in order to + * effective delete that instance. + */ + struct XrdOucBufferDeleter { + void operator()(XrdOucBuffer* buf) { + buf->Recycle(); + } + }; + + /** + * Convenience typedef for an std::unique_ptr type that can delete instances + * of the XrdOucBuffer class. + */ + typedef std::unique_ptr<XrdOucBuffer, XrdOucBufferDeleter> UniqueXrdOucBuffer; + + /** + * Convenience method to create a UniqueXrdOucBuffer. + */ + static UniqueXrdOucBuffer make_UniqueXrdOucBuffer(const size_t bufSize) { + char *const cbuf = static_cast<char *>(malloc(bufSize)); + if(nullptr == cbuf) { + cta::exception::Exception ex; + ex.getMessage() << __FUNCTION__ << " failed: Failed to malloc " << bufSize << " bytes"; + throw ex; + } + XrdOucBuffer *xbuf = new XrdOucBuffer(cbuf, bufSize); + if(nullptr == xbuf) { + cta::exception::Exception ex; + ex.getMessage() << __FUNCTION__ << " failed: Failed to allocate an XrdOucBuffer"; + throw ex; + } + return UniqueXrdOucBuffer(xbuf); + } + /** * Processes the specified wrapper message. * * @param msg The message. - * @param eInfo Same semantic as the XrdCtaFilesystem::FSctl() method. * @param client Same semantic as the XrdCtaFilesystem::FSctl() method. - * @return Same semantic as the XrdCtaFilesystem::FSctl() method. + * @return The result in the form of an XrdOucBuffer to be sent back to the + * client. */ - int processWrapperMsg(const eos::wfe::Wrapper &msg, XrdOucErrInfo &eInfo, const XrdSecEntity *client); + UniqueXrdOucBuffer processWrapperMsg(const eos::wfe::Wrapper &msg, const XrdSecEntity *const client); /** * Processes the specified notification message. * * @param msg The message. - * @param eInfo Same semantic as the XrdCtaFilesystem::FSctl() method. * @param client Same semantic as the XrdCtaFilesystem::FSctl() method. - * @return Same semantic as the XrdCtaFilesystem::FSctl() method. + * @return The result in the form of an XrdOucBuffer to be sent back to the + * client. + */ + UniqueXrdOucBuffer processNotificationMsg(const eos::wfe::Notification &msg, const XrdSecEntity *const client); + + /** + * Processes the specified CLOSEW event. + * + * @param msg The notification message. + * @param client Same semantic as the XrdCtaFilesystem::FSctl() method. + * @return The result in the form of an XrdOucBuffer to be sent back to the + * client. + */ + UniqueXrdOucBuffer processCLOSEW(const eos::wfe::Notification &msg, const XrdSecEntity *const client); + + /** + * Processes the specified CLOSEW event triggered by an EOS user writing a + * file to disk, as opposed to a tape server. + * + * A user uses the "default" workflow when they write a file to disk. A tape + * server uses the "cta" workflow when it writes a file to disk. + * + * @param msg The message. + * @param client Same semantic as the XrdCtaFilesystem::FSctl() method. + * @return The result in the form of an XrdOucBuffer to be sent back to the + * client. + */ + UniqueXrdOucBuffer processDefaultCLOSEW(const eos::wfe::Notification &msg, const XrdSecEntity *const client); + + /** + * Return the JSON representation of teh specified Google protocol buffer + * message. + * @param protobufMsg The Google protocol buffer message. + * @return The JSON string. + */ + template <typename T> static std::string toJson(T protobufMsg) { + google::protobuf::util::JsonPrintOptions jsonPrintOptions; + jsonPrintOptions.add_whitespace = false; + jsonPrintOptions.always_print_primitive_fields = false; + std::string json; + google::protobuf::util::MessageToJsonString(protobufMsg, &json, jsonPrintOptions); + return json; + } + + /** + * Returns the CTA_StorageClass of the specified directory. + * + * This method throws an exception if the specified directory does not have a + * CTA_StorageClass. + * + * @param dir The directory. + * @return The storage class. */ - int processNotificationMsg(const eos::wfe::Notification &msg, XrdOucErrInfo &eInfo, const XrdSecEntity *client); + std::string getDirStorageClass(const eos::wfe::Md &dir) const; }; // XrdCtaFilesystem