Skip to content
Snippets Groups Projects
Application.cc 52.6 KiB
Newer Older
/*
 * Application.cc
 *
 *  Created on: Jun 10, 2016
 *      Author: Martin Hierholzer
 */

#include <string>
#include <thread>
#include <ChimeraTK/BackendFactory.h>
#include "Application.h"
#include "ApplicationModule.h"
#include "ArrayAccessor.h"
#include "ConstantAccessor.h"
#include "DebugPrintAccessorDecorator.h"
#include "DeviceModule.h"
#include "FeedingFanOut.h"
#include "ThreadedFanOut.h"
#include "TriggerFanOut.h"
#include "VariableNetworkGraphDumpingVisitor.h"
#include "VariableNetworkNode.h"
#include "Visitor.h"
#include "XMLGeneratorVisitor.h"
#include "ExceptionHandlingDecorator.h"
/*********************************************************************************************************************/

Application::Application(const std::string& name) : ApplicationBase(name), EntityOwner(name, "") {
  if(applicationName == "") {
    throw ChimeraTK::logic_error("Error: An instance of Application must have its applicationName set.");
  // check if application name contains illegal characters
  std::string legalChars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890_";
  bool nameContainsIllegalChars = name.find_first_not_of(legalChars) != std::string::npos;
  if(nameContainsIllegalChars) {
    throw ChimeraTK::logic_error("Error: The application name may only contain "
                                 "alphanumeric characters and underscores.");
}

/*********************************************************************************************************************/

  if(initialiseCalled) {
    throw ChimeraTK::logic_error("Application::initialise() was already called before.");
  }

  // call the user-defined defineConnections() function which describes the
  // structure of the application
  // connect any unconnected accessors with constant values
  processUnconnectedNodes();

  // realise the connections between variable accessors as described in the
  // initialise() function

  // set flag to prevent further calls to this function and to prevent definition of additional connections.
  initialiseCalled = true;
}

/*********************************************************************************************************************/

/** Functor class to create a constant for otherwise unconnected variables,
 * suitable for boost::fusion::for_each(). */
  struct CreateConstantForUnconnectedVar {
    /// @todo test unconnected variables for all types!
    CreateConstantForUnconnectedVar(const std::type_info& typeInfo, bool makeFeeder, size_t length)
    : _typeInfo(typeInfo), _makeFeeder(makeFeeder), _length(length) {}

    template<typename PAIR>
    void operator()(PAIR&) const {
      if(typeid(typename PAIR::first_type) != _typeInfo) return;
      theNode = VariableNetworkNode::makeConstant<typename PAIR::first_type>(
          _makeFeeder, typename PAIR::first_type(), _length);
      done = true;
    }
    const std::type_info& _typeInfo;
    bool _makeFeeder;
    size_t _length;
    mutable bool done{false};
    mutable VariableNetworkNode theNode;
  };
} // namespace

