Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • chimeratk-mirror/ApplicationCore
1 result
Show changes
Commits on Source (18)
......@@ -12,6 +12,7 @@
#include <boost/thread/latch.hpp>
#include <barrier>
#include <memory>
namespace ChimeraTK {
......@@ -171,7 +172,7 @@ namespace ChimeraTK {
* Version number of the last exception. Only access under the error mutex. Intentionally not initialised with
* nullptr. It is propagated as long as the device is not successfully opened.
*/
VersionNumber _exceptionVersionNumber = {};
VersionNumber _exceptionVersionNumber;
/** The error flag whether the device is functional. protected by the errorMutex. */
bool _deviceHasError{true};
......@@ -207,39 +208,66 @@ namespace ChimeraTK {
friend class ExceptionHandlingDecorator;
/**
* The shared state of a group of devices which are recovered together.
* The shared state of a group of DeviceManagers which are recovering together.
*/
struct RecoveryGroup {
enum class RecoveryStage { NO_ERROR, DETECTION, OPEN, INIT_HANDERS, RECOVERY_ACCESSORS };
static constexpr const char* stageToString(RecoveryStage stage) {
switch(stage) {
case RecoveryStage::NO_ERROR:
return "RecoveryStage::NO_ERROR";
case RecoveryStage::DETECTION:
return "RecoveryStage::DETECTION";
case RecoveryStage::OPEN:
return "RecoveryStage::OPEN";
case RecoveryStage::INIT_HANDERS:
return "RecoveryStage::INIT_HANDLERS";
case RecoveryStage::RECOVERY_ACCESSORS:
return "RecoveryStage::RECOVERY_ACCESSORS";
}
throw ChimeraTK::logic_error("Unknown recovery stage, cannot convert to string.");
}
std::set<DeviceBackend::BackendID> recoveryBackendIDs; ///< All backend ID in this recovery group
Application* app{nullptr}; ///< Pointer to the application to access the recovery lock.
/**
* A barrier is used to ensure that each stage of the recovery process is completed
* A barrier is used to ensure that each stage of the recovery process is completed
* by all DeviceManagers in the recovery group before the next stage is started.
* \li Detection of the error condition
* \li Re-opening of the device
* \li Running the initialisation handlers
* \li Writing the recovery accessors
*/
std::shared_ptr<std::barrier<>> recoveryBarrier;
std::atomic<size_t> errorAtStage{0}; ///< Flag whether recovery has to be repeated.
std::set<DeviceBackend::BackendID> recoveryBackendIDs; ///< All backend ID in this recovery group
Application* app{nullptr}; ///< Pointer to the application to access the recovery lock.
std::barrier<> recoveryBarrier{1};
/** Indicator whether recovery has to be repeated, and from which barrier. */
std::atomic<RecoveryStage> errorAtStage{RecoveryStage::NO_ERROR};
/**
* Protect the device open/close actions in a group. It ensures that no other DeviceManager can perform an open
* or close action while this lock is being held. This is needed in several places:
*
* 1. Devices are closed when running init handlers, and no other DeviceManager must close the device to run its
* init handler while an initi handler is accessing the device.
* 2. In backends, open() and close() are not thread safe. This prevents from concurrent open() calls, concurrent
* close() calls, and calling open()/close() a the same time from different threads.
*/
std::mutex deviceOpenCloseMutex;
// Wait at the barrier for a stage to complete.
// Returns 'true' if the stage was completed successfully.
bool waitForRecoveryStage(size_t stage);
bool waitForRecoveryStage(RecoveryStage stage);
void setErrorAtStage(size_t stage);
void setErrorAtStage(RecoveryStage stage);
// contains a barrier to wait that all threads have seen the change.
void resetErrorStage();
void resetErrorAtStage();
};
std::shared_ptr<RecoveryGroup> _recoveryGroup;
/**
* The globalDeviceOpenMutex is a work around for backends which do not implement open() in a thread-safe
* manner. This seems to be the case for most backends currently, hence it was decided to implement this
* workaround for now (see g).
*/
static std::mutex globalDeviceOpenMutex;
/// Helper function for better error messages
std::string stageToString(RecoveryGroup::RecoveryStage stage);
};
/********************************************************************************************************************/
......
......@@ -15,14 +15,16 @@ namespace ChimeraTK {
: ApplicationModule(application, "/Devices/" + Utilities::escapeName(deviceAliasOrCDD, false), ""),
_device(deviceAliasOrCDD), _deviceAliasOrCDD(deviceAliasOrCDD), _owner{application} {
auto involvedBackends = _device.getInvolvedBackendIDs();
_recoveryGroup =
std::make_shared<RecoveryGroup>(std::make_shared<std::barrier<>>(1), true, involvedBackends, _owner);
// Create a recovery group with barrier size 1.
_recoveryGroup = std::make_shared<RecoveryGroup>(involvedBackends, _owner);
// loop all already existing DeviceManagers and look for shared backends
size_t recoveryGroupSize{1};
int64_t recoveryGroupSize{1};
for(const auto& [alias, existingDeviceManager] : Application::getInstance().getDeviceManagerMap()) {
for(auto backendID : involvedBackends) {
if(existingDeviceManager->_recoveryGroup->recoveryBackendIDs.contains(backendID)) {
// Note: The next line modifies involvedBackends while iterating over it.
// This is only allowed because the iteration is terminated with a break below!
involvedBackends.merge(existingDeviceManager->_recoveryGroup->recoveryBackendIDs);
existingDeviceManager->_recoveryGroup = _recoveryGroup;
++recoveryGroupSize;
......@@ -34,7 +36,10 @@ namespace ChimeraTK {
if(recoveryGroupSize > 1) {
// update the recovery group
_recoveryGroup->recoveryBackendIDs = involvedBackends;
_recoveryGroup->recoveryBarrier = std::make_shared<std::barrier<>>(recoveryGroupSize);
// The barrier does not allow modification of the number of participants.
// We put a placement new to replace it. Before this, we have to call the constructor of the old instance.
_recoveryGroup->recoveryBarrier.~barrier();
new(&_recoveryGroup->recoveryBarrier) std::barrier(recoveryGroupSize);
}
}
......@@ -140,7 +145,7 @@ namespace ChimeraTK {
mainLoopImpl();
}
catch(...) {
_recoveryGroup->recoveryBarrier->arrive_and_drop();
_recoveryGroup->recoveryBarrier.arrive_and_drop();
throw;
}
}
......@@ -160,13 +165,13 @@ namespace ChimeraTK {
while(true) {
/****************************************************************************************************************/
// Sync point (stage 1 complete):
// Sync point DETECTION:
// The manager has seen an error and (re)starts recovery. Wait until all
// involved DeviceManagers have seen it.
/****************************************************************************************************************/
_recoveryGroup->waitForRecoveryStage(1);
// Reset error stage to 0. Contains a barrier to make sure all threads have seen it.
_recoveryGroup->resetErrorStage();
_recoveryGroup->waitForRecoveryStage(RecoveryGroup::RecoveryStage::DETECTION);
// Reset error stage to NO_ERROR. Contains a barrier to make sure all threads have seen it.
_recoveryGroup->resetErrorAtStage();
// Starting stage 2
// [Spec: 2.3.1] (Re)-open the device.
......@@ -177,7 +182,7 @@ namespace ChimeraTK {
_owner->getTestableMode().lock("Attempt open/recover device");
try {
std::lock_guard<std::mutex> globalDeviceOpenLock(globalDeviceOpenMutex);
std::lock_guard<std::mutex> deviceOpenLock(_recoveryGroup->deviceOpenCloseMutex);
_device.open();
}
catch(ChimeraTK::runtime_error& e) {
......@@ -219,24 +224,24 @@ namespace ChimeraTK {
}
/****************************************************************************************************************/
// Sync point (stage 2 complete): Device opened. Synchronise before stating init scripts.
// Sync point (stage OPEN complete): Device opened. Synchronise before starting init scripts.
/****************************************************************************************************************/
assert(_recoveryGroup->errorAtStage == 0); // no other thread must have modified the flag until here.
assert(_recoveryGroup->errorAtStage ==
RecoveryGroup::RecoveryStage::NO_ERROR); // no other thread must have modified the flag until here.
// no need to check the return value. No error reported in stage 2
_recoveryGroup->waitForRecoveryStage(2);
// no need to check the return value. No error reported in the OPEN stage.
_recoveryGroup->waitForRecoveryStage(RecoveryGroup::RecoveryStage::OPEN);
// Starting stage 3
// Starting stage INIT_HANDLERS
// [Spec: 2.3.2] Run initialisation handlers
try {
for(auto& initHandler : _initialisationHandlers) {
{
std::lock_guard<std::mutex> globalDeviceOpenLock(globalDeviceOpenMutex);
// Hold the open/close lock while executing the init handler, so no other
// DeviceManager closes the device while the init handler is running.
std::lock_guard<std::mutex> openCloseLock(_recoveryGroup->deviceOpenCloseMutex);
_device.close();
}
initHandler(_device);
{
std::lock_guard<std::mutex> globalDeviceOpenLock(globalDeviceOpenMutex);
initHandler(_device);
_device.open();
}
}
......@@ -251,21 +256,22 @@ namespace ChimeraTK {
}
// Mark recovery as failed. All DeviceManagers will return to the beginning of the recovery after the next
// synchronisation point
_recoveryGroup->setErrorAtStage(3);
_recoveryGroup->setErrorAtStage(RecoveryGroup::RecoveryStage::INIT_HANDERS);
}
/****************************************************************************************************************/
// Sync point (stage 3 complete): Wait until all init scripts are done before writing recovery accessors.
// Sync point (stage INIT_HANDLERS complete): Wait until all init scripts are done before writing recovery
// accessors.
/****************************************************************************************************************/
if(!_recoveryGroup->waitForRecoveryStage(3)) {
// If another thread has already continued and set an error for recovery stage 4,
// waitForRecoveryStage(3) will still return 'true', so all threads arrive at the
// barrier for stage 4.
// If there was error in stage 3, all threads will see it here and continue.
if(!_recoveryGroup->waitForRecoveryStage(RecoveryGroup::RecoveryStage::INIT_HANDERS)) {
// If another thread has already continued and set an error for recovery stage RECOVERY_ACCESSORS,
// waitForRecoveryStage(INIT_HANDLERS) will still return 'true', so all threads arrive at the
// barrier for stage RECOVERY_ACCESSORS.
// If there was error in stage INIT_HANDLERS, all threads will see it here and continue.
continue;
}
// Starting stage 4
// Starting stage RECOVERY_ACCESSORS
// Write all recovery accessors
// We are now entering the critical recovery section. It is protected by the recovery mutex until the
// deviceHasError flag has been cleared.
......@@ -291,13 +297,13 @@ namespace ChimeraTK {
}
// Mark recovery as failed. All DeviceManagers will return to the beginning of the recovery after the next
// synchronisation point
_recoveryGroup->setErrorAtStage(4);
_recoveryGroup->setErrorAtStage(RecoveryGroup::RecoveryStage::RECOVERY_ACCESSORS);
}
/****************************************************************************************************************/
// Sync point (stage 4 complete): All recovery accessors written.
// Sync point (stage RECOVERY_ACCESSORS complete): All recovery accessors have been written.
/****************************************************************************************************************/
if(!_recoveryGroup->waitForRecoveryStage(4)) {
if(!_recoveryGroup->waitForRecoveryStage(RecoveryGroup::RecoveryStage::RECOVERY_ACCESSORS)) {
// In case of error, jump back to the beginning of the recovery/open procedure
continue;
}
......@@ -475,16 +481,12 @@ namespace ChimeraTK {
/********************************************************************************************************************/
// static variables
std::mutex DeviceManager::globalDeviceOpenMutex;
/********************************************************************************************************************/
bool DeviceManager::RecoveryGroup::waitForRecoveryStage(size_t stage) {
app->getTestableMode().unlock("Sync after after " + std::to_string(stage));
recoveryBarrier->arrive_and_wait();
bool DeviceManager::RecoveryGroup::waitForRecoveryStage(RecoveryStage stage) {
app->getTestableMode().unlock(std::string("DeviceManager: Sync device recovery after ") + stageToString(stage));
recoveryBarrier.arrive_and_wait();
boost::this_thread::interruption_point();
app->getTestableMode().lock("Starting stage " + std::to_string(stage + 1));
app->getTestableMode().lock(
std::string("DeviceManager: Starting next device recovery stage after ") + stageToString(stage));
// Return false if errorAtStage is the current stage.
return !(errorAtStage == stage);
......@@ -492,20 +494,20 @@ namespace ChimeraTK {
/********************************************************************************************************************/
void DeviceManager::RecoveryGroup::setErrorAtStage(size_t stage) {
assert((errorAtStage == 0) || (errorAtStage == stage));
void DeviceManager::RecoveryGroup::setErrorAtStage(RecoveryStage stage) {
assert((errorAtStage == RecoveryStage::NO_ERROR) || (errorAtStage == stage));
errorAtStage = stage;
}
/********************************************************************************************************************/
void DeviceManager::RecoveryGroup::resetErrorStage() {
errorAtStage = 0;
void DeviceManager::RecoveryGroup::resetErrorAtStage() {
errorAtStage = RecoveryStage::NO_ERROR;
app->getTestableMode().unlock("Sync before resetting recovery group stage");
recoveryBarrier->arrive_and_wait();
app->getTestableMode().unlock("DeviceManager: Sync after resetting recovery group stage");
recoveryBarrier.arrive_and_wait();
boost::this_thread::interruption_point();
app->getTestableMode().lock("Starting recovery");
app->getTestableMode().lock("DeviceManager: Starting recovery");
}
} // namespace ChimeraTK
Raw1 (ExceptionDummy:1?map=test3.map)
Raw2 (ExceptionDummy:2?map=test3.map)
Raw3 (ExceptionDummy:3?map=test3.map)
Raw4 (ExceptionDummy:4?map=test3.map)
Raw5 (ExceptionDummy:5?map=test3.map)
Use1 (logicalNameMap?map=oneTarget.xlmap&target=Raw1)
Use2 (logicalNameMap?map=oneTarget.xlmap&target=Raw2)
Use3 (logicalNameMap?map=oneTarget.xlmap&target=Raw3)
Use4 (logicalNameMap?map=oneTarget.xlmap&target=Raw4)
Use5 (logicalNameMap?map=oneTarget.xlmap&target=Raw5)
Use12 (logicalNameMap?map=twoTargets.xlmap&target1=Raw1&target2=Raw2)
Use34 (logicalNameMap?map=twoTargetsRO.xlmap&target1=Raw3&target2=Raw4)
Use23 (logicalNameMap?map=twoTargets.xlmap&target1=Raw2&target2=Raw3)
Use12ReadOnly (logicalNameMap?map=twoTargetsRO.xlmap&target1=Raw1&target2=Raw2)
......@@ -270,12 +270,17 @@ namespace Tests::testDeviceInitialisationHandler {
std::cout << "TestDeviceClosedInInitHandler" << std::endl;
TestApplication app;
app.dev.addInitialisationHandler([](ctk::Device& d) { BOOST_CHECK(!d.isOpened()); });
// Cache the opened state in the init handler in a variable. BOOST_CHECK is not threat safe and
// cannot directly be used in the handler.
bool isOpenedInInitHandler{
true}; // We expect false, so we set the starting value to true to know the test is sensitive.
app.dev.addInitialisationHandler([&](ctk::Device& d) { isOpenedInInitHandler = d.isOpened(); });
ctk::TestFacility testFacility(app);
testFacility.runApplication();
// The testFacility in testable mode guarantees that the device has been opened at this point. So we know the init
// handler with the test has been run at this point.
BOOST_CHECK(!isOpenedInInitHandler);
}
} // namespace Tests::testDeviceInitialisationHandler