diff --git a/objectstore/BackendRados.cpp b/objectstore/BackendRados.cpp index 9f7a12c30a4be0c29d31f24e538da1ed66f43f90..81ea269d07bef4dffcd303253ed181d1a3f59d02 100644 --- a/objectstore/BackendRados.cpp +++ b/objectstore/BackendRados.cpp @@ -26,6 +26,55 @@ #include <valgrind/helgrind.h> #include <random> +// This macro should be defined to get printouts to understand timings of locking. +// Usually while running BackendTestRados/BackendAbstractTest.MultithreadLockingInterface +// Also define TEST_RADOS in objectstore/BackendRadosTestSwitch.hpp. +// Nunber of threads/passes should be reduced in the test for any usefullness. +#undef DEBUG_RADOS_LOCK_TIMINGS +#ifdef DEBUG_RADOS_LOCK_TIMINGS + +namespace { + std::atomic<double> previousSec; + std::atomic<bool> everReleased{false}; + std::atomic<double> lastReleased; + +void timestampedPrint (const char * f, const char *s) { + struct ::timeval tv; + ::gettimeofday(&tv, nullptr); + double localPreviousSec=previousSec; + double secs=previousSec=tv.tv_sec % 1000 + tv.tv_usec / 1000.0 / 1000; + uint8_t tid = syscall(__NR_gettid) % 100; + ::printf ("%03.06f %02.06f %02d %s %s\n", secs, secs - localPreviousSec, tid, f, s); + ::fflush(::stdout); +} + +void notifyReleased() { + struct ::timeval tv; + ::gettimeofday(&tv, nullptr); + lastReleased=tv.tv_sec + tv.tv_usec / 1000.0 / 1000; + everReleased=true; +} + +void notifyLocked() { + if (everReleased) { + struct ::timeval tv; + ::gettimeofday(&tv, nullptr); + ::printf ("Relocked after %02.06f\n", (tv.tv_sec + tv.tv_usec / 1000.0 / 1000) - lastReleased); + ::fflush(::stdout); + } +} + +} + +#define TIMESTAMPEDPRINT(A) timestampedPrint(__PRETTY_FUNCTION__, (A)) +#define NOTIFYLOCKED() notifyLocked() +#define NOTIFYRELEASED() notifyReleased() +#else +#define TIMESTAMPEDPRINT(A) +#define NOTIFYLOCKED() +#define NOTIFYRELEASED() +#endif + namespace cta { namespace objectstore { BackendRados::BackendRados(const std::string & userId, const std::string & pool, const std::string &radosNameSpace) : @@ -115,6 +164,7 @@ void BackendRados::ScopedLock::release() { // We should be tolerant with unlocking a deleted object, which is part // of the lock-remove-(implicit unlock) cycle when we delete an object // we hence overlook the ENOENT errors. + TIMESTAMPEDPRINT("Pre-release"); int rc=m_context.unlock(m_oid, "lock", m_clientId); switch (-rc) { case ENOENT: @@ -125,7 +175,21 @@ void BackendRados::ScopedLock::release() { m_oid); break; } + NOTIFYRELEASED(); + TIMESTAMPEDPRINT("Post-release/pre-notify"); + // Notify potential waiters to take their chances now on the lock. + utils::Timer t; + librados::bufferlist bl; + //librados::bufferlist * pBl = new librados::bufferlist; + //librados::AioCompletion * completion = librados::Rados::aio_create_completion((void *)pBl, notifyCallback, nullptr); + librados::AioCompletion * completion = librados::Rados::aio_create_completion(nullptr, nullptr, nullptr); + cta::exception::Errnum::throwOnReturnedErrno(-m_context.aio_notify(m_oid, completion, bl, 10000, nullptr), + "In BackendRados::ScopedLock::release(): failed to aio_notify()"); + completion->release(); + //printf(" %0.06f", t.secs()); fflush(stdout); + //printf("N"); fflush(stdout); m_lockSet = false; + TIMESTAMPEDPRINT("Post-notify"); } void BackendRados::ScopedLock::set(const std::string& oid, const std::string clientId) { @@ -138,6 +202,60 @@ BackendRados::ScopedLock::~ScopedLock() { release(); } +//void BackendRados::notifyCallback(librados::completion_t cb, void* pBp) { +// delete (librados::bufferlist *)pBp; +//} + + +BackendRados::LockWatcher::LockWatcher(librados::IoCtx& context, const std::string& name): + m_context(context), m_name(name) { + m_future = m_promise.get_future(); +// m_watchHandleFuture = m_watchHandlePromise.get_future(); +// TIMESTAMPEDPRINT("Pre-aio-watch2"); +// librados::AioCompletion * completion = librados::Rados::aio_create_completion(this, watchCallback, nullptr); +// cta::exception::Errnum::throwOnReturnedErrno(-m_context.aio_watch2(name, completion, &m_watchHandle, this, 10000)); +// completion->release(); +// TIMESTAMPEDPRINT("Post-aio-watch2"); + TIMESTAMPEDPRINT("Pre-watch2"); + cta::exception::Errnum::throwOnReturnedErrno(-m_context.watch2(name, &m_watchHandle, this)); + TIMESTAMPEDPRINT("Post-watch2"); +} + +void BackendRados::LockWatcher::handle_error(uint64_t cookie, int err) { + //printf("rne %s ",m_name.c_str()); fflush(stdout); + //printf("rne "); fflush(stdout); + m_promise.set_value(); + TIMESTAMPEDPRINT(""); +} + +void BackendRados::LockWatcher::handle_notify(uint64_t notify_id, uint64_t cookie, uint64_t notifier_id, librados::bufferlist& bl) { + //printf("rnn %s ",m_name.c_str()); fflush(stdout); + // printf("n"); fflush(stdout); + m_promise.set_value(); + TIMESTAMPEDPRINT(""); +} + +void BackendRados::LockWatcher::wait(const durationUs& timeout) { + TIMESTAMPEDPRINT("Pre-wait"); + m_future.wait_for(timeout); + TIMESTAMPEDPRINT("Post-wait"); +} + +BackendRados::LockWatcher::~LockWatcher() { +// TIMESTAMPEDPRINT("Pre-wait"); +// m_watchHandleFuture.wait(); +// TIMESTAMPEDPRINT("Post-wait/pre-unwatch2"); + TIMESTAMPEDPRINT("Pre-unwatch2"); + m_context.unwatch2(m_watchHandle); + TIMESTAMPEDPRINT("Post-unwatch2"); +} + +//void BackendRados::LockWatcher::watchCallback(librados::completion_t cb, void* arg) { +// ((LockWatcher *)arg)->m_watchHandlePromise.set_value(); +// TIMESTAMPEDPRINT(""); +//} + + std::string BackendRados::createUniqueClientId() { // Build a unique client name: host:thread char buff[200]; @@ -159,29 +277,44 @@ BackendRados::ScopedLock* BackendRados::lockExclusive(std::string name, uint64_t tv.tv_sec = 240; int rc; std::unique_ptr<ScopedLock> ret(new ScopedLock(m_radosCtx)); - // Crude backoff: we will measure the RTT of the call and backoff a faction of this amount multiplied - // by the number of tries (and capped by a maximum). Then the value will be randomized - // (betweend and 50-150%) - size_t backoff=1; utils::Timer t, timeoutTimer; while (true) { + TIMESTAMPEDPRINT("Pre-lock"); rc = m_radosCtx.lock_exclusive(name, "lock", client, "", &tv, 0); + if (!rc) { + TIMESTAMPEDPRINT("Post-lock (got it)"); + NOTIFYLOCKED(); + } else { + TIMESTAMPEDPRINT("Post-lock"); + } if (-EBUSY != rc) break; + // The lock is taken. Start a watch on it immediately. Inspired from the algorithm listed her: + // https://zookeeper.apache.org/doc/r3.1.2/recipes.html#sc_recipes_Locks + TIMESTAMPEDPRINT("Pre-watch-setup"); + LockWatcher watcher(m_radosCtx, name); + TIMESTAMPEDPRINT("Post-watch-setup/Pre-relock"); + // We need to retry the lock after establishing the watch: it could have been released during that time. + rc = m_radosCtx.lock_exclusive(name, "lock", client, "", &tv, 0); + if (!rc) { + TIMESTAMPEDPRINT("Post-relock (got it)"); + NOTIFYLOCKED(); + } else { + TIMESTAMPEDPRINT("Post-relock"); + } + if (-EBUSY != rc) break; + LockWatcher::durationUs watchTimeout = LockWatcher::durationUs(5L * 1000 * 1000); // We will poll at least every 5 second. + // If we are dealing with a user-defined timeout, take it into account. + if (timeout_us) { + watchTimeout = std::min(watchTimeout, + LockWatcher::durationUs(timeout_us) - LockWatcher::durationUs(timeoutTimer.usecs())); + // Make sure the value makes sense if we just crossed the deadline. + watchTimeout = std::max(watchTimeout, LockWatcher::durationUs(1)); + } + watcher.wait(watchTimeout); + TIMESTAMPEDPRINT("Post-wait"); if (timeout_us && (timeoutTimer.usecs() > (int64_t)timeout_us)) { throw exception::Exception("In BackendRados::lockExclusive(): timeout."); } - timespec ts; - auto wait=t.usecs(utils::Timer::resetCounter)*backoff++/c_backoffFraction; - wait = std::min(wait, c_maxWait); - if (backoff>c_maxBackoff) backoff=1; - // We need to get a random number [50, 150] - std::default_random_engine dre(std::chrono::system_clock::now().time_since_epoch().count()); - std::uniform_int_distribution<size_t> distribution(50, 150); - decltype(wait) randFactor=distribution(dre); - wait=(wait * randFactor)/100; - ts.tv_sec = wait/(1000*1000); - ts.tv_nsec = (wait % (1000*1000)) * 1000; - nanosleep(&ts, nullptr); } cta::exception::Errnum::throwOnReturnedErrno(-rc, std::string("In ObjectStoreRados::lockExclusive, failed to librados::IoCtx::lock_exclusive: ") + diff --git a/objectstore/BackendRados.hpp b/objectstore/BackendRados.hpp index bb70a1ce4815c8e7f203cd34a3c37f717fe48909..2edea965151f6d24e9d12078bd9fe2fdc88b0989 100644 --- a/objectstore/BackendRados.hpp +++ b/objectstore/BackendRados.hpp @@ -84,7 +84,34 @@ public: ScopedLock * lockExclusive(std::string name, uint64_t timeout_us=0) override; ScopedLock * lockShared(std::string name, uint64_t timeout_us=0) override; +private: + /** + * A class handling the watch part when waiting for a lock. + */ + class LockWatcher: public librados::WatchCtx2 { + public: + LockWatcher(librados::IoCtx & context, const std::string & name); + void handle_error(uint64_t cookie, int err) override; + void handle_notify(uint64_t notify_id, uint64_t cookie, uint64_t notifier_id, librados::bufferlist& bl) override; + virtual ~LockWatcher(); + typedef std::chrono::microseconds durationUs; + void wait(const durationUs & timeout); + private: + std::promise<void> m_promise; + std::future<void> m_future; + librados::IoCtx & m_context; + std::string m_name; + uint64_t m_watchHandle; +// std::promise<void> m_watchHandlePromise; +// std::future<void> m_watchHandleFuture; +// static void watchCallback(librados::completion_t cb, void* arg); + }; +// // A very simple callback for rados::aio_notify() so we basically +// // notify in a fire and forget fashion. +// static void notifyCallback(librados::completion_t cb, void *pBp); + +public: /** * A class following up the check existence-lock-fetch-update-write-unlock. Constructor implicitly * starts the lock step. diff --git a/objectstore/BackendTest.cpp b/objectstore/BackendTest.cpp index 495a75ec464a5c89aabdb182379cc249cc61ff40..940803216540f726b3541b6e35a85660ea5c3203 100644 --- a/objectstore/BackendTest.cpp +++ b/objectstore/BackendTest.cpp @@ -20,6 +20,7 @@ #include "BackendVFS.hpp" #include "BackendRados.hpp" #include "common/exception/Exception.hpp" +#include "common/Timer.hpp" #include "BackendRadosTestSwitch.hpp" #include "tests/TestsCompileTimeSwitches.hpp" #include <atomic> @@ -32,6 +33,8 @@ TEST_P(BackendAbstractTest, BasicReadWrite) { const std::string testValue = "1234"; const std::string testSecondValue = "12345"; const std::string testObjectName = "testObject"; + // Make sure there is no leftover from previous runs + try { m_os->remove(testObjectName); } catch (...) {} // Check we can verify the absence of an object ASSERT_FALSE(m_os->exists(testObjectName)); // Check that an update attempt fails on a non-existing object @@ -59,6 +62,8 @@ TEST_P(BackendAbstractTest, LockingInterface) { //std::cout << "Type=" << m_os->typeName() << std::endl; const std::string testObjectName = "testObject"; const std::string nonExistingObject = "thisObjectShouldNotExist"; + // Make sure we will recreate the object (no leftover from previous runs) + try { m_os->remove(testObjectName); } catch (...) {} m_os->create(testObjectName, "X"); { // If we don't scope the object, the release will blow up after @@ -100,13 +105,14 @@ TEST_P(BackendAbstractTest, MultithreadLockingInterface) { uint64_t val=0; std::string valStr; valStr.append((char*)&val, sizeof(val)); + try { m_os->remove(testObjectName); } catch (...) {} m_os->create(testObjectName, valStr); auto os=m_os; std::atomic<uint64_t> counter(0); std::list<std::future<void>> insertCompletions; std::list<std::function<void()>> lambdas; - const size_t threadCount=100; - const size_t passCount=100; + const size_t threadCount=20; + const size_t passCount=20; for (size_t i=0; i<threadCount; i++) { lambdas.emplace_back([&testObjectName,os,&passCount,&counter,i](){ for (size_t pass=0; pass<passCount; pass++) { @@ -118,11 +124,10 @@ TEST_P(BackendAbstractTest, MultithreadLockingInterface) { valStr.append((char*)&val, sizeof(val)); os->atomicOverwrite(testObjectName, valStr); counter++; -// printf("%03ld ",(uint64_t)i); -// fflush(stdout); + //printf("%03ld ",(uint64_t)i); fflush(stdout);. } -// printf("--- "); -// fflush(stdout); + //printf("--- "); + //fflush(stdout); }); insertCompletions.emplace_back(std::async(std::launch::async, lambdas.back())); }