Commit e94555ed authored by Michael Davis's avatar Michael Davis
Browse files

Merge remote-tracking branch 'origin/master' into cta_eos

Conflicts:
	cta.spec.in
parents fc5782ab 0fd88391
......@@ -17,4 +17,32 @@ cheers, Germán
-----------------------------------------------------------------
Date: Mon, 14 Aug 2017 14:40:02 +0200
From: Steven Murray <Steven.Murray@cern.ch>
To: Vlado Bahyl <Vladimir.Bahyl@cern.ch>
CC: German Cancio Melia <German.Cancio.Melia@cern.ch>, Eric Cano <Eric.Cano@cern.ch>
Subject: Fwd: add ceiling for concurrent jobs
[-- Autoprezeranie použitím /usr/bin/w3m -dump -I 'Windows-1252' -T text/html ''/tmp/vlado/mutt.html'' --]
Hi Vlado,
Andreas has asked us to try the “space.wfe.ntx" configuration parameter again.
We know that German was not convinced of the usefulness of this parameter,
however Andreas says that we should monitor the “ntx” column of “eos space ls”
command in order to see what is happening. Andreas did a test of his own and
saw the thread limiting working as expected. Let’s see if we are experiencing
a different behaviour with our production eosctatape instance.
The command to set the “space.wfe.ntx” parameter is as follows:
eos space config default space.wfe.ntx=10
The command to monitor the current number of threads is
eos space ls
Cheers,
Steve
-----------------------------------------------------------------
......@@ -61,7 +61,9 @@ else (OCCI_SUPPORT)
endif (OCCI_SUPPORT)
add_library (ctacatalogue SHARED
${CATALOGUE_LIB_SRC_FILES})
${CATALOGUE_LIB_SRC_FILES})
set_property(TARGET ctacatalogue PROPERTY SOVERSION "${CTA_SOVERSION}")
set_property(TARGET ctacatalogue PROPERTY VERSION "${CTA_LIBVERSION}")
install (TARGETS ctacatalogue DESTINATION usr/${CMAKE_INSTALL_LIBDIR})
......@@ -94,6 +96,8 @@ set(IN_MEMORY_CATALOGUE_UNIT_TESTS_LIB_SRC_FILES
add_library (ctainmemorycatalogueunittests SHARED
${IN_MEMORY_CATALOGUE_UNIT_TESTS_LIB_SRC_FILES})
set_property(TARGET ctainmemorycatalogueunittests PROPERTY SOVERSION "${CTA_SOVERSION}")
set_property(TARGET ctainmemorycatalogueunittests PROPERTY VERSION "${CTA_LIBVERSION}")
target_link_libraries (ctainmemorycatalogueunittests
ctacatalogue)
......@@ -106,6 +110,8 @@ set (CATALOGUE_UNIT_TESTS_LIB_SRC_FILES
add_library (ctacatalogueunittests SHARED
${CATALOGUE_UNIT_TESTS_LIB_SRC_FILES})
set_property(TARGET ctacatalogueunittests PROPERTY SOVERSION "${CTA_SOVERSION}")
set_property(TARGET ctacatalogueunittests PROPERTY VERSION "${CTA_LIBVERSION}")
target_link_libraries (ctacatalogueunittests
ctacatalogue)
......@@ -195,4 +201,7 @@ set (CATALOGUE_CMD_LINE_UNIT_TESTS_LIB_SRC_FILES
add_library (ctacataloguecmdlineunittests SHARED
${CATALOGUE_CMD_LINE_UNIT_TESTS_LIB_SRC_FILES})
set_property(TARGET ctacataloguecmdlineunittests PROPERTY SOVERSION "${CTA_SOVERSION}")
set_property(TARGET ctacataloguecmdlineunittests PROPERTY VERSION "${CTA_LIBVERSION}")
install (TARGETS ctacataloguecmdlineunittests DESTINATION usr/${CMAKE_INSTALL_LIBDIR})
#Default values
# Default CTA Version
set(CTA_VERSION 0)
set(CTA_RELEASE 0)
#Get version number from environment if set.
# Shared object internal version (used in SONAME)
set(CTA_SOVERSION 0)
# Shared object external version (used in filename)
set(CTA_SOMAJOR ${CTA_SOVERSION})
set(CTA_SOMINOR 1)
set(CTA_SOPATCH 0)
# Get version number from environment if set.
if(NOT $ENV{CTA_VERSION} STREQUAL "")
set(CTA_VERSION $ENV{CTA_VERSION})
message(STATUS "Got CTA_VERSION from environment: ${CTA_VERSION}")
......@@ -11,7 +18,7 @@ else(NOT $ENV{CTA_VERSION} STREQUAL "")
message(STATUS "Using default CTA_VERSION: ${CTA_VERSION}")
endif(NOT $ENV{CTA_VERSION} STREQUAL "")
#Get release number from environment if set
# Get release number from environment if set
if(NOT $ENV{CTA_RELEASE} STREQUAL "")
set(CTA_RELEASE $ENV{CTA_RELEASE})
message(STATUS "Got CTA_RELEASE from environment: ${CTA_RELEASE}")
......@@ -26,3 +33,8 @@ if(DEFINED VCS_VERSION)
endif(DEFINED VCS_VERSION)
message(STATUS "CTA version is ${CTA_VERSION}-${CTA_RELEASE}")
# Shared library versioning
set(CTA_LIBVERSION ${CTA_SOMAJOR}.${CTA_SOMINOR}.${CTA_SOPATCH})
message(STATUS "CTA shared object version is ${CTA_LIBVERSION} (${CTA_SOVERSION})")
......@@ -135,6 +135,9 @@ set (COMMON_LIB_SRC_FILES
add_library (ctacommon SHARED
${COMMON_LIB_SRC_FILES})
set_property(TARGET ctacommon PROPERTY SOVERSION "${CTA_SOVERSION}")
set_property(TARGET ctacommon PROPERTY VERSION "${CTA_LIBVERSION}")
install (TARGETS ctacommon DESTINATION usr/${CMAKE_INSTALL_LIBDIR})
target_link_libraries (ctacommon
......@@ -176,6 +179,8 @@ set (COMMON_UNIT_TESTS_LIB_SRC_FILES
add_library (ctacommonunittests SHARED
${COMMON_UNIT_TESTS_LIB_SRC_FILES})
set_property(TARGET ctacommonunittests PROPERTY SOVERSION "${CTA_SOVERSION}")
set_property(TARGET ctacommonunittests PROPERTY VERSION "${CTA_LIBVERSION}")
install(TARGETS ctacommonunittests DESTINATION usr/${CMAKE_INSTALL_LIBDIR})
......
......@@ -4,5 +4,7 @@ include_directories(${PROJECT_SOURCE_DIR}/tapeserver)
add_library (ctaexceptionunittests SHARED
ExceptionTest.cpp)
set_property(TARGET ctaexceptionunittests PROPERTY SOVERSION "${CTA_SOVERSION}")
set_property(TARGET ctaexceptionunittests PROPERTY VERSION "${CTA_LIBVERSION}")
install(TARGETS ctaexceptionunittests DESTINATION usr/${CMAKE_INSTALL_LIBDIR})
......@@ -152,7 +152,10 @@ test -f /CANSTART && echo OK || exit 1
xrdcp /etc/group root://${eoshost}:/${EOS_TMP_DIR}/testFile
# prepare EOS workflow
# enable eos workflow engine
eos space config default space.wfe=on
# set the thread-pool size of concurrently running workflows
eos space config default space.wfe.ntx=3
# ATTENTION
# for sss authorisation unix has to be replaced by sss
......
......@@ -12,10 +12,14 @@
%define mydist %{?dist}
%endif
# Skipping unit tests (for developpers)
#--------------------------------------
# Skipping unit tests (for developers)
#-------------------------------------
%define skip_unit_tests @SKIP_UNIT_TESTS@
# Build with XRootD SSI extensions (for developers)
#--------------------------------------------------
%define build_xroot_ssi 0@BUILD_XROOT_SSI@
# General settings
#-----------------
Summary: CERN Tape Archive
......@@ -37,10 +41,17 @@ BuildRequires: xrootd-client-devel = 1:4.2.3
BuildRequires: xrootd-devel = 1:4.2.3
BuildRequires: cryptopp-devel >= 5.6.2
%else
BuildRequires: xrootd-client-devel = 1:20170801.af5dd76
BuildRequires: xrootd-devel = 1:20170801.af5dd76
BuildRequires: xrootd-server-devel = 1:20170801.af5dd76
BuildRequires: xrootd-private-devel = 1:20170801.af5dd76
%if "%{?build_xroot_ssi}" == "0"
BuildRequires: xrootd-client-devel = 1:4.4.1
BuildRequires: xrootd-devel = 1:4.4.1
BuildRequires: xrootd-server-devel = 1:4.4.1
BuildRequires: xrootd-private-devel = 1:4.4.1
%else
BuildRequires: xrootd-client-devel = 1:4.7.0
BuildRequires: xrootd-devel = 1:4.7.0
BuildRequires: xrootd-server-devel = 1:4.7.0
BuildRequires: xrootd-private-devel = 1:4.7.0
%endif
BuildRequires: librados-devel >= 11.0, libradosstriper-devel >= 11.0,
BuildRequires: protobuf3-compiler >= 3.3.1 protobuf3-devel >= 3.3.1
BuildRequires: gmock-devel >= 1.5.0 gtest-devel >= 1.5.0
......@@ -155,11 +166,11 @@ The xroot plugin
/usr/bin/getent group cta || /usr/sbin/groupadd cta
/usr/bin/getent passwd cta || /usr/sbin/useradd -s /bin/nologin -c "CTA system account" -g cta cta
%files -n cta-frontend
%defattr(-,root,root)
%defattr(0755,root,root)
%{_libdir}/libXrdCtaOfs.so*
%{_libdir}/libXrdSsiCta.so*
%attr(0755,cta,cta) %dir /var/log/cta
%attr(0644,root,root) %config(noreplace) /etc/logrotate.d/cta-frontend
%attr(0755,root,root) %{_libdir}/libXrdCtaOfs.so
%attr(0755,root,root) %{_libdir}/libXrdSsiCta.so
%attr(0644,root,root) %config(noreplace) /etc/xrootd/xrootd-cta.cfg
%attr(0644,root,root) %config(noreplace) %{_sysconfdir}/cta/cta-frontend.conf
......@@ -187,17 +198,17 @@ Requires: oracle-instantclient12.1-basic
CERN Tape Archive:
The shared libraries
%files -n cta-lib
%defattr(-,root,root)
%attr(0755,root,root) %{_libdir}/libctacatalogue.so
%attr(0755,root,root) %{_libdir}/libctacommon.so
%defattr(0755,root,root,-)
%{_libdir}/libctacatalogue.so*
%{_libdir}/libctacommon.so*
#TODO: merge util and common
%attr(0755,root,root) %{_libdir}/libctautils.so
%attr(0755,root,root) %{_libdir}/libctascheduler.so
%attr(0755,root,root) %{_libdir}/libctaobjectstore.so
%attr(0755,root,root) %{_libdir}/libctamediachanger.so
%attr(0755,root,root) %{_libdir}/libctamessages.so
%attr(0755,root,root) %{_libdir}/libctamessagesutils.so
%attr(0755,root,root) %{_libdir}/libctardbms.so
%{_libdir}/libctautils.so*
%{_libdir}/libctascheduler.so*
%{_libdir}/libctaobjectstore.so*
%{_libdir}/libctamediachanger.so*
%{_libdir}/libctamessages.so*
%{_libdir}/libctamessagesutils.so*
%{_libdir}/libctardbms.so*
%attr(0644,root,root) %config(noreplace) %{_sysconfdir}/cta/cta_catalogue_db.conf.example
#CTA-lib installs libraries so we need ldconfig.
......@@ -213,29 +224,30 @@ Requires: cta-taped = %{ctaVersion}-%{ctaRelease}%{mydist}
CERN Tape Archive:
Unit tests and system tests with virtual tape drives
%files -n cta-systemtests
%attr(0755,root,root) %{_libdir}/libsystemTestHelperTests.so
%attr(0755,root,root) %{_libdir}/libcta-tapedSystemTests.so
%attr(0755,root,root) %{_bindir}/cta-catalogueUnitTests
%attr(0755,root,root) %{_bindir}/cta-unitTests
%attr(0755,root,root) %{_bindir}/cta-unitTests-multiProcess
%attr(0755,root,root) %{_bindir}/cta-valgrindUnitTests.sh
%attr(0755,root,root) %{_bindir}/cta-unitPlusSystemTests.sh
%attr(0755,root,root) %{_libdir}/libctacatalogueunittests.so
%attr(0755,root,root) %{_libdir}/libctacataloguecmdlineunittests.so
%attr(0755,root,root) %{_libdir}/libctacommonunittests.so
%attr(0755,root,root) %{_libdir}/libctaexceptionunittests.so
%attr(0755,root,root) %{_libdir}/libctainmemorycatalogueunittests.so
%attr(0755,root,root) %{_libdir}/libctaobjectstoreunittests.so
%attr(0755,root,root) %{_libdir}/libctardbmsunittests.so
%attr(0755,root,root) %{_libdir}/libctaschedulerunittests.so
%attr(0755,root,root) %{_libdir}/libctatapeserverdaemonunittests.so
%attr(0755,root,root) %{_libdir}/libctatapeserverdriveunittests.so
%attr(0755,root,root) %{_libdir}/libctatapeserverfileunittests.so
%attr(0755,root,root) %{_libdir}/libctatapeserverscsiunittests.so
%attr(0755,root,root) %{_libdir}/libctadaemonunittests.so
%attr(0755,root,root) %{_libdir}/libctamediachangerunittests.so
%attr(0755,root,root) %{_bindir}/cta-systemTests
%attr(0755,root,root) %{_libdir}/libctadaemonunittests-multiprocess.so
%defattr(0755,root,root,-)
%{_libdir}/libsystemTestHelperTests.so*
%{_libdir}/libcta-tapedSystemTests.so*
%{_bindir}/cta-catalogueUnitTests
%{_bindir}/cta-unitTests
%{_bindir}/cta-unitTests-multiProcess
%{_bindir}/cta-valgrindUnitTests.sh
%{_bindir}/cta-unitPlusSystemTests.sh
%{_libdir}/libctacatalogueunittests.so*
%{_libdir}/libctacataloguecmdlineunittests.so*
%{_libdir}/libctacommonunittests.so*
%{_libdir}/libctaexceptionunittests.so*
%{_libdir}/libctainmemorycatalogueunittests.so*
%{_libdir}/libctaobjectstoreunittests.so*
%{_libdir}/libctardbmsunittests.so*
%{_libdir}/libctaschedulerunittests.so*
%{_libdir}/libctatapeserverdaemonunittests.so*
%{_libdir}/libctatapeserverdriveunittests.so*
%{_libdir}/libctatapeserverfileunittests.so*
%{_libdir}/libctatapeserverscsiunittests.so*
%{_libdir}/libctadaemonunittests.so*
%{_libdir}/libctamediachangerunittests.so*
%{_bindir}/cta-systemTests
%{_libdir}/libctadaemonunittests-multiprocess.so*
%attr(0644,root,root) %{_datadir}/%{name}-%{ctaVersion}/unittest/*.suppr
%package -n cta-objectstore-tools
......
......@@ -25,3 +25,4 @@ add_library (ctaeos
DiskReporter.cpp
DiskReporterFactory.cpp
EOSReporter.cpp)
......@@ -55,6 +55,8 @@ set (MEDIACHANGER_LIB_SRC_FILES
include_directories (${PROTOBUF3_INCLUDE_DIRS})
add_library (ctamediachanger SHARED
${MEDIACHANGER_LIB_SRC_FILES})
set_property(TARGET ctamediachanger PROPERTY SOVERSION "${CTA_SOVERSION}")
set_property(TARGET ctamediachanger PROPERTY VERSION "${CTA_LIBVERSION}")
target_link_libraries (ctamediachanger
ctacommon
......@@ -77,6 +79,8 @@ set (MEDIACHANGER_UNIT_TESTS_LIB_SRC_FILES
add_library (ctamediachangerunittests SHARED
${MEDIACHANGER_UNIT_TESTS_LIB_SRC_FILES})
set_property(TARGET ctamediachangerunittests PROPERTY SOVERSION "${CTA_SOVERSION}")
set_property(TARGET ctamediachangerunittests PROPERTY VERSION "${CTA_LIBVERSION}")
target_link_libraries (ctamediachangerunittests
ctamediachanger)
......
......@@ -108,6 +108,7 @@ public:
CTA_GENERATE_EXCEPTION_CLASS(CouldNotFetch);
CTA_GENERATE_EXCEPTION_CLASS(CouldNotUpdateValue);
CTA_GENERATE_EXCEPTION_CLASS(CouldNotCommit);
CTA_GENERATE_EXCEPTION_CLASS(CouldNotDelete);
CTA_GENERATE_EXCEPTION_CLASS(CouldNotUnlock);
CTA_GENERATE_EXCEPTION_CLASS(AsyncUpdateWithDelete);
......@@ -138,6 +139,34 @@ public:
*/
virtual AsyncUpdater * asyncUpdate(const std::string & name, std::function <std::string(const std::string &)> & update) = 0;
/**
* A base class handling asynchronous sequence of lock exclusive, delete.
* Each operation will be asynchronous, and the result
* (success or exception) will be returned via the wait() function call.
*/
class AsyncDeleter {
public:
/**
* Waits for completion (success) of throws exception (failure).
*/
virtual void wait() = 0;
/**
* Destructor
*/
virtual ~AsyncDeleter() {}
};
/**
* Triggers the asynchronous object delete sequence, as described in
* AsyncDeleter class description.
*
* @param name The name of the object to be deleted.
* @return pointer to a newly created AsyncDeleter
*/
virtual AsyncDeleter * asyncDelete(const std::string & name) = 0;
/**
* Base class for the representation of the parameters of the BackendStore.
*/
......
......@@ -338,12 +338,17 @@ void BackendRados::AsyncUpdater::statCallback(librados::completion_t completion,
std::string("In BackendRados::AsyncUpdater::statCallback(): could not stat object: ") + au.m_name);
throw Backend::NoSuchObject(errnum.getMessageValue());
}
// Check the size. If zero, we locked an empty object: delete and throw an exception.
// Check the size. If zero, we locked an empty object: delete and throw an exception in the deleteCallback
if (!au.m_size) {
// TODO. This is going to lock the callback thread of the rados context for a while.
// As this is not supposde to happen often, this is acceptable.
au.m_backend.remove(au.m_name);
throw Backend::NoSuchObject(std::string("In BackendRados::AsyncUpdater::statCallback(): no such object: ") + au.m_name);
// launch the delete operation (async).
librados::AioCompletion * aioc = librados::Rados::aio_create_completion(&au, deleteEmptyCallback, nullptr);
auto rc=au.m_backend.m_radosCtx.aio_remove(au.m_name, aioc);
aioc->release();
if (rc) {
cta::exception::Errnum errnum (-rc, std::string("In BackendRados::AsyncUpdater::statCallback(): failed to launch aio_remove(): ")+au.m_name);
throw Backend::CouldNotDelete(errnum.getMessageValue());
}
return;
}
// Stat is done, we can launch the read operation (async).
librados::AioCompletion * aioc = librados::Rados::aio_create_completion(&au, fetchCallback, nullptr);
......@@ -359,6 +364,23 @@ void BackendRados::AsyncUpdater::statCallback(librados::completion_t completion,
}
}
void BackendRados::AsyncUpdater::deleteEmptyCallback(librados::completion_t completion, void* pThis) {
AsyncUpdater & au = *((AsyncUpdater *) pThis);
try {
// Check that the object could be deleted.
if (rados_aio_get_return_value(completion)) {
cta::exception::Errnum errnum(-rados_aio_get_return_value(completion),
std::string("In BackendRados::AsyncUpdater::deleteEmptyCallback(): could not delete object: ") + au.m_name);
throw Backend::CouldNotDelete(errnum.getMessageValue());
}
// object deleted then throw an exception
throw Backend::NoSuchObject(std::string("In BackendRados::AsyncUpdater::deleteEmptyCallback(): no such object: ") + au.m_name);
} catch (...) {
ANNOTATE_HAPPENS_BEFORE(&au.m_job);
au.m_job.set_exception(std::current_exception());
}
}
void BackendRados::AsyncUpdater::fetchCallback(librados::completion_t completion, void* pThis) {
AsyncUpdater & au = *((AsyncUpdater *) pThis);
try {
......@@ -484,6 +506,153 @@ void BackendRados::AsyncUpdater::wait() {
ANNOTATE_HAPPENS_BEFORE_FORGET_ALL(&m_job);
}
Backend::AsyncDeleter* BackendRados::asyncDelete(const std::string & name)
{
return new AsyncDeleter(*this, name);
}
BackendRados::AsyncDeleter::AsyncDeleter(BackendRados& be, const std::string& name):
m_backend(be), m_name(name), m_job(), m_jobFuture(m_job.get_future()) {
// At construction time, we just fire a lock.
try {
// Rados does not have aio_lock, so we do it in an async.
// Operation is lock (synchronous), and then launch an async stat, then read.
// The async function never fails, exceptions go to the promise (as everywhere).
m_lockAsync.reset(new std::future<void>(std::async(std::launch::async,
[this](){
try {
m_lockClient = BackendRados::createUniqueClientId();
struct timeval tv;
tv.tv_usec = 0;
tv.tv_sec = 60;
int rc;
// TODO: could be improved (but need aio_lock in rados, not available at the time
// of writing).
// Crude backoff: we will measure the RTT of the call and backoff a faction of this amount multiplied
// by the number of tries (and capped by a maximum). Then the value will be randomized
// (betweend and 50-150%)
size_t backoff=1;
utils::Timer t;
while (true) {
rc = m_backend.m_radosCtx.lock_exclusive(m_name, "lock", m_lockClient, "", &tv, 0);
if (-EBUSY != rc) break;
timespec ts;
auto wait=t.usecs(utils::Timer::resetCounter)*backoff++/c_backoffFraction;
wait = std::min(wait, c_maxWait);
if (backoff>c_maxBackoff) backoff=1;
// We need to get a random number [50, 150]
std::default_random_engine dre(std::chrono::system_clock::now().time_since_epoch().count());
std::uniform_int_distribution<size_t> distribution(50, 150);
decltype(wait) randFactor=distribution(dre);
wait=(wait * randFactor)/100;
ts.tv_sec = wait/(1000*1000);
ts.tv_nsec = (wait % (1000*1000)) * 1000;
nanosleep(&ts, nullptr);
}
if (rc) {
cta::exception::Errnum errnum(-rc,
std::string("In BackendRados::AsyncDeleter::statCallback::lock_lambda(): failed to librados::IoCtx::lock_exclusive: ") +
m_name + "/" + "lock" + "/" + m_lockClient + "//");
throw CouldNotLock(errnum.getMessageValue());
}
// Locking is done, we can launch the stat operation (async).
librados::AioCompletion * aioc = librados::Rados::aio_create_completion(this, statCallback, nullptr);
rc=m_backend.m_radosCtx.aio_stat(m_name, aioc, &m_size, &date);
aioc->release();
if (rc) {
cta::exception::Errnum errnum (-rc, std::string("In BackendRados::AsyncDeleter::AsyncDeleter::lock_lambda(): failed to launch aio_stat(): ")+m_name);
throw Backend::NoSuchObject(errnum.getMessageValue());
}
} catch (...) {
ANNOTATE_HAPPENS_BEFORE(&m_job);
m_job.set_exception(std::current_exception());
}
}
)));
} catch (...) {
ANNOTATE_HAPPENS_BEFORE(&m_job);
m_job.set_exception(std::current_exception());
}
}
void BackendRados::AsyncDeleter::statCallback(librados::completion_t completion, void* pThis) {
AsyncDeleter & au = *((AsyncDeleter *) pThis);
try {
// Get the object size (it's already locked).
if (rados_aio_get_return_value(completion)) {
cta::exception::Errnum errnum(-rados_aio_get_return_value(completion),
std::string("In BackendRados::AsyncDeleter::statCallback(): could not stat object: ") + au.m_name);
throw Backend::NoSuchObject(errnum.getMessageValue());
}
// Check the size. If zero, we locked an empty object: delete and throw an exception.
if (!au.m_size) {
// launch the delete operation (async).
librados::AioCompletion * aioc = librados::Rados::aio_create_completion(&au, deleteEmptyCallback, nullptr);
auto rc=au.m_backend.m_radosCtx.aio_remove(au.m_name, aioc);
aioc->release();
if (rc) {
cta::exception::Errnum errnum (-rc, std::string("In BackendRados::AsyncDeleter::statCallback():"
" failed to launch aio_remove() for zero size object: ")+au.m_name);
throw Backend::CouldNotDelete(errnum.getMessageValue());
}
return;
}
// Stat is done, we can launch the delete operation (async).
librados::AioCompletion * aioc = librados::Rados::aio_create_completion(&au, deleteCallback, nullptr);
auto rc=au.m_backend.m_radosCtx.aio_remove(au.m_name, aioc);
aioc->release();
if (rc) {
cta::exception::Errnum errnum (-rc, std::string("In BackendRados::AsyncUpdater::statCallback(): failed to launch aio_remove(): ")+au.m_name);
throw Backend::CouldNotDelete(errnum.getMessageValue());
}
} catch (...) {
ANNOTATE_HAPPENS_BEFORE(&au.m_job);
au.m_job.set_exception(std::current_exception());
}
}
void BackendRados::AsyncDeleter::deleteCallback(librados::completion_t completion, void* pThis) {
AsyncDeleter & au = *((AsyncDeleter *) pThis);
try {
// Check that the object could be deleted.
if (rados_aio_get_return_value(completion)) {
cta::exception::Errnum errnum(-rados_aio_get_return_value(completion),
std::string("In BackendRados::AsyncDeleter::deleteCallback(): could not delete object: ") + au.m_name);
throw Backend::CouldNotDelete(errnum.getMessageValue());
}
// Done!
ANNOTATE_HAPPENS_BEFORE(&au.m_job);
au.m_job.set_value();
} catch (...) {
ANNOTATE_HAPPENS_BEFORE(&au.m_job);
au.m_job.set_exception(std::current_exception());
}
}
void BackendRados::AsyncDeleter::deleteEmptyCallback(librados::completion_t completion, void* pThis) {
AsyncDeleter & au = *((AsyncDeleter *) pThis);
try {
// Check that the object could be deleted.
if (rados_aio_get_return_value(completion)) {
cta::exception::Errnum errnum(-rados_aio_get_return_value(completion),
std::string("In BackendRados::AsyncDeleter::deleteEmptyCallback(): could not delete object: ") + au.m_name);
throw Backend::CouldNotDelete(errnum.getMessageValue());
}
// object deleted then throw an exception
throw Backend::NoSuchObject(std::string("In BackendRados::AsyncDeleter::deleteEmptyCallback(): no such object: ") + au.m_name);
} catch (...) {
ANNOTATE_HAPPENS_BEFORE(&au.m_job);
au.m_job.set_exception(std::current_exception());
}
}
void BackendRados::AsyncDeleter::wait() {
m_jobFuture.get();
ANNOTATE_HAPPENS_AFTER(&m_job);
ANNOTATE_HAPPENS_BEFORE_FORGET_ALL(&m_job);
}
std::string BackendRados::Parameters::toStr() {
std::stringstream ret;
ret << "userId=" << m_userId << " pool=" << m_pool;
......
......@@ -120,6 +120,8 @@ public:
std::unique_ptr<std::future<void>> m_updateAsync;
/** The first callback operation (after checking existence) */
static void statCallback(librados::completion_t completion, void *pThis);
/** Async delete in case of zero sized object */
static void deleteEmptyCallback(librados::completion_t completion, void *pThis);
/** The second callback operation (after reading) */
static void fetchCallback(librados::completion_t completion, void *pThis);
/** The third callback operation (after writing) */
......@@ -130,6 +132,45 @@ public:
Backend::AsyncUpdater* asyncUpdate(const std::string & name, std::function <std::string(const std::string &)> & update) override;
/**
* A class following up the check existence-lock-delete.
* Constructor implicitly starts the lock step.
*/
class AsyncDeleter: public Backend::AsyncDeleter {
public:
AsyncDeleter(BackendRados & be, const std::string & name);
void wait() override;
private:
/** A reference to the backend */
BackendRados &m_backend;
/** The object name */
const std::string m_name;
/** Storage for stat operation (size) */
uint64_t m_size;
/** Storage for stat operation (date) */
time_t date;
/** The promise that will both do the job and allow synchronization with the caller. */
std::promise<void> m_job;
/** The future from m_jobs, which will be extracted before any thread gets a chance to play with it. */
std::future<void> m_jobFuture;
/** A future used to hold the structure of the lock operation. It will be either empty of complete at
destruction time */
std::unique_ptr<std::future<void>> m_lockAsync;
/** A string used to identify the locker */
std::string m_lockClient;
/** A future the hole the the structure of the update operation. It will be either empty of complete at
destruction time */
std::unique_ptr<std::future<void>> m_updateAsync;
/** The first callback operation (after checking existence) */
static void statCallback(librados::completion_t completion, void *pThis);
/** The second callback operation (after deleting) */
static void deleteCallback(librados::completion_t completion, void *pThis);
/** Async delete in case of zero sized object */
static void deleteEmptyCallback(librados::completion_t completion, void *pThis);
};
Backend::AsyncDeleter* asyncDelete(const std::string & name) override;
class Parameters: public Backend::Parameters {
friend class BackendRados;
public:
......
......@@ -377,6 +377,46 @@ void BackendVFS::AsyncUpdater::wait() {
ANNOTATE_HAPPENS_BEFORE_FORGET_ALL(&m_job);
}
BackendVFS::AsyncDeleter::AsyncDeleter(BackendVFS & be, const std::string& name):
m_backend(be), m_name(name),
m_job(std::async(std::launch::async,
[&](){
std::unique_ptr<ScopedLock> sl;
try { // locking already throws proper exceptions for no such file.