Commit c62224fa authored by Eric Cano's avatar Eric Cano
Browse files

Created a new Helpers class to store the multi-objects algorithms.

Factored a first function out of OStoreDB into Herlpers.
parent c94727ae
......@@ -68,7 +68,8 @@ add_library (ctaobjectstore SHARED
GenericObject.cpp
GarbageCollector.cpp
SchedulerGlobalLock.cpp
ValueCountMap.cpp)
ValueCountMap.cpp
Helpers.cpp)
target_link_libraries(ctaobjectstore rados json-c ctautils)
set_source_files_properties(BackendRados.cpp PROPERTIES COMPILE_FLAGS -Wno-deprecated-declarations)
install (TARGETS ctaobjectstore DESTINATION usr/${CMAKE_INSTALL_LIBDIR})
......
/*
* The CERN Tape Archive (CTA) project
* Copyright (C) 2015 CERN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "Helpers.hpp"
#include "Backend.hpp"
#include "ArchiveQueue.hpp"
#include "AgentReference.hpp"
#include "RootEntry.hpp"
namespace cta { namespace objectstore {
//------------------------------------------------------------------------------
// OStoreDB::getLockedAndFetchedArchiveQueue()
//------------------------------------------------------------------------------
void Helpers::getLockedAndFetchedArchiveQueue(ArchiveQueue& archiveQueue,
ScopedExclusiveLock& archiveQueueLock, AgentReference & agentReference,
const std::string& tapePool) {
// TODO: if necessary, we could use a singleton caching object here to accelerate
// lookups.
// Getting a locked AQ is the name of the game.
// Try and find an existing one first, create if needed
Backend & be = archiveQueue.m_objectStore;
for (size_t i=0; i<5; i++) {
{
RootEntry re (be);
ScopedSharedLock rel(re);
re.fetch();
try {
archiveQueue.setAddress(re.getArchiveQueueAddress(tapePool));
} catch (cta::exception::Exception & ex) {
rel.release();
ScopedExclusiveLock rexl(re);
re.fetch();
archiveQueue.setAddress(re.addOrGetArchiveQueueAndCommit(tapePool, agentReference));
}
}
try {
archiveQueueLock.lock(archiveQueue);
archiveQueue.fetch();
return;
} catch (cta::exception::Exception & ex) {
// We have a (rare) opportunity for a race condition, where we identify the
// queue and it gets deleted before we manage to lock it.
// The locking of fetching will fail in this case.
// We hence allow ourselves to retry a couple times.
continue;
}
}
throw cta::exception::Exception(std::string(
"In OStoreDB::getLockedArchiveQueue(): failed to find or create and lock archive queue after 5 retries for tapepool: ")
+ tapePool);
}
}} // namespace cta::objectstore.
\ No newline at end of file
/*
* The CERN Tape Archive (CTA) project
* Copyright (C) 2015 CERN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include <string>
/**
* A collection of helper functions for commonly used multi-object operations
*/
namespace cta { namespace objectstore {
class ArchiveQueue;
class ScopedExclusiveLock;
class AgentReference;
/**
* A class with static functions allowing multi-object operations
*/
class Helpers {
public:
/**
* Find or create an archive queue, and return it locked and fetched to the caller
* (ArchiveQueue and ScopedExclusiveLock objects are provided empty)
* @param archiveQueue the ArchiveQueue object, empty
* @param archiveQueueLock the lock, not initialized
* @param tapePool the name of the needed tape pool
*/
static void getLockedAndFetchedArchiveQueue(ArchiveQueue & archiveQueue,
ScopedExclusiveLock & archiveQueueLock, AgentReference & agentReference,
const std::string & tapePool);
};
}} // namespace cta::objectstore
\ No newline at end of file
......@@ -31,6 +31,7 @@ class ObjectOpsBase {
friend class ScopedSharedLock;
friend class ScopedExclusiveLock;
friend class GenericObject;
friend class Helpers;
protected:
ObjectOpsBase(Backend & os): m_nameSet(false), m_objectStore(os),
m_headerInterpreted(false), m_payloadInterpreted(false),
......
......@@ -19,6 +19,7 @@
#include "common/helgrind_annotator.hpp"
#include "MemQueues.hpp"
#include "OStoreDB.hpp"
#include "objectstore/Helpers.hpp"
namespace cta { namespace ostoredb {
......@@ -104,7 +105,7 @@ std::shared_ptr<SharedQueueLock> MemArchiveQueue::sharedAddToArchiveQueueWithNew
ret->m_lock.reset(new objectstore::ScopedExclusiveLock);
auto & aq = *ret->m_queue;
auto & aql = *ret->m_lock;
oStoreDB.getLockedAndFetchedArchiveQueue(aq, aql, job.tapePool);
objectstore::Helpers::getLockedAndFetchedArchiveQueue(aq, aql, *oStoreDB.m_agentReference, job.tapePool);
size_t aqSizeBefore=aq.dumpJobs().size();
size_t addedJobs=1;
// First add the job for this thread
......
......@@ -292,46 +292,6 @@ std::unique_ptr<SchedulerDatabase::TapeMountDecisionInfo>
}
*/
//------------------------------------------------------------------------------
// OStoreDB::getLockedAndFetchedArchiveQueue()
//------------------------------------------------------------------------------
void OStoreDB::getLockedAndFetchedArchiveQueue(cta::objectstore::ArchiveQueue& archiveQueue,
cta::objectstore::ScopedExclusiveLock& archiveQueueLock, const std::string& tapePool) {
// TODO: if necessary, we could use a singleton caching object here to accelerate
// lookups.
// Getting a locked AQ is the name of the game.
// Try and find an existing one first, create if needed
for (size_t i=0; i<5; i++) {
{
RootEntry re (m_objectStore);
ScopedSharedLock rel(re);
re.fetch();
try {
archiveQueue.setAddress(re.getArchiveQueueAddress(tapePool));
} catch (cta::exception::Exception & ex) {
rel.release();
ScopedExclusiveLock rexl(re);
re.fetch();
archiveQueue.setAddress(re.addOrGetArchiveQueueAndCommit(tapePool, *m_agentReference));
}
}
try {
archiveQueueLock.lock(archiveQueue);
archiveQueue.fetch();
return;
} catch (cta::exception::Exception & ex) {
// We have a (rare) opportunity for a race condition, where we identify the
// queue and it gets deleted before we manage to lock it.
// The locking of fetching will fail in this case.
// We hence allow ourselves to retry a couple times.
continue;
}
}
throw cta::exception::Exception(std::string(
"In OStoreDB::getLockedArchiveQueue(): failed to find or create and lock archive queue after 5 retries for tapepool: ")
+ tapePool);
}
//------------------------------------------------------------------------------
// OStoreDB::queueArchive()
//------------------------------------------------------------------------------
......
......@@ -165,19 +165,6 @@ public:
void queueArchive(const std::string &instanceName, const cta::common::dataStructures::ArchiveRequest &request,
const cta::common::dataStructures::ArchiveFileQueueCriteria &criteria, log::LogContext &logContext) override;
private:
/**
* Find or create an archive queue, and return it locked and fetched to the caller
* (ArchiveQueue and ScopedExclusiveLock objects are provided empty)
* @param archiveQueue the ArchiveQueue object, empty
* @param archiveQueueLock the lock, not initialized
* @param tapePool the name of the needed tape pool
*/
void getLockedAndFetchedArchiveQueue(cta::objectstore::ArchiveQueue & archiveQueue,
cta::objectstore::ScopedExclusiveLock & archiveQueueLock,
const std::string & tapePool);
public:
CTA_GENERATE_EXCEPTION_CLASS(NoSuchArchiveRequest);
CTA_GENERATE_EXCEPTION_CLASS(ArchiveRequestAlreadyDeleted);
virtual void deleteArchiveRequest(const std::string &diskInstanceName, uint64_t fileId) override;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment