Skip to content
Snippets Groups Projects
Commit 05021162 authored by Steven Murray's avatar Steven Murray
Browse files

CTA front end now receives eos::wfe::Wrapper messages

parent 4bb52a71
Branches
Tags
No related merge requests found
......@@ -55,67 +55,68 @@ int WriteNotificationMsgCmd::exceptionThrowingMain(const int argc, char *const *
return 0;
}
eos::wfe::Notification notification;
notification.mutable_wf()->set_event("notification_workflow_event");
notification.mutable_wf()->set_queue("notification_workflow_queue");
notification.mutable_wf()->set_wfname("notification_workflow_wfname");
notification.mutable_wf()->set_vpath("notification_workflow_vpath");
notification.mutable_wf()->mutable_instance()->set_name("notification_instance_name");
notification.mutable_wf()->mutable_instance()->set_url("notification_instance_url");
notification.mutable_wf()->set_timestamp(1100);
notification.set_turl("notification_turl");
notification.mutable_cli()->mutable_user()->set_n(1111);
notification.mutable_cli()->mutable_user()->set_name("notification_cli_user_name");
notification.mutable_cli()->mutable_sec()->set_host("notification_cli_sec_host");
notification.mutable_cli()->mutable_sec()->set_app("notification_cli_sec_app");
notification.mutable_cli()->mutable_sec()->set_name("notification_cli_sec_name");
notification.mutable_cli()->mutable_sec()->set_prot("notification_cli_sec_prot");
notification.mutable_cli()->mutable_sec()->set_grps("notification_cli_sec_grps");
notification.mutable_file()->set_fid(1122);
notification.mutable_file()->set_pid(1133);
notification.mutable_file()->mutable_ctime()->set_sec(1144);
notification.mutable_file()->mutable_ctime()->set_nsec(1155);
notification.mutable_file()->mutable_mtime()->set_sec(1166);
notification.mutable_file()->mutable_mtime()->set_nsec(1177);
notification.mutable_file()->mutable_btime()->set_sec(1188);
notification.mutable_file()->mutable_btime()->set_nsec(1199);
notification.mutable_file()->mutable_ttime()->set_sec(2200);
notification.mutable_file()->mutable_ttime()->set_nsec(2211);
notification.mutable_file()->mutable_owner()->set_n(2222);
notification.mutable_file()->mutable_owner()->set_name("notification_file_owner_name");
notification.mutable_file()->set_size(2233);
notification.mutable_file()->mutable_cks()->set_value("notification_file_cks_value");
notification.mutable_file()->mutable_cks()->set_name("notification_file_cks_name");
notification.mutable_file()->set_mode(244);
notification.mutable_file()->set_lpath("notification_file_lpath");
(*notification.mutable_file()->mutable_xattr())["notification_file_xattr1"] = "file_xattr1_value";
(*notification.mutable_file()->mutable_xattr())["notification_file_xattr2"] = "file_xattr2_value";
notification.mutable_directory()->set_fid(1122);
notification.mutable_directory()->set_pid(1133);
notification.mutable_directory()->mutable_ctime()->set_sec(1144);
notification.mutable_directory()->mutable_ctime()->set_nsec(1155);
notification.mutable_directory()->mutable_mtime()->set_sec(1166);
notification.mutable_directory()->mutable_mtime()->set_nsec(1177);
notification.mutable_directory()->mutable_btime()->set_sec(1188);
notification.mutable_directory()->mutable_btime()->set_nsec(1199);
notification.mutable_directory()->mutable_ttime()->set_sec(2200);
notification.mutable_directory()->mutable_ttime()->set_nsec(2211);
notification.mutable_directory()->mutable_owner()->set_n(2222);
notification.mutable_directory()->mutable_owner()->set_name("notification_directory");
notification.mutable_directory()->set_size(2233);
notification.mutable_directory()->mutable_cks()->set_value("notification_directory_cks_value");
notification.mutable_directory()->mutable_cks()->set_name("notification_directory_cks_name");
notification.mutable_directory()->set_mode(2244);
notification.mutable_directory()->set_lpath("notification_directory_lpath");
(*notification.mutable_directory()->mutable_xattr())["notification_directory_attr1"] = "directory_xattr1_value";
(*notification.mutable_directory()->mutable_xattr())["notification_directory_attr2"] = "directory_xattr2_value";
eos::wfe::Wrapper wrapper;
wrapper.set_type(eos::wfe::Wrapper::NOTIFICATION);
wrapper.mutable_notification()->mutable_wf()->set_event("notification_workflow_event");
wrapper.mutable_notification()->mutable_wf()->set_queue("notification_workflow_queue");
wrapper.mutable_notification()->mutable_wf()->set_wfname("notification_workflow_wfname");
wrapper.mutable_notification()->mutable_wf()->set_vpath("notification_workflow_vpath");
wrapper.mutable_notification()->mutable_wf()->mutable_instance()->set_name("notification_instance_name");
wrapper.mutable_notification()->mutable_wf()->mutable_instance()->set_url("notification_instance_url");
wrapper.mutable_notification()->mutable_wf()->set_timestamp(1100);
wrapper.mutable_notification()->set_turl("notification_turl");
wrapper.mutable_notification()->mutable_cli()->mutable_user()->set_n(1111);
wrapper.mutable_notification()->mutable_cli()->mutable_user()->set_name("notification_cli_user_name");
wrapper.mutable_notification()->mutable_cli()->mutable_sec()->set_host("notification_cli_sec_host");
wrapper.mutable_notification()->mutable_cli()->mutable_sec()->set_app("notification_cli_sec_app");
wrapper.mutable_notification()->mutable_cli()->mutable_sec()->set_name("notification_cli_sec_name");
wrapper.mutable_notification()->mutable_cli()->mutable_sec()->set_prot("notification_cli_sec_prot");
wrapper.mutable_notification()->mutable_cli()->mutable_sec()->set_grps("notification_cli_sec_grps");
wrapper.mutable_notification()->mutable_file()->set_fid(1122);
wrapper.mutable_notification()->mutable_file()->set_pid(1133);
wrapper.mutable_notification()->mutable_file()->mutable_ctime()->set_sec(1144);
wrapper.mutable_notification()->mutable_file()->mutable_ctime()->set_nsec(1155);
wrapper.mutable_notification()->mutable_file()->mutable_mtime()->set_sec(1166);
wrapper.mutable_notification()->mutable_file()->mutable_mtime()->set_nsec(1177);
wrapper.mutable_notification()->mutable_file()->mutable_btime()->set_sec(1188);
wrapper.mutable_notification()->mutable_file()->mutable_btime()->set_nsec(1199);
wrapper.mutable_notification()->mutable_file()->mutable_ttime()->set_sec(2200);
wrapper.mutable_notification()->mutable_file()->mutable_ttime()->set_nsec(2211);
wrapper.mutable_notification()->mutable_file()->mutable_owner()->set_n(2222);
wrapper.mutable_notification()->mutable_file()->mutable_owner()->set_name("notification_file_owner_name");
wrapper.mutable_notification()->mutable_file()->set_size(2233);
wrapper.mutable_notification()->mutable_file()->mutable_cks()->set_value("notification_file_cks_value");
wrapper.mutable_notification()->mutable_file()->mutable_cks()->set_name("notification_file_cks_name");
wrapper.mutable_notification()->mutable_file()->set_mode(244);
wrapper.mutable_notification()->mutable_file()->set_lpath("notification_file_lpath");
(*wrapper.mutable_notification()->mutable_file()->mutable_xattr())["notification_file_xattr1"] = "file_xattr1_value";
(*wrapper.mutable_notification()->mutable_file()->mutable_xattr())["notification_file_xattr2"] = "file_xattr2_value";
wrapper.mutable_notification()->mutable_directory()->set_fid(1122);
wrapper.mutable_notification()->mutable_directory()->set_pid(1133);
wrapper.mutable_notification()->mutable_directory()->mutable_ctime()->set_sec(1144);
wrapper.mutable_notification()->mutable_directory()->mutable_ctime()->set_nsec(1155);
wrapper.mutable_notification()->mutable_directory()->mutable_mtime()->set_sec(1166);
wrapper.mutable_notification()->mutable_directory()->mutable_mtime()->set_nsec(1177);
wrapper.mutable_notification()->mutable_directory()->mutable_btime()->set_sec(1188);
wrapper.mutable_notification()->mutable_directory()->mutable_btime()->set_nsec(1199);
wrapper.mutable_notification()->mutable_directory()->mutable_ttime()->set_sec(2200);
wrapper.mutable_notification()->mutable_directory()->mutable_ttime()->set_nsec(2211);
wrapper.mutable_notification()->mutable_directory()->mutable_owner()->set_n(2222);
wrapper.mutable_notification()->mutable_directory()->mutable_owner()->set_name("notification_directory");
wrapper.mutable_notification()->mutable_directory()->set_size(2233);
wrapper.mutable_notification()->mutable_directory()->mutable_cks()->set_value("notification_directory_cks_value");
wrapper.mutable_notification()->mutable_directory()->mutable_cks()->set_name("notification_directory_cks_name");
wrapper.mutable_notification()->mutable_directory()->set_mode(2244);
wrapper.mutable_notification()->mutable_directory()->set_lpath("notification_directory_lpath");
(*wrapper.mutable_notification()->mutable_directory()->mutable_xattr())["notification_directory_attr1"] = "directory_xattr1_value";
(*wrapper.mutable_notification()->mutable_directory()->mutable_xattr())["notification_directory_attr2"] = "directory_xattr2_value";
if(cmdLineArgs.writeJsonToStdOut) {
google::protobuf::util::JsonPrintOptions options;
options.add_whitespace = true;
options.always_print_primitive_fields = true;
std::string jsonNotification;
google::protobuf::util::MessageToJsonString(notification, &jsonNotification, options);
google::protobuf::util::MessageToJsonString(wrapper, &jsonNotification, options);
std::cout << jsonNotification;
return 0;
}
......@@ -126,7 +127,7 @@ int WriteNotificationMsgCmd::exceptionThrowingMain(const int argc, char *const *
return 1;
}
notification.SerializeToOstream(&messageFileStream);
wrapper.SerializeToOstream(&messageFileStream);
return 0;
}
......
......@@ -98,34 +98,62 @@ int XrdCtaFilesystem::FSctl(const int cmd, XrdSfsFSctl &args, XrdOucErrInfo &eIn
return SFS_ERROR;
}
const std::string query(args.Arg1, args.Arg1Len);
eos::wfe::Notification notification;
if(!notification.ParseFromString(query)) {
eInfo.setErrInfo(EINVAL, "Failed to parse notification message");
const std::string msgBuffer(args.Arg1, args.Arg1Len);
eos::wfe::Wrapper msg;
if(!msg.ParseFromString(msgBuffer)) {
eInfo.setErrInfo(EINVAL, "Failed to parse incoming wrapper message");
return SFS_ERROR;
}
return processWrapperMsg(msg, eInfo, client);
}
//------------------------------------------------------------------------------
// processWrapperMsg
//------------------------------------------------------------------------------
int XrdCtaFilesystem::processWrapperMsg(const eos::wfe::Wrapper &msg, XrdOucErrInfo &eInfo,
const XrdSecEntity *client) {
switch(msg.type()) {
case eos::wfe::Wrapper::NONE:
eInfo.setErrInfo(EINVAL, "Cannot process a wrapped message of type NONE");
case eos::wfe::Wrapper::NOTIFICATION:
return processNotificationMsg(msg.notification(), eInfo, client);
default:
{
std::ostringstream errMsg;
errMsg << "Cannot process a wrapped message with a numeric type value of " << msg.type();
eInfo.setErrInfo(EINVAL, errMsg.str().c_str());
}
return SFS_ERROR;
}
}
//------------------------------------------------------------------------------
// processNotificationMsg
//------------------------------------------------------------------------------
int XrdCtaFilesystem::processNotificationMsg(const eos::wfe::Notification &msg, XrdOucErrInfo &eInfo,
const XrdSecEntity *client) {
{
std::list<cta::log::Param> params;
params.push_back({"wf.event", notification.wf().event()});
params.push_back({"wf.queue", notification.wf().queue()});
params.push_back({"wf.wfname", notification.wf().wfname()});
params.push_back({"eosfid", notification.file().fid()});
params.push_back({"eoslpath", notification.file().lpath()});
(*m_log)(log::INFO, "FSctl received notification message", params);
params.push_back({"wf.event", msg.wf().event()});
params.push_back({"wf.queue", msg.wf().queue()});
params.push_back({"wf.wfname", msg.wf().wfname()});
params.push_back({"eosfid", msg.file().fid()});
params.push_back({"eoslpath", msg.file().lpath()});
(*m_log)(log::INFO, "Processing notification message", params);
}
const size_t sizeOfMsg = 10*1024*1024;
char *const msg = static_cast<char *>(malloc(sizeOfMsg));
if(nullptr == msg) {
const size_t sizeOfReply = 10*1024*1024;
char *const reply = static_cast<char *>(malloc(sizeOfReply));
if(nullptr == reply) {
(*m_log)(log::ERR, "FSctl failed to allocate reply message");
}
memset(msg, '\0', sizeOfMsg);
char msgTxt[] = "Reply from CTA";
strncpy(msg, msgTxt, sizeOfMsg);
msg[sizeOfMsg - 1] = '\0';
memset(reply, '\0', sizeOfReply);
char replyTxt[] = "Reply from CTA";
strncpy(reply, replyTxt, sizeOfReply);
reply[sizeOfReply - 1] = '\0';
// buf takes ownership of msg
XrdOucBuffer *buf = new XrdOucBuffer(msg, sizeOfMsg);
XrdOucBuffer *buf = new XrdOucBuffer(reply, sizeOfReply);
if(nullptr == buf) {
(*m_log)(log::ERR, "FSctl failed to allocate reply buffer");
}
......
......@@ -97,6 +97,27 @@ protected:
* The logger.
*/
std::unique_ptr<log::Logger> m_log;
/**
* Processes the specified wrapper message.
*
* @param msg The message.
* @param eInfo Same semantic as the XrdCtaFilesystem::FSctl() method.
* @param client Same semantic as the XrdCtaFilesystem::FSctl() method.
* @return Same semantic as the XrdCtaFilesystem::FSctl() method.
*/
int processWrapperMsg(const eos::wfe::Wrapper &msg, XrdOucErrInfo &eInfo, const XrdSecEntity *client);
/**
* Processes the specified notification message.
*
* @param msg The message.
* @param eInfo Same semantic as the XrdCtaFilesystem::FSctl() method.
* @param client Same semantic as the XrdCtaFilesystem::FSctl() method.
* @return Same semantic as the XrdCtaFilesystem::FSctl() method.
*/
int processNotificationMsg(const eos::wfe::Notification &msg, XrdOucErrInfo &eInfo, const XrdSecEntity *client);
}; // XrdCtaFilesystem
}}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment