Skip to content
Snippets Groups Projects
Commit 9a81a99a authored by Michael Davis's avatar Michael Davis
Browse files

[EOS-CTA] Implements prepare workflow => queue retrieve request

parent 416fe6e9
Branches
Tags
No related merge requests found
......@@ -235,8 +235,10 @@ void fillNotification(eos::wfe::Notification &notification, bool &isStderr, bool
google::protobuf::MapPair<std::string,std::string> sc("CTA_StorageClass", argval);
notification.mutable_file()->mutable_xattr()->insert(sc);
}
else if(argstr == "--id") {} // eos::wfe::fxattr:sys.archiveFileId, not used for archive WF
else if(argstr == "--id") {
google::protobuf::MapPair<std::string,std::string> id("CTA_ArchiveFileId", argval);
notification.mutable_file()->mutable_xattr()->insert(id);
}
else if(argstr == "--diskpool") {} // = default?
else if(argstr == "--throughput") {} // = 10000?
else if(argstr == "--recoveryblob:base64") base64Decode(notification, argval);
......
......@@ -71,7 +71,7 @@ static void requestProcCLOSEW(cta::Scheduler &scheduler, cta::log::LogContext &l
uint64_t archiveFileId = scheduler.queueArchive(cli_username, request, lc);
// Return archiveFileId (deprecated)
// Set archiveFileId in response (deprecated)
std::string result_str = "<eos::wfe::path::fxattr:sys.archiveFileId>" + std::to_string(archiveFileId);
#ifdef XRDSSI_DEBUG
......@@ -79,13 +79,121 @@ static void requestProcCLOSEW(cta::Scheduler &scheduler, cta::log::LogContext &l
#endif
response.set_message_txt(result_str);
// Set metadata
// Set response type
response.set_type(eos::wfe::Response::RSP_SUCCESS);
}
/*!
* Process the PREPARE workflow event.
*
* @param[in,out] scheduler CTA Scheduler to queue this request
* @param[in,out] lc Log Context for this request
* @param[in] cli_username Client username, from authentication key for the request
* @param[in] notification EOS WFE Notification message
* @param[out] response Response message to return to EOS
*/
static void requestProcPREPARE(cta::Scheduler &scheduler, cta::log::LogContext &lc,
const std::string &cli_username, const eos::wfe::Notification &notification, eos::wfe::Response &response)
{
// Unpack message
cta::common::dataStructures::UserIdentity originator;
originator.name = notification.cli().user().username();
originator.group = notification.cli().user().groupname();
cta::common::dataStructures::DiskFileInfo diskFileInfo;
diskFileInfo.owner = notification.file().owner().username();
diskFileInfo.group = notification.file().owner().groupname();
diskFileInfo.path = notification.file().lpath();
cta::common::dataStructures::RetrieveRequest request;
request.requester = originator;
request.dstURL = notification.transport().url();
request.diskFileInfo = diskFileInfo;
// CTA Archive ID is an EOS extended attribute, i.e. it is stored as a string, which
// must be converted to a valid uint64_t
std::string archiveFileIdStr = notification.file().xattr().at("CTA_ArchiveFileId");
if((request.archiveFileID = strtoul(archiveFileIdStr.c_str(), nullptr, 10)) == 0)
{
throw PbException("Invalid archiveFileID " + archiveFileIdStr);
}
// Queue the request
scheduler.queueRetrieve(cli_username, request, lc);
// Set response type
response.set_type(eos::wfe::Response::RSP_SUCCESS);
}
/*!
* Process the DELETE workflow event.
*
* @param[in,out] scheduler CTA Scheduler to queue this request
* @param[in,out] lc Log Context for this request
* @param[in] cli_username Client username, from authentication key for the request
* @param[in] notification EOS WFE Notification message
* @param[out] response Response message to return to EOS
*/
#if 0
static void requestProcDELETE(cta::Scheduler &scheduler, cta::log::LogContext &lc,
const std::string &cli_username, const eos::wfe::Notification &notification, eos::wfe::Response &response)
{
cta::common::dataStructures::UserIdentity originator;
originator.name=user.value();
originator.group=group.value();
cta::common::dataStructures::DeleteArchiveRequest request;
request.archiveFileID=id.value();
request.requester=originator;
const cta::common::dataStructures::ArchiveFile archiveFile = m_scheduler->deleteArchive(m_cliIdentity.username, request);
std::list<cta::log::Param> params;
params.push_back(cta::log::Param("USERNAME", m_cliIdentity.username));
params.push_back(cta::log::Param("HOST", m_cliIdentity.host));
params.push_back(cta::log::Param("fileId", std::to_string(archiveFile.archiveFileID)));
params.push_back(cta::log::Param("diskInstance", archiveFile.diskInstance));
params.push_back(cta::log::Param("diskFileId", archiveFile.diskFileId));
params.push_back(cta::log::Param("diskFileInfo.path", archiveFile.diskFileInfo.path));
params.push_back(cta::log::Param("diskFileInfo.owner", archiveFile.diskFileInfo.owner));
params.push_back(cta::log::Param("diskFileInfo.group", archiveFile.diskFileInfo.group));
params.push_back(cta::log::Param("diskFileInfo.recoveryBlob", archiveFile.diskFileInfo.recoveryBlob));
params.push_back(cta::log::Param("fileSize", std::to_string(archiveFile.fileSize)));
params.push_back(cta::log::Param("checksumType", archiveFile.checksumType));
params.push_back(cta::log::Param("checksumValue", archiveFile.checksumValue));
params.push_back(cta::log::Param("creationTime", std::to_string(archiveFile.creationTime)));
params.push_back(cta::log::Param("reconciliationTime", std::to_string(archiveFile.reconciliationTime)));
params.push_back(cta::log::Param("storageClass", archiveFile.storageClass));
for(auto it=archiveFile.tapeFiles.begin(); it!=archiveFile.tapeFiles.end(); it++) {
std::stringstream tapeCopyLogStream;
tapeCopyLogStream << "copy number: " << it->first
<< " vid: " << it->second.vid
<< " fSeq: " << it->second.fSeq
<< " blockId: " << it->second.blockId
<< " creationTime: " << it->second.creationTime
<< " compressedSize: " << it->second.compressedSize
<< " checksumType: " << it->second.checksumType //this shouldn't be here: repeated field
<< " checksumValue: " << it->second.checksumValue //this shouldn't be here: repeated field
<< " copyNb: " << it->second.copyNb; //this shouldn't be here: repeated field
params.push_back(cta::log::Param("TAPE FILE", tapeCopyLogStream.str()));
}
m_log(log::INFO, "Archive File Deleted", params);
return cmdlineOutput.str();
// Set response type
response.set_type(eos::wfe::Response::RSP_SUCCESS);
}
#endif
/*!
* Convert a framework exception into a Response
*/
......@@ -135,11 +243,14 @@ void RequestProc<eos::wfe::Notification, eos::wfe::Response, eos::wfe::Alert>::E
{
using namespace eos::wfe;
case(Workflow::CLOSEW):
requestProcCLOSEW(scheduler, lc, cli_username, m_request, m_metadata);
break;
case(Workflow::PREPARE):
case(Workflow::DELETE):
case Workflow::CLOSEW:
requestProcCLOSEW(scheduler, lc, cli_username, m_request, m_metadata); break;
case Workflow::PREPARE:
requestProcPREPARE(scheduler, lc, cli_username, m_request, m_metadata); break;
case Workflow::DELETE:
default:
throw PbException("Workflow Event type " + std::to_string(m_request.wf().event()) + " is not supported.");
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment