Commit 0d0b2af2 authored by Eric Cano's avatar Eric Cano
Browse files

Added Tape object creation and removal.

Added support for tape addition in scheduler db interace.
Expanded usage or CreationLog class.
parent 4401eae5
......@@ -18,44 +18,19 @@ add_library (CTAObjectStore SHARED
AgentRegister.cpp
AgentWatchdog.cpp
TapePool.cpp
Tape.cpp
DriveRegister.cpp
#AdminUsersList.cpp
BackendVFS.cpp
BackendRados.cpp
ObjectOps.cpp
ProtocolBuffersAlgorithms.cpp
#FIFO.cpp
GenericObject.cpp
GarbageCollector.cpp)
# add_executable(tapeResourceManagerTest tapeResourceManagerTest.cpp)
# target_link_libraries(tapeResourceManagerTest
# protobuf rados CTAObjectStore)
#
# add_executable(dumpStructure dumpStructure.cpp)
# target_link_libraries(dumpStructure
# protobuf rados CTAObjectStore)
#
# add_executable(jobPoster jobPoster.cpp)
# target_link_libraries(jobPoster
# protobuf rados CTAObjectStore)
#
# add_executable(recaller recaller.cpp)
# target_link_libraries(recaller
# protobuf rados CTAObjectStore)
#
# add_executable(garbageCollector garbageCollector.cpp)
# target_link_libraries(garbageCollector
# protobuf rados CTAObjectStore)
#
# add_executable(createEnvironment createEnvironment.cpp)
# target_link_libraries(createEnvironment
# protobuf rados CTAObjectStore)
set(ObjectStoreUnitTests
BackendTest.cpp
RootEntryTest.cpp
#FIFOTest.cpp
TapeTest.cpp
GarbageCollectorTest.cpp
)
......
......@@ -16,6 +16,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include "common/UserIdentity.hpp"
#include "objectstore/cta.pb.h"
#include "scheduler/CreationLog.hpp"
......
......@@ -23,19 +23,14 @@ namespace cta { namespace objectstore {
#define MAKE_CTA_OBJECTSTORE_OBJECTOPS_TYPEID(A) \
template <> const serializers::ObjectType ObjectOps<serializers::A>::typeId = serializers::A##_t
MAKE_CTA_OBJECTSTORE_OBJECTOPS_TYPEID(GenericObject);
MAKE_CTA_OBJECTSTORE_OBJECTOPS_TYPEID(RootEntry);
MAKE_CTA_OBJECTSTORE_OBJECTOPS_TYPEID(AgentRegister);
MAKE_CTA_OBJECTSTORE_OBJECTOPS_TYPEID(Agent);
MAKE_CTA_OBJECTSTORE_OBJECTOPS_TYPEID(TapePool);
MAKE_CTA_OBJECTSTORE_OBJECTOPS_TYPEID(DriveRegister);
// MAKE_CTA_OBJECTSTORE_OBJECTOPS_TYPEID(JobPool);
// MAKE_CTA_OBJECTSTORE_OBJECTOPS_TYPEID(RecallFIFO);
// MAKE_CTA_OBJECTSTORE_OBJECTOPS_TYPEID(MigrationFIFO);
// MAKE_CTA_OBJECTSTORE_OBJECTOPS_TYPEID(RecallJob);
// MAKE_CTA_OBJECTSTORE_OBJECTOPS_TYPEID(Counter);
// MAKE_CTA_OBJECTSTORE_OBJECTOPS_TYPEID(FIFO);
// MAKE_CTA_OBJECTSTORE_OBJECTOPS_TYPEID(AdminUsersList);
MAKE_CTA_OBJECTSTORE_OBJECTOPS_TYPEID(Tape);
#undef MAKE_CTA_OBJECTSTORE_OBJECTOPS_TYPEID
}}
\ No newline at end of file
......@@ -456,6 +456,7 @@ std::string cta::objectstore::RootEntry::addOrGetTapePoolAndCommit(const std::st
// Insert the tape pool, then its pointer, with agent intent log update
// First generate the intent. We expect the agent to be passed locked.
std::string tapePoolAddress = agent.nextId("tapePool");
// TODO Do we expect the agent to be passed locked or not: to be clarified.
ScopedExclusiveLock agl(agent);
agent.fetch();
agent.addToOwnership(tapePoolAddress);
......@@ -762,6 +763,6 @@ std::string cta::objectstore::RootEntry::dump () {
// if (m_payload.has_jobpool()) ret << "jobPool=" << m_payload.jobpool() << std::endl;
/* if (m_payload.has_driveregister()) ret << "driveRegister=" << m_payload.driveregister() << std::endl;
if (m_payload.has_taperegister()) ret << "tapeRegister=" << m_payload.taperegister() << std::endl;*/
ret << ">>>> Root entry dump start" << std::endl;
ret << ">>>> Root entry dump end" << std::endl;
return ret.str();
}
/*
* The CERN Tape Archive (CTA) project
* Copyright (C) 2015 CERN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "Tape.hpp"
#include "GenericObject.hpp"
cta::objectstore::Tape::Tape(const std::string& address, Backend& os):
ObjectOps<serializers::Tape>(os, address) { }
cta::objectstore::Tape::Tape(GenericObject& go):
ObjectOps<serializers::Tape>(go.objectStore()){
// Here we transplant the generic object into the new object
go.transplantHeader(*this);
// And interpret the header.
getPayloadFromHeader();
}
void cta::objectstore::Tape::initialize(const std::string &name) {
ObjectOps<serializers::Tape>::initialize();
// Set the reguired fields
m_payload.set_vid(name);
m_payload.set_bytesstored(0);
m_payload.set_lastfseq(0);
m_payloadInterpreted = true;
}
bool cta::objectstore::Tape::isEmpty() {
checkPayloadReadable();
return !m_payload.retrievaljobs_size();
}
void cta::objectstore::Tape::removeIfEmpty() {
checkPayloadWritable();
if (!isEmpty()) {
throw NotEmpty("In Tape::removeIfEmpty: trying to remove an tape with retrievals queued");
}
remove();
}
void cta::objectstore::Tape::addStoredData(uint64_t bytes) {
checkPayloadWritable();
m_payload.set_bytesstored(m_payload.bytesstored()+bytes);
}
uint64_t cta::objectstore::Tape::getStoredData() {
checkPayloadReadable();
return m_payload.bytesstored();
}
std::string cta::objectstore::Tape::getVid() {
checkPayloadReadable();
return m_payload.vid();
}
std::string cta::objectstore::Tape::dump() {
checkPayloadReadable();
std::stringstream ret;
ret << "<<<< Tape dump start: vid=" << m_payload.vid() << std::endl;
ret << " lastFseq=" << m_payload.lastfseq()
<< " bytesStored=" << m_payload.bytesstored() << std::endl;
ret << " Retrieval jobs queued: " << m_payload.retrievaljobs_size() << std::endl;
if (m_payload.readmounts_size()) {
auto lrm = m_payload.readmounts(0);
ret << " Latest read for mount: " << lrm.host() << " " << lrm.time() << " "
<< lrm.drivevendor() << " " << lrm.drivemodel() << " "
<< lrm.driveserial() << std::endl;
}
if (m_payload.writemounts_size()) {
auto lwm = m_payload.writemounts(0);
ret << " Latest write for mount: " << lwm.host() << " " << lwm.time() << " "
<< lwm.drivevendor() << " " << lwm.drivemodel() << " "
<< lwm.driveserial() << std::endl;
}
ret << ">>>> Tape dump end" << std::endl;
return ret.str();
}
/*
* The CERN Tape Archive (CTA) project
* Copyright (C) 2015 CERN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include "ObjectOps.hpp"
#include "objectstore/cta.pb.h"
namespace cta { namespace objectstore {
class Backend;
class Agent;
class GenericObject;
class Tape: public ObjectOps<serializers::Tape> {
public:
Tape(const std::string & address, Backend & os);
Tape(GenericObject & go);
void initialize(const std::string & vid);
void garbageCollect();
bool isEmpty();
CTA_GENERATE_EXCEPTION_CLASS(NotEmpty);
void removeIfEmpty();
std::string dump();
// -- Stored data counting ---------------------------------------------------
uint64_t getStoredData();
std::string getVid();
void setStoredData(uint64_t bytes);
void addStoredData(uint64_t bytes);
};
}}
\ No newline at end of file
......@@ -18,6 +18,9 @@
#include "TapePool.hpp"
#include "GenericObject.hpp"
#include "ProtcolBuffersAlgorithms.hpp"
#include "CreationLog.hpp"
#include "Tape.hpp"
cta::objectstore::TapePool::TapePool(const std::string& address, Backend& os):
ObjectOps<serializers::TapePool>(os, address) { }
......@@ -42,6 +45,107 @@ void cta::objectstore::TapePool::initialize(const std::string& name) {
m_payloadInterpreted = true;
}
namespace {
bool operator == (const std::string & vid,
const cta::objectstore::serializers::TapePointer &t) {
return vid==t.vid();
}
}
std::string cta::objectstore::TapePool::addOrGetTapeAndCommit(const std::string& vid,
const std::string& logicalLibraryName, const uint64_t capacityInBytes,
Agent& agent, const cta::CreationLog& creationLog) {
checkPayloadWritable();
// Check the tape already exists
try {
return serializers::findElement(m_payload.tapes(), vid).address();
} catch (serializers::NotFound &) {}
// Insert the tape, then its pointer, with agent intent log update
// first generate the intent. We expect the agent to be passed locked.
std::string tapeAddress = agent.nextId(std::string("tape_") + vid + "_");
// TODO Do we expect the agent to be passed locked or not: to be clarified.
ScopedExclusiveLock agl(agent);
agent.fetch();
agent.addToOwnership(tapeAddress);
agent.commit();
// The create the tape object
Tape t(tapeAddress, ObjectOps<serializers::TapePool>::m_objectStore);
t.initialize(vid);
t.setOwner(agent.getAddressIfSet());
t.setBackupOwner(getAddressIfSet());
t.insert();
ScopedExclusiveLock tl(t);
// Now reference the tape in the pool
auto * pt = m_payload.mutable_tapes()->Add();
pt->set_address(tapeAddress);
pt->set_capacity(capacityInBytes);
pt->set_library(logicalLibraryName);
pt->set_vid(vid);
objectstore::CreationLog oslog(creationLog);
oslog.serialize(*pt->mutable_log());
commit();
// Switch the tape ownership
t.setOwner(getAddressIfSet());
t.commit();
// Clean up the agent. We're done.
agent.removeFromOwnership(tapeAddress);
agent.commit();
return tapeAddress;
}
void cta::objectstore::TapePool::removeTapeAndCommit(const std::string& vid) {
checkPayloadWritable();
try {
// Find the tape
auto tp = serializers::findElement(m_payload.tapes(), vid);
// Open the tape object
Tape t(tp.address(), m_objectStore);
ScopedExclusiveLock tl(t);
t.fetch();
// Verify this is the tape we're looking for.
if (t.getVid() != vid) {
std::stringstream err;
err << "Unexpected tape VID found in object pointed to for tape: "
<< vid << " found: " << t.getVid();
throw WrongTape(err.str());
}
// We can now delete the tape
t.remove();
// And remove it from our entry
serializers::removeOccurences(m_payload.mutable_tapes(), vid);
// We commit for safety and symmetry with the add operation
commit();
} catch (serializers::NotFound &) {
// No such tape. Nothing to to.
throw NoSuchTape("In TapePool::removeTapeAndCommit: trying to remove non-existing tape");
}
}
auto cta::objectstore::TapePool::dumpTapes() -> std::list<TapeDump>{
checkPayloadReadable();
std::list<TapeDump> ret;
auto & tl = m_payload.tapes();
for (auto t=tl.begin(); t!=tl.end(); t++) {
ret.push_back(TapeDump());
ret.back().address = t->address();
ret.back().vid = t->vid();
ret.back().capacityInBytes = t->capacity();
ret.back().logicalLibraryName = t->library();
ret.back().log.deserialize(t->log());
}
return ret;
}
std::string cta::objectstore::TapePool::getTapeAddress(const std::string& vid) {
checkPayloadReadable();
return serializers::findElement(m_payload.tapes(), vid).address();
}
bool cta::objectstore::TapePool::isEmpty() {
checkPayloadReadable();
// Check we have no tapes in pool
......@@ -57,7 +161,7 @@ bool cta::objectstore::TapePool::isEmpty() {
void cta::objectstore::TapePool::garbageCollect() {
checkPayloadWritable();
if (!isEmpty()) {
throw (NotEmpty("Trying to garbage collect a non-empty AgentRegister: internal error"));
throw (NotEmpty("Trying to garbage collect a non-empty TapePool: internal error"));
}
remove();
}
......
......@@ -22,6 +22,9 @@
#include "ObjectOps.hpp"
#include <string>
#include "objectstore/cta.pb.h"
#include "scheduler/CreationLog.hpp"
#include "CreationLog.hpp"
#include "Agent.hpp"
namespace cta { namespace objectstore {
......@@ -42,6 +45,26 @@ public:
void setName(const std::string & name);
std::string getName();
// Tapes management ==========================================================
std::string addOrGetTapeAndCommit(const std::string &vid,
const std::string &logicalLibraryName, const uint64_t capacityInBytes,
Agent & agent, const cta::CreationLog & CreationLog);
CTA_GENERATE_EXCEPTION_CLASS(NoSuchTape);
CTA_GENERATE_EXCEPTION_CLASS(WrongTape);
void removeTapeAndCommit(const std::string &vid);
std::string getTapeAddress(const std::string &vid);
class TapeDump {
public:
std::string vid;
std::string address;
std::string logicalLibraryName;
uint64_t capacityInBytes;
objectstore::CreationLog log;
};
std::list<TapeDump> dumpTapes();
// Archival jobs management ==================================================
// Check that the tape pool is empty (of both tapes and jobs)
bool isEmpty();
......
/*
* The CERN Tape Archive (CTA) project
* Copyright (C) 2015 CERN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <gtest/gtest.h>
#include "Tape.hpp"
#include "BackendVFS.hpp"
#include "Agent.hpp"
namespace unitTests {
TEST(Tape, BasicAccess) {
cta::objectstore::BackendVFS be;
cta::objectstore::Agent agent(be);
agent.generateName("unitTest");
std::string tapeAddress = agent.nextId("Tape");
{
// Try to create the tape entry
cta::objectstore::Tape t(tapeAddress, be);
t.initialize("V12345");
t.insert();
}
{
// Try to read back and dump the tape
cta::objectstore::Tape t(tapeAddress, be);
ASSERT_THROW(t.fetch(), cta::exception::Exception);
cta::objectstore::ScopedSharedLock lock(t);
ASSERT_NO_THROW(t.fetch());
t.dump();
}
// Delete the root entry
cta::objectstore::Tape t(tapeAddress, be);
cta::objectstore::ScopedExclusiveLock lock(t);
t.fetch();
t.removeIfEmpty();
ASSERT_EQ(false, t.exists());
}
}
\ No newline at end of file
......@@ -8,14 +8,7 @@ enum ObjectType {
Agent_t = 2;
TapePool_t = 3;
DriveRegister_t = 4;
// JobPool_t = 3;
// RecallFIFO_t = 4;
// MigrationFIFO_t = 5;
// RecallJob_t = 6;
// Counter_t = 7;
// FIFO_t = 8;
// AdminUsersList_t = 9;
// StorageClassList_t = 10;
Tape_t = 5;
GenericObject_t = 1000;
}
......@@ -149,13 +142,6 @@ message Agent {
repeated string ownedobjects = 2003;
}
////// The registers (simple name arrays)
////message Register {
//// repeated string elements = 150;
////}
////
// The agent register holds 2 lists:
// a full list, and a list of agents not yet watched
message AgentRegister {
repeated string agents = 2100;
repeated string untrackedagents = 2101;
......@@ -168,12 +154,19 @@ message ArchivalJobPointer {
required string address = 3002;
}
message RetrievalJobPointer {
required uint64 size = 3001;
required string address = 3002;
}
// ------------- Tape pools ---------------------------------------------------
message TapePointer {
required string name = 4000;
required string vid = 4000;
required string address = 4001;
required CreationLog log = 4002;
required string library = 4002;
required uint64 capacity = 4003;
required CreationLog log = 4004;
}
message TapePool {
......@@ -183,131 +176,30 @@ message TapePool {
required uint64 ArchivalJobsTotalSize = 4103;
}
// ------------- Tape ----------------------------------------------------------
message MountInfo {
required string host = 4200;
required string drivevendor = 4201;
required string drivemodel = 4202;
required string driveserial = 4203;
required uint64 time = 4204;
}
message Tape {
required string vid = 4300;
required uint64 lastfseq = 4301;
required uint64 bytesstored = 4302;
repeated MountInfo readmounts = 4303;
repeated MountInfo writemounts = 4304;
repeated RetrievalJobPointer retrievaljobs = 4305;
}
// ------------- Drives handling ----------------------------------------------
message DriveRegister {
repeated string drivenames = 5000;
}
////
////// A basic FIFO
////// poping is done by increasing the read pointer, and from time to time
////// collapsing the name array.
////// There is no write pointer because we always append at the end of the name
////// array.
////message FIFO {
//// required uint64 readPointer = 200;
//// repeated string name = 201;
////}
////
////// A basic shared counter
////message Counter {
//// required uint64 count = 300;
////}
////
////// The agents's elements:
////message ObjectCreationIntent {
//// required ObjectType type = 1101;
//// required string name = 1102;
//// required string container = 1103;
////}
////
////message ObjectOwnershipIntent {
//// required ObjectType type = 1111;
//// required string name = 1112;
////}
////
////// The tape record
////message Tape {
//// required string type = 2001;
//// required string format = 2002;
//// required string vid = 2003;
//// required uint64 maxFseq = 2004;
//// required string status = 2005 ;
////}
////
////// The drive record
////message Drive {
//// required string name = 3001;
//// required string status = 3002;
////}
////
////// The jobs
////message MigrationJob {
//// required string owner = 4001;
//// required string status = 4002;
//// required string source = 4003;
//// required string destination = 4004;
////}
////
////message RecallJob {
//// required string owner = 5001;
//// required string status = 5002;
//// required string source = 5003;
//// required string destination = 5004;
////}
////
////// The job pool
////message JobPool {
//// required string migration = 7001;
//// required string recall = 7002;
//// required string recallcounter = 7003;
////}
////
////message RecallFIFO {}
////
////message MigrationFIFO {}
////
////message UserIdentity {
//// required uint32 uid = 8001;
//// required uint32 gid = 8002;
////}
////
////message AdminUser {
//// required UserIdentity user = 8010;
//// required UserIdentity creator = 8011;
//// required uint64 creationTime = 8012;
//// required string comment = 8013;
////}
////
////// AdminHosts are just strings
////