/*********************************************************************************************************************/

  for(auto& module : getSubmoduleListRecursive()) {
    for(auto& accessor : module->getAccessorList()) {
      if(!accessor.hasOwner()) {
        if(enableUnconnectedVariablesWarning) {
          std::cerr << "*** Warning: Variable '" << accessor.getQualifiedName()
                    << "' is not connected. " // LCOV_EXCL_LINE
                       "Reading will always result in 0, writing will be ignored."
                    << std::endl; // LCOV_EXCL_LINE
        bool makeFeeder = !(networkList.back().hasFeedingNode());
        size_t length = accessor.getNumberOfElements();
        auto callable = CreateConstantForUnconnectedVar(accessor.getValueType(), makeFeeder, length);
        boost::fusion::for_each(ChimeraTK::userTypeMap(), std::ref(callable));
        assert(callable.done);
        constantList.emplace_back(callable.theNode);
/*********************************************************************************************************************/

void Application::checkConnections() {
  // check all networks for validity
  for(auto& network : networkList) {
  // note: this in principle cannot happen, since processUnconnectedNodes() is
  // called before
  for(auto& module : getSubmoduleListRecursive()) {
    for(auto& accessor : module->getAccessorList()) {
      if(!accessor.hasOwner()) {
        throw ChimeraTK::logic_error("The accessor '" + accessor.getName() + "' of the module '" +
            module->getName() +      // LCOV_EXCL_LINE
            "' was not connected!"); // LCOV_EXCL_LINE
      }
    }
  }
}

/*********************************************************************************************************************/

void Application::run() {
  if(testableMode) {
    if(!testFacilityRunApplicationCalled) {
      throw ChimeraTK::logic_error(
          "Testable mode enabled but Application::run() called directly. Call TestFacility::runApplication() instead.");
    }
  }

  if(runCalled) {
    throw ChimeraTK::logic_error("Application::run() has already been called before.");
  }
  runCalled = true;

  // set all initial version numbers in the modules to the same value
  for(auto& module : getSubmoduleListRecursive()) {
    if(module->getModuleType() != ModuleType::ApplicationModule) continue;
  for(auto& module : getSubmoduleListRecursive()) {
  for(auto& deviceModule : deviceModuleList) {
    deviceModule->prepare();
  }
  // check for application PVs which have a value, which needs to be propagated as initial value
  for(auto& module : getSubmoduleListRecursive()) {
    for(auto& var : module->getAccessorList()) {
      if(!var.getAppAccessorNoType().isWriteable()) continue;
      if(var.getAppAccessorNoType().getVersionNumber() >= getStartVersion()) {
  // Switch life-cycle state to run
  lifeCycleState = LifeCycleState::run;

  // start the necessary threads for the FanOuts etc.
  for(auto& internalModule : internalModuleList) {
  for(auto& deviceModule : deviceModuleList) {
    deviceModule->run();
  }
  for(auto& module : getSubmoduleListRecursive()) {
    module->run();
  }
}

/*********************************************************************************************************************/

  // switch life-cycle state
  lifeCycleState = LifeCycleState::shutdown;

  // first allow to run the application threads again, if we are in testable
  // mode
  if(testableMode && testableModeTestLock()) {
  // deactivate the FanOuts first, since they have running threads inside
  // accessing the modules etc. (note: the modules are members of the
  // Application implementation and thus get destroyed after this destructor)
  for(auto& internalModule : internalModuleList) {
  }

  // next deactivate the modules, as they have running threads inside as well
  for(auto& module : getSubmoduleListRecursive()) {
  for(auto& deviceModule : deviceModuleList) {
    deviceModule->terminate();
  }
  ApplicationBase::shutdown();
}
/*********************************************************************************************************************/

  // also search for unconnected nodes - this is here only executed to print the
  // warnings
  // create connections for exception handling
  for(auto& devModule : deviceModuleList) {
    devModule->defineConnections();
  }

  // finalise connections: decide still-undecided details, in particular for
  // control-system and device varibales, which get created "on the fly".
  XMLGeneratorVisitor visitor;
  visitor.dispatch(*this);
  visitor.save(applicationName + ".xml");
}

/*********************************************************************************************************************/

VariableNetwork& Application::connect(VariableNetworkNode a, VariableNetworkNode b) {
  // if one of the nodes has the value type AnyType, set it to the type of the
  // other if both are AnyType, nothing changes.
  if(a.getValueType() == typeid(AnyType)) {
  }
  else if(b.getValueType() == typeid(AnyType)) {
  // if one of the nodes has not yet a defined number of elements, set it to the
  // number of elements of the other. if both are undefined, nothing changes.
  if(a.getNumberOfElements() == 0) {
    a.setNumberOfElements(b.getNumberOfElements());
  }
  else if(b.getNumberOfElements() == 0) {
    b.setNumberOfElements(a.getNumberOfElements());
  }
  if(a.getNumberOfElements() != b.getNumberOfElements()) {
    what << "*** ERROR: Cannot connect array variables with difference number "
            "of elements!"
         << std::endl;
    what << "Node A:" << std::endl;
    a.dump(what);
    what << "Node B:" << std::endl;
    b.dump(what);
    throw ChimeraTK::logic_error(what.str());
  // if both nodes already have an owner, we are either already done (same
  // owners) or we need to try to merge the networks
  if(a.hasOwner() && b.hasOwner()) {
    if(&(a.getOwner()) != &(b.getOwner())) {
      auto& networkToMerge = b.getOwner();
      bool success = a.getOwner().merge(networkToMerge);
      if(!success) {
        what << "*** ERROR: Trying to connect two nodes which are already part "
                "of different networks, and merging these"
                " networks is not possible (cannot have two non-control-system "
                "or two control-system feeders)!"
             << std::endl;
        what << "Node A:" << std::endl;
        a.dump(what);
        what << "Node B:" << std::endl;
        b.dump(what);
        what << "Owner of node A:" << std::endl;
        a.getOwner().dump("", what);
        what << "Owner of node B:" << std::endl;
        b.getOwner().dump("", what);
        throw ChimeraTK::logic_error(what.str());
      }
      for(auto n = networkList.begin(); n != networkList.end(); ++n) {
        if(&*n == &networkToMerge) {
  else if(a.hasOwner()) {
  else if(b.hasOwner()) {
  else {
    networkList.emplace_back();
    networkList.back().addNode(a);
    networkList.back().addNode(b);
  }
/*********************************************************************************************************************/

template<typename UserType>
boost::shared_ptr<ChimeraTK::NDRegisterAccessor<UserType>> Application::createDeviceVariable(
    const std::string& deviceAlias, const std::string& registerName, VariableDirection direction, UpdateMode mode,
    size_t nElements) {
  if(deviceMap.count(deviceAlias) == 0) {
    deviceMap[deviceAlias] = ChimeraTK::BackendFactory::getInstance().createBackend(deviceAlias);
  // use wait_for_new_data mode if push update mode was requested
  // Feeding to the network means reading from a device to feed it into the network.
  ChimeraTK::AccessModeFlags flags{};
  if(mode == UpdateMode::push && direction.dir == VariableDirection::feeding) flags = {AccessMode::wait_for_new_data};
  // obtain the register accessor from the device
  auto accessor = deviceMap[deviceAlias]->getRegisterAccessor<UserType>(registerName, nElements, 0, flags);
  // find the right DeviceModule for this alias name - required for exception handling
  DeviceModule* devmod = nullptr;
  for(auto& dm : deviceModuleList) {
    if(dm->deviceAliasOrURI == deviceAlias) {
      devmod = dm;
      break;
    }
  }
  assert(devmod != nullptr);

  // decorate the accessor with a ExceptionHandlingDecorator and return it
  // Consuming from the network means writing to the device what you consumed.
  if(direction.dir == VariableDirection::consuming) {
    // writable registers additionally get a recoveryAccessor
    // Notice: Don't use the fact that there is a recovery accessor to determine the direction in the ExceptionHandlingDecorator.
    // There will be write-accessors without recovery accessors in future (intentionally turned off by the application programmer)
    auto recoveryAccessor = deviceMap[deviceAlias]->getRegisterAccessor<UserType>(registerName, nElements, 0, flags);
    return boost::make_shared<ExceptionHandlingDecorator<UserType>>(accessor, *devmod, direction, recoveryAccessor);
  else {
    return boost::make_shared<ExceptionHandlingDecorator<UserType>>(accessor, *devmod, direction);
}

/*********************************************************************************************************************/

template<typename UserType>
boost::shared_ptr<ChimeraTK::NDRegisterAccessor<UserType>> Application::createProcessVariable(
    VariableNetworkNode const& node) {
  // determine the SynchronizationDirection
  SynchronizationDirection dir;
  if(node.getDirection().withReturn) {
  }
  else if(node.getDirection().dir == VariableDirection::feeding) {
    dir = SynchronizationDirection::controlSystemToDevice;
    dir = SynchronizationDirection::deviceToControlSystem;
  if(node.getDirection().dir == VariableDirection::consuming) { // Application-to-controlsystem must be
                                                                // push-type
    flags = {AccessMode::wait_for_new_data};
  }
  else {
    for(auto& consumer : node.getOwner().getConsumingNodes()) {
      if(consumer.getMode() == UpdateMode::push) flags = {AccessMode::wait_for_new_data};
  auto pvar = _processVariableManager->createProcessArray<UserType>(dir, node.getPublicName(),
      node.getNumberOfElements(), node.getOwner().getUnit(), node.getOwner().getDescription(), {}, 3, flags);
  auto varId = getNextVariableId();
  pvIdMap[pvar->getUniqueId()] = varId;
  // Decorate the process variable if testable mode is enabled and this is the
  // receiving end of the variable (feeding to the network). Also don't decorate, if the mode is polling.
  // Instead flag the variable to be polling, so the TestFacility is aware of
  // this.
  if(testableMode && node.getDirection().dir == VariableDirection::feeding) {
    // The transfer mode of this process variable is considered to be polling,
    // if only one consumer exists and this consumer is polling. Reason:
    // mulitple consumers will result in the use of a FanOut, so the
    // communication up to the FanOut will be push-type, even if all consumers
    // are poll-type.
    /// @todo Check if this is true!
    auto mode = UpdateMode::push;
    if(node.getOwner().countConsumingNodes() == 1) {
      if(node.getOwner().getConsumingNodes().front().getMode() == UpdateMode::poll) mode = UpdateMode::poll;
    if(mode != UpdateMode::poll) {
      auto pvarDec = boost::make_shared<TestableModeAccessorDecorator<UserType>>(pvar, true, false, varId, varId);
      testableMode_names[varId] = "ControlSystem:" + node.getPublicName();
/*********************************************************************************************************************/

template<typename UserType>
std::pair<boost::shared_ptr<ChimeraTK::NDRegisterAccessor<UserType>>,
    boost::shared_ptr<ChimeraTK::NDRegisterAccessor<UserType>>>
    Application::createApplicationVariable(VariableNetworkNode const& node, VariableNetworkNode const& consumer) {
  // obtain the meta data
  size_t nElements = node.getNumberOfElements();
  std::string name = node.getName();
  if(consumer.getType() != NodeType::invalid) {
    if(consumer.getMode() == UpdateMode::push) flags = {AccessMode::wait_for_new_data};
  }
  else {
    if(node.getMode() == UpdateMode::push) flags = {AccessMode::wait_for_new_data};
  std::pair<boost::shared_ptr<ChimeraTK::NDRegisterAccessor<UserType>>,
      boost::shared_ptr<ChimeraTK::NDRegisterAccessor<UserType>>>
  if(consumer.getType() != NodeType::invalid)
    assert(node.getDirection().withReturn == consumer.getDirection().withReturn);
  if(!node.getDirection().withReturn) {
    pvarPair = createSynchronizedProcessArray<UserType>(
        nElements, name, node.getUnit(), node.getDescription(), {}, 3, {}, flags);
    pvarPair = createBidirectionalSynchronizedProcessArray<UserType>(
        nElements, name, node.getUnit(), node.getDescription(), {}, 3, {}, {}, flags);
  assert(pvarPair.first->getName() != "");
  assert(pvarPair.second->getName() != "");
  size_t varId = getNextVariableId();
  size_t varIdReturn;
  if(node.getDirection().withReturn) varIdReturn = getNextVariableId();
  // decorate the process variable if testable mode is enabled and mode is
  // push-type
  if(testableMode && node.getMode() == UpdateMode::push) {
    if(!node.getDirection().withReturn) {
      pvarPair.first =
          boost::make_shared<TestableModeAccessorDecorator<UserType>>(pvarPair.first, false, true, varId, varId);
      pvarPair.second =
          boost::make_shared<TestableModeAccessorDecorator<UserType>>(pvarPair.second, true, false, varId, varId);
    }
    else {
      pvarPair.first =
          boost::make_shared<TestableModeAccessorDecorator<UserType>>(pvarPair.first, true, true, varIdReturn, varId);
      pvarPair.second =
          boost::make_shared<TestableModeAccessorDecorator<UserType>>(pvarPair.second, true, true, varId, varIdReturn);
    testableMode_names[varId] = "Internal:" + node.getQualifiedName();
    if(consumer.getType() != NodeType::invalid) {
      testableMode_names[varId] += "->" + consumer.getQualifiedName();
    if(node.getDirection().withReturn) testableMode_names[varIdReturn] = testableMode_names[varId] + " (return)";
  }

  // if debug mode was requested for either node, decorate both accessors
  if(debugMode_variableList.count(node.getUniqueId()) ||
      (consumer.getType() != NodeType::invalid && debugMode_variableList.count(consumer.getUniqueId()))) {
    if(consumer.getType() != NodeType::invalid) {
      assert(node.getDirection().dir == VariableDirection::feeding);
      assert(consumer.getDirection().dir == VariableDirection::consuming);
      pvarPair.first =
          boost::make_shared<DebugPrintAccessorDecorator<UserType>>(pvarPair.first, node.getQualifiedName());
      pvarPair.second =
          boost::make_shared<DebugPrintAccessorDecorator<UserType>>(pvarPair.second, consumer.getQualifiedName());
    }
    else {
      pvarPair.first =
          boost::make_shared<DebugPrintAccessorDecorator<UserType>>(pvarPair.first, node.getQualifiedName());
      pvarPair.second =
          boost::make_shared<DebugPrintAccessorDecorator<UserType>>(pvarPair.second, node.getQualifiedName());
/*********************************************************************************************************************/

void Application::makeConnections() {
  for(auto& devModule : deviceModuleList) {
    devModule->defineConnections();
  }
  // finalise connections: decide still-undecided details, in particular for
  // control-system and device varibales, which get created "on the fly".
  finaliseNetworks();
  // note: checks may not be run before since sometimes networks may only be
  // valid after optimisations
  for(auto& network : networkList) {
/*********************************************************************************************************************/
void Application::finaliseNetworks() {
  // check for control system variables which should be made bidirectional
  for(auto& network : networkList) {
    size_t nBidir = network.getFeedingNode().getDirection().withReturn ? 1 : 0;
    for(auto& consumer : network.getConsumingNodes()) {
      if(consumer.getDirection().withReturn) ++nBidir;
    if(nBidir != 1)
      continue; // only if there is exactly one node with return channel we need
                // to guess its peer
    if(network.getFeedingNode().getType() != NodeType::ControlSystem) {
      // only a feeding control system variable can be made bidirectional
      continue;
    network.getFeedingNode().setDirection({VariableDirection::feeding, true});
}

/*********************************************************************************************************************/

void Application::optimiseConnections() {
  // list of iterators of networks to be removed from the networkList after the
  // merge operation
  std::list<VariableNetwork*> deleteNetworks;
  // search for networks with the same feeder
  for(auto it1 = networkList.begin(); it1 != networkList.end(); ++it1) {
    for(auto it2 = it1; it2 != networkList.end(); ++it2) {
      if(it1 == it2) continue;
      auto feeder1 = it1->getFeedingNode();
      auto feeder2 = it2->getFeedingNode();
      // this optimisation is only necessary for device-type nodes, since
      // application and control-system nodes will automatically create merged
      // networks when having the same feeder
      /// @todo check if this assumtion is true! control-system nodes can be
      /// created with different types, too!
      if(feeder1.getType() != NodeType::Device || feeder2.getType() != NodeType::Device) continue;
      // check if referrring to same register
      if(feeder1.getDeviceAlias() != feeder2.getDeviceAlias()) continue;
      if(feeder1.getRegisterName() != feeder2.getRegisterName()) continue;
      // check if directions are the same
      if(feeder1.getDirection() != feeder2.getDirection()) continue;
      // check if value types and number of elements are compatible
      if(feeder1.getValueType() != feeder2.getValueType()) continue;
      if(feeder1.getNumberOfElements() != feeder2.getNumberOfElements()) continue;
      // check if transfer mode is the same
      if(feeder1.getMode() != feeder2.getMode()) continue;
      // check if triggers are compatible, if present
      if(feeder1.hasExternalTrigger() != feeder2.hasExternalTrigger()) continue;
      if(feeder1.hasExternalTrigger()) {
        if(feeder1.getExternalTrigger() != feeder2.getExternalTrigger()) continue;
      // everything should be compatible at this point: merge the networks. We
      // will merge the network of the outer loop into the network of the inner
      // loop, since the network of the outer loop will not be found a second
      // time in the inner loop.
      for(auto consumer : it1->getConsumingNodes()) {
        consumer.clearOwner();
        it2->addNode(consumer);
      }
      // if trigger present, remove corresponding trigger receiver node from the
      // trigger network
      if(feeder1.hasExternalTrigger()) {
        feeder1.getExternalTrigger().getOwner().removeNodeToTrigger(it1->getFeedingNode());
      // schedule the outer loop network for deletion and stop processing it
      deleteNetworks.push_back(&(*it1));
      break;
    }
  }
  // remove networks from the network list
  for(auto net : deleteNetworks) {
    networkList.remove(*net);
  }
}

/*********************************************************************************************************************/
void Application::dumpConnections(std::ostream& stream) {                                         // LCOV_EXCL_LINE
  stream << "==== List of all variable connections of the current Application ====" << std::endl; // LCOV_EXCL_LINE
  for(auto& network : networkList) {                                                              // LCOV_EXCL_LINE
    network.dump("", stream);                                                                     // LCOV_EXCL_LINE
  }                                                                                               // LCOV_EXCL_LINE
  stream << "=====================================================================" << std::endl; // LCOV_EXCL_LINE
} // LCOV_EXCL_LINE
void Application::dumpConnectionGraph(const std::string& fileName) {
  std::fstream file{fileName, std::ios_base::out};
  VariableNetworkGraphDumpingVisitor visitor{file};
  visitor.dispatch(*this);
/*********************************************************************************************************************/

Application::TypedMakeConnectionCaller::TypedMakeConnectionCaller(Application& owner, VariableNetwork& network)
: _owner(owner), _network(network) {}

/*********************************************************************************************************************/

template<typename PAIR>
void Application::TypedMakeConnectionCaller::operator()(PAIR&) const {
  if(typeid(typename PAIR::first_type) != _network.getValueType()) return;
  _owner.typedMakeConnection<typename PAIR::first_type>(_network);
  done = true;
}

/*********************************************************************************************************************/

void Application::makeConnectionsForNetwork(VariableNetwork& network) {
  // if the network has been created already, do nothing
  if(network.isCreated()) return;
  // if the trigger type is external, create the trigger first
  if(network.getFeedingNode().hasExternalTrigger()) {
    VariableNetwork& dependency = network.getFeedingNode().getExternalTrigger().getOwner();
    if(!dependency.isCreated()) makeConnectionsForNetwork(dependency);
  // defer actual network creation to templated function
  auto callable = TypedMakeConnectionCaller(*this, network);
  boost::fusion::for_each(ChimeraTK::userTypeMap(), std::ref(callable));
}

/*********************************************************************************************************************/

template<typename UserType>
void Application::typedMakeConnection(VariableNetwork& network) {
  try {                          // catch exceptions to add information about the failed network
    bool connectionMade = false; // to check the logic...

    size_t nNodes = network.countConsumingNodes() + 1;
    auto feeder = network.getFeedingNode();
    auto consumers = network.getConsumingNodes();
    bool useExternalTrigger = network.getTriggerType() == VariableNetwork::TriggerType::external;
    bool useFeederTrigger = network.getTriggerType() == VariableNetwork::TriggerType::feeder;
    bool constantFeeder = feeder.getType() == NodeType::Constant;

    // 1st case: the feeder requires a fixed implementation
    if(feeder.hasImplementation() && !constantFeeder) {
      // Create feeding implementation. Note: though the implementation is derived
      // from the feeder, it will be used as the implementation of the (or one of
      // the) consumer. Logically, implementations are always pairs of
      // implementations (sender and receiver), but in this case the feeder
      // already has a fixed implementation pair. So our feedingImpl will contain
      // the consumer-end of the implementation pair. This is the reason why the
      // functions createProcessScalar() and createDeviceAccessor() get the
      // VariableDirection::consuming.
      boost::shared_ptr<ChimeraTK::NDRegisterAccessor<UserType>> feedingImpl;
      if(feeder.getType() == NodeType::Device) {
        feedingImpl = createDeviceVariable<UserType>(feeder.getDeviceAlias(), feeder.getRegisterName(),
            {VariableDirection::feeding, false}, feeder.getMode(), feeder.getNumberOfElements());
      else if(feeder.getType() == NodeType::ControlSystem) {
        feedingImpl = createProcessVariable<UserType>(feeder);
      else {
        throw ChimeraTK::logic_error("Unexpected node type!"); // LCOV_EXCL_LINE (assert-like)
      }

      // if we just have two nodes, directly connect them
      if(nNodes == 2 && !useExternalTrigger) {
        auto consumer = consumers.front();
        if(consumer.getType() == NodeType::Application) {
          consumer.setAppAccessorImplementation(feedingImpl);
          //check if the feedingImpl is from a device. In this case it has been decorated with an ExceptionHandlingDecorator
          auto feedingDeviceImpl =
              boost::dynamic_pointer_cast<ChimeraTK::ExceptionHandlingDecorator<UserType>>(feedingImpl);
          if(feedingDeviceImpl) {
            auto owningModule = consumer.getOwningModule(); // application module or variable group
            feedingDeviceImpl->setOwner(owningModule);
          }
        else if(consumer.getType() == NodeType::Device) {
          auto consumingImpl = createDeviceVariable<UserType>(consumer.getDeviceAlias(), consumer.getRegisterName(),
              {VariableDirection::consuming, false}, consumer.getMode(), consumer.getNumberOfElements());
          // connect the Device with e.g. a ControlSystem node via a
          // ThreadedFanOut
          auto fanOut = boost::make_shared<ThreadedFanOut<UserType>>(feedingImpl, network);
          fanOut->addSlave(consumingImpl, consumer);
          internalModuleList.push_back(fanOut);
          connectionMade = true;
        }
        else if(consumer.getType() == NodeType::ControlSystem) {
          auto consumingImpl = createProcessVariable<UserType>(consumer);
          // connect the ControlSystem with e.g. a Device node via an
          // ThreadedFanOut
          auto fanOut = boost::make_shared<ThreadedFanOut<UserType>>(feedingImpl, network);
          fanOut->addSlave(consumingImpl, consumer);
          internalModuleList.push_back(fanOut);
          connectionMade = true;
        }
        else if(consumer.getType() == NodeType::TriggerReceiver) {
          consumer.getNodeToTrigger().getOwner().setExternalTriggerImpl(feedingImpl);
          connectionMade = true;
          throw ChimeraTK::logic_error("Unexpected node type!"); // LCOV_EXCL_LINE (assert-like)
      else { /* !(nNodes == 2 && !useExternalTrigger) */

        // create the right FanOut type
        boost::shared_ptr<FanOut<UserType>> fanOut;
        boost::shared_ptr<ConsumingFanOut<UserType>> consumingFanOut;
        if(useExternalTrigger) {
          // if external trigger is enabled, use externally triggered threaded
          // FanOut. Create one per external trigger impl.
          void* triggerImplId = network.getExternalTriggerImpl().get();
          auto triggerFanOut = triggerMap[triggerImplId];
            // find the right DeviceModule for this alias name - required for exception handling
            DeviceModule* devmod = nullptr;
            std::string deviceAlias = feeder.getDeviceAlias();
            for(auto& dm : deviceModuleList) {
              if(dm->deviceAliasOrURI == deviceAlias) {
                devmod = dm;
                break;
              }
            }
            assert(devmod != nullptr);

            // create the trigger fan out and store it in the map and the internalModuleList
            triggerFanOut = boost::make_shared<TriggerFanOut>(network.getExternalTriggerImpl(), *devmod, network);
            internalModuleList.push_back(triggerFanOut);
          }
          fanOut = triggerFanOut->addNetwork(feedingImpl);
        }
        else if(useFeederTrigger) {
          // if the trigger is provided by the pushing feeder, use the treaded
          // version of the FanOut to distribute new values immediately to all
          // consumers. Depending on whether we have a return channel or not, pick
          // the right implementation of the FanOut
          boost::shared_ptr<ThreadedFanOut<UserType>> threadedFanOut;
          if(!feeder.getDirection().withReturn) {
            threadedFanOut = boost::make_shared<ThreadedFanOut<UserType>>(feedingImpl, network);
            threadedFanOut = boost::make_shared<ThreadedFanOutWithReturn<UserType>>(feedingImpl, network);
          }
          internalModuleList.push_back(threadedFanOut);
          fanOut = threadedFanOut;
        }
        else {
          assert(network.hasApplicationConsumer()); // checkConnections should
                                                    // catch this
          consumingFanOut = boost::make_shared<ConsumingFanOut<UserType>>(feedingImpl);
          fanOut = consumingFanOut;
        }
        // In case we have one or more trigger receivers among our consumers, we
        // produce one consuming application variable for each device. Later this will create a TriggerFanOut for
        // each trigger consumer, i.e. one per device so one blocking device does not affect the others.
        /** Map of deviceAliases to their corresponding TriggerFanOuts. */
        std::map<std::string, boost::shared_ptr<ChimeraTK::NDRegisterAccessor<UserType>>> triggerFanOuts;

        // add all consumers to the FanOut
        for(auto& consumer : consumers) {
          if(consumer.getType() == NodeType::Application) {
            if(consumingFanOut && consumer.getMode() == UpdateMode::poll) {
              consumer.setAppAccessorImplementation<UserType>(consumingFanOut);
              consumingFanOut.reset();
            }
            else {
              auto impls = createApplicationVariable<UserType>(consumer);
              fanOut->addSlave(impls.first, consumer);
              consumer.setAppAccessorImplementation<UserType>(impls.second);
            }
          }
          else if(consumer.getType() == NodeType::ControlSystem) {
            auto impl = createProcessVariable<UserType>(consumer);
            fanOut->addSlave(impl, consumer);
          }
          else if(consumer.getType() == NodeType::Device) {
            auto impl = createDeviceVariable<UserType>(consumer.getDeviceAlias(), consumer.getRegisterName(),
                {VariableDirection::consuming, false}, consumer.getMode(), consumer.getNumberOfElements());
            fanOut->addSlave(impl, consumer);
          }
          else if(consumer.getType() == NodeType::TriggerReceiver) {
            std::string deviceAlias = consumer.getNodeToTrigger().getOwner().getFeedingNode().getDeviceAlias();
            auto triggerFanOut = triggerFanOuts[deviceAlias];
            if(!triggerFanOut) { // triggerFanOut is a shared pointer, which evaluates false if default constructed.
              // create a new process variable pair and set the sender/feeder to the fan out
              auto triggerConnection = createApplicationVariable<UserType>(feeder);
              triggerFanOut = triggerConnection.second;
              triggerFanOuts[deviceAlias] = triggerFanOut;
              fanOut->addSlave(triggerConnection.first, consumer);
            }
            consumer.getNodeToTrigger().getOwner().setExternalTriggerImpl(triggerFanOut);
            throw ChimeraTK::logic_error("Unexpected node type!"); // LCOV_EXCL_LINE (assert-like)
        connectionMade = true;
      }
    }
    // 2nd case: the feeder does not require a fixed implementation
    else if(!constantFeeder) { /* !feeder.hasImplementation() */
      // we should be left with an application feeder node
      if(feeder.getType() != NodeType::Application) {
        throw ChimeraTK::logic_error("Unexpected node type!"); // LCOV_EXCL_LINE (assert-like)
      }
      assert(!useExternalTrigger);
      // if we just have two nodes, directly connect them
      if(nNodes == 2) {
        auto consumer = consumers.front();
        if(consumer.getType() == NodeType::Application) {
          auto impls = createApplicationVariable<UserType>(feeder, consumer);
          feeder.setAppAccessorImplementation<UserType>(impls.first);
          consumer.setAppAccessorImplementation<UserType>(impls.second);
          connectionMade = true;
        }
        else if(consumer.getType() == NodeType::ControlSystem) {
          auto impl = createProcessVariable<UserType>(consumer);
          feeder.setAppAccessorImplementation<UserType>(impl);
          connectionMade = true;
        }
        else if(consumer.getType() == NodeType::Device) {
          auto impl = createDeviceVariable<UserType>(consumer.getDeviceAlias(), consumer.getRegisterName(),
              {VariableDirection::consuming, false}, consumer.getMode(), consumer.getNumberOfElements());
          feeder.setAppAccessorImplementation<UserType>(impl);
          connectionMade = true;
        }
        else if(consumer.getType() == NodeType::TriggerReceiver) {
          auto impls = createApplicationVariable<UserType>(feeder, consumer);
          feeder.setAppAccessorImplementation<UserType>(impls.first);
          consumer.getNodeToTrigger().getOwner().setExternalTriggerImpl(impls.second);
          connectionMade = true;
        }
        else if(consumer.getType() == NodeType::Constant) {
          auto impl = consumer.getConstAccessor<UserType>();
          feeder.setAppAccessorImplementation<UserType>(impl);
          connectionMade = true;
        }
        else {
          throw ChimeraTK::logic_error("Unexpected node type!"); // LCOV_EXCL_LINE (assert-like)
        // create FanOut and use it as the feeder implementation
        auto fanOut = boost::make_shared<FeedingFanOut<UserType>>(feeder.getName(), feeder.getUnit(),
            feeder.getDescription(), feeder.getNumberOfElements(), feeder.getDirection().withReturn);
        feeder.setAppAccessorImplementation<UserType>(fanOut);

        // In case we have one or more trigger receivers among our consumers, we
        // produce one consuming application variable for each device. Later this will create a TriggerFanOut for
        // each trigger consimer, i.e. one per device so one blocking device does not affect the others.
        /** Map of deviceAliases to their corresponding TriggerFanOuts. */
        std::map<std::string, boost::shared_ptr<ChimeraTK::NDRegisterAccessor<UserType>>> triggerFanOuts;

        for(auto& consumer : consumers) {
          if(consumer.getType() == NodeType::Application) {
            auto impls = createApplicationVariable<UserType>(consumer);
            fanOut->addSlave(impls.first, consumer);
            consumer.setAppAccessorImplementation<UserType>(impls.second);
          }
          else if(consumer.getType() == NodeType::ControlSystem) {
            auto impl = createProcessVariable<UserType>(consumer);
            fanOut->addSlave(impl, consumer);
          }
          else if(consumer.getType() == NodeType::Device) {
            auto impl = createDeviceVariable<UserType>(consumer.getDeviceAlias(), consumer.getRegisterName(),
                {VariableDirection::consuming, false}, consumer.getMode(), consumer.getNumberOfElements());
            fanOut->addSlave(impl, consumer);
          }
          else if(consumer.getType() == NodeType::TriggerReceiver) {
            std::string deviceAlias = consumer.getNodeToTrigger().getOwner().getFeedingNode().getDeviceAlias();
            auto triggerFanOut = triggerFanOuts[deviceAlias];
            if(!triggerFanOut) { // triggerFanOut is a shared pointer, which evaluates false if default constructed.
              // create a new process variable pair and set the sender/feeder to the fan out
              auto triggerConnection = createApplicationVariable<UserType>(feeder);
              triggerFanOut = triggerConnection.second;
              triggerFanOuts[deviceAlias] = triggerFanOut;
              fanOut->addSlave(triggerConnection.first, consumer);
            }
            consumer.getNodeToTrigger().getOwner().setExternalTriggerImpl(triggerFanOut);
          }
          else {
            throw ChimeraTK::logic_error("Unexpected node type!"); // LCOV_EXCL_LINE (assert-like)
          }
        }
        connectionMade = true;
    else { /* constantFeeder */
      assert(feeder.getType() == NodeType::Constant);
      auto feedingImpl = feeder.getConstAccessor<UserType>();
      assert(feedingImpl != nullptr);
      for(auto& consumer : consumers) {
        if(consumer.getType() == NodeType::Application) {
          if(testableMode) {
            auto varId = getNextVariableId();
            auto pvarDec =
                boost::make_shared<TestableModeAccessorDecorator<UserType>>(feedingImpl, true, false, varId, varId);
            testableMode_names[varId] = "Constant";
            consumer.setAppAccessorImplementation<UserType>(pvarDec);
          }
          else {
            consumer.setAppAccessorImplementation<UserType>(feedingImpl);
          }
        }
        else if(consumer.getType() == NodeType::ControlSystem) {
          auto impl = createProcessVariable<UserType>(consumer);
          impl->accessChannel(0) = feedingImpl->accessChannel(0);
          impl->write();
        }
        else if(consumer.getType() == NodeType::Device) {
          auto impl = createDeviceVariable<UserType>(consumer.getDeviceAlias(), consumer.getRegisterName(),
              {VariableDirection::consuming, false}, consumer.getMode(), consumer.getNumberOfElements());
          impl->accessChannel(0) = feedingImpl->accessChannel(0);
          // find the right DeviceModule for this alias name
          DeviceModule* devmod = nullptr;