Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • chimeratk-mirror/ApplicationCore
1 result
Show changes
Commits on Source (4)
Showing
with 647 additions and 47 deletions
......@@ -35,16 +35,17 @@ namespace ChimeraTK {
const Model::ProcessVariableProxy* proxy{nullptr};
// Variables related to the current network
VariableNetworkNode feeder{};
VariableNetworkNode feeder;
std::map<std::string, boost::shared_ptr<TriggerFanOut>> triggerImpl;
std::list<VariableNetworkNode> consumers;
const std::type_info* valueType{&typeid(AnyType)};
size_t valueLength{0};
std::string description{};
std::string unit{};
std::string description;
std::string unit;
size_t numberOfBidirectionalNodes{0};
size_t numberOfPollingConsumers{0};
bool useExternalTrigger{false};
bool useReverseRecovery{false};
};
std::set<std::string> _triggerNetworks{};
std::map<std::string, NetworkInformation> _networks{};
......
......@@ -5,8 +5,11 @@
#include "Application.h"
#include "RecoveryHelper.h"
#include <ChimeraTK/NDRegisterAccessor.h>
#include <ChimeraTK/NDRegisterAccessorDecorator.h>
#include <boost/smart_ptr/shared_ptr.hpp>
namespace ChimeraTK {
/** Decorator of the NDRegisterAccessor which facilitates tests of the
......@@ -15,7 +18,7 @@ namespace ChimeraTK {
class ExceptionHandlingDecorator : public ChimeraTK::NDRegisterAccessorDecorator<UserType> {
public:
/**
* Decorate the accessors which is handed in the constuctor.
* Decorate the accessors which is handed in the constructor.
* All information to get the DeviceModule and to create a recovery accessor are
* taken from the VariableNetworkNode.
*/
......
......@@ -86,7 +86,6 @@ namespace ChimeraTK {
ChimeraTK::NDRegisterAccessor<UserType>::buffer_2D[0].resize(numberOfElements);
if(_withReturn) {
this->_readQueue = cppext::future_queue<void>(3);
this->_accessModeFlags = {AccessMode::wait_for_new_data};
}
......@@ -203,7 +202,7 @@ namespace ChimeraTK {
if(!hasNewData && type != TransferType::read) {
// No post read handling for readNonBlocking and readLatest if there was no new data, since there was actually no
// corresponding read operation on any of the unterlying accessors (just checking the notification queue).
// corresponding read operation on any of the underlying accessors (just checking the notification queue).
return;
}
......
......@@ -13,14 +13,17 @@ namespace ChimeraTK {
* when the accessor is written, but delay the writing do a later point in time.
*/
struct RecoveryHelper {
enum class Direction { fromDevice, toDevice };
boost::shared_ptr<TransferElement> accessor;
VersionNumber versionNumber;
uint64_t writeOrder;
bool wasWritten{false};
cppext::future_queue<void> notificationQueue;
Direction recoveryDirection{Direction::toDevice};
explicit RecoveryHelper(
boost::shared_ptr<TransferElement> a, VersionNumber v = VersionNumber(nullptr), uint64_t order = 0)
: accessor(std::move(a)), versionNumber(v), writeOrder(order) {}
explicit RecoveryHelper(boost::shared_ptr<TransferElement> a, VersionNumber v = VersionNumber(nullptr),
uint64_t order = 0, Direction direction = Direction::toDevice)
: accessor(std::move(a)), versionNumber(v), writeOrder(order), recoveryDirection(direction) {}
};
} // end of namespace ChimeraTK
// SPDX-FileCopyrightText: Deutsches Elektronen-Synchrotron DESY, MSK, ChimeraTK Project <chimeratk-support@desy.de>
// SPDX-License-Identifier: LGPL-3.0-or-later
#pragma once
#include "ExceptionHandlingDecorator.h"
#include <ChimeraTK/Exception.h>
namespace ChimeraTK {
template<typename UserType>
class ReverseRecoveryDecorator : public ChimeraTK::ExceptionHandlingDecorator<UserType> {
public:
ReverseRecoveryDecorator(
boost::shared_ptr<ChimeraTK::NDRegisterAccessor<UserType>> accessor, const VariableNetworkNode& networkNode);
void interrupt() override;
protected:
using ExceptionHandlingDecorator<UserType>::_recoveryHelper;
using ExceptionHandlingDecorator<UserType>::_target;
};
DECLARE_TEMPLATE_FOR_CHIMERATK_USER_TYPES(ReverseRecoveryDecorator);
} // namespace ChimeraTK
\ No newline at end of file
......@@ -6,6 +6,7 @@
#include "InversionOfControlAccessor.h"
#include <ChimeraTK/ScalarRegisterAccessor.h>
#include <ChimeraTK/SystemTags.h>
#include <boost/smart_ptr/shared_ptr.hpp>
#include <boost/thread.hpp>
......@@ -118,6 +119,16 @@ namespace ChimeraTK {
using ScalarAccessor<UserType>::operator=;
};
/********************************************************************************************************************/
/** Convenience class for output scalar accessors with return channel ("read back") (always UpdateMode::push) */
template<typename UserType>
struct ScalarOutputReverseRecovery : public ScalarAccessor<UserType> {
ScalarOutputReverseRecovery(Module* owner, const std::string& name, std::string unit,
const std::string& description, const std::unordered_set<std::string>& tags = {});
ScalarOutputReverseRecovery() : ScalarAccessor<UserType>() {}
using ScalarAccessor<UserType>::operator=;
};
/********************************************************************************************************************/
/********************************************************************************************************************/
/* Implementations below this point */
......@@ -247,5 +258,16 @@ namespace ChimeraTK {
owner, name, {VariableDirection::feeding, true}, unit, UpdateMode::push, description, tags) {}
/********************************************************************************************************************/
/********************************************************************************************************************/
template<typename UserType>
ScalarOutputReverseRecovery<UserType>::ScalarOutputReverseRecovery(Module* owner, const std::string& name,
std::string unit, const std::string& description, const std::unordered_set<std::string>& tags)
: ScalarAccessor<UserType>(
owner, name, {VariableDirection::feeding, true}, unit, UpdateMode::push, description, tags) {
this->addTag(ChimeraTK::SystemTags::reverseRecovery);
}
/********************************************************************************************************************/
} /* namespace ChimeraTK */
......@@ -5,9 +5,16 @@
#include "Application.h"
#include "FanOut.h"
#include "InternalModule.h"
#include "ReverseRecoveryDecorator.h"
#include <ChimeraTK/NDRegisterAccessor.h>
#include <ChimeraTK/ReadAnyGroup.h>
#include <ChimeraTK/SupportedUserTypes.h>
#include <ChimeraTK/SystemTags.h>
#include <boost/smart_ptr/shared_ptr.hpp>
#include <string>
namespace ChimeraTK {
......@@ -60,6 +67,7 @@ namespace ChimeraTK {
/** Thread handling the synchronisation, if needed */
boost::thread _thread;
boost::shared_ptr<ChimeraTK::NDRegisterAccessor<UserType>> _initialValueProvider;
std::vector<boost::shared_ptr<ChimeraTK::NDRegisterAccessor<UserType>>> _inputChannels;
// using ThreadedFanOut<UserType>::_network;
......@@ -174,6 +182,8 @@ namespace ChimeraTK {
ConsumerImplementationPairs<UserType> const& consumerImplementationPairs)
: ThreadedFanOut<UserType>(feedingImpl, consumerImplementationPairs) {
_inputChannels.push_back(feedingImpl);
// By default, we take the initial value from the feeder
_initialValueProvider = feedingImpl;
for(auto el : consumerImplementationPairs) {
ThreadedFanOutWithReturn<UserType>::addSlave(el.first, el.second);
}
......@@ -187,6 +197,12 @@ namespace ChimeraTK {
// TODO Adding slaves is currently by done by the ThreadedFanOut base class constructor.
// Refactor constructors and addSlaves for all FanOuts?
// FanOut<UserType>::addSlave(slave, consumer);
if(consumer.getTags().contains(ChimeraTK::SystemTags::reverseRecovery)) {
_initialValueProvider = slave;
// FIXME: Do we need to check here that there is only one reverse recovery accessor
}
if(consumer.getDirection().withReturn) {
_inputChannels.push_back(slave);
}
......@@ -206,10 +222,14 @@ namespace ChimeraTK {
}
accessors[FanOut<UserType>::_impl->getId()] = FanOut<UserType>::_impl;
// For reading the initial value, swap out _impl, because readInitialValues()
// operates on it
std::swap(FanOut<UserType>::_impl, _initialValueProvider);
TransferElementID changedVariable = FanOut<UserType>::_impl->getId();
VersionNumber version{nullptr};
version = readInitialValues();
std::swap(FanOut<UserType>::_impl, _initialValueProvider);
ReadAnyGroup group(_inputChannels.begin(), _inputChannels.end());
......@@ -227,6 +247,7 @@ namespace ChimeraTK {
}
bool dataLoss = accessor->writeDestructively(version);
if(dataLoss) {
Application::incrementDataLossCounter(accessor->getName());
}
......
......@@ -246,7 +246,7 @@ namespace ChimeraTK {
/** Pointer to the module owning this node */
EntityOwner* owningModule{nullptr};
/** Hash which idientifies a circular network. 0 if the node is not part if a circular dependency. */
/** Hash which identifies a circular network. 0 if the node is not part if a circular dependency. */
size_t circularNetworkHash{0};
/** Model representation of this variable */
......
......@@ -5,8 +5,10 @@
#include "Application.h"
#include "CircularDependencyDetector.h"
#include "Flags.h"
#include "ModuleGroup.h"
#include <ChimeraTK/SystemTags.h>
#include <ChimeraTK/Utilities.h>
#include <iterator>
......@@ -122,10 +124,17 @@ namespace ChimeraTK {
}
}
}
for(auto& variable : getAccessorListRecursive()) {
if(variable.getDirection().dir != VariableDirection::consuming) {
// According to spec, readback values are not considered in the distribution of initial values
// However, if they are declared as providing reverse recovery, they have to be considered.
auto doNotSkipInIvDistribution = variable.getDirection().dir == VariableDirection::feeding &&
variable.getDirection().withReturn && variable.getTags().contains(ChimeraTK::SystemTags::reverseRecovery);
if(!doNotSkipInIvDistribution && variable.getDirection().dir != VariableDirection::consuming) {
continue;
}
if(variable.getMode() == UpdateMode::push) {
Application::getInstance().getTestableMode().unlock("Initial value read for push-type " + variable.getName());
Application::getInstance()._circularDependencyDetector.registerDependencyWait(variable);
......
......@@ -9,11 +9,16 @@
#include "DeviceManager.h"
#include "ExceptionHandlingDecorator.h"
#include "FanOut.h"
#include "Flags.h"
#include "ReverseRecoveryDecorator.h"
#include "TestableMode.h"
#include "ThreadedFanOut.h"
#include "TriggerFanOut.h"
#include <ChimeraTK/NDRegisterAccessor.h>
#include <ChimeraTK/SystemTags.h>
#include <algorithm>
namespace ChimeraTK {
......@@ -21,10 +26,21 @@ namespace ChimeraTK {
NetworkVisitor::NetworkInformation NetworkVisitor::checkNetwork(Model::ProcessVariableProxy& proxy) {
NetworkInformation net{&proxy};
debug("Checking network \"" + proxy.getName() + "\" consistency");
// Sanity check for the type and lengths of the nodes, extract the feeding node if any
VariableNetworkNode firstNodeWithType; // used for helpful error message only
net.useReverseRecovery = proxy.getTags().contains(ChimeraTK::SystemTags::reverseRecovery);
if(net.useReverseRecovery) {
debug(" Network has reverse recovery");
}
else {
debug(" Network does not have reverse recovery");
}
for(const auto& node : proxy.getNodes()) {
if(node->getDirection().withReturn) {
net.numberOfBidirectionalNodes++;
......@@ -32,7 +48,11 @@ namespace ChimeraTK {
if(node->getDirection().dir == VariableDirection::feeding) {
std::stringstream ss;
node->dump(ss);
debug(" Feeder: ", ss.str());
auto nodeDump = ss.str();
// Remove trailing newline
nodeDump.erase(nodeDump.length() - 1);
debug(" Feeder: ", nodeDump);
if(net.feeder.getType() == NodeType::invalid) {
net.feeder = *node;
......@@ -54,7 +74,9 @@ namespace ChimeraTK {
else if(node->getDirection().dir == VariableDirection::consuming) {
std::stringstream ss;
node->dump(ss);
debug(" Consumer: ", ss.str());
auto consumerDump = ss.str();
consumerDump.erase(consumerDump.length() - 1);
debug(" Consumer: ", consumerDump);
net.consumers.push_back(*node);
if(node->getMode() == UpdateMode::poll) {
net.numberOfPollingConsumers++;
......@@ -152,6 +174,7 @@ namespace ChimeraTK {
/*********************************************************************************************************************/
void NetworkVisitor::finaliseNetwork(NetworkInformation& net) {
debug("Finalising network \"" + net.proxy->getName() + "\"");
// check whether this is a constant created via ApplicationModule::constant()
bool isConstant{!net.consumers.empty() &&
boost::starts_with(net.consumers.front().getName(), ApplicationModule::namePrefixConstant)};
......@@ -177,7 +200,7 @@ namespace ChimeraTK {
bool neededFeeder{false};
if(not net.feeder.isValid()) {
debug(" No feeder in network, creating ControlSystem feeder ", net.proxy->getFullyQualifiedPath());
debug(" Bi-directional consumers: ", net.numberOfBidirectionalNodes);
debug(" Bi-directional consumers: ", net.numberOfBidirectionalNodes);
// If we have a bi-directional consumer, mark this CS feeder as bidirectional as well
net.feeder = VariableNetworkNode(net.proxy->getFullyQualifiedPath(),
......@@ -191,9 +214,13 @@ namespace ChimeraTK {
if(not neededFeeder and not isConstant) {
// Only add CS consumer if we did not previously add CS feeder, we will add one or the other, but never both
// Also we will not add CS consumers for constants.
//
// If this is a one-on-one network with reverse recovery, we have to make the CS feeder bi-directional
auto needReturn = net.useReverseRecovery && net.consumers.empty();
debug(" Network has a non-CS feeder, can create additional ControlSystem consumer");
net.consumers.push_back(VariableNetworkNode(
net.proxy->getFullyQualifiedPath(), {VariableDirection::consuming, false}, *net.valueType, net.valueLength));
debug(" with" + std::string(needReturn ? "" : "out") + " return");
net.consumers.push_back(VariableNetworkNode(net.proxy->getFullyQualifiedPath(),
{VariableDirection::consuming, needReturn}, *net.valueType, net.valueLength));
}
assert(not net.consumers.empty());
......@@ -220,7 +247,6 @@ namespace ChimeraTK {
}
}
AccessModeFlags give_me_a_name;
this->createProcessVariable<UserType>(net.feeder, net.valueLength, net.unit, net.description, flags);
}
});
......@@ -230,6 +256,7 @@ namespace ChimeraTK {
<< net.proxy->getFullyQualifiedPath() << std::endl;
throw;
}
debug();
}
/*********************************************************************************************************************/
......@@ -256,10 +283,10 @@ namespace ChimeraTK {
auto deviceTrigger = p.getTrigger();
if(deviceTrigger.isValid()) {
debug(" Found Feeding device ", p.getAliasOrCdd(), " with trigger ", p.getTrigger().getFullyQualifiedPath());
debug(" Found Feeding device ", p.getAliasOrCdd(), " with trigger ", p.getTrigger().getFullyQualifiedPath());
}
else {
debug(" Feeding from device ", p.getAliasOrCdd(), " but without any trigger");
debug(" Feeding from device ", p.getAliasOrCdd(), " but without any trigger");
}
return std::make_pair(deviceTrigger, p);
......@@ -291,18 +318,17 @@ namespace ChimeraTK {
makeDirectConnectionForFeederWithImplementation(_networks.at(path));
}
else {
// More than one consuming node
debug(" More than one consuming node or having external trigger, setting up FanOut");
makeFanOutConnectionForFeederWithImplementation(_networks.at(path), device, trigger);
}
}
else if(not constantFeeder) {
debug(" Feeder '", _networks.at(path).feeder.getName(), "' does not require a fixed implementation.");
debug(" Feeder '", _networks.at(path).feeder.getName(), "' does not require a fixed implementation.");
assert(not trigger.isValid());
makeConnectionForFeederWithoutImplementation(_networks.at(path));
}
else { // constant feeder
debug(" Using constant feeder '", _networks.at(path).feeder.getName(), "'.");
debug(" Using constant feeder '", _networks.at(path).feeder.getName(), "'.");
makeConnectionForConstantFeeder(_networks.at(path));
}
......@@ -316,10 +342,11 @@ namespace ChimeraTK {
_app._circularDependencyNetworks[circularNetworkHash] = circularNetwork;
_app._circularNetworkInvalidityCounters[circularNetworkHash] = 0;
debug(" Circular network detected: " + proxy.getFullyQualifiedPath() + " is part of " +
debug(" Circular network detected: " + proxy.getFullyQualifiedPath() + " is part of " +
std::to_string(circularNetworkHash));
}
}
debug();
}
/*********************************************************************************************************************/
......@@ -329,8 +356,8 @@ namespace ChimeraTK {
_app.getTestableMode()._debugDecorating = _debugConnections;
debug(" Preparing trigger networks");
debug(" Collecting triggers");
debug("Preparing trigger networks");
debug("Collecting triggers");
// Collect all triggers, add a TriggerReceiver placeholder for every device associated with that trigger
std::list<Model::DeviceModuleProxy> dmProxyList;
......@@ -345,16 +372,21 @@ namespace ChimeraTK {
VariableNetworkNode placeholder(proxy.getAliasOrCdd(), 0);
proxy.addVariable(trigger, placeholder);
}
debug(" Found " + std::to_string(_triggers.size()) + " trigger(s)");
debug(" Finalising trigger networks");
debug("---------------------------");
debug("Finalising trigger networks");
debug("---------------------------");
for(auto trigger : _triggers) {
auto info = checkAndFinaliseNetwork(trigger);
_triggerNetworks.insert(trigger.getFullyQualifiedPath());
_networks.insert({trigger.getFullyQualifiedPath(), info});
debug(" trigger network: " + trigger.getFullyQualifiedPath());
debug(" trigger network: " + trigger.getFullyQualifiedPath());
}
debug(" Finalising other networks");
debug("-------------------------");
debug("Finalising other networks");
debug("-------------------------");
auto connectingVisitor = [&](auto proxy) {
if(_triggerNetworks.count(proxy.getFullyQualifiedPath()) != 0) {
return;
......@@ -378,12 +410,16 @@ namespace ChimeraTK {
// Improve: Likely no need to distinguish trigger and normal networks here... Also just iterate _networks instead
// of the model!
debug(" Connecting trigger networks");
debug("---------------------------");
debug("Connecting trigger networks");
debug("---------------------------");
for(auto trigger : _triggers) {
connectNetwork(trigger);
}
debug(" Connecting other networks");
debug("-------------------------");
debug("Connecting other networks");
debug("-------------------------");
auto connectingVisitor = [&](auto proxy) {
if(_triggerNetworks.count(proxy.getFullyQualifiedPath()) != 0) {
return;
......@@ -452,10 +488,20 @@ namespace ChimeraTK {
}
if(needsFanOut) {
debug(" needing an additional fan-out");
assert(consumingImpl != nullptr);
auto consumerImplPair = ConsumerImplementationPairs<UserType>{{consumingImpl, consumer}};
auto fanOut = boost::make_shared<ThreadedFanOut<UserType>>(feedingImpl, consumerImplPair);
_app._internalModuleList.push_back(fanOut);
boost::shared_ptr<ThreadedFanOut<UserType>> threadedFanOut;
if(not net.feeder.getDirection().withReturn) {
debug(" No return channel");
threadedFanOut = boost::make_shared<ThreadedFanOut<UserType>>(feedingImpl, consumerImplPair);
}
else {
debug(" With return channel");
threadedFanOut = boost::make_shared<ThreadedFanOutWithReturn<UserType>>(feedingImpl, consumerImplPair);
}
_app._internalModuleList.push_back(threadedFanOut);
}
});
}
......@@ -473,6 +519,7 @@ namespace ChimeraTK {
boost::shared_ptr<ChimeraTK::NDRegisterAccessor<UserType>> feedingImpl;
if(net.feeder.getType() == NodeType::Device) {
debug(" Device feeder, creating Device variable");
feedingImpl = createDeviceVariable<UserType>(net.feeder);
}
else if(net.feeder.getType() == NodeType::ControlSystem) {
......@@ -527,7 +574,7 @@ namespace ChimeraTK {
debug(" No trigger, using consuming fanout.");
consumingFanOut = boost::make_shared<ConsumingFanOut<UserType>>(feedingImpl, consumerImplementationPairs);
// TODO Is this correct? we already added all consumer as slaves in the fanout constructor.
// TODO Is this correct? we already added all consumer as slaves in the fanout constructor.
// Maybe assert that we only have a single poll-type node (is there a check in checkConnections?)
for(const auto& consumer : net.consumers) {
if(consumer.getMode() == UpdateMode::poll) {
......@@ -567,7 +614,7 @@ namespace ChimeraTK {
dir = SynchronizationDirection::deviceToControlSystem;
}
debug(" calling createProcessArray()");
debug(" calling createProcessArray()");
auto pv = _app.getPVManager()->createProcessArray<UserType>(
dir, node.getPublicName(), length, unit, description, {}, 3, flags);
......@@ -627,7 +674,7 @@ namespace ChimeraTK {
// Receiving accessors should be faulty after construction,
// see data validity propagation spec 2.6.1
if(node.getDirection().dir == VariableDirection::feeding) {
if(node.getDirection().dir == VariableDirection::feeding || node.getDirection().withReturn) {
accessor->setDataValidity(DataValidity::faulty);
}
......@@ -636,6 +683,11 @@ namespace ChimeraTK {
accessor = _app.getTestableMode().decorate(accessor, detail::TestableMode::DecoratorType::READ);
}
if(node.getDirection().dir == VariableDirection::consuming && node.getDirection().withReturn) {
std::cerr << "Adding reverse recovery accessor for " << accessor->getName() << std::endl;
return boost::make_shared<ReverseRecoveryDecorator<UserType>>(accessor, node);
}
return boost::make_shared<ExceptionHandlingDecorator<UserType>>(accessor, node);
}
......@@ -824,7 +876,7 @@ namespace ChimeraTK {
}
}
else if(net.consumers.size() > 1) {
debug(" More than one consumer, using fan-out as feeder impl");
debug(" More than one consumer, using fan-out as feeder impl");
callForType(*net.valueType, [&](auto t) {
using UserType = decltype(t);
auto consumerImplementationPairs = setConsumerImplementations<UserType>(net);
......@@ -836,7 +888,7 @@ namespace ChimeraTK {
});
}
else {
debug(" No consumer (presumably optimised out)");
debug(" No consumer (presumably optimised out)");
net.feeder.setAppAccessorConstImplementation(VariableNetworkNode(net.valueType, true, net.valueLength));
}
}
......@@ -867,6 +919,8 @@ namespace ChimeraTK {
auto dev = deviceManager->getDevice().getBackend();
auto impl =
dev->getRegisterAccessor<UserType>(consumer.getRegisterName(), consumer.getNumberOfElements(), 0, {});
auto catalog = deviceManager->getDevice().getRegisterCatalogue();
auto tags = catalog.getRegister(consumer.getRegisterName()).getTags();
// Set the value
impl->accessChannel(0) =
......@@ -876,8 +930,12 @@ namespace ChimeraTK {
// version number and have a recovery accessors (RecoveryHelper to be exact) which we can register at the
// DeviceModule. As this is a constant we don't need to change it later and don't have to store it somewhere
// else.
deviceManager->addRecoveryAccessor(
boost::make_shared<RecoveryHelper>(impl, VersionNumber(), deviceManager->writeOrder()));
// If this register is considered for reverse recovery (Device pushes to application), do not add an
// accessor at all, since pushing to a constant does not make any sense.
if(!tags.contains(ChimeraTK::SystemTags::reverseRecovery)) {
deviceManager->addRecoveryAccessor(
boost::make_shared<RecoveryHelper>(impl, VersionNumber(), deviceManager->writeOrder()));
}
}
else if(consumer.getType() == NodeType::TriggerReceiver) {
throw ChimeraTK::logic_error("Using constants as triggers is not supported!");
......@@ -896,7 +954,20 @@ namespace ChimeraTK {
auto& network = _networks.at(name);
// if the control system is the feeder, change it into a constant
if(network.feeder.getType() == NodeType::ControlSystem) {
network.feeder = VariableNetworkNode(network.valueType, true, network.valueLength);
if(network.useReverseRecovery) {
// We need to promote the accessor with the reverse recovery tag to the network feeder
// to prevent writing down the constant value into the device and propagating the
// recovery value to the other consumers instead.
auto reverseConsumer = std::ranges::find_if(network.consumers, [](auto& consumer) {
return consumer.getType() == NodeType::Device &&
consumer.getTags().contains(ChimeraTK::SystemTags::reverseRecovery);
});
network.consumers.remove(*reverseConsumer);
network.feeder = *reverseConsumer;
}
else {
network.feeder = VariableNetworkNode(network.valueType, true, network.valueLength);
}
}
else {
// control system is a consumer: remove it from the list of consumers
......
......@@ -3,8 +3,14 @@
#include "DeviceManager.h"
#include "RecoveryHelper.h"
#include "Utilities.h"
#include <ChimeraTK/NDRegisterAccessor.h>
#include <ChimeraTK/SupportedUserTypes.h>
#include <boost/smart_ptr/shared_ptr.hpp>
namespace ChimeraTK {
/*********************************************************************************************************************/
......@@ -48,9 +54,16 @@ namespace ChimeraTK {
// find minimum type required to represent data
const auto* valTyp = &(reg.getDataDescriptor().minimumDataType().getAsTypeInfo());
if(reg.getTags().contains(SystemTags::reverseRecovery) && reg.isReadable()) {
direction.withReturn = true;
}
// create node and add to list
rv.emplace_back(reg.getRegisterName(), _deviceAliasOrCDD, reg.getRegisterName(), updateMode, direction, *valTyp,
reg.getNumberOfElements());
for(const auto& tag : reg.getTags()) {
rv.back().addTag(tag);
}
}
return rv;
......@@ -65,9 +78,9 @@ namespace ChimeraTK {
// The error queue must only be modified when holding both mutexes (error mutex and testable mode mutex), because
// the testable mode counter must always be consistent with the content of the queue.
// To avoid deadlocks you must always first aquire the testable mode mutex if you need both.
// To avoid deadlocks you must always first acquire the testable mode mutex if you need both.
// You can hold the error mutex without holding the testable mode mutex (for instance for checking the error
// predicate), but then you must not try to aquire the testable mode mutex!
// predicate), but then you must not try to acquire the testable mode mutex!
boost::unique_lock<boost::shared_mutex> errorLock(_errorMutex);
if(!_deviceHasError) { // only report new errors if the device does not have reported errors already
......@@ -186,9 +199,16 @@ namespace ChimeraTK {
return a->writeOrder < b->writeOrder;
});
for(auto& recoveryHelper : _recoveryHelpers) {
if(recoveryHelper->versionNumber != VersionNumber{nullptr}) {
recoveryHelper->accessor->write();
recoveryHelper->wasWritten = true;
if(recoveryHelper->recoveryDirection == RecoveryHelper::Direction::toDevice) {
if(recoveryHelper->versionNumber != VersionNumber{nullptr}) {
recoveryHelper->accessor->write();
recoveryHelper->wasWritten = true;
}
}
else if(recoveryHelper->recoveryDirection == RecoveryHelper::Direction::fromDevice) {
if(recoveryHelper->accessor->isReadable()) {
recoveryHelper->notificationQueue.push();
}
}
}
}
......
......@@ -3,8 +3,10 @@
#include "ExceptionHandlingDecorator.h"
#include "DeviceManager.h"
#include "RecoveryHelper.h"
#include <functional>
#include <ChimeraTK/SystemTags.h>
#include <ChimeraTK/TransferElement.h>
namespace ChimeraTK {
......@@ -43,9 +45,17 @@ namespace ChimeraTK {
// add recovery accessor to DeviceManager so the last known value is restored during device recovery, unless
// the data type is Void, in which case there is no value to recover and writing will likely trigger some unwanted
// action.
if(networkNode.getTags().contains(ChimeraTK::SystemTags::reverseRecovery)) {
_recoveryHelper->recoveryDirection = RecoveryHelper::Direction::fromDevice;
}
if(!std::is_same<UserType, ChimeraTK::Void>::value) {
deviceManager->addRecoveryAccessor(_recoveryHelper);
}
if(_direction.withReturn && _recoveryAccessor->isReadable()) {
deviceManager->_readRegisterPaths.emplace_back(registerName);
}
}
else if(_direction.dir == VariableDirection::feeding) {
deviceManager->_readRegisterPaths.emplace_back(registerName);
......@@ -126,7 +136,7 @@ namespace ChimeraTK {
{
auto recoverylock{deviceManager->getRecoverySharedLock()};
// the transfer was successful or doPostRead did not throw and we reach this point,
// so we matk these data as written
// so we mark these data as written
_recoveryHelper->wasWritten = true;
} // end scope for recovery lock
}
......
// SPDX-FileCopyrightText: Deutsches Elektronen-Synchrotron DESY, MSK, ChimeraTK Project <chimeratk-support@desy.de>
// SPDX-License-Identifier: LGPL-3.0-or-later
#include "ReverseRecoveryDecorator.h"
namespace ChimeraTK {
/*********************************************************************************************************************/
template<typename UserType>
ReverseRecoveryDecorator<UserType>::ReverseRecoveryDecorator(
boost::shared_ptr<ChimeraTK::NDRegisterAccessor<UserType>> accessor, const VariableNetworkNode& networkNode)
: ExceptionHandlingDecorator<UserType>(std::move(accessor), networkNode) {
// Check if we are wrapping a push-type variable and forbid that
if(TransferElement::getAccessModeFlags().has(AccessMode::wait_for_new_data)) {
throw ChimeraTK::logic_error("Cannot use reverse recovery on push-type input");
}
// Set ourselves as wfnd:
TransferElement::_accessModeFlags.add(AccessMode::wait_for_new_data);
_recoveryHelper->notificationQueue = cppext::future_queue<void>(3);
// Set the read queue as continuation of the notification queue
// The continuation will just trigger a read on the target accessor
this->_readQueue = _recoveryHelper->notificationQueue.template then<void>(
[&, this]() {
std::cout << "Notification triggered" << std::endl;
_target->read();
},
std::launch::deferred);
}
/*********************************************************************************************************************/
template<typename UserType>
void ReverseRecoveryDecorator<UserType>::interrupt() {
this->interrupt_impl(this->_recoveryHelper->notificationQueue);
}
/*********************************************************************************************************************/
INSTANTIATE_TEMPLATE_FOR_CHIMERATK_USER_TYPES(ReverseRecoveryDecorator);
} // namespace ChimeraTK
\ No newline at end of file
readOnly 1 0 4 0 32 0 1 RO
readWrite 1 4 4 0 32 0 1 RW
writeOnly 1 8 4 0 32 0 1 WO
writeOnlyRB 1 8 4 0 32 0 1 RO
secondReadWrite 1 12 4 0 32 0 1 RW
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<logicalNameMap>
<redirectedRegister name="taggedReadOnly">
<targetDevice>baseDevice</targetDevice>
<targetRegister>readOnly</targetRegister>
<plugin name="tagModifier">
<parameter name="set">_ChimeraTK_DeviceRegister_reverseRecovery</parameter>
</plugin>
</redirectedRegister>
<redirectedRegister name="taggedReadWrite">
<targetDevice>baseDevice</targetDevice>
<targetRegister>readWrite</targetRegister>
<plugin name="tagModifier">
<parameter name="set">_ChimeraTK_DeviceRegister_reverseRecovery</parameter>
</plugin>
</redirectedRegister>
<redirectedRegister name="taggedWriteOnly">
<targetDevice>baseDevice</targetDevice>
<targetRegister>writeOnly</targetRegister>
<plugin name="tagModifier">
<parameter name="set">_ChimeraTK_DeviceRegister_reverseRecovery</parameter>
</plugin>
</redirectedRegister>
<redirectedRegister name="untagged">
<targetDevice>baseDevice</targetDevice>
<targetRegister>secondReadWrite</targetRegister>
</redirectedRegister>
</logicalNameMap>
baseDevice (sharedMemoryDummy?map=baseDevice.map)
taggedDevice (logicalNameMap?map=tagged.xlmap)
// SPDX-FileCopyrightText: Deutsches Elektronen-Synchrotron DESY, MSK, ChimeraTK Project <chimeratk-support@desy.de>
// SPDX-License-Identifier: LGPL-3.0-or-later
#include "ScalarAccessor.h"
#include <ChimeraTK/DeviceBackend.h>
#include <ChimeraTK/ReadAnyGroup.h>
#include <ChimeraTK/SharedDummyBackend.h>
#include <ChimeraTK/TransferElement.h>
#include <ChimeraTK/VoidRegisterAccessor.h>
#include <boost/smart_ptr/shared_ptr.hpp>
#define BOOST_TEST_MODULE reverseRecoveryTest
#include "Application.h"
#include "ApplicationModule.h"
#include "check_timeout.h"
#include "DeviceModule.h"
#include "Logger.h"
#include "ScalarAccessor.h"
#include "TestFacility.h"
#include <ChimeraTK/BackendFactory.h>
#include <ChimeraTK/Device.h>
#include <boost/test/included/unit_test.hpp>
#include <cstdint>
namespace ctk = ChimeraTK;
struct TestApplication : ctk::Application {
TestApplication() : ctk::Application("tagTestApplication") {}
~TestApplication() override { shutdown(); }
struct : public ctk::ApplicationModule {
using ctk::ApplicationModule::ApplicationModule;
std::function<void()> doMainLoop;
void mainLoop() final { doMainLoop(); }
} mod{this, "Module", ""};
};
BOOST_AUTO_TEST_CASE(testDirectThreadedFanOutWithReturn) {
std::cout << "testTaggedRegisterNotWrittenOnRecovery" << std::endl;
ctk::BackendFactory::getInstance().setDMapFilePath("testTagged.dmap");
TestApplication app;
ctk::DeviceModule devModule{&app, "taggedDevice", "/trigger"};
std::atomic<bool> up{false};
app.mod.doMainLoop = [&]() {
std::cout << "In mainloop" << std::endl;
up = true;
up.notify_one();
};
ctk::TestFacility test(app, false);
ctk::Device dev;
dev.open("baseDevice");
// Initialize the device with some values
dev.write<int32_t>("/readWrite", 4);
dev.write<int32_t>("/writeOnlyRB.DUMMY_WRITEABLE", 8);
dev.write<int32_t>("/secondReadWrite", 16);
// Set initial values for the variables
test.setScalarDefault<int32_t>("/taggedReadWrite", 12);
test.setScalarDefault<int32_t>("/taggedWriteOnly", 24);
test.setScalarDefault<int32_t>("/untagged", 36);
test.runApplication();
// Wait for the device to become ready
CHECK_EQUAL_TIMEOUT(test.readScalar<int32_t>("/Devices/taggedDevice/status"), 0, 1000);
up.wait(false);
auto taggedReadWriteCs = test.getScalar<int32_t>("/taggedReadWrite");
auto taggedWriteOnlyCs = test.getScalar<int32_t>("/taggedWriteOnly");
auto untagged = test.getScalar<int32_t>("/untagged");
// Check that the values are still on the values we have written explicitly
// into the device, and not the initial values we configured above
BOOST_TEST(dev.read<int32_t>("/readWrite") == 4);
BOOST_TEST(dev.read<int32_t>("/writeOnlyRB") == 8);
// Check that instead those values have been propagated to the CS (where applicable)
CHECK_EQUAL_TIMEOUT((taggedReadWriteCs.readLatest(), int(taggedReadWriteCs)), 4, 2000);
// The untagged register should have received the initial value from the CS
BOOST_TEST(dev.read<int32_t>("/secondReadWrite") == 36);
// Just do normal operations
taggedReadWriteCs.setAndWrite(48);
taggedWriteOnlyCs.setAndWrite(96);
untagged.setAndWrite(128);
CHECK_EQUAL_TIMEOUT(dev.read<int32_t>("/readWrite"), 48, 2000);
CHECK_EQUAL_TIMEOUT(dev.read<int32_t>("/writeOnlyRB"), 96, 2000);
CHECK_EQUAL_TIMEOUT(dev.read<int32_t>("/secondReadWrite"), 128, 2000);
dev.write<int32_t>("/readWrite", 3);
dev.write<int32_t>("/writeOnlyRB.DUMMY_WRITEABLE", 7);
dev.write<int32_t>("/secondReadWrite", 15);
devModule.reportException("Trigger device recovery");
// Wait for ApplicationCore to recover
CHECK_EQUAL_TIMEOUT(test.readScalar<int32_t>("/Devices/taggedDevice/status"), 0, 1000);
// The two tagged registers should keep their values, the untagged register should receive the value written before
CHECK_EQUAL_TIMEOUT(dev.read<int32_t>("/readWrite"), 3, 1000);
CHECK_EQUAL_TIMEOUT(dev.read<int32_t>("/writeOnlyRB"), 7, 1000);
CHECK_EQUAL_TIMEOUT(dev.read<int32_t>("/secondReadWrite"), 128, 1000);
// The read-write register should have propagated its value to the CS
CHECK_EQUAL_TIMEOUT((taggedReadWriteCs.readLatest(), int32_t(taggedReadWriteCs)), 3, 2000);
app.shutdown();
}
// Create a ThreadedFanOutWithReturn and check that we can use the
// just the recovery value as an input
BOOST_AUTO_TEST_CASE(testThreadedFanOutWithReturnOnlyRecoverValue) {
ctk::BackendFactory::getInstance().setDMapFilePath("testTagged.dmap");
ctk::Device dev;
dev.open("baseDevice");
// Initialize the device with some values
dev.write<int32_t>("/readWrite", 4);
TestApplication app;
ctk::DeviceModule devModule{&app, "taggedDevice", "/trigger"};
std::atomic<bool> up{false};
ctk::ScalarPushInput<int32_t> deviceInput{&app.mod, "/taggedReadWrite", "", ""};
app.mod.doMainLoop = [&]() {
up = true;
up.notify_one();
};
ctk::TestFacility test(app, false);
// Set initial values for the variables
test.setScalarDefault<int32_t>("/taggedReadWrite", 12);
test.runApplication();
up.wait(false);
// Check that the device did not receive the initial value in this setup
CHECK_EQUAL_TIMEOUT(dev.read<int32_t>("/readWrite"), 4, 1000);
// Check that the input is having the value from the device
CHECK_EQUAL_TIMEOUT(deviceInput, 4, 1000);
dev.write<int32_t>("/readWrite", 8);
devModule.reportException("Trigger device recovery");
// Wait for ApplicationCore to recover
CHECK_EQUAL_TIMEOUT(test.readScalar<int32_t>("/Devices/taggedDevice/status"), 0, 1000);
deviceInput.read();
CHECK_EQUAL_TIMEOUT(int32_t(deviceInput), 8, 1000);
app.shutdown();
}
// Force the connection maker to create a direct connection with constant
// feeder
BOOST_AUTO_TEST_CASE(testConstantFeederInversion) {
ctk::BackendFactory::getInstance().setDMapFilePath("testTagged.dmap");
ctk::Device dev;
dev.open("baseDevice");
// Initialize the device with some values
dev.write<int32_t>("/readWrite", 4);
TestApplication app;
ctk::DeviceModule devModule{&app, "taggedDevice", "/trigger"};
std::atomic<bool> up{false};
ctk::ScalarPushInput<int32_t> deviceInput{&app.mod, "/taggedReadWrite", "", ""};
app.mod.doMainLoop = [&]() {
up = true;
up.notify_one();
};
ctk::TestFacility test(app, false);
app.optimiseUnmappedVariables({"/taggedReadWrite"});
test.runApplication();
up.wait(false);
CHECK_EQUAL_TIMEOUT(int32_t(deviceInput), 4, 1000);
CHECK_EQUAL_TIMEOUT(dev.read<int32_t>("/readWrite"), 4, 1000);
devModule.reportException("Trigger device recovery");
// Wait for ApplicationCore to recover
CHECK_EQUAL_TIMEOUT(test.readScalar<int32_t>("/Devices/taggedDevice/status"), 0, 1000);
deviceInput.read();
CHECK_EQUAL_TIMEOUT(int32_t(deviceInput), 4, 1000);
app.shutdown();
}
// Have an application module that has an explicit accessor requesting reverse recovery
BOOST_AUTO_TEST_CASE(testFeedingFanOutWithExplicitAccessor) {
ctk::BackendFactory::getInstance().setDMapFilePath("testTagged.dmap");
ctk::Device dev;
dev.open("baseDevice");
// Initialize the device with some values
dev.write<int32_t>("/readWrite", 4);
TestApplication app;
ctk::DeviceModule devModule{&app, "taggedDevice", "/trigger"};
std::atomic<bool> up{false};
ctk::ScalarOutputReverseRecovery<int32_t> deviceInput{&app.mod, "/taggedReadWrite", "", ""};
app.mod.doMainLoop = [&]() {
up = true;
up.notify_one();
};
ctk::TestFacility test(app, false);
test.runApplication();
up.wait(false);
CHECK_EQUAL_TIMEOUT((int32_t(deviceInput)), 4, 1000);
CHECK_EQUAL_TIMEOUT(dev.read<int32_t>("/readWrite"), 4, 1000);
// Check that we can still write down to the device properly
deviceInput.setAndWrite(44);
CHECK_EQUAL_TIMEOUT(dev.read<int32_t>("/readWrite"), 44, 1000);
// Manipulate the device so we can check that the value is propagated
// from the device to the application, as expected, after the device recovers
dev.write<int32_t>("/readWrite", 111);
devModule.reportException("Trigger device recovery");
// Wait for ApplicationCore to recover
CHECK_EQUAL_TIMEOUT(test.readScalar<int32_t>("/Devices/taggedDevice/status"), 0, 1000);
deviceInput.read();
CHECK_EQUAL_TIMEOUT(int32_t(deviceInput), 111, 1000);
app.shutdown();
}
// Have an application module that has an explicit accessor requesting reverse recovery
BOOST_AUTO_TEST_CASE(testFanOutWithExplicitAccessor02) {
ctk::BackendFactory::getInstance().setDMapFilePath("testTagged.dmap");
ctk::Device dev;
dev.open("baseDevice");
// Initialize the device with some values
dev.write<int32_t>("/readWrite", 4);
TestApplication app;
ctk::DeviceModule devModule{&app, "taggedDevice", "/trigger"};
std::atomic<bool> up{false};
ctk::ScalarOutputReverseRecovery<int32_t> deviceInput{&app.mod, "/taggedReadWrite", "", ""};
app.mod.doMainLoop = [&]() {
up = true;
up.notify_one();
};
ctk::TestFacility test(app, false);
app.optimiseUnmappedVariables({"/taggedReadWrite"});
test.runApplication();
up.wait(false);
CHECK_EQUAL_TIMEOUT((int32_t(deviceInput)), 4, 1000);
CHECK_EQUAL_TIMEOUT(dev.read<int32_t>("/readWrite"), 4, 1000);
// Check that we can still write down to the device properly
deviceInput.setAndWrite(44);
CHECK_EQUAL_TIMEOUT(dev.read<int32_t>("/readWrite"), 44, 1000);
// Manipulate the device so we can check that the value is propagated
// from the device to the application, as expected, after the device recovers
dev.write<int32_t>("/readWrite", 111);
devModule.reportException("Trigger device recovery");
// Wait for ApplicationCore to recover
CHECK_EQUAL_TIMEOUT(test.readScalar<int32_t>("/Devices/taggedDevice/status"), 0, 1000);
deviceInput.read();
CHECK_EQUAL_TIMEOUT(int32_t(deviceInput), 111, 1000);
app.shutdown();
}
// Special case: Reverse recovery, but without any device
BOOST_AUTO_TEST_CASE(testReverseRecoveryFromCS) {
TestApplication app;
std::atomic<bool> up{false};
ctk::ScalarOutputReverseRecovery<int32_t> csOutput{&app.mod, "/taggedReadWrite", "", ""};
app.mod.doMainLoop = [&]() {
up = true;
up.notify_one();
};
ctk::TestFacility test(app, false);
test.setScalarDefault("/taggedReadWrite", 4711);
test.runApplication();
up.wait(false);
CHECK_EQUAL_TIMEOUT(csOutput, 4711, 2000);
app.shutdown();
}
\ No newline at end of file