Commit addfc4e1 authored by Eric Cano's avatar Eric Cano
Browse files

Improved enqueueing logs.

parent e29e2323
......@@ -50,7 +50,7 @@ using namespace objectstore;
// OStoreDB::OStoreDB()
//------------------------------------------------------------------------------
OStoreDB::OStoreDB(objectstore::Backend& be, catalogue::Catalogue & catalogue, log::Logger &logger):
m_objectStore(be), m_catalogue(catalogue), m_logger(logger), m_threadCounter(0) {
m_objectStore(be), m_catalogue(catalogue), m_logger(logger), m_taskQueueSize(0) {
for (size_t i=0; i<10; i++) {
m_enqueueingWorkerThreads.emplace_back(new EnqueueingWorkerThread(m_enqueueingTasksQueue));
m_enqueueingWorkerThreads.back()->start();
......@@ -61,7 +61,7 @@ OStoreDB::OStoreDB(objectstore::Backend& be, catalogue::Catalogue & catalogue, l
// OStoreDB::~OStoreDB()
//------------------------------------------------------------------------------
OStoreDB::~OStoreDB() throw() {
while (m_threadCounter) sleep(1);
while (m_taskQueueSize) sleep(1);
for (__attribute__((unused)) auto &t: m_enqueueingWorkerThreads) m_enqueueingTasksQueue.push(nullptr);
for (auto &t: m_enqueueingWorkerThreads) {
t->wait();
......@@ -89,7 +89,7 @@ void OStoreDB::assertAgentAddressSet() {
// OStoreDB::waitSubthreadsComplete()
//------------------------------------------------------------------------------
void OStoreDB::waitSubthreadsComplete() {
while (m_threadCounter) std::this_thread::yield();
while (m_taskQueueSize) std::this_thread::yield();
}
//------------------------------------------------------------------------------
......@@ -449,15 +449,26 @@ void OStoreDB::queueArchive(const std::string &instanceName, const cta::common::
}
// We create the object here
m_agentReference->addToOwnership(aReq->getAddressIfSet(), m_objectStore);
double agentReferencingTime = timer.secs(cta::utils::Timer::reset_t::resetCounter);
aReq->setOwner(m_agentReference->getAgentAddress());
aReq->insert();
double insertionTime = timer.secs(cta::utils::Timer::reset_t::resetCounter);
// The request is now safe in the object store. We can now return to the caller and fire (and forget) a thread
// complete the bottom half of it.
m_threadCounter++;
uint64_t taskQueueSize = ++m_taskQueueSize;
// Prepare the logs to avoid multithread access on the object.
log::ScopedParamContainer params(logContext);
params.add("jobObject", aReq->getAddressIfSet())
.add("fileId", aReq->getArchiveFile().archiveFileID)
.add("diskInstance", aReq->getArchiveFile().diskInstance)
.add("diskFilePath", aReq->getArchiveFile().diskFileInfo.path)
.add("diskFileId", aReq->getArchiveFile().diskFileId)
.add("agentReferencingTime", agentReferencingTime)
.add("insertionTime", insertionTime);
m_enqueueingTasksQueue.push(new EnqueueingTask([aReq, this]{
// This unique_ptr's destructor will ensure the OStoreDB object is not deleted before the thread exits.
auto scopedCounterDecrement = [this](void *){
m_threadCounter--;
m_taskQueueSize--;
};
// A bit ugly, but we need a non-null pointer for the "deleter" to be called.
std::unique_ptr<void, decltype(scopedCounterDecrement)> scopedCounterDecrementerInstance((void *)1, scopedCounterDecrement);
......@@ -496,7 +507,7 @@ void OStoreDB::queueArchive(const std::string &instanceName, const cta::common::
.add("queueingTime", qTime)
.add("commitTime", cTime)
.add("queueUnlockTime", qUnlockTime);
logContext.log(log::INFO, "In OStoreDB::queueArchive_bottomHalf(): added job to queue");
logContext.log(log::INFO, "In OStoreDB::queueArchive_bottomHalf(): added job to queue.");
}
} catch (NoSuchArchiveQueue &ex) {
// Unlink the request from already connected tape pools
......@@ -540,7 +551,11 @@ void OStoreDB::queueArchive(const std::string &instanceName, const cta::common::
.add("agentOwnershipResetTime", timer.secs());
logContext.log(log::INFO, "In OStoreDB::queueArchive_bottomHalf(): Finished enqueueing request.");
}));
double taskPostingTime = timer.secs(cta::utils::Timer::reset_t::resetCounter);
params.add("taskPostingTime", taskPostingTime)
.add("taskQueueSize", taskQueueSize)
.add("totalTime", agentReferencingTime + insertionTime + taskPostingTime);
logContext.log(log::INFO, "In OStoreDB::queueArchive(): recorded request for queueing (enqueueing posted to thread pool).");
}
//------------------------------------------------------------------------------
......@@ -746,7 +761,7 @@ std::string OStoreDB::queueRetrieve(const cta::common::dataStructures::RetrieveR
rReq->setActiveCopyNumber(criteria.archiveFile.tapeFiles.begin()->second.copyNb);
rReq->insert();
double insertionTime = timer.secs(cta::utils::Timer::reset_t::resetCounter);
m_threadCounter++;
uint64_t taskQueueSize = ++m_taskQueueSize;
// Prepare the logs to avoid multithread access on the object.
log::ScopedParamContainer params(logContext);
params.add("vid", bestVid)
......@@ -761,7 +776,7 @@ std::string OStoreDB::queueRetrieve(const cta::common::dataStructures::RetrieveR
m_enqueueingTasksQueue.push(new EnqueueingTask([rReq, job, bestVid, this]{
// This unique_ptr's destructor will ensure the OStoreDB object is not deleted before the thread exits.
auto scopedCounterDecrement = [this](void *){
m_threadCounter--;
m_taskQueueSize--;
};
// A bit ugly, but we need a non-null pointer for the "deleter" to be called.
std::unique_ptr<void, decltype(scopedCounterDecrement)> scopedCounterDecrementerInstance((void *)1, scopedCounterDecrement);
......@@ -798,12 +813,13 @@ std::string OStoreDB::queueRetrieve(const cta::common::dataStructures::RetrieveR
.add("queueUnlockTime", qUnlockTime)
.add("requestUnlockTime", rUnlockTime)
.add("totalTime", rLockTime + qTime + cTime + qUnlockTime + rUnlockTime);
logContext.log(log::INFO, "In OStoreDB::queueRetrieve(): added job to queue (enqueueing finished).");
logContext.log(log::INFO, "In OStoreDB::queueRetrieve()_bottomHalf(): added job to queue (enqueueing finished).");
}));
double taskPostingTime = timer.secs(cta::utils::Timer::reset_t::resetCounter);
params.add("taskPostingTime", taskPostingTime)
.add("taskQueueSize", taskQueueSize)
.add("totalTime", vidSelectionTime + agentReferencingTime + insertionTime + taskPostingTime);
logContext.log(log::INFO, "In OStoreDB::queueRetrieve(): recorded job for queueing (enqueueing posted to thread pool).");
logContext.log(log::INFO, "In OStoreDB::queueRetrieve(): recorded request for queueing (enqueueing posted to thread pool).");
}
return bestVid;
}
......
......@@ -374,7 +374,7 @@ private:
catalogue::Catalogue & m_catalogue;
log::Logger & m_logger;
objectstore::AgentReference *m_agentReference = nullptr;
std::atomic<uint64_t> m_threadCounter; ///< This counter ensures destruction happens after the last thread completed.
std::atomic<uint64_t> m_taskQueueSize; ///< This counter ensures destruction happens after the last thread completed.
};
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment