Commit 208388b1 authored by Eric Cano's avatar Eric Cano
Browse files

Added annotations and mutexes for helgrind.

This allows helgrind to be aware of succession of events.
Also reduced the default queue depth for unit tests.
parent eadcd818
......@@ -35,6 +35,7 @@
#include "scheduler/LogicalLibrary.hpp"
#include "common/TapePool.hpp"
#include "common/dataStructures/MountPolicy.hpp"
#include "common/make_unique.hpp"
#include "tapeserver/castor/tape/tapeserver/daemon/TapeSessionStats.hpp"
#include <algorithm>
#include <stdlib.h> /* srand, rand */
......@@ -50,8 +51,8 @@ using namespace objectstore;
// OStoreDB::OStoreDB()
//------------------------------------------------------------------------------
OStoreDB::OStoreDB(objectstore::Backend& be, catalogue::Catalogue & catalogue, log::Logger &logger):
m_taskQueueSize(0), m_taskPostingSemaphore(25000), m_objectStore(be), m_catalogue(catalogue), m_logger(logger) {
for (size_t i=0; i<10; i++) {
m_taskQueueSize(0), m_taskPostingSemaphore(5), m_objectStore(be), m_catalogue(catalogue), m_logger(logger) {
for (size_t i=0; i<5; i++) {
m_enqueueingWorkerThreads.emplace_back(new EnqueueingWorkerThread(m_enqueueingTasksQueue));
m_enqueueingWorkerThreads.back()->start();
}
......@@ -111,6 +112,12 @@ void OStoreDB::setThreadNumber(uint64_t threadNumber) {
}
}
void OStoreDB::setBootomHalfQueueSize(uint64_t tasksNumber) {
// 5 is the default queue size.
m_taskPostingSemaphore.release(tasksNumber - 5);
}
//------------------------------------------------------------------------------
// OStoreDB::EnqueueingWorkerThread::run()
//------------------------------------------------------------------------------
......@@ -118,7 +125,9 @@ void OStoreDB::EnqueueingWorkerThread::run() {
while (true) {
std::unique_ptr<EnqueueingTask> et(m_enqueueingTasksQueue.pop());
if (!et.get()) break;
ANNOTATE_HAPPENS_AFTER(et.get());
(*et)();
ANNOTATE_HAPPENS_BEFORE_FORGET_ALL(et.get());
}
}
......@@ -445,6 +454,9 @@ void OStoreDB::queueArchive(const std::string &instanceName, const cta::common::
const cta::common::dataStructures::ArchiveFileQueueCriteriaAndFileId &criteria, log::LogContext &logContext) {
assertAgentAddressSet();
cta::utils::Timer timer;
auto mutexForHelgrind = cta::make_unique<cta::threading::Mutex>();
cta::threading::MutexLocker mlForHelgrind(*mutexForHelgrind);
auto * mutexForHelgrindAddr = mutexForHelgrind.release();
// Construct the archive request object in memory
auto aReq = std::make_shared<cta::objectstore::ArchiveRequest> (m_agentReference->nextId("ArchiveRequest"), m_objectStore);
aReq->initialize();
......@@ -500,7 +512,9 @@ void OStoreDB::queueArchive(const std::string &instanceName, const cta::common::
.add("agentReferencingTime", agentReferencingTime)
.add("insertionTime", insertionTime);
delayIfNecessary(logContext);
m_enqueueingTasksQueue.push(new EnqueueingTask([aReq, this]{
auto * et = new EnqueueingTask([aReq, mutexForHelgrindAddr, this]{
std::unique_ptr<cta::threading::Mutex> mutexForHelgrind(mutexForHelgrindAddr);
cta::threading::MutexLocker mlForHelgrind(*mutexForHelgrind);
// This unique_ptr's destructor will ensure the OStoreDB object is not deleted before the thread exits.
auto scopedCounterDecrement = [this](void *){
m_taskQueueSize--;
......@@ -590,7 +604,10 @@ void OStoreDB::queueArchive(const std::string &instanceName, const cta::common::
+ arTotalQueueUnlockTime + arOwnerResetTime + arLockRelease
+ agOwnershipResetTime);
logContext.log(log::INFO, "In OStoreDB::queueArchive_bottomHalf(): Finished enqueueing request.");
}));
});
ANNOTATE_HAPPENS_BEFORE(et);
mlForHelgrind.unlock();
m_enqueueingTasksQueue.push(et);
double taskPostingTime = timer.secs(cta::utils::Timer::reset_t::resetCounter);
params.add("taskPostingTime", taskPostingTime)
.add("taskQueueSize", taskQueueSize)
......@@ -749,6 +766,9 @@ std::list<SchedulerDatabase::RetrieveQueueStatistics> OStoreDB::getRetrieveQueue
std::string OStoreDB::queueRetrieve(const cta::common::dataStructures::RetrieveRequest& rqst,
const cta::common::dataStructures::RetrieveFileQueueCriteria& criteria, log::LogContext &logContext) {
assertAgentAddressSet();
auto mutexForHelgrind = cta::make_unique<cta::threading::Mutex>();
cta::threading::MutexLocker mlForHelgrind(*mutexForHelgrind);
auto *mutexForHelgrindAddr = mutexForHelgrind.release();
cta::utils::Timer timer;
// Get the best vid from the cache
std::set<std::string> candidateVids;
......@@ -815,7 +835,9 @@ std::string OStoreDB::queueRetrieve(const cta::common::dataStructures::RetrieveR
.add("agentReferencingTime", agentReferencingTime)
.add("insertionTime", insertionTime);
delayIfNecessary(logContext);
m_enqueueingTasksQueue.push(new EnqueueingTask([rReq, job, bestVid, this]{
auto * et = new EnqueueingTask([rReq, job, bestVid, mutexForHelgrindAddr, this]{
std::unique_ptr<cta::threading::Mutex> mutexForHelgrind(mutexForHelgrindAddr);
cta::threading::MutexLocker mlForHelgrind(*mutexForHelgrind);
// This unique_ptr's destructor will ensure the OStoreDB object is not deleted before the thread exits.
auto scopedCounterDecrement = [this](void *){
m_taskQueueSize--;
......@@ -862,7 +884,10 @@ std::string OStoreDB::queueRetrieve(const cta::common::dataStructures::RetrieveR
.add("totalTime", rLockTime + qTime + cTime + qUnlockTime
+ rUnlockTime + agOwnershipResetTime);
logContext.log(log::INFO, "In OStoreDB::queueRetrieve_bottomHalf(): added job to queue (enqueueing finished).");
}));
});
ANNOTATE_HAPPENS_BEFORE(et);
mlForHelgrind.unlock();
m_enqueueingTasksQueue.push(et);
double taskPostingTime = timer.secs(cta::utils::Timer::reset_t::resetCounter);
params.add("taskPostingTime", taskPostingTime)
.add("taskQueueSize", taskQueueSize)
......
......@@ -82,6 +82,7 @@ private:
public:
void waitSubthreadsComplete() override;
void setThreadNumber(uint64_t threadNumber);
void setBootomHalfQueueSize(uint64_t tasksNumber);
/*============ Basic IO check: validate object store access ===============*/
void ping() override;
......
......@@ -110,10 +110,10 @@ bool XrdSsiCtaServiceProvider::Init(XrdSsiLogger *logP, XrdSsiCluster *clsP, con
if (threadPoolSize.first) {
m_scheddb->setThreadNumber(threadPoolSize.second);
}
m_scheddb->setThreadNumber(25000);
// Initialise the Scheduler
m_scheduler = cta::make_unique<cta::Scheduler>(*m_catalogue, *m_scheddb, 5, 2*1000*1000);
try {
// If the backend is a VFS, make sure we don't delete it on exit
dynamic_cast<objectstore::BackendVFS &>(*m_backend).noDeleteOnExit();
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment