Commit cafadcf9 authored by Eric Cano's avatar Eric Cano
Browse files

Split the DriveRegister into separate DriveState objects for each drive.

This will remove contention on the drive register as drives uipdate their statuses.
Also simplified structures in OStoreDB implementation (less references with added friend relations).
parent 8f0dc5c1
......@@ -158,6 +158,34 @@ public:
virtual ~AsyncDeleter() {}
};
/**
* A base class handling asynchronous fetch (lockfree).
* The operation will be asynchronous, and the result
* (success or exception) will be returned via the wait() function call.
*/
class AsyncLockfreeFetcher {
public:
/**
* Waits for completion (success) of throws exception (failure).
* The return value is the content of the object.
*/
virtual std::string wait() = 0;
/**
* Destructor
*/
virtual ~AsyncLockfreeFetcher() {}
};
/**
* Triggers the asynchronous object fetch, as described in
* AsyncLockfreeFetcher class description.
*
* @param name The name of the object to be deleted.
* @return pointer to a newly created AsyncDeleter
*/
virtual AsyncLockfreeFetcher * asyncLockfreeFetch(const std::string & name) = 0;
/**
* Triggers the asynchronous object delete sequence, as described in
* AsyncDeleter class description.
......
......@@ -750,28 +750,65 @@ void BackendRados::AsyncDeleter::deleteCallback(librados::completion_t completio
}
}
void BackendRados::AsyncDeleter::deleteEmptyCallback(librados::completion_t completion, void* pThis) {
AsyncDeleter & au = *((AsyncDeleter *) pThis);
Backend::AsyncLockfreeFetcher* BackendRados::asyncLockfreeFetch(const std::string& name) {
return new AsyncLockfreeFetcher(*this, name);
}
void BackendRados::AsyncDeleter::wait() {
m_jobFuture.get();
ANNOTATE_HAPPENS_AFTER(&m_job);
ANNOTATE_HAPPENS_BEFORE_FORGET_ALL(&m_job);
}
BackendRados::AsyncLockfreeFetcher::AsyncLockfreeFetcher(BackendRados& be, const std::string& name):
m_backend(be), m_name(name), m_job(), m_jobFuture(m_job.get_future()) {
// At construction time, we just start the read.
librados::AioCompletion * aioc = librados::Rados::aio_create_completion(this, fetchCallback, nullptr);
RadosTimeoutLogger rtl;
m_radosTimeoutLogger.reset();
auto rc=m_backend.m_radosCtx.aio_read(m_name, aioc, &m_radosBufferList, std::numeric_limits<int32_t>::max(), 0);
rtl.logIfNeeded("BackendRados::AsyncLockfreeFetcher::AsyncLockfreeFetcher(): m_radosCtx.aio_read()", m_name);
aioc->release();
if (rc) {
cta::exception::Errnum errnum (-rc, std::string("In BackendRados::AsyncLockfreeFetcher::AsyncLockfreeFetcher(): failed to launch aio_read(): ")+m_name);
throw Backend::NoSuchObject(errnum.getMessageValue());
}
}
void BackendRados::AsyncLockfreeFetcher::fetchCallback(librados::completion_t completion, void* pThis) {
AsyncLockfreeFetcher & au = *((AsyncLockfreeFetcher *) pThis);
au.m_radosTimeoutLogger.logIfNeeded("In BackendRados::AsyncLockfreeFetcher::fetchCallback(): aio_read callback", au.m_name);
try {
// Check that the object could be deleted.
if (rados_aio_get_return_value(completion)) {
// Check that the object could be read.
if (rados_aio_get_return_value(completion)<0) {
cta::exception::Errnum errnum(-rados_aio_get_return_value(completion),
std::string("In BackendRados::AsyncDeleter::deleteEmptyCallback(): could not delete object: ") + au.m_name);
throw Backend::CouldNotDelete(errnum.getMessageValue());
std::string("In BackendRados::AsyncLockfreeFetcher::fetchCallback(): could not read object: ") + au.m_name);
throw Backend::CouldNotFetch(errnum.getMessageValue());
}
// object deleted then throw an exception
throw Backend::NoSuchObject(std::string("In BackendRados::AsyncDeleter::deleteEmptyCallback(): no such object: ") + au.m_name);
// The data is in the buffer list.
std::string value;
try {
au.m_radosBufferList.copy(0, au.m_radosBufferList.length(), value);
} catch (std::exception & ex) {
throw CouldNotUpdateValue(
std::string("In In BackendRados::AsyncUpdater::fetchCallback::update_lambda(): failed to read buffer: ") +
au.m_name + ": "+ ex.what());
}
ANNOTATE_HAPPENS_BEFORE(&au.m_job);
au.m_job.set_value(value);
} catch (...) {
ANNOTATE_HAPPENS_BEFORE(&au.m_job);
au.m_job.set_exception(std::current_exception());
}
}
}
void BackendRados::AsyncDeleter::wait() {
m_jobFuture.get();
std::string BackendRados::AsyncLockfreeFetcher::wait() {
auto ret=m_jobFuture.get();
ANNOTATE_HAPPENS_AFTER(&m_job);
ANNOTATE_HAPPENS_BEFORE_FORGET_ALL(&m_job);
}
return ret;
}
std::string BackendRados::Parameters::toStr() {
std::stringstream ret;
......
......@@ -253,19 +253,41 @@ public:
std::unique_ptr<std::future<void>> m_lockAsync;
/** A string used to identify the locker */
std::string m_lockClient;
/** A future the hole the the structure of the update operation. It will be either empty of complete at
destruction time */
std::unique_ptr<std::future<void>> m_updateAsync;
/** The second callback operation (after deleting) */
static void deleteCallback(librados::completion_t completion, void *pThis);
/** Async delete in case of zero sized object */
static void deleteEmptyCallback(librados::completion_t completion, void *pThis);
/** Instrumentation for rados calls timing */
RadosTimeoutLogger m_radosTimeoutLogger;
};
Backend::AsyncDeleter* asyncDelete(const std::string & name) override;
/**
* A class following up the async lockfree fetch.
* Constructor implicitly starts the fetch step.
*/
class AsyncLockfreeFetcher: public Backend::AsyncLockfreeFetcher {
public:
AsyncLockfreeFetcher(BackendRados & be, const std::string & name);
std::string wait() override;
private:
/** A reference to the backend */
BackendRados &m_backend;
/** The object name */
const std::string m_name;
/** The promise that will both do the job and allow synchronization with the caller. */
std::promise<std::string> m_job;
/** The future from m_jobs, which will be extracted before any thread gets a chance to play with it. */
std::future<std::string> m_jobFuture;
/** The rados bufferlist used to hold the object data (read+write) */
::librados::bufferlist m_radosBufferList;
/** The callback for the fetch operation */
static void fetchCallback(librados::completion_t completion, void *pThis);
/** Instrumentation for rados calls timing */
RadosTimeoutLogger m_radosTimeoutLogger;
};
Backend::AsyncLockfreeFetcher* asyncLockfreeFetch(const std::string& name) override;
class Parameters: public Backend::Parameters {
friend class BackendRados;
public:
......
......@@ -162,6 +162,9 @@ TEST_P(BackendAbstractTest, AsyncIOInterface) {
std::unique_ptr<cta::objectstore::Backend::AsyncUpdater> updater(m_os->asyncUpdate(testObjectName,updaterCallback));
updater->wait();
ASSERT_EQ(testSecondValue, m_os->read(testObjectName));
// Async re-read
std::unique_ptr<cta::objectstore::Backend::AsyncLockfreeFetcher> reader(m_os->asyncLockfreeFetch(testObjectName));
ASSERT_EQ(testSecondValue, reader->wait());
m_os->remove(testObjectName);
}
......
......@@ -436,6 +436,26 @@ void BackendVFS::AsyncDeleter::wait() {
ANNOTATE_HAPPENS_BEFORE_FORGET_ALL(&m_job);
}
BackendVFS::AsyncLockfreeFetcher::AsyncLockfreeFetcher(BackendVFS& be, const std::string& name):
m_backend(be), m_name(name),
m_job(std::async(std::launch::async,
[&](){
ANNOTATE_HAPPENS_BEFORE(&m_job);
return m_backend.read(name);
}))
{ }
Backend::AsyncLockfreeFetcher* BackendVFS::asyncLockfreeFetch(const std::string& name) {
return new AsyncLockfreeFetcher(*this, name);
}
std::string BackendVFS::AsyncLockfreeFetcher::wait() {
auto ret = m_job.get();
ANNOTATE_HAPPENS_AFTER(&m_job);
ANNOTATE_HAPPENS_BEFORE_FORGET_ALL(&m_job);
return ret;
}
std::string BackendVFS::Parameters::toStr() {
std::stringstream ret;
ret << "path=" << m_path;
......
......@@ -124,10 +124,28 @@ public:
std::future<void> m_job;
};
/**
* A class mimicking AIO using C++ async tasks
*/
class AsyncLockfreeFetcher: public Backend::AsyncLockfreeFetcher {
public:
AsyncLockfreeFetcher(BackendVFS & be, const std::string & name);
std::string wait() override;
private:
/** A reference to the backend */
BackendVFS &m_backend;
/** The object name */
const std::string m_name;
/** The future that will both do the job and allow synchronization with the caller. */
std::future<std::string> m_job;
};
Backend::AsyncUpdater* asyncUpdate(const std::string & name, std::function <std::string(const std::string &)> & update) override;
Backend::AsyncDeleter* asyncDelete(const std::string & name) override;
Backend::AsyncLockfreeFetcher* asyncLockfreeFetch(const std::string& name) override;
class Parameters: public Backend::Parameters {
friend class BackendVFS;
public:
......
......@@ -32,6 +32,8 @@ PROTOBUF3_GENERATE_CPP(CTAProtoSources CTAProtoHeaders ${CTAProtoFiles})
set (CTAProtoDependants objectstore/Agent.hpp
objectstore/ArchiveRequest.hpp
objectstore/CreationLog.hpp
objectstore/DriveRegister.hpp
objectstore/DriveState.hpp
objectstore/GenericObject.hpp
objectstore/ObjectOps.hpp
objectstore/RetrieveRequest.hpp
......@@ -60,6 +62,7 @@ add_library (ctaobjectstore SHARED
ArchiveRequest.cpp
RetrieveRequest.cpp
DriveRegister.cpp
DriveState.cpp
BackendVFS.cpp
BackendRados.cpp
BackendPopulator.cpp
......
......@@ -101,168 +101,46 @@ void DriveRegister::garbageCollect(const std::string &presumedOwner, AgentRefere
}
//------------------------------------------------------------------------------
// DriveRegister::getAllDrivesState())
// DriveRegister::getDriveAddresses()
//------------------------------------------------------------------------------
std::list<cta::common::dataStructures::DriveState> DriveRegister::getAllDrivesState() {
using cta::common::dataStructures::DriveState;
// Get all drives states
std::list<DriveRegister::DriveAddress> DriveRegister::getDriveAddresses() {
checkPayloadReadable();
std::list<cta::common::dataStructures::DriveState> ret;
std::list<DriveAddress> ret;
for (auto & d: m_payload.drives()) {
ret.push_back(DriveState());
ret.back().driveName = d.drivename();
ret.back().host = d.host();
ret.back().logicalLibrary = d.logicallibrary();
ret.back().sessionId = d.sessionid();
ret.back().bytesTransferredInSession = d.bytestransferedinsession();
ret.back().filesTransferredInSession = d.filestransferedinsession();
ret.back().latestBandwidth = d.latestbandwidth();
ret.back().sessionStartTime = d.sessionstarttime();
ret.back().mountStartTime = d.mountstarttime();
ret.back().transferStartTime = d.transferstarttime();
ret.back().unloadStartTime = d.unloadstarttime();
ret.back().unmountStartTime = d.unmountstarttime();
ret.back().drainingStartTime = d.drainingstarttime();
ret.back().downOrUpStartTime = d.downorupstarttime();
ret.back().probeStartTime = d.probestarttime();
ret.back().cleanupStartTime = d.cleanupstarttime();
ret.back().lastUpdateTime = d.lastupdatetime();
ret.back().startStartTime = d.startstarttime();
ret.back().shutdownTime = d.shutdowntime();
ret.back().mountType = (common::dataStructures::MountType) d.mounttype();
ret.back().driveStatus = (common::dataStructures::DriveStatus) d.drivestatus();
ret.back().desiredDriveState.up = d.desiredup();
ret.back().desiredDriveState.forceDown = d.desiredforcedown();
ret.back().currentVid = d.currentvid();
ret.back().currentTapePool = d.currenttapepool();
if (d.has_nextmounttype()) { ret.back().nextMountType = (common::dataStructures::MountType) d.nextmounttype(); }
if (d.has_nexttapepool()) { ret.back().nextTapepool = d.nexttapepool(); }
if (d.has_nextvid()) { ret.back().nextVid = d.nextvid(); }
ret.push_back(DriveAddress());
ret.back().driveName=d.drivename();
ret.back().driveStateAddress=d.drivestateaddress();
}
return ret;
}
//------------------------------------------------------------------------------
// DriveRegister::getDriveState())
// DriveRegister::getDriveAddresse()
//------------------------------------------------------------------------------
cta::common::dataStructures::DriveState DriveRegister::getDriveState(const std::string& driveName) {
using cta::common::dataStructures::DriveState;
// Find the drive for which we want the status
std::string DriveRegister::getDriveAddress(const std::string& driveName) {
checkPayloadReadable();
for (auto & d: m_payload.drives()) {
if (d.drivename() == driveName) {
cta::common::dataStructures::DriveState ret;
ret.driveName = d.drivename();
ret.host = d.host();
ret.logicalLibrary = d.logicallibrary();
ret.sessionId = d.sessionid();
ret.bytesTransferredInSession = d.bytestransferedinsession();
ret.filesTransferredInSession = d.filestransferedinsession();
ret.latestBandwidth = d.latestbandwidth();
ret.sessionStartTime = d.sessionstarttime();
ret.mountStartTime = d.mountstarttime();
ret.transferStartTime = d.transferstarttime();
ret.unloadStartTime = d.unloadstarttime();
ret.unmountStartTime = d.unmountstarttime();
ret.drainingStartTime = d.drainingstarttime();
ret.downOrUpStartTime = d.downorupstarttime();
ret.probeStartTime = d.probestarttime();
ret.cleanupStartTime = d.cleanupstarttime();
ret.lastUpdateTime = d.lastupdatetime();
ret.startStartTime = d.startstarttime();
ret.shutdownTime = d.shutdowntime();
ret.mountType = (common::dataStructures::MountType) d.mounttype();
ret.driveStatus = (common::dataStructures::DriveStatus) d.drivestatus();
ret.desiredDriveState.up = d.desiredup();
ret.desiredDriveState.forceDown = d.desiredforcedown();
ret.currentVid = d.currentvid();
ret.currentTapePool = d.currenttapepool();
return ret;
}
}
throw cta::exception::Exception("In DriveRegister::getDriveState(): no such drive.");
for (auto & d:m_payload.drives())
if (d.drivename() == driveName)
return d.drivestateaddress();
throw NoSuchDrive("In DriveRegister::getDriveAddresse(): no such drive.");
}
//------------------------------------------------------------------------------
// DriveRegister::setDriveState())
// DriveRegister::getDriveAddresse()
//------------------------------------------------------------------------------
void DriveRegister::setDriveState(const cta::common::dataStructures::DriveState driveState) {
using cta::common::dataStructures::DriveState;
void DriveRegister::setDriveAddress(const std::string& driveName, const std::string& driveAddress) {
checkPayloadWritable();
// Find the drive to update (new or existing)
serializers::DriveState * ds = nullptr;
for (ssize_t i=0; i<m_payload.mutable_drives()->size(); i++) {
if (m_payload.mutable_drives(i)->drivename() == driveState.driveName) {
ds = m_payload.mutable_drives(i);
goto update;
}
}
// The drive was not found. We will create it.
ds = m_payload.mutable_drives()->Add();
ds->set_drivename(driveState.driveName);
update:
ds->set_host(driveState.host);
ds->set_logicallibrary(driveState.logicalLibrary);
ds->set_sessionid(driveState.sessionId);
ds->set_bytestransferedinsession(driveState.bytesTransferredInSession);
ds->set_filestransferedinsession(driveState.filesTransferredInSession);
ds->set_latestbandwidth(driveState.latestBandwidth);
ds->set_sessionstarttime(driveState.sessionStartTime);
ds->set_mountstarttime(driveState.mountStartTime);
ds->set_transferstarttime(driveState.transferStartTime);
ds->set_unloadstarttime(driveState.unloadStartTime);
ds->set_unmountstarttime(driveState.unmountStartTime);
ds->set_drainingstarttime(driveState.drainingStartTime);
ds->set_downorupstarttime(driveState.downOrUpStartTime);
ds->set_probestarttime(driveState.probeStartTime);
ds->set_cleanupstarttime(driveState.cleanupStartTime);
ds->set_lastupdatetime(driveState.lastUpdateTime);
ds->set_startstarttime(driveState.startStartTime);
ds->set_shutdowntime(driveState.shutdownTime);
ds->set_mounttype((uint32_t)driveState.mountType);
ds->set_drivestatus((uint32_t)driveState.driveStatus);
ds->set_desiredup(driveState.desiredDriveState.up);
ds->set_desiredforcedown(driveState.desiredDriveState.forceDown);
ds->set_currentvid(driveState.currentVid);
ds->set_currenttapepool(driveState.currentTapePool);
}
//------------------------------------------------------------------------------
// DriveRegister::setNextDriveState())
//------------------------------------------------------------------------------
void DriveRegister::setNextDriveState(const cta::common::dataStructures::DriveNextState driveNextState) {
using cta::common::dataStructures::MountType;
checkPayloadWritable();
// Find the drive to update (new or existing)
serializers::DriveState * ds = nullptr;
for (ssize_t i=0; i<m_payload.mutable_drives()->size(); i++) {
if (m_payload.mutable_drives(i)->drivename() == driveNextState.driveName) {
ds = m_payload.mutable_drives(i);
goto update;
for (int i=0; i< m_payload.mutable_drives()->size(); i++) {
auto d=m_payload.mutable_drives(i);
if (d->drivename() == driveName) {
d->set_drivestateaddress(driveAddress);
return;
}
}
// The drive was not found. We will create it.
ds = m_payload.mutable_drives()->Add();
ds->set_drivename(driveNextState.driveName);
update:
switch(driveNextState.mountType) {
case MountType::Archive:
ds->set_nexttapepool(driveNextState.tapepool);
// No break on purpose.
case MountType::Label:
case MountType::Retrieve:
ds->set_nextmounttype((int)driveNextState.mountType);
ds->set_nextvid(driveNextState.vid);
break;
default:
{
std::stringstream err;
err << "In DriveRegister::setNextDriveState(): unknown mount type: "
<< cta::common::dataStructures::toString(driveNextState.mountType);
throw cta::exception::Exception(err.str());
}
}
auto d=m_payload.mutable_drives()->Add();
d->set_drivename(driveName);
d->set_drivestateaddress(driveAddress);
return;
}
//------------------------------------------------------------------------------
......
......@@ -32,8 +32,8 @@ class GenericObject;
class EntryLogSerDeser;
class DriveRegister: public ObjectOps<serializers::DriveRegister, serializers::DriveRegister_t> {
CTA_GENERATE_EXCEPTION_CLASS(DuplicateEntry);
public:
CTA_GENERATE_EXCEPTION_CLASS(NoSuchDrive);
DriveRegister(const std::string & address, Backend & os);
DriveRegister(GenericObject & go);
void initialize();
......@@ -42,39 +42,39 @@ public:
cta::catalogue::Catalogue & catalogue) override;
bool isEmpty();
// Drives management =========================================================
/**
* Returns all the drive states stored in the drive registry.
* @return a list of all the drive states
* A drive register entry (drive name + object address)
*/
std::list<cta::common::dataStructures::DriveState> getAllDrivesState();
struct DriveAddress {
std::string driveName;
std::string driveStateAddress;
};
// Drives management =========================================================
/**
* Query the complete drive state for one drive.
* @param driveName
* @return complete drive state, or throws an exception if the drive is not
* known.
* Returns all the drive states addresses stored in the drive registry.
* @return a list of all the drive states
*/
cta::common::dataStructures::DriveState getDriveState(const std::string &driveName);
std::list<DriveAddress> getDriveAddresses();
/**
* Set the state of a drive. Either creates or overwrites the entry.
* @param driveState Full drive state (drive name is part of the structure).
* Returns all the drive states addresses stored in the drive registry.
* @return a list of all the drive states
*/
void setDriveState(const cta::common::dataStructures::DriveState driveState);
std::string getDriveAddress(const std::string & driveName);
/**
* Set the next state of a drive, following a mount decision (scheduling from
* idle or preemption).
* @param driveNextState Partial drive state.
* Adds a drive status reference to the register.
* @param driveName
* @param driveAddress
*/
void setNextDriveState(const cta::common::dataStructures::DriveNextState driveNextState);
void setDriveAddress(const std::string & driveName, const std::string &driveAddress);
/**
* Remove the drive from the register.
* @param name
* Removes entry from drive addresses.
* @param driveName
*/
void removeDrive (const std::string & driveName);
void removeDrive(const std::string & driveName);
/**
* JSON dump of the drive
......
/*
* The CERN Tape Archive (CTA) project
* Copyright (C) 2015 CERN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "DriveState.hpp"
#include "GenericObject.hpp"
namespace cta { namespace objectstore {
DriveState::DriveState(GenericObject& go):
ObjectOps<serializers::DriveState, serializers::DriveState_t>(go.objectStore()) {
// Here we transplant the generic object into the new object
go.transplantHeader(*this);
// And interpret the header.
getPayloadFromHeader();
}
void DriveState::garbageCollect(const std::string& presumedOwner, AgentReference& agentReference, log::LogContext& lc, cta::catalogue::Catalogue& catalogue) {
// The drive state is easily replaceable. We just delete it on garbage collection.
checkPayloadWritable();
if (presumedOwner != m_header.owner())
return;
remove();
}
void DriveState::initialize(const std::string & driveName) {
// Setup underlying object
ObjectOps<serializers::DriveState, serializers::DriveState_t>::initialize();
m_payload.set_drivename(driveName);
m_payload.set_host("");
m_payload.set_logicallibrary("");
m_payload.set_sessionid(0);
m_payload.set_bytestransferedinsession(0);
m_payload.set_filestransferedinsession(0);
m_payload.set_latestbandwidth(0);
m_payload.set_sessionstarttime(0);
m_payload.set_mountstarttime(0);
m_payload.set_transferstarttime(0);
m_payload.set_unloadstarttime(0);
m_payload.set_unmountstarttime(0);
m_payload.set_drainingstarttime(0);
m_payload.set_downorupstarttime(0);
m_payload.set_probestarttime(0);
m_payload.set_cleanupstarttime(0);
m_payload.set_lastupdatetime(0);
m_payload.set_startstarttime(0);
m_payload.set_shutdowntime(0);
m_payload.set_mounttype((uint32_t)common::dataStructures::MountType::NoMount);
m_payload.set_drivestatus((uint32_t)common::dataStructures::DriveStatus::Down);
m_payload.set_desiredup(false);
m_payload.set_desiredforcedown(false);
m_payload.set_currentvid("");
m_payload.set_currenttapepool("");
// This object is good to go (to storage)
m_payloadInterpreted = true;
}
DriveState::DriveState(const std::string& address, Backend& os):
ObjectOps<serializers::DriveState, serializers::DriveState_t>(os, address) { }
DriveState::DriveState(Backend& os):
ObjectOps<serializers::DriveState, serializers::DriveState_t>(os) { }
cta::common::dataStructures::DriveState DriveState::getState() {