Skip to content
Snippets Groups Projects
Commit ff41bee6 authored by Michael Davis's avatar Michael Davis
Browse files

[EOS-CTA] Integration with catalogue, objectstore and scheduler

parent 3050b99b
Branches
Tags
No related merge requests found
......@@ -202,7 +202,10 @@ void fillNotification(eos::wfe::Notification &notification, bool &isStderr, bool
else if(argstr == "--checksumtype") notification.mutable_file()->mutable_cks()->set_name(argval);
else if(argstr == "--checksumvalue") notification.mutable_file()->mutable_cks()->set_value(argval);
else if(argstr == "--diskfilepath") notification.mutable_file()->set_lpath(argval);
else if(argstr == "--storageclass") {} // this is a xattr
else if(argstr == "--storageclass") {
google::protobuf::MapPair<std::string,std::string> sc("CTA_StorageClass", argval);
notification.mutable_file()->mutable_xattr()->insert(sc);
}
else if(argstr == "--id") {} // eos::wfe::fxattr:sys.archiveFileId, not used for archive WF
else if(argstr == "--diskpool") {} // = default?
......
......@@ -33,7 +33,7 @@ set_target_properties(XrdCtaOfs PROPERTIES INSTALL_RPATH ${PROTOBUF3_RPATH})
add_definitions(-DXRDSSI_DEBUG)
include_directories(../xroot_ssi_pb)
add_library(XrdSsiCta MODULE XrdSsiCtaServiceProvider.cpp XrdSsiCtaRequestProc.cpp)
target_link_libraries(XrdSsiCta ctaeosmessages ${PROTOBUF3_LIBRARIES} XrdSsi-4 XrdSsiLib)
target_link_libraries(XrdSsiCta ctaeosmessages ${PROTOBUF3_LIBRARIES} XrdSsi-4 XrdSsiLib ctacommon ctaobjectstore ctacatalogue ctascheduler)
set_target_properties(XrdSsiCta PROPERTIES INSTALL_RPATH ${PROTOBUF3_RPATH})
INSTALL (TARGETS XrdCtaOfs DESTINATION usr/${CMAKE_INSTALL_LIBDIR})
......
......@@ -17,12 +17,25 @@
*/
#include <iostream>
#include <memory>
#include "../frontend/test_util.h" // for Json output (for debugging)
#include "XrdSsiPbException.h"
#include "XrdSsiPbRequestProc.h"
#include "eos/messages/eos_messages.pb.h"
#include "common/Configuration.hpp"
#include "common/dataStructures/ArchiveRequest.hpp"
#include "common/log/StdoutLogger.hpp"
#include "scheduler/Scheduler.hpp"
#include "objectstore/BackendFactory.hpp"
#include "objectstore/BackendPopulator.hpp"
#include "catalogue/CatalogueFactory.hpp"
#include "rdbms/Login.hpp"
#include "scheduler/OStoreDB/OStoreDBWithAgent.hpp"
#include "common/make_unique.hpp"
/*!
......@@ -32,11 +45,59 @@
template <>
void RequestProc<eos::wfe::Notification, eos::wfe::Response, eos::wfe::Alert>::ExecuteAction()
{
// Instantiate the scheduler
const cta::rdbms::Login catalogueLogin = cta::rdbms::Login::parseFile("/home/mdavis/cernbox/CERNHome/CTAtest/ctatest_image/etc/cta/cta_catalogue_db.conf");
const uint64_t nbConns = 10;
const uint64_t nbArchiveFileListingConns = 2;
std::unique_ptr<cta::catalogue::Catalogue> my_catalogue = cta::catalogue::CatalogueFactory::create(catalogueLogin, nbConns, nbArchiveFileListingConns);
//cta::common::Configuration ctaConf("/etc/cta/cta-frontend.conf"),
//ctaConf.getConfEntString("ObjectStore", "BackendPath", nullptr)).release());
std::string backend_str("/tmp/jobStoreXXXXXXX");
std::unique_ptr<cta::objectstore::Backend> backend(cta::objectstore::BackendFactory::createBackend(backend_str));
cta::objectstore::BackendPopulator backendPopulator(*backend, "Frontend");
cta::OStoreDBWithAgent scheddb(*backend, backendPopulator.getAgentReference());
cta::Scheduler *scheduler = new cta::Scheduler(*my_catalogue, scheddb, 5, 2*1000*1000);
cta::log::StdoutLogger log("ctafrontend");
cta::log::LogContext lc(log);
// Output message in Json format (for debugging)
std::cerr << "Received message:" << std::endl;
OutputJsonString(&m_request);
// Unpack message
cta::common::dataStructures::UserIdentity originator;
originator.name = m_request.cli().user().username();
originator.group = m_request.cli().user().groupname();
cta::common::dataStructures::DiskFileInfo diskFileInfo;
diskFileInfo.owner = m_request.file().owner().username();
diskFileInfo.group = m_request.file().owner().groupname();
diskFileInfo.path = m_request.file().lpath();
cta::common::dataStructures::ArchiveRequest request;
request.checksumType = m_request.file().cks().name();
request.checksumValue = m_request.file().cks().value();
request.diskFileInfo = diskFileInfo;
request.diskFileID = m_request.file().fid();
request.fileSize = m_request.file().size();
request.requester = originator;
request.srcURL = m_request.wf().instance().url();
request.storageClass = m_request.file().xattr().at("CTA_StorageClass");
request.archiveReportURL = "null:";
std::string client_username = m_request.cli().user().username();
// Queue the request
uint64_t archiveFileId = scheduler->queueArchive(client_username, request, lc);
std::cout << "<eos::wfe::path::fxattr:sys.archiveFileId>" << archiveFileId << std::endl;
#if 0
// Set reply
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment