Commit 0ee7bd2e authored by Eric Cano's avatar Eric Cano
Browse files

Added adaptive delay and semaphore to limit task queue.

The delay will initially slow down the user and then the semaphore will block.
This prevent the queue from going too big and the agent object from growing
so big all becomes slow.
parent 371e6b30
......@@ -50,7 +50,7 @@ using namespace objectstore;
// OStoreDB::OStoreDB()
//------------------------------------------------------------------------------
OStoreDB::OStoreDB(objectstore::Backend& be, catalogue::Catalogue & catalogue, log::Logger &logger):
m_objectStore(be), m_catalogue(catalogue), m_logger(logger), m_taskQueueSize(0) {
m_taskQueueSize(0), m_taskPostingSemaphore(25000), m_objectStore(be), m_catalogue(catalogue), m_logger(logger) {
for (size_t i=0; i<10; i++) {
m_enqueueingWorkerThreads.emplace_back(new EnqueueingWorkerThread(m_enqueueingTasksQueue));
m_enqueueingWorkerThreads.back()->start();
......@@ -122,6 +122,39 @@ void OStoreDB::EnqueueingWorkerThread::run() {
}
}
//------------------------------------------------------------------------------
// OStoreDB::delayIfNecessary()
//------------------------------------------------------------------------------
void OStoreDB::delayIfNecessary(log::LogContext& lc) {
bool delayInserted = false;
utils::Timer t;
uint64_t taskQueueSize = m_taskQueueSize;
double sleepDelay = 0;
double lockDelay = 0;
if (m_taskQueueSize > 10000) {
// Put a linear delay starting at 0 at 10000 element and going to 10s at 25000
double delay = 10.0 * (taskQueueSize - 10000) / 15000;
double delayInt;
::timespec ts;
ts.tv_nsec = std::modf(delay, &delayInt) * 1000 * 1000 *1000;
ts.tv_nsec = delayInt;
nanosleep(&ts, nullptr);
sleepDelay = t.secs(utils::Timer::resetCounter);
delayInserted = true;
}
if (!m_taskPostingSemaphore.tryAcquire()) {
m_taskPostingSemaphore.acquire();
lockDelay = t.secs(utils::Timer::resetCounter);
delayInserted = true;
}
if (delayInserted) {
log::ScopedParamContainer params(lc);
params.add("sleepDelay", sleepDelay)
.add("lockDelay", lockDelay)
.add("taskQueueSize", taskQueueSize);
lc.log(log::INFO, "In OStoreDB::delayIfNecessary(): inserted delay.");
}
}
//------------------------------------------------------------------------------
// OStoreDB::ping()
......@@ -466,10 +499,12 @@ void OStoreDB::queueArchive(const std::string &instanceName, const cta::common::
.add("diskFileId", aFile.diskFileId)
.add("agentReferencingTime", agentReferencingTime)
.add("insertionTime", insertionTime);
delayIfNecessary(logContext);
m_enqueueingTasksQueue.push(new EnqueueingTask([aReq, this]{
// This unique_ptr's destructor will ensure the OStoreDB object is not deleted before the thread exits.
auto scopedCounterDecrement = [this](void *){
m_taskQueueSize--;
m_taskPostingSemaphore.release();
};
// A bit ugly, but we need a non-null pointer for the "deleter" to be called.
std::unique_ptr<void, decltype(scopedCounterDecrement)> scopedCounterDecrementerInstance((void *)1, scopedCounterDecrement);
......@@ -775,10 +810,12 @@ std::string OStoreDB::queueRetrieve(const cta::common::dataStructures::RetrieveR
.add("vidSelectionTime", vidSelectionTime)
.add("agentReferencingTime", agentReferencingTime)
.add("insertionTime", insertionTime);
delayIfNecessary(logContext);
m_enqueueingTasksQueue.push(new EnqueueingTask([rReq, job, bestVid, this]{
// This unique_ptr's destructor will ensure the OStoreDB object is not deleted before the thread exits.
auto scopedCounterDecrement = [this](void *){
m_taskQueueSize--;
m_taskQueueSize--;
m_taskPostingSemaphore.release();
};
// A bit ugly, but we need a non-null pointer for the "deleter" to be called.
std::unique_ptr<void, decltype(scopedCounterDecrement)> scopedCounterDecrementerInstance((void *)1, scopedCounterDecrement);
......
......@@ -75,6 +75,10 @@ private:
cta::threading::BlockingQueue<EnqueueingTask*> & m_enqueueingTasksQueue;
};
std::vector<EnqueueingWorkerThread *> m_enqueueingWorkerThreads;
std::atomic<uint64_t> m_taskQueueSize; ///< This counter ensures destruction happens after the last thread completed.
/// Delay introduced before posting to the task queue when it becomes too long.
void delayIfNecessary(log::LogContext &lc);
cta::threading::Semaphore m_taskPostingSemaphore;
public:
void waitSubthreadsComplete() override;
void setThreadNumber(uint64_t threadNumber);
......@@ -374,7 +378,6 @@ private:
catalogue::Catalogue & m_catalogue;
log::Logger & m_logger;
objectstore::AgentReference *m_agentReference = nullptr;
std::atomic<uint64_t> m_taskQueueSize; ///< This counter ensures destruction happens after the last thread completed.
};
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment