-
Martin Christoph Hierholzer authoredMartin Christoph Hierholzer authored
DeviceManager.cc 16.28 KiB
// SPDX-FileCopyrightText: Deutsches Elektronen-Synchrotron DESY, MSK, ChimeraTK Project <chimeratk-support@desy.de>
// SPDX-License-Identifier: LGPL-3.0-or-later
#include "DeviceManager.h"
#include "Utilities.h"
namespace ChimeraTK {
/*********************************************************************************************************************/
DeviceManager::DeviceManager(Application* application, const std::string& deviceAliasOrCDD)
: ApplicationModule(application, "/Devices/" + Utilities::escapeName(deviceAliasOrCDD, false), ""),
_device(deviceAliasOrCDD), _deviceAliasOrCDD(deviceAliasOrCDD), _owner{application} {}
/*********************************************************************************************************************/
std::vector<VariableNetworkNode> DeviceManager::getNodesList() const {
std::vector<VariableNetworkNode> rv;
// obtain register catalogue
auto catalog = _device.getRegisterCatalogue();
// iterate catalogue, create VariableNetworkNode for all registers
for(const auto& reg : catalog) {
// ignore 2D registers
if(reg.getNumberOfDimensions() > 1) {
continue;
}
// guess direction and determine update mode
VariableDirection direction{};
UpdateMode updateMode;
if(reg.isWriteable()) {
direction = {VariableDirection::consuming, false};
updateMode = UpdateMode::push;
}
else {
direction = {VariableDirection::feeding, false};
if(reg.getSupportedAccessModes().has(AccessMode::wait_for_new_data)) {
updateMode = UpdateMode::push;
}
else {
updateMode = UpdateMode::poll;
}
}
// find minimum type required to represent data
const std::type_info& valTyp = reg.getDataDescriptor().minimumDataType().getAsTypeInfo();
// create node and add to list
rv.emplace_back(reg.getRegisterName(), _deviceAliasOrCDD, reg.getRegisterName(), updateMode, direction, valTyp,
reg.getNumberOfElements());
}
return rv;
}
/*********************************************************************************************************************/
void DeviceManager::reportException(std::string errMsg) {
if(_owner->getTestableMode().isEnabled()) {
assert(_owner->getTestableMode().testLock());
}
// The error queue must only be modified when holding both mutexes (error mutex and testable mode mutex), because
// the testable mode counter must always be consistent with the content of the queue.
// To avoid deadlocks you must always first aquire the testable mode mutex if you need both.
// You can hold the error mutex without holding the testable mode mutex (for instance for checking the error
// predicate), but then you must not try to aquire the testable mode mutex!
boost::unique_lock<boost::shared_mutex> errorLock(_errorMutex);
if(!_deviceHasError) { // only report new errors if the device does not have reported errors already
if(_errorQueue.push(std::move(errMsg))) {
if(_owner->getTestableMode().isEnabled()) {
++_owner->getTestableMode()._counter;
}
} // else do nothing. There are plenty of errors reported already: The queue is full.
// set the error flag and notify the other threads
_deviceHasError = true;
_exceptionVersionNumber = {}; // generate a new exception version number
errorLock.unlock();
}
else {
errorLock.unlock();
}
}
/*********************************************************************************************************************/
VersionNumber DeviceManager::getExceptionVersionNumber() {
boost::shared_lock<boost::shared_mutex> errorLock(_errorMutex);
return _exceptionVersionNumber;
}
/*********************************************************************************************************************/
void DeviceManager::mainLoop() {
Application::registerThread("DM_" + getName());
std::string error;
// We have the testable mode lock. The device has not been initialised yet, but from now on the
// testableMode.deviceInitialisationCounter will take care or it
_testableModeReached = true;
// flag whether the devices was opened+initialised for the first time
bool firstSuccess = true;
while(true) {
// [Spec: 2.3.1] (Re)-open the device.
do {
_owner->getTestableMode().unlock("Wait before open/recover device");
usleep(500000);
boost::this_thread::interruption_point();
_owner->getTestableMode().lock("Attempt open/recover device");
try {
// The globalDeviceOpenMutex is a work around for backends which do not implement open() in a thread-safe
// manner. This seems to be the case for most backends currently, hence it was decided to implement this
// workaround for now (see #11478).
static std::mutex globalDeviceOpenMutex;
std::lock_guard<std::mutex> globalDeviceOpenLock(globalDeviceOpenMutex);
_device.open();
}
catch(ChimeraTK::runtime_error& e) {
assert(_deviceError._status != StatusOutput::Status::OK); // any error must already be reported...
if(std::string(_deviceError._message) != e.what()) {
std::cerr << "Device " << _deviceAliasOrCDD << " reports error: " << e.what() << std::endl;
// set proper error message in very first attempt to open the device
setCurrentVersionNumber({});
_deviceError.write(StatusOutput::Status::FAULT, e.what());
}
continue; // should not be necessary because isFunctional() should return false. But no harm in leaving it in.
}
} while(!_device.isFunctional());
boost::unique_lock<boost::shared_mutex> errorLock(_errorMutex);
// [Spec: 2.3.3] Empty exception reporting queue.
while(_errorQueue.pop()) {
if(_owner->getTestableMode()._enabled) {
assert(_owner->getTestableMode()._counter > 0);
--_owner->getTestableMode()._counter;
}
}
errorLock.unlock(); // we don't need to hold the lock for now, but we will need it later
for(auto& writeMe : _writeRegisterPaths) {
auto reg = _device.getOneDRegisterAccessor<std::string>(writeMe); // the user data type does not matter here.
if(!reg.isWriteable()) {
throw ChimeraTK::logic_error(std::string(writeMe) + " is not writeable!");
}
}
for(auto& readMe : _readRegisterPaths) {
auto reg = _device.getOneDRegisterAccessor<std::string>(readMe); // the user data type does not matter here.
if(!reg.isReadable()) {
throw ChimeraTK::logic_error(std::string(readMe) + " is not readable!");
}
}
// [Spec: 2.3.2] Run initialisation handlers
try {
for(auto& initHandler : _initialisationHandlers) {
initHandler(_device);
}
}
catch(ChimeraTK::runtime_error& e) {
assert(_deviceError._status != StatusOutput::Status::OK); // any error must already be reported...
// update error message, since it might have been changed...
if(std::string(_deviceError._message) != e.what()) {
std::cerr << "Device " << _deviceAliasOrCDD << " reports error: " << e.what() << std::endl;
setCurrentVersionNumber({});
_deviceError.write(StatusOutput::Status::FAULT, e.what());
}
// Jump back to re-opening the device
continue;
}
// Write all recovery accessors
// We are now entering the critical recovery section. It is protected by the recovery mutex until the
// deviceHasError flag has been cleared.
boost::unique_lock<boost::shared_mutex> recoveryLock(_recoveryMutex);
try {
// sort recovery helpers according to write order
_recoveryHelpers.sort([](boost::shared_ptr<RecoveryHelper>& a, boost::shared_ptr<RecoveryHelper>& b) {
return a->writeOrder < b->writeOrder;
});
for(auto& recoveryHelper : _recoveryHelpers) {
if(recoveryHelper->versionNumber != VersionNumber{nullptr}) {
recoveryHelper->accessor->write();
recoveryHelper->wasWritten = true;
}
}
}
catch(ChimeraTK::runtime_error& e) {
// update error message, since it might have been changed...
if(std::string(_deviceError._message) != e.what()) {
std::cerr << "Device " << _deviceAliasOrCDD << " reports error: " << e.what() << std::endl;
setCurrentVersionNumber({});
_deviceError.write(StatusOutput::Status::FAULT, e.what());
}
// Jump back to re-opening the device
continue;
}
errorLock.lock();
_deviceHasError = false;
errorLock.unlock();
recoveryLock.unlock();
// send the trigger that the device is available again
_device.activateAsyncRead();
if(_isHoldingInitialValueLatch) {
_isHoldingInitialValueLatch = false;
_initialValueLatch.count_down();
}
// [Spec: 2.3.5] Reset exception state and wait for the next error to be reported.
_deviceError.writeOk();
deviceBecameFunctional.write();
if(!firstSuccess) {
std::cerr << "Device " << _deviceAliasOrCDD << " error cleared." << std::endl;
}
firstSuccess = false;
// decrement special testable mode counter, was incremented manually above to make sure initialisation completes
// within one "application step"
if(Application::getInstance().getTestableMode()._enabled) {
--_owner->getTestableMode()._deviceInitialisationCounter;
}
// [Spec: 2.3.8] Wait for an exception being reported by the ExceptionHandlingDecorators
// release the testable mode mutex for waiting for the exception.
_owner->getTestableMode().unlock("Wait for exception");
// Do not modify the queue without holding the testable mode lock, because we also consistently have to modify
// the counter protected by that mutex.
// Just call wait(), not pop_wait().
boost::this_thread::interruption_point();
_errorQueue.wait();
boost::this_thread::interruption_point();
_owner->getTestableMode().lock("Process exception");
// increment special testable mode counter to make sure the initialisation completes within one
// "application step"
if(Application::getInstance().getTestableMode()._enabled) {
++_owner->getTestableMode()._deviceInitialisationCounter; // matched above with a decrement
}
errorLock.lock(); // we need both locks to modify the queue
auto popResult = _errorQueue.pop(error);
assert(popResult); // this if should always be true, otherwise the waiting did not work.
(void)popResult; // avoid warning in production build. g++5.4 does not support [[maybe_unused]] yet.
if(_owner->getTestableMode()._enabled) {
assert(_owner->getTestableMode()._counter > 0);
--_owner->getTestableMode()._counter;
}
// [ExceptionHandling Spec: C.3.3.14] report exception to the control system
std::cerr << "Device " << _deviceAliasOrCDD << " reports error: " << error << std::endl;
setCurrentVersionNumber({});
_deviceError.write(StatusOutput::Status::FAULT, error);
// We must not hold the lock while waiting for the synchronousTransferCounter to go back to 0. Only release it
// after deviceError has been written, so the CircularDependencyDetector can read the error message from its
// thread for printing.
errorLock.unlock();
// [ExceptionHandling Spec: C.3.3.15] Wait for all synchronous transfers to finish before starting recovery.
while(_synchronousTransferCounter > 0) {
usleep(1000);
}
} // while(true)
}
/*********************************************************************************************************************/
void DeviceManager::prepare() {
// Set initial status to error
setCurrentVersionNumber({});
_deviceError.write(StatusOutput::Status::FAULT, "Attempting to open device...");
// Increment special testable mode counter to make sure the initialisation completes within one
// "application step". Start with counter increased (device not initialised yet, wait).
// We can to this here without testable mode lock because the application is still single threaded.
if(Application::getInstance().getTestableMode()._enabled) {
++_owner->getTestableMode()._deviceInitialisationCounter; // released and increased in handeException loop
}
}
/*********************************************************************************************************************/
void DeviceManager::addInitialisationHandler(std::function<void(ChimeraTK::Device&)> initialisationHandler) {
_initialisationHandlers.push_back(std::move(initialisationHandler));
}
/*********************************************************************************************************************/
void DeviceManager::addRecoveryAccessor(boost::shared_ptr<RecoveryHelper> recoveryAccessor) {
_recoveryHelpers.push_back(std::move(recoveryAccessor));
}
/*********************************************************************************************************************/
uint64_t DeviceManager::writeOrder() {
return ++_writeOrderCounter;
}
/*********************************************************************************************************************/
boost::shared_lock<boost::shared_mutex> DeviceManager::getRecoverySharedLock() {
return boost::shared_lock<boost::shared_mutex>(_recoveryMutex);
}
/*********************************************************************************************************************/
void DeviceManager::waitForInitialValues() {
_initialValueLatch.wait();
}
/*********************************************************************************************************************/
std::list<EntityOwner*> DeviceManager::getInputModulesRecursively(std::list<EntityOwner*> startList) {
// The DeviceManager does not process the device registers, and hence circular networks involving the DeviceManager
// are not truely circular. Hence no real circular network cecking is done here.
// If the startList is empty, the recursion scan might be about the status/control variables of the DeviceManager.
// Hence we add the DeviceManager to the empty list.
if(startList.empty()) {
startList.push_back(this);
}
return startList;
}
/*********************************************************************************************************************/
size_t DeviceManager::getCircularNetworkHash() const {
return 0; // The device module is never part of a circular network
}
/*********************************************************************************************************************/
void DeviceManager::incrementDataFaultCounter() {
throw ChimeraTK::logic_error("incrementDataFaultCounter() called on a DeviceManager. This is probably "
"caused by incorrect ownership of variables/accessors or VariableGroups.");
}
/*********************************************************************************************************************/
void DeviceManager::decrementDataFaultCounter() {
throw ChimeraTK::logic_error("decrementDataFaultCounter() called on a DeviceManager. This is probably "
"caused by incorrect ownership of variables/accessors or VariableGroups.");
}
/*********************************************************************************************************************/
void DeviceManager::terminate() {
if(_moduleThread.joinable()) {
_moduleThread.interrupt();
// try joining the thread
while(!_moduleThread.try_join_for(boost::chrono::milliseconds(10))) {
// send boost interrupted exception through the _errorQueue
_errorQueue.push_exception(std::make_exception_ptr(boost::thread_interrupted()));
// it may not suffice to send the exception once, as the exception might get overwritten in the queue, thus we
// repeat this until the thread was joined.
}
}
assert(!_moduleThread.joinable());
}
/*********************************************************************************************************************/
} // namespace ChimeraTK