Skip to content
Snippets Groups Projects
Commit af7d6735 authored by Eric Cano's avatar Eric Cano
Browse files

Homogenized the behaviors of object store backends.

BackendVFS now throws the same exceptions as BackendRados.
The exceptions of the user provided update callback are now passed through.
A unit test was added.
parent 5f728fee
No related branches found
No related tags found
No related merge requests found
......@@ -288,14 +288,8 @@ void BackendRados::AsyncUpdater::fetchCallback(librados::completion_t completion
std::string("In In BackendRados::AsyncUpdater::fetchCallback::update_lambda(): failed to read buffer: ")
+ ex.what());
}
try {
// Execute the user's callback.
value=au.m_update(value);
} catch (std::exception & ex) {
throw CouldNotUpdateValue(
std::string("In In BackendRados::AsyncUpdater::fetchCallback::update_lambda(): failed to call update(): ")
+ ex.what());
}
// Execute the user's callback. Let exceptions fly through. User knows his own exceptions.
value=au.m_update(value);
try {
// Prepare result in buffer list.
au.m_radosBufferList.clear();
......
......@@ -100,6 +100,39 @@ TEST_P(BackendAbstractTest, AsyncIOInterface) {
m_os->remove(testObjectName);
}
TEST_P(BackendAbstractTest, AsyncIOInterfaceMultithread) {
// Create object to update.
const std::string testValue = "1234";
const std::string testSecondValue = "12345";
const std::string testObjectNameRadix = "testObject";
std::function<std::string(size_t)> testObjectName=[&](size_t i){
std::stringstream tom;
tom << testObjectNameRadix << i;
return tom.str();
};
std::function<std::string(size_t)> value=[&](size_t i){
std::stringstream val;
val << testSecondValue << i;
return val.str();
};
for (size_t i=0; i<10; i++) { try {m_os->remove(testObjectName(i));}catch(...){} }
std::list<std::unique_ptr<cta::objectstore::Backend::AsyncUpdater>> updaters;
std::list<std::function<std::string(const std::string &)>> lambdas;
for (size_t i=0; i<10; i++) {
m_os->create(testObjectName(i), testValue);
// Launch update of object via asynchronous IO
lambdas.emplace_back([i,&value](const std::string &s)->std::string{return value(i);});
updaters.emplace_back(m_os->asyncUpdate(testObjectName(i),lambdas.back()));
}
size_t i=0;
for (auto & u: updaters) {
u->wait();
ASSERT_EQ(value(i), m_os->read(testObjectName(i)));
m_os->remove(testObjectName(i));
i++;
}
}
TEST_P(BackendAbstractTest, ParametersInterface) {
//std::cout << "Type=" << m_os->typeName() << std::endl;
std::unique_ptr<cta::objectstore::Backend::Parameters> params(
......
......@@ -203,8 +203,6 @@ std::list<std::string> BackendVFS::list() {
return ret;
}
BackendVFS::Parameters* BackendVFS::getParams() {
std::unique_ptr<Parameters> ret(new Parameters);
ret->m_path = m_root;
......@@ -236,10 +234,13 @@ BackendVFS::ScopedLock * BackendVFS::lockHelper(std::string name, int type) {
int openErrno = errno;
struct ::stat sBuff;
int statResult = ::stat((m_root + name).c_str(), &sBuff);
int statErrno = errno;
if (ENOENT == openErrno && !statResult) {
ret->set(::open(path.c_str(), O_RDONLY | O_CREAT, S_IRWXU | S_IRGRP | S_IROTH), path);
exception::Errnum::throwOnMinusOne(ret->m_fd, "In BackendVFS::lockHelper(): Failed to recreate missing lock file");
} else {
if (statErrno == ENOENT)
throw Backend::NoSuchObject("In BackendVFS::lockHelper(): no such file");
const std::string errnoStr = utils::errnoToString(errno);
exception::Exception ex;
ex.getMessage() << "In BackendVFS::lockHelper(): Failed to open file " << path <<
......@@ -289,8 +290,32 @@ BackendVFS::AsyncUpdater::AsyncUpdater(BackendVFS & be, const std::string& name,
m_backend(be), m_name(name), m_update(update),
m_job(std::async(std::launch::async,
[&](){
std::unique_ptr<ScopedLock> sl(m_backend.lockExclusive(m_name));
m_backend.atomicOverwrite(m_name, m_update(m_backend.read(m_name)));
std::unique_ptr<ScopedLock> sl;
try { // locking already throws proper exceptions for no such file.
sl.reset(m_backend.lockExclusive(m_name));
} catch (Backend::NoSuchObject &) {
throw;
} catch (cta::exception::Exception & ex) {
throw Backend::CouldNotLock(ex.getMessageValue());
}
std::string preUpdateData;
try {
preUpdateData=m_backend.read(m_name);
} catch (cta::exception::Exception & ex) {
throw Backend::CouldNotFetch(ex.getMessageValue());
}
// Let user's exceptions go through.
std::string postUpdateData=m_update(preUpdateData);
try {
m_backend.atomicOverwrite(m_name, postUpdateData);
} catch (cta::exception::Exception & ex) {
throw Backend::CouldNotCommit(ex.getMessageValue());
}
try {
sl->release();
} catch (cta::exception::Exception & ex) {
throw Backend::CouldNotUnlock(ex.getMessageValue());
}
}))
{}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment