Commit c7d7f4cf authored by Daniele Kruse's avatar Daniele Kruse
Browse files

Introduced the retrieve and archive messages from CTA

parent 91243b45
// This file is part of the Castor project.
// See http://castor.web.cern.ch/castor
//
// Copyright (C) 2003 CERN
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
//
// @author Castor Dev team, castor-dev@cern.ch
package castor.messages;
message ArchiveJobFromCTA {
required string vid = 1;
required string unitname = 2;
}
......@@ -58,7 +58,9 @@ enum MsgType {
/* 23 */ MSG_TYPE_ACSDISMOUNTTAPE,
/* 24 */ MSG_TYPE_ACSFORCEDISMOUNTTAPE,
/* 25 */ MSG_TYPE_ADDLOGPARAMS,
/* 26 */ MSG_TYPE_DELETELOGPARAMS
/* 26 */ MSG_TYPE_DELETELOGPARAMS,
/* 27 */ MSG_TYPE_ARCHIVEJOBFROMCTA,
/* 28 */ MSG_TYPE_RETRIEVEJOBFROMCTA
};
enum ProtocolVersion {
......
// This file is part of the Castor project.
// See http://castor.web.cern.ch/castor
//
// Copyright (C) 2003 CERN
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
//
// @author Castor Dev team, castor-dev@cern.ch
package castor.messages;
message RetrieveJobFromCTA {
required string vid = 1;
required string unitname = 2;
}
......@@ -20,6 +20,7 @@
*****************************************************************************/
#include "castor/messages/AddLogParams.pb.h"
#include "castor/messages/ArchiveJobFromCTA.pb.h"
#include "castor/messages/Constants.hpp"
#include "castor/messages/DeleteLogParams.pb.h"
#include "castor/messages/MutexLocker.hpp"
......@@ -32,6 +33,7 @@
#include "castor/messages/NbFilesOnTape.pb.h"
#include "castor/messages/RecallJobFromReadTp.pb.h"
#include "castor/messages/RecallJobFromTapeGateway.pb.h"
#include "castor/messages/RetrieveJobFromCTA.pb.h"
#include "castor/messages/ReturnValue.pb.h"
#include "castor/messages/TapeMountedForRecall.pb.h"
#include "castor/messages/TapeMountedForMigration.pb.h"
......@@ -55,7 +57,23 @@ castor::messages::TapeserverProxyZmq::TapeserverProxyZmq(log::Logger &log,
//------------------------------------------------------------------------------
uint32_t castor::messages::TapeserverProxyZmq::gotArchiveJobFromCTA(
const std::string &vid, const std::string &unitName) {
return 0;
MutexLocker lock(&m_mutex);
try {
const Frame rqst = createArchiveJobFromCTAFrame(vid, unitName);
sendFrame(m_serverSocket, rqst);
NbFilesOnTape reply;
recvTapeReplyOrEx(m_serverSocket, reply);
return reply.nbfiles();
} catch(castor::exception::Exception &ne) {
castor::exception::Exception ex;
ex.getMessage() <<
"Failed to notify tapeserver of archive job from CTA: " <<
"vid=" << vid << " unitName=" << unitName << ": " <<
ne.getMessage().str();
throw ex;
}
}
//------------------------------------------------------------------------------
......@@ -63,6 +81,29 @@ uint32_t castor::messages::TapeserverProxyZmq::gotArchiveJobFromCTA(
//------------------------------------------------------------------------------
void castor::messages::TapeserverProxyZmq::gotRetrieveJobFromCTA(
const std::string &vid, const std::string &unitName) {
MutexLocker lock(&m_mutex);
try {
const Frame rqst = createRetrieveJobFromCTAFrame(vid, unitName);
sendFrame(m_serverSocket, rqst);
ReturnValue reply;
recvTapeReplyOrEx(m_serverSocket, reply);
if(0 != reply.value()) {
// Should never get here
castor::exception::Exception ex;
ex.getMessage() << "Received an unexpected return value"
": expected=0 actual=" << reply.value();
throw ex;
}
} catch(castor::exception::Exception &ne) {
castor::exception::Exception ex;
ex.getMessage() <<
"Failed to notify tapeserver of retrieve job from CTA: " <<
"vid=" << vid << " unitName=" << unitName << ": " <<
ne.getMessage().str();
throw ex;
}
}
//------------------------------------------------------------------------------
......@@ -122,6 +163,32 @@ castor::messages::Frame castor::messages::TapeserverProxyZmq::
}
}
//------------------------------------------------------------------------------
// createRetrieveJobFromCTAFrame
//------------------------------------------------------------------------------
castor::messages::Frame castor::messages::TapeserverProxyZmq::
createRetrieveJobFromCTAFrame(const std::string &vid,
const std::string &unitName) {
try {
Frame frame;
frame.header = messages::protoTapePreFillHeader();
frame.header.set_msgtype(messages::MSG_TYPE_RETRIEVEJOBFROMCTA);
frame.header.set_bodysignature("PIPO");
RetrieveJobFromCTA body;
body.set_vid(vid);
body.set_unitname(unitName);
frame.serializeProtocolBufferIntoBody(body);
return frame;
} catch(castor::exception::Exception &ne) {
castor::exception::Exception ex;
ex.getMessage() << "Failed to create RetrieveJobFromCTA frame: " <<
ne.getMessage().str();
throw ex;
}
}
//------------------------------------------------------------------------------
// gotRecallJobFromReadTp
......@@ -231,6 +298,33 @@ castor::messages::Frame castor::messages::TapeserverProxyZmq::
}
}
//------------------------------------------------------------------------------
// createArchiveJobFromCTAFrame
//------------------------------------------------------------------------------
castor::messages::Frame castor::messages::TapeserverProxyZmq::
createArchiveJobFromCTAFrame(const std::string &vid,
const std::string &unitName) {
try {
Frame frame;
frame.header = messages::protoTapePreFillHeader();
frame.header.set_msgtype(messages::MSG_TYPE_ARCHIVEJOBFROMCTA);
frame.header.set_bodysignature("PIPO");
ArchiveJobFromCTA body;
body.set_vid(vid);
body.set_unitname(unitName);
frame.serializeProtocolBufferIntoBody(body);
return frame;
} catch(castor::exception::Exception &ne) {
castor::exception::Exception ex;
ex.getMessage() << "Failed to create ArchiveJobFromCTA frame: " <<
ne.getMessage().str();
throw ex;
}
}
//------------------------------------------------------------------------------
// gotMigrationJobFromWriteTp
//------------------------------------------------------------------------------
......
......@@ -210,6 +210,26 @@ private:
* Socket connecting this tape server proxy to the tape server daemon.
*/
ZmqSocketST m_serverSocket;
/**
* Creates a frame containing a ArchiveJobFromCTA message.
*
* @param vid The volume identifier of the tape.
* @param unitName The unit name of the tape drive.
* @return The frame.
*/
Frame createArchiveJobFromCTAFrame(const std::string &vid,
const std::string &unitName);
/**
* Creates a frame containing a RetrieveJobFromCTA message.
*
* @param vid The volume identifier of the tape.
* @param unitName The unit name of the tape drive.
* @return The frame.
*/
Frame createRetrieveJobFromCTAFrame(const std::string &vid,
const std::string &unitName);
/**
* Creates a frame containing a RecallJobFromTapeGateway message.
......
......@@ -21,6 +21,7 @@
#include "castor/messages/AddLogParams.pb.h"
#include "castor/messages/ArchiveJobFromCTA.pb.h"
#include "castor/messages/Constants.hpp"
#include "castor/messages/DeleteLogParams.pb.h"
#include "castor/messages/Exception.pb.h"
......@@ -32,6 +33,7 @@
#include "castor/messages/NbFilesOnTape.pb.h"
#include "castor/messages/RecallJobFromReadTp.pb.h"
#include "castor/messages/RecallJobFromTapeGateway.pb.h"
#include "castor/messages/RetrieveJobFromCTA.pb.h"
#include "castor/messages/ReturnValue.pb.h"
#include "castor/messages/TapeMountedForMigration.pb.h"
#include "castor/messages/TapeMountedForRecall.pb.h"
......@@ -190,6 +192,12 @@ castor::messages::Frame castor::tape::tapeserver::daemon::TapeMessageHandler::
case messages::MSG_TYPE_HEARTBEAT:
return handleHeartbeat(rqst);
case messages::MSG_TYPE_ARCHIVEJOBFROMCTA:
return handleArchiveJobFromCTA(rqst);
case messages::MSG_TYPE_RETRIEVEJOBFROMCTA:
return handleRetrieveJobFromCTA(rqst);
case messages::MSG_TYPE_MIGRATIONJOBFROMTAPEGATEWAY:
return handleMigrationJobFromTapeGateway(rqst);
......@@ -318,6 +326,39 @@ castor::messages::Frame castor::tape::tapeserver::daemon::TapeMessageHandler::
}
}
//------------------------------------------------------------------------------
// handleArchiveJobFromCTA
//------------------------------------------------------------------------------
castor::messages::Frame castor::tape::tapeserver::daemon::TapeMessageHandler::
handleArchiveJobFromCTA(const messages::Frame &rqst) {
m_log(LOG_INFO, "Handling ArchiveJobFromCTA message");
try {
castor::messages::ArchiveJobFromCTA rqstBody;
rqst.parseBodyIntoProtocolBuffer(rqstBody);
CatalogueDrive &drive =
m_driveCatalogue.findDrive(rqstBody.unitname());
drive.getTransferSession().receivedMigrationJob(rqstBody.vid());
{
std::ostringstream msg;
msg << __FUNCTION__ << ": Not fully implemented because the number of"
" files on tape is not known because there is no vmgr in the CTA"
" project";
throw castor::exception::Exception(msg.str());
}
//messages::Frame reply = createNbFilesOnTapeFrame(tapeInfo.nbFiles);
//return reply;
} catch(castor::exception::Exception &ne) {
castor::exception::Exception ex;
ex.getMessage() <<
"Failed to handle ArchiveJobFromCTA message: " <<
ne.getMessage().str();
throw ex;
}
}
//------------------------------------------------------------------------------
// createNbFilesOnTapeFrame
//------------------------------------------------------------------------------
......@@ -421,6 +462,32 @@ castor::messages::Frame castor::tape::tapeserver::daemon::TapeMessageHandler::
}
}
//------------------------------------------------------------------------------
// handleRetrieveJobFromCTA
//------------------------------------------------------------------------------
castor::messages::Frame castor::tape::tapeserver::daemon::TapeMessageHandler::
handleRetrieveJobFromCTA(const messages::Frame &rqst) {
m_log(LOG_INFO, "Handling RecallJobFromTapeGateway message");
try {
messages::RetrieveJobFromCTA rqstBody;
rqst.parseBodyIntoProtocolBuffer(rqstBody);
CatalogueDrive &drive =
m_driveCatalogue.findDrive(rqstBody.unitname());
drive.getTransferSession().receivedRecallJob(rqstBody.vid());
const messages::Frame reply = createReturnValueFrame(0);
return reply;
} catch(castor::exception::Exception &ne) {
castor::exception::Exception ex;
ex.getMessage() <<
"Failed to handle RetrieveJobFromCTA message: " <<
ne.getMessage().str();
throw ex;
}
}
//------------------------------------------------------------------------------
// handleTapeMountedForMigration
//------------------------------------------------------------------------------
......
......@@ -177,6 +177,15 @@ private:
*/
messages::Frame handleMigrationJobFromTapeGateway(
const messages::Frame &rqst);
/**
* Handles the specified request.
*
* @param rqst The request.
* @return The reply.
*/
messages::Frame handleArchiveJobFromCTA(
const messages::Frame &rqst);
/**
* Creates a message frame containing a NbFilesOnTape message.
......@@ -201,6 +210,14 @@ private:
* @return The reply.
*/
messages::Frame handleRecallJobFromTapeGateway(const messages::Frame &rqst);
/**
* Handles the specified request.
*
* @param rqst The request.
* @return The reply.
*/
messages::Frame handleRetrieveJobFromCTA(const messages::Frame &rqst);
/**
* Handles the specified request.
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment