Commit 34255f98 authored by Eric Cano's avatar Eric Cano
Browse files

Fixed missing fetch after relocking an object store object.

Added many conditional printouts in unit tests. Those printouts are controlled by #defines in a central file.
parent 21ec95e8
......@@ -21,6 +21,9 @@
#include "BackendRados.hpp"
#include "common/exception/Exception.hpp"
#include "BackendRadosTestSwitch.hpp"
#include "tests/TestsCompileTimeSwitches.hpp"
#include <atomic>
#include <future>
namespace unitTests {
......@@ -86,6 +89,58 @@ TEST_P(BackendAbstractTest, LockingInterface) {
ASSERT_FALSE(m_os->exists(nonExistingObject));
}
TEST_P(BackendAbstractTest, MultithreadLockingInterface) {
// This test validates the locking of the object store
// Launch many threads which will bump up a number stored in a file.
// Similarly counted in memory
#ifdef LOOPING_TEST
do {
#endif
const std::string testObjectName = "testObject";
uint64_t val=0;
std::string valStr;
valStr.append((char*)&val, sizeof(val));
m_os->create(testObjectName, valStr);
auto os=m_os;
std::atomic<uint64_t> counter(0);
std::list<std::future<void>> insertCompletions;
std::list<std::function<void()>> lambdas;
const size_t threadCount=100;
const size_t passCount=100;
for (size_t i=0; i<threadCount; i++) {
lambdas.emplace_back([&testObjectName,os,&passCount,&counter](){
for (size_t pass=0; pass<passCount; pass++) {
std::unique_ptr<cta::objectstore::Backend::ScopedLock> sl(os->lockExclusive(testObjectName));
uint64_t val;
os->read(testObjectName).copy((char*)&val,sizeof(val));
val++;
std::string valStr;
valStr.append((char*)&val, sizeof(val));
os->atomicOverwrite(testObjectName, valStr);
counter++;
}
});
insertCompletions.emplace_back(std::async(std::launch::async, lambdas.back()));
}
for (auto &ic: insertCompletions) { ic.wait(); }
insertCompletions.clear();
lambdas.clear();
m_os->read(testObjectName).copy((char*)&val, sizeof(val));
#ifdef LOOPING_TEST
printf(".");
if (counter != val) {
std::cout << "counter=" << counter << " val=" << val << std::endl;
std::cout << "ERROR!! *************************************************************" << std::endl;
while (true) { sleep (1); }
}
#endif
ASSERT_EQ(counter, val);
m_os->remove(testObjectName);
#ifdef LOOPING_TEST
}while(true);
#endif
}
TEST_P(BackendAbstractTest, AsyncIOInterface) {
// Create object to update.
const std::string testValue = "1234";
......
......@@ -20,6 +20,7 @@
#include "common/exception/Errnum.hpp"
#include "common/utils/utils.hpp"
#include "common/utils/Regex.hpp"
#include "tests/TestsCompileTimeSwitches.hpp"
#include <fstream>
#include <stdlib.h>
......@@ -33,8 +34,7 @@
#include <sys/socket.h>
#include <sys/stat.h>
#include <dirent.h>
#undef DEBUG_PRINT_LOGS
#ifdef DEBUG_PRINT_LOGS
#ifdef LOW_LEVEL_TRACING
#include <iostream>
#endif
......@@ -50,7 +50,7 @@ BackendVFS::BackendVFS(int line, const char *file) : m_deleteOnExit(true) {
} else {
throw cta::exception::Errnum("Failed to create temporary directory");
}
#ifdef DEBUG_PRINT_LOGS
#ifdef LOW_LEVEL_TRACING
std::cout << "In BackendVFS::BackendVFS(): created object store " << m_root << " "
<< std::hex << this << file << ":" << line << std::endl;
#endif
......@@ -86,11 +86,11 @@ BackendVFS::~BackendVFS() {
if (m_deleteOnExit) {
// Delete the created directories recursively
nftw (m_root.c_str(), deleteFileOrDirectory, 100, FTW_DEPTH);
#ifdef DEBUG_PRINT_LOGS
#ifdef LOW_LEVEL_TRACING
::printf("In BackendVFS::~BackendVFS(): deleted object store %s 0x%p\n", m_root.c_str(), (void*)this);
#endif
}
#ifdef DEBUG_PRINT_LOGS
#ifdef LOW_LEVEL_TRACING
else {
::printf("In BackendVFS::~BackendVFS(): skipping object store deletion %s 0x%p\n", m_root.c_str(), (void*)this);
}
......@@ -109,6 +109,9 @@ void BackendVFS::create(std::string name, std::string content) {
cta::exception::Errnum::throwOnMinusOne(fd,
"In ObjectStoreVFS::create, failed to open the file");
fileCreated = true;
#ifdef LOW_LEVEL_TRACING
::printf("In BackendVFS::create(): created object %s, tid=%li\n", name.c_str(), ::syscall(SYS_gettid));
#endif
cta::exception::Errnum::throwOnMinusOne(
::write(fd, content.c_str(), content.size()),
"In ObjectStoreVFS::create, failed to write to file");
......@@ -156,6 +159,9 @@ void BackendVFS::atomicOverwrite(std::string name, std::string content) {
cta::exception::Errnum::throwOnMinusOne(
::rename(tempPath.c_str(), targetPath.c_str()),
err.str());
#ifdef LOW_LEVEL_TRACING
::printf("In BackendVFS::atomicOverwrite(): overwrote object %s, tid=%li\n", name.c_str(), ::syscall(SYS_gettid));
#endif
}
std::string BackendVFS::read(std::string name) {
......@@ -172,6 +178,9 @@ std::string BackendVFS::read(std::string name) {
file.read(buff, sizeof (buff));
ret.append(buff, file.gcount());
}
#ifdef LOW_LEVEL_TRACING
::printf("In BackendVFS::read(): read object %s, tid=%li\n", name.c_str(), ::syscall(SYS_gettid));
#endif
return ret;
}
......@@ -180,13 +189,22 @@ void BackendVFS::remove(std::string name) {
std::string lockPath = m_root + "/." + name + ".lock";
cta::exception::Errnum::throwOnNonZero(unlink(path.c_str()), "Failed to remove object file");
cta::exception::Errnum::throwOnNonZero(unlink(lockPath.c_str()), "Failed to remove lock file.");
#ifdef LOW_LEVEL_TRACING
::printf("In BackendVFS::read(): removed object %s, tid=%li\n", name.c_str(), ::syscall(SYS_gettid));
#endif
}
bool BackendVFS::exists(std::string name) {
std::string path = m_root + "/" + name;
std::string lockPath = m_root + "/." + name + ".lock";
struct stat buffer;
return (stat(path.c_str(), &buffer)==0 && stat(lockPath.c_str(), &buffer)==0);
struct stat buffer;
#ifdef LOW_LEVEL_TRACING
bool filePresent=stat(path.c_str(), &buffer)==0 && stat(lockPath.c_str(), &buffer)==0;
::printf("In BackendVFS::exists(): tested presence of object %s, tid=%li, exists=%d\n", name.c_str(), ::syscall(SYS_gettid), filePresent);
return filePresent;
#else
return (stat(path.c_str(), &buffer)==0 && stat(lockPath.c_str(), &buffer)==0);
#endif
}
std::list<std::string> BackendVFS::list() {
......@@ -211,7 +229,7 @@ BackendVFS::Parameters* BackendVFS::getParams() {
void BackendVFS::ScopedLock::release() {
if (!m_fdSet) return;
#ifdef DEBUG_PRINT_LOGS
#ifdef LOW_LEVEL_TRACING
if (m_fd==-1) {
std::cout << "Warning: fd=-1!" << std::endl;
}
......@@ -219,8 +237,8 @@ void BackendVFS::ScopedLock::release() {
::flock(m_fd, LOCK_UN);
::close(m_fd);
m_fdSet = false;
#ifdef DEBUG_PRINT_LOGS
std::cout << "Unlocked " << m_path << " with fd=" << m_fd << " tid=" << syscall(SYS_gettid) << std::endl;
#ifdef LOW_LEVEL_TRACING
::printf("BackendVFS::ScopedLock::release() Unlocked %s with fd=%d tid=%ld\n", m_path.c_str() , m_fd, syscall(SYS_gettid));
#endif
}
......@@ -259,30 +277,30 @@ BackendVFS::ScopedLock * BackendVFS::lockHelper(std::string name, int type) {
throw ex;
}
#ifdef DEBUG_PRINT_LOGS
#ifdef LOW_LEVEL_TRACING
if (ret->m_fd==-1) {
std::cout << "Warning: fd=-1!" << std::endl;
}
#endif
#endif
return ret.release();
}
BackendVFS::ScopedLock * BackendVFS::lockExclusive(std::string name) {
std::unique_ptr<BackendVFS::ScopedLock> ret(lockHelper(name, LOCK_EX));
#ifdef DEBUG_PRINT_LOGS
std::cout << "LockedExclusive " << name << " with fd=" << ret->m_fd
<< " path=" << ret->m_path << " tid=" << syscall(SYS_gettid) << std::endl;
#endif
#ifdef LOW_LEVEL_TRACING
::printf ("In BackendVFS::lockExclusive(): LockedExclusive %s with fd=%d path=%s tid=%ld\n",
name.c_str(), ret->m_fd, ret->m_path.c_str(), syscall(SYS_gettid));
#endif
return ret.release();
}
BackendVFS::ScopedLock * BackendVFS::lockShared(std::string name) {
std::unique_ptr<BackendVFS::ScopedLock> ret(lockHelper(name, LOCK_SH));
#ifdef DEBUG_PRINT_LOGS
std::cout << "LockedShared " << name << " with fd=" << ret->m_fd
<< " path=" << ret->m_path << " tid=" << syscall(SYS_gettid) << std::endl;
#endif
#ifdef LOW_LEVEL_TRACING
::printf ("In BackendVFS::lockShared(): LockedShared %s with fd=%d path=%s tid=%ld\n",
name.c_str(), ret->m_fd, ret->m_path.c_str(), syscall(SYS_gettid));
#endif
return ret.release();
}
......
......@@ -296,6 +296,7 @@ void OStoreDB::getLockedAndFetchedArchiveQueue(cta::objectstore::ArchiveQueue& a
} catch (cta::exception::Exception & ex) {
rel.release();
ScopedExclusiveLock rexl(re);
re.fetch();
archiveQueue.setAddress(re.addOrGetArchiveQueueAndCommit(tapePool, *m_agentReference));
}
}
......
......@@ -16,6 +16,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "objectstore/BackendRadosTestSwitch.hpp"
#include "tests/TestsCompileTimeSwitches.hpp"
#include "common/UserIdentity.hpp"
#include "common/admin/AdminHost.hpp"
#include "common/admin/AdminUser.hpp"
......@@ -26,7 +28,9 @@
#include "OStoreDB/OStoreDBFactory.hpp"
#include "objectstore/BackendRados.hpp"
#include "common/log/DummyLogger.hpp"
#include "objectstore/BackendRadosTestSwitch.hpp"
#ifdef STDOUT_LOGGING
#include "common/log/StdoutLogger.hpp"
#endif
#include <exception>
#include <gtest/gtest.h>
......@@ -127,18 +131,23 @@ const cta::common::dataStructures::SecurityIdentity SchedulerDatabaseTest::s_adm
const cta::common::dataStructures::SecurityIdentity SchedulerDatabaseTest::s_userOnAdminHost(SchedulerDatabaseTest::s_user, SchedulerDatabaseTest::s_adminHost);
const cta::common::dataStructures::SecurityIdentity SchedulerDatabaseTest::s_userOnUserHost(SchedulerDatabaseTest::s_user, SchedulerDatabaseTest::s_userHost);
// unit test is disabled as it is pretty long to run.
TEST_P(SchedulerDatabaseTest, createManyArchiveJobs) {
using namespace cta;
#ifndef STDOUT_LOGGING
cta::log::DummyLogger dl("");
#else
cta::log::StdoutLogger dl("");
#endif
cta::log::LogContext lc(dl);
cta::SchedulerDatabase &db = getDb();
// Inject 1000 archive jobs to the db.
// Inject 100 archive jobs to the db.
const size_t filesToDo = 100;
std::list<std::future<void>> jobInsertions;
std::list<std::function<void()>> lambdas;
#ifdef LOOPING_TEST
do {
#endif
for (size_t i=0; i<filesToDo; i++) {
lambdas.emplace_back(
[i,&db,&lc](){
......@@ -201,11 +210,22 @@ TEST_P(SchedulerDatabaseTest, createManyArchiveJobs) {
else
done = true;
}
#ifdef LOOPING_TEST
if (filesToDo != count) {
std::cout << "ERROR_DETECTED!!! ********************************************* BLocking test" << std::endl;
std::cout << "Missing=" << filesToDo - count << " count=" << count << " filesToDo=" << filesToDo << std::endl;
while (true) { sleep(1);}
} else {
std::cout << "**************************************************************************************************** END OF RUN *******************************************************\n" << std::endl;
}
#endif
ASSERT_EQ(filesToDo, count);
am->complete(time(nullptr));
am.reset(nullptr);
moutInfo.reset(nullptr);
#ifdef LOOPING_TEST
} while (true);
#endif
const size_t filesToDo2 = 200;
for (size_t i=0; i<filesToDo2; i++) {
lambdas.emplace_back(
......@@ -252,10 +272,20 @@ TEST_P(SchedulerDatabaseTest, createManyArchiveJobs) {
// Then load all archive jobs into memory (2nd pass)
// Create mount.
#ifdef LOOPING_TEST
auto moutInfo = db.getMountInfo();
cta::catalogue::TapeForWriting tfw;
tfw.tapePool = "tapePool";
tfw.vid = "vid";
auto am = moutInfo->createArchiveMount(tfw, "drive", "library", "host", time(nullptr));
auto done = false;
auto count = 0;
#else
moutInfo = db.getMountInfo();
am = moutInfo->createArchiveMount(tfw, "drive", "library", "host", time(nullptr));
done = false;
count = 0;
#endif
while (!done) {
auto aj = am->getNextJob(lc);
if (aj.get()) {
......
/*
* The CERN Tape Archive (CTA) project
* Copyright (C) 2015 CERN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#undef LOOPING_TEST
#undef LOW_LEVEL_TRACING
#undef STDOUT_LOGGING
\ No newline at end of file
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment