diff --git a/cmdline/EosCtaStub.cpp b/cmdline/EosCtaStub.cpp index 0cbd7f9461dd9b5e8a08c062044d21ead13c4263..0486c39c504f96a8523baa5869c739ee85130cdb 100644 --- a/cmdline/EosCtaStub.cpp +++ b/cmdline/EosCtaStub.cpp @@ -235,8 +235,10 @@ void fillNotification(eos::wfe::Notification ¬ification, bool &isStderr, bool google::protobuf::MapPair<std::string,std::string> sc("CTA_StorageClass", argval); notification.mutable_file()->mutable_xattr()->insert(sc); } - - else if(argstr == "--id") {} // eos::wfe::fxattr:sys.archiveFileId, not used for archive WF + else if(argstr == "--id") { + google::protobuf::MapPair<std::string,std::string> id("CTA_ArchiveFileId", argval); + notification.mutable_file()->mutable_xattr()->insert(id); + } else if(argstr == "--diskpool") {} // = default? else if(argstr == "--throughput") {} // = 10000? else if(argstr == "--recoveryblob:base64") base64Decode(notification, argval); diff --git a/xroot_plugins/XrdSsiCtaRequestProc.cpp b/xroot_plugins/XrdSsiCtaRequestProc.cpp index a8a6e5ced4c47d1524547f35819a98b112c4f138..deb190da9f96b30372a7d7c3b2bfdb22afda555b 100644 --- a/xroot_plugins/XrdSsiCtaRequestProc.cpp +++ b/xroot_plugins/XrdSsiCtaRequestProc.cpp @@ -71,7 +71,7 @@ static void requestProcCLOSEW(cta::Scheduler &scheduler, cta::log::LogContext &l uint64_t archiveFileId = scheduler.queueArchive(cli_username, request, lc); - // Return archiveFileId (deprecated) + // Set archiveFileId in response (deprecated) std::string result_str = "<eos::wfe::path::fxattr:sys.archiveFileId>" + std::to_string(archiveFileId); #ifdef XRDSSI_DEBUG @@ -79,13 +79,121 @@ static void requestProcCLOSEW(cta::Scheduler &scheduler, cta::log::LogContext &l #endif response.set_message_txt(result_str); - // Set metadata + // Set response type response.set_type(eos::wfe::Response::RSP_SUCCESS); } +/*! + * Process the PREPARE workflow event. + * + * @param[in,out] scheduler CTA Scheduler to queue this request + * @param[in,out] lc Log Context for this request + * @param[in] cli_username Client username, from authentication key for the request + * @param[in] notification EOS WFE Notification message + * @param[out] response Response message to return to EOS + */ +static void requestProcPREPARE(cta::Scheduler &scheduler, cta::log::LogContext &lc, + const std::string &cli_username, const eos::wfe::Notification ¬ification, eos::wfe::Response &response) +{ + // Unpack message + + cta::common::dataStructures::UserIdentity originator; + originator.name = notification.cli().user().username(); + originator.group = notification.cli().user().groupname(); + + cta::common::dataStructures::DiskFileInfo diskFileInfo; + diskFileInfo.owner = notification.file().owner().username(); + diskFileInfo.group = notification.file().owner().groupname(); + diskFileInfo.path = notification.file().lpath(); + + cta::common::dataStructures::RetrieveRequest request; + request.requester = originator; + request.dstURL = notification.transport().url(); + request.diskFileInfo = diskFileInfo; + + // CTA Archive ID is an EOS extended attribute, i.e. it is stored as a string, which + // must be converted to a valid uint64_t + + std::string archiveFileIdStr = notification.file().xattr().at("CTA_ArchiveFileId"); + if((request.archiveFileID = strtoul(archiveFileIdStr.c_str(), nullptr, 10)) == 0) + { + throw PbException("Invalid archiveFileID " + archiveFileIdStr); + } + + // Queue the request + + scheduler.queueRetrieve(cli_username, request, lc); + + // Set response type + + response.set_type(eos::wfe::Response::RSP_SUCCESS); +} + + + +/*! + * Process the DELETE workflow event. + * + * @param[in,out] scheduler CTA Scheduler to queue this request + * @param[in,out] lc Log Context for this request + * @param[in] cli_username Client username, from authentication key for the request + * @param[in] notification EOS WFE Notification message + * @param[out] response Response message to return to EOS + */ +#if 0 +static void requestProcDELETE(cta::Scheduler &scheduler, cta::log::LogContext &lc, + const std::string &cli_username, const eos::wfe::Notification ¬ification, eos::wfe::Response &response) +{ + cta::common::dataStructures::UserIdentity originator; + originator.name=user.value(); + originator.group=group.value(); + cta::common::dataStructures::DeleteArchiveRequest request; + request.archiveFileID=id.value(); + request.requester=originator; + const cta::common::dataStructures::ArchiveFile archiveFile = m_scheduler->deleteArchive(m_cliIdentity.username, request); + std::list<cta::log::Param> params; + params.push_back(cta::log::Param("USERNAME", m_cliIdentity.username)); + params.push_back(cta::log::Param("HOST", m_cliIdentity.host)); + params.push_back(cta::log::Param("fileId", std::to_string(archiveFile.archiveFileID))); + params.push_back(cta::log::Param("diskInstance", archiveFile.diskInstance)); + params.push_back(cta::log::Param("diskFileId", archiveFile.diskFileId)); + params.push_back(cta::log::Param("diskFileInfo.path", archiveFile.diskFileInfo.path)); + params.push_back(cta::log::Param("diskFileInfo.owner", archiveFile.diskFileInfo.owner)); + params.push_back(cta::log::Param("diskFileInfo.group", archiveFile.diskFileInfo.group)); + params.push_back(cta::log::Param("diskFileInfo.recoveryBlob", archiveFile.diskFileInfo.recoveryBlob)); + params.push_back(cta::log::Param("fileSize", std::to_string(archiveFile.fileSize))); + params.push_back(cta::log::Param("checksumType", archiveFile.checksumType)); + params.push_back(cta::log::Param("checksumValue", archiveFile.checksumValue)); + params.push_back(cta::log::Param("creationTime", std::to_string(archiveFile.creationTime))); + params.push_back(cta::log::Param("reconciliationTime", std::to_string(archiveFile.reconciliationTime))); + params.push_back(cta::log::Param("storageClass", archiveFile.storageClass)); + for(auto it=archiveFile.tapeFiles.begin(); it!=archiveFile.tapeFiles.end(); it++) { + std::stringstream tapeCopyLogStream; + tapeCopyLogStream << "copy number: " << it->first + << " vid: " << it->second.vid + << " fSeq: " << it->second.fSeq + << " blockId: " << it->second.blockId + << " creationTime: " << it->second.creationTime + << " compressedSize: " << it->second.compressedSize + << " checksumType: " << it->second.checksumType //this shouldn't be here: repeated field + << " checksumValue: " << it->second.checksumValue //this shouldn't be here: repeated field + << " copyNb: " << it->second.copyNb; //this shouldn't be here: repeated field + params.push_back(cta::log::Param("TAPE FILE", tapeCopyLogStream.str())); + } + m_log(log::INFO, "Archive File Deleted", params); + return cmdlineOutput.str(); + + // Set response type + + response.set_type(eos::wfe::Response::RSP_SUCCESS); +} +#endif + + + /*! * Convert a framework exception into a Response */ @@ -135,11 +243,14 @@ void RequestProc<eos::wfe::Notification, eos::wfe::Response, eos::wfe::Alert>::E { using namespace eos::wfe; - case(Workflow::CLOSEW): - requestProcCLOSEW(scheduler, lc, cli_username, m_request, m_metadata); - break; - case(Workflow::PREPARE): - case(Workflow::DELETE): + case Workflow::CLOSEW: + requestProcCLOSEW(scheduler, lc, cli_username, m_request, m_metadata); break; + + case Workflow::PREPARE: + requestProcPREPARE(scheduler, lc, cli_username, m_request, m_metadata); break; + + case Workflow::DELETE: + default: throw PbException("Workflow Event type " + std::to_string(m_request.wf().event()) + " is not supported."); }