Skip to content
Snippets Groups Projects
Application.cc 74.12 KiB
// SPDX-FileCopyrightText: Deutsches Elektronen-Synchrotron DESY, MSK, ChimeraTK Project <chimeratk-support@desy.de>
// SPDX-License-Identifier: LGPL-3.0-or-later
#include "Application.h"

#include "ApplicationModule.h"
#include "ArrayAccessor.h"
#include "ConstantAccessor.h"
#include "ConsumingFanOut.h"
#include "DebugPrintAccessorDecorator.h"
#include "DeviceModule.h"
#include "ExceptionHandlingDecorator.h"
#include "FeedingFanOut.h"
#include "ScalarAccessor.h"
#include "TestableModeAccessorDecorator.h"
#include "ThreadedFanOut.h"
#include "TriggerFanOut.h"
#include "VariableNetworkGraphDumpingVisitor.h"
#include "VariableNetworkModuleGraphDumpingVisitor.h"
#include "VariableNetworkNode.h"
#include "Visitor.h"
#include "VoidAccessor.h"
#include "XMLGeneratorVisitor.h"

#include <ChimeraTK/BackendFactory.h>

#include <boost/fusion/container/map.hpp>

#include <exception>
#include <fstream>
#include <string>
#include <thread>

using namespace ChimeraTK;

std::timed_mutex Application::testableMode_mutex;

/*********************************************************************************************************************/

Application::Application(const std::string& name) : ApplicationBase(name), EntityOwner(name, "") {
  // check if the application name has been set
  if(applicationName == "") {
    shutdown();
    throw ChimeraTK::logic_error("Error: An instance of Application must have its applicationName set.");
  }
  // check if application name contains illegal characters
  std::string legalChars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890_";
  bool nameContainsIllegalChars = name.find_first_not_of(legalChars) != std::string::npos;
  if(nameContainsIllegalChars) {
    shutdown();
    throw ChimeraTK::logic_error("Error: The application name may only contain "
                                 "alphanumeric characters and underscores.");
  }
}

/*********************************************************************************************************************/

void Application::defineConnections() {
  ControlSystemModule cs;
  findTag(".*").connectTo(cs);
}

/*********************************************************************************************************************/

void Application::enableTestableMode() {
  threadName() = "TEST THREAD";
  testableMode = true;
  testableModeLock("enableTestableMode");
}

/*********************************************************************************************************************/

bool Application::testableModeTestLock() {
  if(!getInstance().testableMode) return false;
  return getTestableModeLockObject().owns_lock();
}

/*********************************************************************************************************************/

void Application::registerThread(const std::string& name) {
  Application::getInstance().setThreadName(name);
  pthread_setname_np(pthread_self(), name.substr(0, std::min<std::string::size_type>(name.length(), 15)).c_str());
}

/*********************************************************************************************************************/

void Application::incrementDataLossCounter(const std::string& name) {
  if(getInstance().debugDataLoss) {
    std::cout << "Data loss in variable " << name << std::endl;
  }
  getInstance().dataLossCounter++;
}

/*********************************************************************************************************************/

size_t Application::getAndResetDataLossCounter() {
  size_t counter = getInstance().dataLossCounter.load(std::memory_order_relaxed);
  while(!getInstance().dataLossCounter.compare_exchange_weak(
      counter, 0, std::memory_order_release, std::memory_order_relaxed)) {
  }
  return counter;
}

/*********************************************************************************************************************/

void Application::initialise() {
  if(initialiseCalled) {
    throw ChimeraTK::logic_error("Application::initialise() was already called before.");
  }

  // call the user-defined defineConnections() function which describes the structure of the application
  defineConnections();
  for(auto& module : getSubmoduleListRecursive()) {
    module->defineConnections();
  }

  // call defineConnections() for alldevice modules
  for(auto& devModule : deviceModuleMap) {
    devModule.second->defineConnections();
  }

  // find and handle constant nodes
  findConstantNodes();

  // connect any unconnected accessors with constant values
  processUnconnectedNodes();

  // realise the connections between variable accessors as described in the
  // initialise() function
  makeConnections();

  // set flag to prevent further calls to this function and to prevent definition of additional connections.
  initialiseCalled = true;
}

/*********************************************************************************************************************/

void Application::optimiseUnmappedVariables(const std::set<std::string>& names) {
  for(const auto& pv : names) {
    auto& node = controlSystemVariables.at(pv);
    auto& network = node.getOwner();
    if(network.getFeedingNode() == node) {
      // cannot optimise if network is fed by unmapped control system variable (anyway not eating CPU ressources)
      continue;
    }
    if(network.getConsumingNodes().size() == 1) {
      if(network.getFeedingNode().getType() == NodeType::Application) {
        callForType(network.getValueType(), [&](auto t) {
          using UserType = decltype(t);

          // replace comsuming node with constant in the model
          network.removeNode(network.getConsumingNodes().front());
          auto constNode = VariableNetworkNode::makeConstant<UserType>(
              false, UserType(), network.getFeedingNode().getNumberOfElements());
          network.addNode(constNode);

          // change application accessor into constant
          auto& acc = network.getFeedingNode().getAppAccessor<UserType>();
          auto constImpl = constNode.template createConstAccessor<UserType>({});
          acc.replace(constImpl);
        });
      }
      else {
        auto fanOut = network.getFanOut();
        assert(fanOut);
        // FanOut is present: disable it
        fanOut->disable();
      }
    }
    else {
      // more than one consumer: we have a fan out -> remove control system consumer from it.
      auto fanOut = network.getFanOut();
      assert(fanOut != nullptr);
      fanOut->removeSlave(_processVariableManager->getProcessVariable(pv));
    }
  }
}

/*********************************************************************************************************************/

/** Functor class to create a constant for otherwise unconnected variables,
 * suitable for boost::fusion::for_each(). */
namespace {
  struct CreateConstantForUnconnectedVar {
    /// @todo test unconnected variables for all types!
    CreateConstantForUnconnectedVar(const std::type_info& typeInfo, bool makeFeeder, size_t length)
    : _typeInfo(typeInfo), _makeFeeder(makeFeeder), _length(length) {}

    template<typename PAIR>
    void operator()(PAIR&) const {
      if(typeid(typename PAIR::first_type) != _typeInfo) return;
      theNode = VariableNetworkNode::makeConstant<typename PAIR::first_type>(
          _makeFeeder, typename PAIR::first_type(), _length);
      done = true;
    }

    const std::type_info& _typeInfo;
    bool _makeFeeder;
    size_t _length;
    mutable bool done{false};
    mutable VariableNetworkNode theNode;
  };
} // namespace

/*********************************************************************************************************************/

void Application::processUnconnectedNodes() {
  for(auto& module : getSubmoduleListRecursive()) {
    for(auto& accessor : module->getAccessorList()) {
      if(!accessor.hasOwner()) {
        if(enableUnconnectedVariablesWarning) {
          std::cerr << "*** Warning: Variable '" << accessor.getQualifiedName()
                    << "' is not connected. " // LCOV_EXCL_LINE
                       "Reading will always result in 0, writing will be ignored."
                    << std::endl; // LCOV_EXCL_LINE
        }
        networkList.emplace_back();
        networkList.back().addNode(accessor);

        bool makeFeeder = !(networkList.back().hasFeedingNode());
        size_t length = accessor.getNumberOfElements();
        auto callable = CreateConstantForUnconnectedVar(accessor.getValueType(), makeFeeder, length);
        boost::fusion::for_each(ChimeraTK::userTypeMap(), std::ref(callable));
        assert(callable.done);
        constantList.emplace_back(callable.theNode);
        networkList.back().addNode(constantList.back());
      }
    }
  }
}

/*********************************************************************************************************************/

void Application::checkConnections() {
  // check all networks for validity
  for(auto& network : networkList) {
    network.check();
  }

  // check if all accessors are connected
  // note: this in principle cannot happen, since processUnconnectedNodes() is
  // called before
  for(auto& module : getSubmoduleListRecursive()) {
    for(auto& accessor : module->getAccessorList()) {
      if(!accessor.hasOwner()) {
        throw ChimeraTK::logic_error("The accessor '" + accessor.getName() + "' of the module '" +
            module->getName() +      // LCOV_EXCL_LINE
            "' was not connected!"); // LCOV_EXCL_LINE
      }
    }
  }
}

/*********************************************************************************************************************/

void Application::run() {
  assert(applicationName != "");

  if(testableMode) {
    if(!testFacilityRunApplicationCalled) {
      throw ChimeraTK::logic_error(
          "Testable mode enabled but Application::run() called directly. Call TestFacility::runApplication() instead.");
    }
  }

  if(runCalled) {
    throw ChimeraTK::logic_error("Application::run() has already been called before.");
  }
  runCalled = true;

  // set all initial version numbers in the modules to the same value
  for(auto& module : getSubmoduleListRecursive()) {
    if(module->getModuleType() != ModuleType::ApplicationModule) continue;
    module->setCurrentVersionNumber(getStartVersion());
  }

  // prepare the modules
  for(auto& module : getSubmoduleListRecursive()) {
    module->prepare();
  }
  for(auto& deviceModule : deviceModuleMap) {
    deviceModule.second->prepare();
  }

  // Switch life-cycle state to run
  lifeCycleState = LifeCycleState::run;

  // start the necessary threads for the FanOuts etc.
  for(auto& internalModule : internalModuleList) {
    internalModule->activate();
  }

  for(auto& deviceModule : deviceModuleMap) {
    deviceModule.second->run();
  }

  // start the threads for the modules
  for(auto& module : getSubmoduleListRecursive()) {
    module->run();
  }

  // When in testable mode, wait for all modules to report that they have reched the testable mode.
  // We have to start all module threads first because some modules might only send the initial
  // values in their main loop, and following modules need them to enter testable mode.

  // just a small helper lambda to avoid code repetition
  auto waitForTestableMode = [](EntityOwner* module) {
    while(!module->hasReachedTestableMode()) {
      Application::getInstance().testableModeUnlock("releaseForReachTestableMode");
      usleep(100);
      Application::getInstance().testableModeLock("acquireForReachTestableMode");
    }
  };

  if(Application::getInstance().isTestableModeEnabled()) {
    for(auto& internalModule : internalModuleList) {
      waitForTestableMode(internalModule.get());
    }
    for(auto& deviceModule : deviceModuleMap) {
      waitForTestableMode(deviceModule.second);
    }
    for(auto& module : getSubmoduleListRecursive()) {
      waitForTestableMode(module);
    }
  }

  // Launch circular dependency detector thread
  circularDependencyDetector.startDetectBlockedModules();
}

/*********************************************************************************************************************/

void Application::shutdown() {
  // switch life-cycle state
  lifeCycleState = LifeCycleState::shutdown;

  // first allow to run the application threads again, if we are in testable
  // mode
  if(testableMode && testableModeTestLock()) {
    testableModeUnlock("shutdown");
  }

  // deactivate the FanOuts first, since they have running threads inside
  // accessing the modules etc. (note: the modules are members of the
  // Application implementation and thus get destroyed after this destructor)
  for(auto& internalModule : internalModuleList) {
    internalModule->deactivate();
  }

  // next deactivate the modules, as they have running threads inside as well
  for(auto& module : getSubmoduleListRecursive()) {
    module->terminate();
  }

  for(auto& deviceModule : deviceModuleMap) {
    deviceModule.second->terminate();
  }

  circularDependencyDetector.terminate();

  ApplicationBase::shutdown();
}
/*********************************************************************************************************************/

void Application::generateXML() {
  assert(applicationName != "");

  // define the connections
  defineConnections();
  for(auto& module : getSubmoduleListRecursive()) {
    module->defineConnections();
  }

  // create connections for exception handling
  for(auto& devModule : deviceModuleMap) {
    devModule.second->defineConnections();
  }

  // find and handle constant nodes
  findConstantNodes();

  // also search for unconnected nodes - this is here only executed to print the
  // warnings
  processUnconnectedNodes();

  // finalise connections: decide still-undecided details, in particular for
  // control-system and device varibales, which get created "on the fly".
  finaliseNetworks();

  XMLGeneratorVisitor visitor;
  visitor.dispatch(*this);
  visitor.save(applicationName + ".xml");
}

/*********************************************************************************************************************/

VariableNetwork& Application::connect(VariableNetworkNode a, VariableNetworkNode b) {
  // if one of the nodes has the value type AnyType, set it to the type of the
  // other if both are AnyType, nothing changes.
  if(a.getValueType() == typeid(AnyType)) {
    a.setValueType(b.getValueType());
  }
  else if(b.getValueType() == typeid(AnyType)) {
    b.setValueType(a.getValueType());
  }

  // if one of the nodes has not yet a defined number of elements, set it to the
  // number of elements of the other. if both are undefined, nothing changes.
  if(a.getNumberOfElements() == 0) {
    a.setNumberOfElements(b.getNumberOfElements());
  }
  else if(b.getNumberOfElements() == 0) {
    b.setNumberOfElements(a.getNumberOfElements());
  }
  if(a.getNumberOfElements() != b.getNumberOfElements()) {
    std::stringstream what;
    what << "*** ERROR: Cannot connect array variables with difference number "
            "of elements!"
         << std::endl;
    what << "Node A:" << std::endl;
    a.dump(what);
    what << "Node B:" << std::endl;
    b.dump(what);
    throw ChimeraTK::logic_error(what.str());
  }

  // if both nodes already have an owner, we are either already done (same
  // owners) or we need to try to merge the networks
  if(a.hasOwner() && b.hasOwner()) {
    if(&(a.getOwner()) != &(b.getOwner())) {
      auto& networkToMerge = b.getOwner();
      bool success = a.getOwner().merge(networkToMerge);
      if(!success) {
        std::stringstream what;
        what << "*** ERROR: Trying to connect two nodes which are already part "
                "of different networks, and merging these"
                " networks is not possible (cannot have two non-control-system "
                "or two control-system feeders)!"
             << std::endl;
        what << "Node A:" << std::endl;
        a.dump(what);
        what << "Node B:" << std::endl;
        b.dump(what);
        what << "Owner of node A:" << std::endl;
        a.getOwner().dump("", what);
        what << "Owner of node B:" << std::endl;
        b.getOwner().dump("", what);
        throw ChimeraTK::logic_error(what.str());
      }
      for(auto n = networkList.begin(); n != networkList.end(); ++n) {
        if(&*n == &networkToMerge) {
          networkList.erase(n);
          break;
        }
      }
    }
  }
  // add b to the existing network of a
  else if(a.hasOwner()) {
    a.getOwner().addNode(b);
  }

  // add a to the existing network of b
  else if(b.hasOwner()) {
    b.getOwner().addNode(a);
  }
  // create new network
  else {
    networkList.emplace_back();
    networkList.back().addNode(a);
    networkList.back().addNode(b);
  }
  return a.getOwner();
}

/*********************************************************************************************************************/

template<typename UserType>
boost::shared_ptr<ChimeraTK::NDRegisterAccessor<UserType>> Application::createDeviceVariable(
    VariableNetworkNode const& node) {
  auto deviceAlias = node.getDeviceAlias();
  auto registerName = node.getRegisterName();
  auto direction = node.getDirection();
  auto mode = node.getMode();
  auto nElements = node.getNumberOfElements();

  // Device opens in DeviceModule
  if(deviceMap.count(deviceAlias) == 0) {
    deviceMap[deviceAlias] = ChimeraTK::BackendFactory::getInstance().createBackend(deviceAlias);
  }
  // use wait_for_new_data mode if push update mode was requested
  // Feeding to the network means reading from a device to feed it into the network.
  ChimeraTK::AccessModeFlags flags{};
  if(mode == UpdateMode::push && direction.dir == VariableDirection::feeding) flags = {AccessMode::wait_for_new_data};

  // obtain the register accessor from the device
  auto accessor = deviceMap[deviceAlias]->getRegisterAccessor<UserType>(registerName, nElements, 0, flags);

  // Receiving accessors should be faulty after construction,
  // see data validity propagation spec 2.6.1
  if(node.getDirection().dir == VariableDirection::feeding) {
    accessor->setDataValidity(DataValidity::faulty);
  }

  // decorate push-type feeders with testable mode decorator, if needed
  if(testableMode) {
    if(mode == UpdateMode::push && direction.dir == VariableDirection::feeding) {
      auto varId = getNextVariableId();
      accessor = boost::make_shared<TestableModeAccessorDecorator<UserType>>(accessor, true, false, varId, varId);
    }
  }

  return boost::make_shared<ExceptionHandlingDecorator<UserType>>(accessor, node);
}

/*********************************************************************************************************************/

template<typename UserType>
boost::shared_ptr<ChimeraTK::NDRegisterAccessor<UserType>> Application::createProcessVariable(
    VariableNetworkNode const& node) {
  // determine the SynchronizationDirection
  SynchronizationDirection dir;
  if(node.getDirection().withReturn) {
    dir = SynchronizationDirection::bidirectional;
  }
  else if(node.getDirection().dir == VariableDirection::feeding) {
    dir = SynchronizationDirection::controlSystemToDevice;
  }
  else {
    dir = SynchronizationDirection::deviceToControlSystem;
  }
  AccessModeFlags flags = {};
  if(node.getDirection().dir == VariableDirection::consuming) { // Application-to-controlsystem must be
                                                                // push-type
    flags = {AccessMode::wait_for_new_data};
  }
  else {
    if(node.getOwner().getConsumingNodes().size() == 1 &&
        node.getOwner().getConsumingNodes().front().getType() == NodeType::Application) {
      // exactly one consumer which is an ApplicationModule input: decide flag depending on input type
      auto consumer = node.getOwner().getConsumingNodes().front();
      if(consumer.getMode() == UpdateMode::push) flags = {AccessMode::wait_for_new_data};
    }
    else {
      // multiple consumers (or a single Device/CS consumer) result in a ThreadedFanOut which requires push-type input
      flags = {AccessMode::wait_for_new_data};
    }
  }

  // create the ProcessArray for the proper UserType
  auto pvar = _processVariableManager->createProcessArray<UserType>(dir, node.getPublicName(),
      node.getNumberOfElements(), node.getOwner().getUnit(), node.getOwner().getDescription(), {}, 3, flags);
  assert(pvar->getName() != "");

  // create variable ID
  auto varId = getNextVariableId();
  pvIdMap[pvar->getUniqueId()] = varId;

  // Decorate the process variable if testable mode is enabled and this is the receiving end of the variable (feeding
  // to the network), or a bidirectional consumer. Also don't decorate, if the mode is polling. Instead flag the
  // variable to be polling, so the TestFacility is aware of this.
  if(testableMode) {
    if(node.getDirection().dir == VariableDirection::feeding) {
      // The transfer mode of this process variable is considered to be polling,
      // if only one consumer exists and this consumer is polling. Reason:
      // mulitple consumers will result in the use of a FanOut, so the
      // communication up to the FanOut will be push-type, even if all consumers
      // are poll-type.
      /// @todo Check if this is true!
      auto mode = UpdateMode::push;
      if(node.getOwner().countConsumingNodes() == 1) {
        if(node.getOwner().getConsumingNodes().front().getMode() == UpdateMode::poll) mode = UpdateMode::poll;
      }

      if(mode != UpdateMode::poll) {
        auto pvarDec = boost::make_shared<TestableModeAccessorDecorator<UserType>>(pvar, true, false, varId, varId);
        testableMode_names[varId] = "ControlSystem:" + node.getPublicName();
        return pvarDec;
      }
      else {
        testableMode_isPollMode[varId] = true;
      }
    }
    else if(node.getDirection().withReturn) {
      // Return channels are always push. The decorator must handle only reads on the return channel, since writes into
      // the control system do not block the testable mode.
      auto pvarDec = boost::make_shared<TestableModeAccessorDecorator<UserType>>(pvar, true, false, varId, varId);
      testableMode_names[varId] = "ControlSystem:" + node.getPublicName();
      return pvarDec;
    }
  }

  // return the process variable
  return pvar;
}

/*********************************************************************************************************************/

template<typename UserType>
std::pair<boost::shared_ptr<ChimeraTK::NDRegisterAccessor<UserType>>,
    boost::shared_ptr<ChimeraTK::NDRegisterAccessor<UserType>>>
    Application::createApplicationVariable(VariableNetworkNode const& node, VariableNetworkNode const& consumer) {
  // obtain the meta data
  size_t nElements = node.getNumberOfElements();
  std::string name = node.getName();
  assert(name != "");
  AccessModeFlags flags = {};
  if(consumer.getType() != NodeType::invalid) {
    if(consumer.getMode() == UpdateMode::push) flags = {AccessMode::wait_for_new_data};
  }
  else {
    if(node.getMode() == UpdateMode::push) flags = {AccessMode::wait_for_new_data};
  }

  // create the ProcessArray for the proper UserType
  std::pair<boost::shared_ptr<ChimeraTK::NDRegisterAccessor<UserType>>,
      boost::shared_ptr<ChimeraTK::NDRegisterAccessor<UserType>>>
      pvarPair;
  if(consumer.getType() != NodeType::invalid)
    assert(node.getDirection().withReturn == consumer.getDirection().withReturn);
  if(!node.getDirection().withReturn) {
    pvarPair =
        createSynchronizedProcessArray<UserType>(nElements, name, node.getUnit(), node.getDescription(), {}, 3, flags);
  }
  else {
    pvarPair = createBidirectionalSynchronizedProcessArray<UserType>(
        nElements, name, node.getUnit(), node.getDescription(), {}, 3, flags);
  }
  assert(pvarPair.first->getName() != "");
  assert(pvarPair.second->getName() != "");

  // create variable IDs
  size_t varId = getNextVariableId();
  size_t varIdReturn;
  if(node.getDirection().withReturn) varIdReturn = getNextVariableId();

  // decorate the process variable if testable mode is enabled and mode is push-type
  if(testableMode && flags.has(AccessMode::wait_for_new_data)) {
    if(!node.getDirection().withReturn) {
      pvarPair.first =
          boost::make_shared<TestableModeAccessorDecorator<UserType>>(pvarPair.first, false, true, varId, varId);
      pvarPair.second =
          boost::make_shared<TestableModeAccessorDecorator<UserType>>(pvarPair.second, true, false, varId, varId);
    }
    else {
      pvarPair.first =
          boost::make_shared<TestableModeAccessorDecorator<UserType>>(pvarPair.first, true, true, varIdReturn, varId);
      pvarPair.second =
          boost::make_shared<TestableModeAccessorDecorator<UserType>>(pvarPair.second, true, true, varId, varIdReturn);
    }

    // put the decorators into the list
    testableMode_names[varId] = "Internal:" + node.getQualifiedName();
    if(consumer.getType() != NodeType::invalid) {
      testableMode_names[varId] += "->" + consumer.getQualifiedName();
    }
    if(node.getDirection().withReturn) testableMode_names[varIdReturn] = testableMode_names[varId] + " (return)";
  }

  // if debug mode was requested for either node, decorate both accessors
  if(debugMode_variableList.count(node.getUniqueId()) ||
      (consumer.getType() != NodeType::invalid && debugMode_variableList.count(consumer.getUniqueId()))) {
    if(consumer.getType() != NodeType::invalid) {
      assert(node.getDirection().dir == VariableDirection::feeding);
      assert(consumer.getDirection().dir == VariableDirection::consuming);
      pvarPair.first =
          boost::make_shared<DebugPrintAccessorDecorator<UserType>>(pvarPair.first, node.getQualifiedName());
      pvarPair.second =
          boost::make_shared<DebugPrintAccessorDecorator<UserType>>(pvarPair.second, consumer.getQualifiedName());
    }
    else {
      pvarPair.first =
          boost::make_shared<DebugPrintAccessorDecorator<UserType>>(pvarPair.first, node.getQualifiedName());
      pvarPair.second =
          boost::make_shared<DebugPrintAccessorDecorator<UserType>>(pvarPair.second, node.getQualifiedName());
    }
  }

  // return the pair
  return pvarPair;
}

/*********************************************************************************************************************/

void Application::makeConnections() {
  // finalise connections: decide still-undecided details, in particular for
  // control-system and device varibales, which get created "on the fly".
  finaliseNetworks();

  // apply optimisations
  // note: checks may not be run before since sometimes networks may only be
  // valid after optimisations
  optimiseConnections();

  // run checks
  checkConnections();

  // make the connections for all networks
  for(auto& network : networkList) {
    makeConnectionsForNetwork(network);
  }

  // check for circular dependencies
  for(auto& network : networkList) {
    markCircularConsumers(network);
  }
}

/*********************************************************************************************************************/

void Application::findConstantNodes() {
  for(auto& module : getSubmoduleListRecursive()) {
    for(auto& accessor : module->getAccessorList()) {
      if(accessor.getName().substr(0, 7) == "@CONST@") {
        VariableNetworkNode node(accessor);

        // must be a consumer node
        if(node.getDirection().dir != VariableDirection::consuming) {
          std::cout << "Variable name '@CONST@' found for a feeding type node:" << std::endl;
          node.dump();
          if(node.hasOwner()) {
            std::cout << "In network:" << std::endl;
            node.getOwner().dump();
          }
          throw ChimeraTK::logic_error("Variable name '@CONST@' found for a feeding type node.");
        }

        // must have no feeder, a constant feeder, or a control-system feeder
        if(node.hasOwner() && node.getOwner().hasFeedingNode() &&
            node.getOwner().getFeedingNode().getType() != NodeType::Constant &&
            node.getOwner().getFeedingNode().getType() != NodeType::ControlSystem) {
          std::cout << "Error: Variable name '@CONST@' found with wrong feeder type:" << std::endl;
          node.getOwner().getFeedingNode().dump();
          std::cout << "In network:" << std::endl;
          node.getOwner().dump();
          throw ChimeraTK::logic_error("Variable name '@CONST@' found with wrong feeder type");
        }

        if(node.hasOwner()) {
          // The @CONST@ variable names do not distinguish types, hence we can have differently typed consumers in our
          // network. To mitigate this issue, we remove the node from the existing network before creating a new one
          // below
          auto& network = node.getOwner();
          network.removeNode(node);
          // if network has no consumer left, abolish it
          if(network.countConsumingNodes() == 0) {
            // remove feeder if existing
            if(network.hasFeedingNode()) {
              auto feeder = network.getFeedingNode();
              network.removeNode(feeder);
            }
            // remove network from application
            networkList.remove(network);
          }
        }

        // Connect node with constant (in a new VariableNetwork)
        callForType(node.getValueType(), [&node](auto t) {
          using UserType = decltype(t);
          auto value = userTypeToUserType<UserType>(node.getName().substr(7));
          makeConstant(value) >> node;
        });
      }
    }
  }
}

/*********************************************************************************************************************/

void Application::finaliseNetworks() {
  // check for control system variables which should be made bidirectional
  for(auto& network : networkList) {
    size_t nBidir = network.getFeedingNode().getDirection().withReturn ? 1 : 0;
    for(auto& consumer : network.getConsumingNodes()) {
      if(consumer.getDirection().withReturn) ++nBidir;
    }
    if(nBidir != 1) {
      // only if there is exactly one node with return channel we need to guess its peer
      continue;
    }
    if(network.getFeedingNode().getType() != NodeType::ControlSystem) {
      // only a feeding control system variable can be made bidirectional
      continue;
    }
    network.getFeedingNode().setDirection({VariableDirection::feeding, true});
  }

  // check for networks which have an external trigger but should be triggered by pollling consumer
  for(auto& network : networkList) {
    if(network.getTriggerType() == VariableNetwork::TriggerType::external) {
      size_t pollingComsumers{0};
      for(auto& consumer : network.getConsumingNodes()) {
        if(consumer.getMode() == UpdateMode::poll) ++pollingComsumers;
      }
      if(pollingComsumers == 1) {
        network.getFeedingNode().removeExternalTrigger();
      }
    }
  }
}

/*********************************************************************************************************************/

void Application::optimiseConnections() {
  // list of iterators of networks to be removed from the networkList after the
  // merge operation
  std::list<VariableNetwork*> deleteNetworks;

  // search for networks with the same feeder
  for(auto it1 = networkList.begin(); it1 != networkList.end(); ++it1) {
    for(auto it2 = it1; it2 != networkList.end(); ++it2) {
      if(it1 == it2) continue;

      auto feeder1 = it1->getFeedingNode();
      auto feeder2 = it2->getFeedingNode();

      // this optimisation is only necessary for device-type nodes, since
      // application and control-system nodes will automatically create merged
      // networks when having the same feeder
      /// @todo check if this assumtion is true! control-system nodes can be
      /// created with different types, too!
      if(feeder1.getType() != NodeType::Device || feeder2.getType() != NodeType::Device) continue;

      // check if referrring to same register
      if(feeder1.getDeviceAlias() != feeder2.getDeviceAlias()) continue;
      if(feeder1.getRegisterName() != feeder2.getRegisterName()) continue;

      // check if directions are the same
      if(feeder1.getDirection() != feeder2.getDirection()) continue;

      // check if value types and number of elements are compatible
      if(feeder1.getValueType() != feeder2.getValueType()) continue;
      if(feeder1.getNumberOfElements() != feeder2.getNumberOfElements()) continue;

      // check if transfer mode is the same
      if(feeder1.getMode() != feeder2.getMode()) continue;

      // check if triggers are compatible, if present
      if(feeder1.hasExternalTrigger() != feeder2.hasExternalTrigger()) continue;
      if(feeder1.hasExternalTrigger()) {
        if(feeder1.getExternalTrigger() != feeder2.getExternalTrigger()) continue;
      }

      // everything should be compatible at this point: merge the networks. We
      // will merge the network of the outer loop into the network of the inner
      // loop, since the network of the outer loop will not be found a second
      // time in the inner loop.
      for(auto& consumer : it1->getConsumingNodes()) {
        consumer.clearOwner();
        it2->addNode(consumer);
      }

      // if trigger present, remove corresponding trigger receiver node from the
      // trigger network
      if(feeder1.hasExternalTrigger()) {
        feeder1.getExternalTrigger().getOwner().removeNodeToTrigger(it1->getFeedingNode());
      }

      // schedule the outer loop network for deletion and stop processing it
      deleteNetworks.push_back(&(*it1));
      break;
    }
  }

  // remove networks from the network list
  for(auto net : deleteNetworks) {
    networkList.remove(*net);
  }
}

/*********************************************************************************************************************/

void Application::dumpConnections(std::ostream& stream) {                                          // LCOV_EXCL_LINE
  stream << "==== List of all variable connections of the current Application =====" << std::endl; // LCOV_EXCL_LINE
  for(auto& network : networkList) {                                                               // LCOV_EXCL_LINE
    network.dump("", stream);                                                                      // LCOV_EXCL_LINE
  }                                                                                                // LCOV_EXCL_LINE
  stream << "==== List of all circlular connections in the current Application ====" << std::endl; // LCOV_EXCL_LINE
  for(auto& circularDependency : circularDependencyNetworks) {
    stream << "Circular dependency network " << circularDependency.first << " : ";
    for(auto& module : circularDependency.second) {
      stream << module->getName() << ", ";
    }
    stream << std::endl;
  }
  stream << "======================================================================" << std::endl; // LCOV_EXCL_LINE
} // LCOV_EXCL_LINE

/*********************************************************************************************************************/

void Application::dumpConnectionGraph(const std::string& fileName) {
  std::fstream file{fileName, std::ios_base::out};

  VariableNetworkGraphDumpingVisitor visitor{file};
  visitor.dispatch(*this);
}

/*********************************************************************************************************************/

void Application::dumpModuleConnectionGraph(const std::string& fileName) const {
  std::fstream file{fileName, std::ios_base::out};

  VariableNetworkModuleGraphDumpingVisitor visitor{file};
  visitor.dispatch(*this);
}

/*********************************************************************************************************************/

Application::TypedMakeConnectionCaller::TypedMakeConnectionCaller(Application& owner, VariableNetwork& network)
: _owner(owner), _network(network) {}

/*********************************************************************************************************************/

template<typename PAIR>
void Application::TypedMakeConnectionCaller::operator()(PAIR&) const {
  if(typeid(typename PAIR::first_type) != _network.getValueType()) return;
  _owner.typedMakeConnection<typename PAIR::first_type>(_network);
  done = true;
}

/*********************************************************************************************************************/

void Application::makeConnectionsForNetwork(VariableNetwork& network) {
  // if the network has been created already, do nothing
  if(network.isCreated()) return;

  // if the trigger type is external, create the trigger first
  if(network.getFeedingNode().hasExternalTrigger()) {
    VariableNetwork& dependency = network.getFeedingNode().getExternalTrigger().getOwner();
    if(!dependency.isCreated()) makeConnectionsForNetwork(dependency);
  }

  // defer actual network creation to templated function
  auto callable = TypedMakeConnectionCaller(*this, network);
  boost::fusion::for_each(ChimeraTK::userTypeMap(), std::ref(callable));
  assert(callable.done);

  // mark the network as created
  network.markCreated();
}

/*********************************************************************************************************************/

void Application::markCircularConsumers(VariableNetwork& variableNetwork) {
  for(auto& node : variableNetwork.getConsumingNodes()) {
    // A variable network is a tree-like network of VariableNetworkNodes (one feeder and one or more multiple consumers)
    // A circlular network is a list of modules (EntityOwners) which have a circular dependency
    auto circularNetwork = node.scanForCircularDepencency();
    if(circularNetwork.size() > 0) {
      auto circularNetworkHash = boost::hash_range(circularNetwork.begin(), circularNetwork.end());
      circularDependencyNetworks[circularNetworkHash] = circularNetwork;
      circularNetworkInvalidityCounters[circularNetworkHash] = 0;
    }
  }
}
/*********************************************************************************************************************/

template<typename UserType>
void Application::typedMakeConnection(VariableNetwork& network) {
  if(enableDebugMakeConnections) {
    std::cout << std::endl << "Executing typedMakeConnections for network:" << std::endl;
    network.dump("", std::cout);
    std::cout << std::endl;
  }
  try {                          // catch exceptions to add information about the failed network
    bool connectionMade = false; // to check the logic...

    size_t nNodes = network.countConsumingNodes() + 1;
    auto feeder = network.getFeedingNode();
    auto consumers = network.getConsumingNodes();
    bool useExternalTrigger = network.getTriggerType() == VariableNetwork::TriggerType::external;
    bool useFeederTrigger = network.getTriggerType() == VariableNetwork::TriggerType::feeder;
    bool constantFeeder = feeder.getType() == NodeType::Constant;

    // 1st case: the feeder requires a fixed implementation
    if(feeder.hasImplementation() && !constantFeeder) {
      if(enableDebugMakeConnections) {
        std::cout << "  Creating fixed implementation for feeder '" << feeder.getName() << "'..." << std::endl;
      }
      // Create feeding implementation.
      boost::shared_ptr<ChimeraTK::NDRegisterAccessor<UserType>> feedingImpl;
      if(feeder.getType() == NodeType::Device) {
        feedingImpl = createDeviceVariable<UserType>(feeder);
      }
      else if(feeder.getType() == NodeType::ControlSystem) {
        feedingImpl = createProcessVariable<UserType>(feeder);
      }
      else {
        throw ChimeraTK::logic_error("Unexpected node type!"); // LCOV_EXCL_LINE (assert-like)
      }

      // if we just have two nodes, directly connect them
      if(nNodes == 2 && !useExternalTrigger) {
        if(enableDebugMakeConnections) {
          std::cout << "    Setting up direct connection without external trigger." << std::endl;
        }
        bool needsFanOut{false};
        boost::shared_ptr<ChimeraTK::NDRegisterAccessor<UserType>> consumingImpl;

        auto consumer = consumers.front();
        if(consumer.getType() == NodeType::Application) {
          consumer.setAppAccessorImplementation(feedingImpl);

          connectionMade = true;
        }
        else if(consumer.getType() == NodeType::Device) {
          consumingImpl = createDeviceVariable<UserType>(consumer);
          // connect the Device with e.g. a ControlSystem node via a
          // ThreadedFanOut
          needsFanOut = true;
        }
        else if(consumer.getType() == NodeType::ControlSystem) {
          consumingImpl = createProcessVariable<UserType>(consumer);
          // connect the ControlSystem with e.g. a Device node via an
          // ThreadedFanOut
          needsFanOut = true;
        }
        else if(consumer.getType() == NodeType::TriggerReceiver) {
          consumer.getNodeToTrigger().getOwner().setExternalTriggerImpl(feedingImpl);
        }
        else {
          throw ChimeraTK::logic_error("Unexpected node type!"); // LCOV_EXCL_LINE (assert-like)
        }

        if(needsFanOut) {
          assert(consumingImpl != nullptr);
          auto consumerImplPair = ConsumerImplementationPairs<UserType>{{consumingImpl, consumer}};
          auto fanOut = boost::make_shared<ThreadedFanOut<UserType>>(feedingImpl, network, consumerImplPair);
          internalModuleList.push_back(fanOut);
          network.setFanOut(fanOut);
        }

        connectionMade = true;
      }
      else { /* !(nNodes == 2 && !useExternalTrigger) */
        if(enableDebugMakeConnections) {
          std::cout << "    Setting up triggered connection." << std::endl;
        }

        // create the right FanOut type
        boost::shared_ptr<FanOut<UserType>> fanOut;
        boost::shared_ptr<ConsumingFanOut<UserType>> consumingFanOut;

        // Fanouts need to know the consumers on contruction, so we collect them first
        auto consumerImplementationPairs = setConsumerImplementations<UserType>(feeder, consumers);

        if(useExternalTrigger) {
          if(enableDebugMakeConnections) {
            std::cout << "      Using external trigger." << std::endl;
          }

          // if external trigger is enabled, use externally triggered threaded
          // FanOut. Create one per external trigger impl.
          void* triggerImplId = network.getExternalTriggerImpl().get();
          auto triggerFanOut = triggerMap[triggerImplId];
          if(!triggerFanOut) {
            assert(deviceModuleMap.find(feeder.getDeviceAlias()) != deviceModuleMap.end());

            // create the trigger fan out and store it in the map and the internalModuleList
            triggerFanOut = boost::make_shared<TriggerFanOut>(
                network.getExternalTriggerImpl(), *deviceModuleMap[feeder.getDeviceAlias()], network);
            triggerMap[triggerImplId] = triggerFanOut;
            internalModuleList.push_back(triggerFanOut);
          }
          fanOut = triggerFanOut->addNetwork(feedingImpl, consumerImplementationPairs);
          network.setFanOut(fanOut);
        }
        else if(useFeederTrigger) {
          if(enableDebugMakeConnections) {
            std::cout << "      Using trigger provided by the feeder." << std::endl;
          }

          // if the trigger is provided by the pushing feeder, use the treaded
          // version of the FanOut to distribute new values immediately to all
          // consumers. Depending on whether we have a return channel or not, pick
          // the right implementation of the FanOut
          boost::shared_ptr<ThreadedFanOut<UserType>> threadedFanOut;
          if(!feeder.getDirection().withReturn) {
            threadedFanOut =
                boost::make_shared<ThreadedFanOut<UserType>>(feedingImpl, network, consumerImplementationPairs);
          }
          else {
            threadedFanOut = boost::make_shared<ThreadedFanOutWithReturn<UserType>>(
                feedingImpl, network, consumerImplementationPairs);
          }
          internalModuleList.push_back(threadedFanOut);
          fanOut = threadedFanOut;
          network.setFanOut(fanOut);
        }
        else {
          if(enableDebugMakeConnections) {
            std::cout << "      No trigger, using consuming fanout." << std::endl;
          }
          assert(network.hasApplicationConsumer()); // checkConnections should
                                                    // catch this
          consumingFanOut = boost::make_shared<ConsumingFanOut<UserType>>(feedingImpl, consumerImplementationPairs);
          fanOut = consumingFanOut;
          network.setFanOut(fanOut);

          // TODO Is this correct? we already added all consumer as slaves in the fanout  constructor.
          //      Maybe assert that we only have a single poll-type node (is there a check in checkConnections?)
          for(auto& consumer : consumers) {
            if(consumer.getMode() == UpdateMode::poll) {
              consumer.setAppAccessorImplementation<UserType>(consumingFanOut);
              break;
            }
          }
        }
        connectionMade = true;
      }
    }
    // 2nd case: the feeder does not require a fixed implementation
    else if(!constantFeeder) { /* !feeder.hasImplementation() */

      if(enableDebugMakeConnections) {
        std::cout << "  Feeder '" << feeder.getName() << "' does not require a fixed implementation." << std::endl;
      }

      // we should be left with an application feeder node
      if(feeder.getType() != NodeType::Application) {
        throw ChimeraTK::logic_error("Unexpected node type!"); // LCOV_EXCL_LINE (assert-like)
      }
      assert(!useExternalTrigger);
      // if we just have two nodes, directly connect them
      if(nNodes == 2) {
        auto consumer = consumers.front();
        if(consumer.getType() == NodeType::Application) {
          auto impls = createApplicationVariable<UserType>(feeder, consumer);
          feeder.setAppAccessorImplementation<UserType>(impls.first);
          consumer.setAppAccessorImplementation<UserType>(impls.second);
        }
        else if(consumer.getType() == NodeType::ControlSystem) {
          auto impl = createProcessVariable<UserType>(consumer);
          feeder.setAppAccessorImplementation<UserType>(impl);
        }
        else if(consumer.getType() == NodeType::Device) {
          auto impl = createDeviceVariable<UserType>(consumer);
          feeder.setAppAccessorImplementation<UserType>(impl);
        }
        else if(consumer.getType() == NodeType::TriggerReceiver) {
          auto impls = createApplicationVariable<UserType>(feeder, consumer);
          feeder.setAppAccessorImplementation<UserType>(impls.first);
          consumer.getNodeToTrigger().getOwner().setExternalTriggerImpl(impls.second);
        }
        else if(consumer.getType() == NodeType::Constant) {
          auto impl = consumer.createConstAccessor<UserType>({});
          feeder.setAppAccessorImplementation<UserType>(impl);
        }
        else {
          throw ChimeraTK::logic_error("Unexpected node type!"); // LCOV_EXCL_LINE (assert-like)
        }
        connectionMade = true;
      }
      else {
        auto consumerImplementationPairs = setConsumerImplementations<UserType>(feeder, consumers);

        // create FanOut and use it as the feeder implementation
        auto fanOut =
            boost::make_shared<FeedingFanOut<UserType>>(feeder.getName(), feeder.getUnit(), feeder.getDescription(),
                feeder.getNumberOfElements(), feeder.getDirection().withReturn, consumerImplementationPairs);
        feeder.setAppAccessorImplementation<UserType>(fanOut);
        network.setFanOut(fanOut);

        connectionMade = true;
      }
    }
    else { /* constantFeeder */

      if(enableDebugMakeConnections) {
        std::cout << "  Using constant feeder '" << feeder.getName() << "'..." << std::endl;
      }
      assert(feeder.getType() == NodeType::Constant);

      for(auto& consumer : consumers) {
        AccessModeFlags flags{};
        if(consumer.getMode() == UpdateMode::push) {
          flags = {AccessMode::wait_for_new_data};
        }

        // each consumer gets its own implementation
        auto feedingImpl = feeder.createConstAccessor<UserType>(flags);

        if(consumer.getType() == NodeType::Application) {
          if(testableMode && consumer.getMode() == UpdateMode::push) {
            auto varId = getNextVariableId();
            auto pvarDec =
                boost::make_shared<TestableModeAccessorDecorator<UserType>>(feedingImpl, true, false, varId, varId);
            testableMode_names[varId] = "Constant";
            consumer.setAppAccessorImplementation<UserType>(pvarDec);
          }
          else {
            consumer.setAppAccessorImplementation<UserType>(feedingImpl);
          }
        }
        else if(consumer.getType() == NodeType::ControlSystem) {
          auto impl = createProcessVariable<UserType>(consumer);
          impl->accessChannel(0) = feedingImpl->accessChannel(0);
          impl->write();
        }
        else if(consumer.getType() == NodeType::Device) {
          // we register the required accessor as a recovery accessor. This is just a bare RegisterAccessor without any
          // decorations directly from the backend.
          if(deviceMap.count(consumer.getDeviceAlias()) == 0) {
            deviceMap[consumer.getDeviceAlias()] =
                ChimeraTK::BackendFactory::getInstance().createBackend(consumer.getDeviceAlias());
          }
          auto impl = deviceMap[consumer.getDeviceAlias()]->getRegisterAccessor<UserType>(
              consumer.getRegisterName(), consumer.getNumberOfElements(), 0, {});
          impl->accessChannel(0) = feedingImpl->accessChannel(0);

          assert(deviceModuleMap.find(consumer.getDeviceAlias()) != deviceModuleMap.end());
          DeviceModule* devmod = deviceModuleMap[consumer.getDeviceAlias()];

          // The accessor implementation already has its data in the user buffer. We now just have to add a valid
          // version number and have a recovery accessors (RecoveryHelper to be excact) which we can register at the
          // DeviceModule. As this is a constant we don't need to change it later and don't have to store it somewere
          // else.
          devmod->addRecoveryAccessor(boost::make_shared<RecoveryHelper>(impl, VersionNumber(), devmod->writeOrder()));
        }
        else if(consumer.getType() == NodeType::TriggerReceiver) {
          throw ChimeraTK::logic_error("Using constants as triggers is not supported!");
        }
        else {
          throw ChimeraTK::logic_error("Unexpected node type!"); // LCOV_EXCL_LINE (assert-like)
        }
      }
      connectionMade = true;
    }

    if(!connectionMade) {                                                     // LCOV_EXCL_LINE (assert-like)
      throw ChimeraTK::logic_error(                                           // LCOV_EXCL_LINE (assert-like)
          "The variable network cannot be handled. Implementation missing!"); // LCOV_EXCL_LINE (assert-like)
    }                                                                         // LCOV_EXCL_LINE (assert-like)
  }
  catch(ChimeraTK::logic_error& e) {
    std::stringstream ss;
    ss << "ChimeraTK::logic_error thrown in Application::typedMakeConnection():" << std::endl
       << "  " << e.what() << std::endl
       << "For network:" << std::endl;
    network.dump("", ss);
    throw ChimeraTK::logic_error(ss.str());
  }
}

/*********************************************************************************************************************/

template<typename UserType>
std::list<std::pair<boost::shared_ptr<ChimeraTK::NDRegisterAccessor<UserType>>, VariableNetworkNode>> Application::
    setConsumerImplementations(VariableNetworkNode const& feeder, std::list<VariableNetworkNode> consumers) {
  std::list<std::pair<boost::shared_ptr<ChimeraTK::NDRegisterAccessor<UserType>>, VariableNetworkNode>>
      consumerImplPairs;

  /** Map of deviceAliases to their corresponding TriggerFanOuts */
  std::map<std::string, boost::shared_ptr<ChimeraTK::NDRegisterAccessor<UserType>>> triggerFanOuts;

  for(auto& consumer : consumers) {
    bool addToConsumerImplPairs{true};
    std::pair<boost::shared_ptr<ChimeraTK::NDRegisterAccessor<UserType>>, VariableNetworkNode> pair{
        boost::shared_ptr<ChimeraTK::NDRegisterAccessor<UserType>>(), consumer};

    if(consumer.getType() == NodeType::Application) {
      auto impls = createApplicationVariable<UserType>(consumer);
      consumer.setAppAccessorImplementation<UserType>(impls.second);
      pair = std::make_pair(impls.first, consumer);
    }
    else if(consumer.getType() == NodeType::ControlSystem) {
      auto impl = createProcessVariable<UserType>(consumer);
      pair = std::make_pair(impl, consumer);
    }
    else if(consumer.getType() == NodeType::Device) {
      auto impl = createDeviceVariable<UserType>(consumer);
      pair = std::make_pair(impl, consumer);
    }
    else if(consumer.getType() == NodeType::TriggerReceiver) {
      // In case we have one or more trigger receivers among our consumers, we produce one
      // consuming application variable for each device. Later this will create a TriggerFanOut for
      // each trigger consumer, i.e. one per device so one blocking device does not affect the others.

      std::string deviceAlias = consumer.getNodeToTrigger().getOwner().getFeedingNode().getDeviceAlias();
      auto triggerFanOut = triggerFanOuts[deviceAlias];
      if(triggerFanOut == nullptr) {
        // create a new process variable pair and set the sender/feeder to the fan out
        auto triggerConnection = createApplicationVariable<UserType>(feeder);
        triggerFanOut = triggerConnection.second;
        triggerFanOuts[deviceAlias] = triggerFanOut;

        assert(triggerConnection.first != nullptr);
        pair = std::make_pair(triggerConnection.first, consumer);
      }
      else {
        // We already have added a pair for this device
        addToConsumerImplPairs = false;
      }
      consumer.getNodeToTrigger().getOwner().setExternalTriggerImpl(triggerFanOut);
    }
    else {
      throw ChimeraTK::logic_error("Unexpected node type!"); // LCOV_EXCL_LINE (assert-like)
    }

    if(addToConsumerImplPairs) {
      consumerImplPairs.push_back(pair);
    }
  }

  return consumerImplPairs;
}

/*********************************************************************************************************************/

VariableNetwork& Application::createNetwork() {
  networkList.emplace_back();
  return networkList.back();
}

/*********************************************************************************************************************/

Application& Application::getInstance() {
  return dynamic_cast<Application&>(ApplicationBase::getInstance());
}

/*********************************************************************************************************************/

bool Application::canStepApplication() const {
  return (testableMode_counter != 0);
}

/*********************************************************************************************************************/

void Application::stepApplication(bool waitForDeviceInitialisation) {
  // testableMode_counter must be non-zero, otherwise there is no input for the application to process. It is also
  // sufficient if testableMode_deviceInitialisationCounter is non-zero, if waitForDeviceInitialisation == true. In that
  // case we only wait for the device initialisation to be completed.
  if(testableMode_counter == 0 && (!waitForDeviceInitialisation || testableMode_deviceInitialisationCounter == 0)) {
    throw ChimeraTK::logic_error("Application::stepApplication() called despite no input was provided "
                                 "to the application to process!");
  }
  // let the application run until it has processed all data (i.e. the semaphore
  // counter is 0)
  size_t oldCounter = 0;
  while(testableMode_counter > 0 || (waitForDeviceInitialisation && testableMode_deviceInitialisationCounter > 0)) {
    if(enableDebugTestableMode && (oldCounter != testableMode_counter)) { // LCOV_EXCL_LINE (only cout)
      std::cout << "Application::stepApplication(): testableMode_counter = " << testableMode_counter
                << std::endl;            // LCOV_EXCL_LINE (only cout)
      oldCounter = testableMode_counter; // LCOV_EXCL_LINE (only cout)
    }
    testableModeUnlock("stepApplication");
    boost::this_thread::yield();
    testableModeLock("stepApplication");
  }
}

/*********************************************************************************************************************/

void Application::testableModeLock(const std::string& name) {
  // don't do anything if testable mode is not enabled
  if(!getInstance().testableMode) return;

  // debug output if enabled (also prevent spamming the same message)
  if(getInstance().enableDebugTestableMode && getInstance().testableMode_repeatingMutexOwner == 0) { // LCOV_EXCL_LINE
                                                                                                     // (only cout)
    std::cout << "Application::testableModeLock(): Thread " << threadName() // LCOV_EXCL_LINE (only cout)
              << " tries to obtain lock for " << name << std::endl;         // LCOV_EXCL_LINE (only cout)
  }                                                                         // LCOV_EXCL_LINE (only cout)

  // if last lock was obtained repeatedly by the same thread, sleep a short time
  // before obtaining the lock to give the other threads a chance to get the
  // lock first
  if(getInstance().testableMode_repeatingMutexOwner > 0) usleep(10000);

  // obtain the lock
  auto lastSeen_lastOwner = getInstance().testableMode_lastMutexOwner;
repeatTryLock:
  auto success = getTestableModeLockObject().try_lock_for(std::chrono::seconds(30));
  if(!success) {
    auto currentLastOwner = getInstance().testableMode_lastMutexOwner;
    if(currentLastOwner != lastSeen_lastOwner) {
      lastSeen_lastOwner = currentLastOwner;
      goto repeatTryLock;
    }
    std::cout << "Application::testableModeLock(): Thread " << threadName()                       // LCOV_EXCL_LINE
              << " could not obtain lock for 30 seconds, presumably because "                     // LCOV_EXCL_LINE
              << threadName(getInstance().testableMode_lastMutexOwner) << " does not release it." // LCOV_EXCL_LINE
              << std::endl;                                                                       // LCOV_EXCL_LINE

    // throw a specialised exception to make sure whoever catches it really knows what he does...
    throw TestsStalled(); // LCOV_EXCL_LINE
  }                       // LCOV_EXCL_LINE

  // check if the last owner of the mutex was this thread, which may be a hint
  // that no other thread is waiting for the lock
  if(getInstance().testableMode_lastMutexOwner == boost::this_thread::get_id()) {
    // debug output if enabled
    if(getInstance().enableDebugTestableMode && getInstance().testableMode_repeatingMutexOwner == 0) { // LCOV_EXCL_LINE
                                                                                                       // (only cout)
      std::cout << "Application::testableModeLock(): Thread " << threadName() // LCOV_EXCL_LINE (only cout)
                << " repeatedly obtained lock successfully for " << name      // LCOV_EXCL_LINE (only cout)
                << ". Further messages will be suppressed." << std::endl;     // LCOV_EXCL_LINE (only cout)
    }                                                                         // LCOV_EXCL_LINE (only cout)
    // increase counter for stall detection
    getInstance().testableMode_repeatingMutexOwner++;

    // detect stall: if the same thread got the mutex with no other thread
    // obtaining it in between for one second, we assume no other thread is able
    // to process data at this time. The test should fail in this case
    if(getInstance().testableMode_repeatingMutexOwner > 100) {
      // print an informative message first, which lists also all variables
      // currently containing unread data.
      std::cerr << "*** Tests are stalled due to data which has been sent but not received." << std::endl;
      std::cerr << "    The following variables still contain unread values or had data loss due to a queue overflow:"
                << std::endl;
      for(auto& pair : Application::getInstance().testableMode_perVarCounter) {
        if(pair.second > 0) {
          std::cerr << "    - " << Application::getInstance().testableMode_names[pair.first] << " ["
                    << getInstance().testableMode_processVars[pair.first]->getId() << "]";
          // check if process variable still has data in the queue
          try {
            if(getInstance().testableMode_processVars[pair.first]->readNonBlocking()) {
              std::cerr << " (unread data in queue)";
            }
            else {
              std::cerr << " (data loss)";
            }
          }
          catch(std::logic_error&) {
            // if we receive a logic_error in readNonBlocking() it just means
            // another thread is waiting on a TransferFuture of this variable,
            // and we actually were not allowed to read...
            std::cerr << " (data loss)";
          }
          std::cerr << std::endl;
        }
      }
      std::cerr << "(end of list)" << std::endl;
      // Check for modules waiting for initial values (prints nothing if there are no such modules)
      getInstance().circularDependencyDetector.printWaiters();
      // throw a specialised exception to make sure whoever catches it really knows what he does...
      throw TestsStalled();
    }
  }
  else {
    // last owner of the mutex was different: reset the counter and store the
    // thread id
    getInstance().testableMode_repeatingMutexOwner = 0;
    getInstance().testableMode_lastMutexOwner = boost::this_thread::get_id();

    // debug output if enabled
    if(getInstance().enableDebugTestableMode) {                               // LCOV_EXCL_LINE (only cout)
      std::cout << "Application::testableModeLock(): Thread " << threadName() // LCOV_EXCL_LINE (only cout)
                << " obtained lock successfully for " << name << std::endl;   // LCOV_EXCL_LINE (only cout)
    }                                                                         // LCOV_EXCL_LINE (only cout)
  }
}

/*********************************************************************************************************************/

void Application::testableModeUnlock(const std::string& name) {
  if(!getInstance().testableMode) return;
  if(getInstance().enableDebugTestableMode &&
      (!getInstance().testableMode_repeatingMutexOwner                                     // LCOV_EXCL_LINE (only cout)
          || getInstance().testableMode_lastMutexOwner != boost::this_thread::get_id())) { // LCOV_EXCL_LINE (only cout)
    std::cout << "Application::testableModeUnlock(): Thread " << threadName()              // LCOV_EXCL_LINE (only cout)
              << " releases lock for " << name << std::endl;                               // LCOV_EXCL_LINE (only cout)
  }                                                                                        // LCOV_EXCL_LINE (only cout)
  getTestableModeLockObject().unlock();
}
/*********************************************************************************************************************/

void Application::setThreadName(const std::string& name) {
  std::unique_lock<std::mutex> myLock(m_threadNames);
  threadNames[boost::this_thread::get_id()] = name;
}

/*********************************************************************************************************************/

std::string Application::threadName(const boost::thread::id& threadId) {
  std::unique_lock<std::mutex> myLock(Application::getInstance().m_threadNames);
  auto& names = Application::getInstance().threadNames;
  if(names.find(threadId) == names.end()) {
    return "*UNKNOWN_THREAD*";
  }
  return names.at(threadId);
}

/*********************************************************************************************************************/

std::unique_lock<std::timed_mutex>& Application::getTestableModeLockObject() {
  // Note: due to a presumed bug in gcc (still present in gcc 7), the
  // thread_local definition must be in the cc file to prevent seeing different
  // objects in the same thread under some conditions. Another workaround for
  // this problem can be found in commit
  // dc051bfe35ce6c1ed954010559186f63646cf5d4
  thread_local std::unique_lock<std::timed_mutex> myLock(Application::testableMode_mutex, std::defer_lock);
  return myLock;
}

/*********************************************************************************************************************/

void Application::registerDeviceModule(DeviceModule* deviceModule) {
  deviceModuleMap[deviceModule->deviceAliasOrURI] = deviceModule;
}

/*********************************************************************************************************************/

void Application::unregisterDeviceModule(DeviceModule* deviceModule) {
  deviceModuleMap.erase(deviceModule->deviceAliasOrURI);
}

/*********************************************************************************************************************/

void Application::CircularDependencyDetector::registerDependencyWait(VariableNetworkNode& node) {
  assert(node.getType() == NodeType::Application);
  std::unique_lock<std::mutex> lock(_mutex);

  auto* dependent = dynamic_cast<Module*>(node.getOwningModule())->findApplicationModule();

  // register the waiting node in the map (used for the detectBlockedModules() thread). This is done also for Device
  // variables etc.
  _awaitedNodes[dependent] = node;

  // checking of direct circular dependencies is only done for Application-type feeders
  if(node.getOwner().getFeedingNode().getType() != NodeType::Application) return;
  auto* dependency = dynamic_cast<Module*>(node.getOwner().getFeedingNode().getOwningModule())->findApplicationModule();

  // If a module depends on itself, the detector would always detect a circular dependency, even if it is resolved
  // by writing initial values in prepare(). Hence we do not check anything in this case.
  if(dependent == dependency) return;

  // Register dependent-dependency relation in map
  _awaitedVariables[dependent] = node.getQualifiedName();
  _waitMap[dependent] = dependency;

  // check for circular dependencies
  auto* depdep = dependency;
  while(_waitMap.find(depdep) != _waitMap.end()) {
    auto* depdep_prev = depdep;
    depdep = _waitMap[depdep];
    if(depdep == dependent) {
      // The detected circular dependency might still resolve itself, because registerDependencyWait() is called even
      // if initial values are already present in the queues.
      for(size_t i = 0; i < 1000; ++i) {
        // give other thread a chance to read their initial value
        lock.unlock();
        usleep(10000);
        lock.lock();
        // if the module depending on an initial value for "us" is no longer waiting for us to send an initial value,
        // the circular dependency is resolved.
        if(_waitMap.find(depdep_prev) == _waitMap.end() || _waitMap[depdep] != dependent) return;
      }

      std::cerr << "*** Cirular dependency of ApplicationModules found while waiting for initial values!" << std::endl;
      std::cerr << std::endl;

      std::cerr << dependent->getQualifiedName() << " waits for " << node.getQualifiedName() << " from:" << std::endl;
      auto* depdep2 = dependency;
      while(_waitMap.find(depdep2) != _waitMap.end()) {
        auto waitsFor = _awaitedVariables[depdep2];
        std::cerr << depdep2->getQualifiedName();
        if(depdep2 == dependent) break;
        depdep2 = _waitMap[depdep2];
        std::cerr << " waits for " << waitsFor << " from:" << std::endl;
      }
      std::cerr << "." << std::endl;

      std::cerr << std::endl;
      std::cerr
          << "Please provide an initial value in the prepare() function of one of the involved ApplicationModules!"
          << std::endl;

      throw ChimeraTK::logic_error("Cirular dependency of ApplicationModules while waiting for initial values");
    }
    else {
      // Give other threads a chance to add to the wait map
      lock.unlock();
      usleep(10000);
      lock.lock();
    }
  }
}

/*********************************************************************************************************************/

void Application::CircularDependencyDetector::printWaiters() {
  if(_waitMap.size() == 0) return;
  std::cerr << "The following modules are still waiting for initial values:" << std::endl;
  for(auto& waiters : getInstance().circularDependencyDetector._waitMap) {
    std::cerr << waiters.first->getQualifiedName() << " waits for " << _awaitedVariables[waiters.first] << " from "
              << waiters.second->getQualifiedName() << std::endl;
  }
  std::cerr << "(end of list)" << std::endl;
}

/*********************************************************************************************************************/

void Application::CircularDependencyDetector::unregisterDependencyWait(VariableNetworkNode& node) {
  assert(node.getType() == NodeType::Application);
  std::lock_guard<std::mutex> lock(_mutex);
  auto* mod = dynamic_cast<Module*>(node.getOwningModule())->findApplicationModule();
  _waitMap.erase(mod);
  _awaitedVariables.erase(mod);
  _awaitedNodes.erase(mod);
}

/*********************************************************************************************************************/

void Application::CircularDependencyDetector::startDetectBlockedModules() {
  _thread = boost::thread([this] { detectBlockedModules(); });
}
/*********************************************************************************************************************/

void Application::CircularDependencyDetector::detectBlockedModules() {
  auto& app = Application::getInstance();
  while(true) {
    // wait some time to slow down this check
    boost::this_thread::sleep_for(boost::chrono::seconds(60));

    // check for modules which did not yet reach their mainLoop
    bool allModulesEnteredMainLoop = true;
    for(auto* module : app.getSubmoduleListRecursive()) {
      // Note: We are "abusing" this flag which was introduced for the testable mode. It actually just shows whether
      // the mainLoop() has benn called already (resp. will be called very soon). It is not depending on the
      // testableMode. FIXME: Rename this flag!
      if(module->hasReachedTestableMode()) {
        continue;
      }

      // Found a module which is still waiting for initial values
      allModulesEnteredMainLoop = false;

      // require lock against concurrent accesses on the maps
      std::lock_guard<std::mutex> lock(_mutex);

      // Check if module has registered a dependency wait. If not, situation will either resolve soon or module will
      // register a dependency wait soon. Both cases can be found in the next interation.
      if(_awaitedNodes.find(module) == _awaitedNodes.end()) continue;

      auto* appModule = dynamic_cast<ApplicationModule*>(module);
      if(!appModule) {
        // only ApplicationModule can have hasReachedTestableMode() == true, so it should not happen
        std::cout << "CircularDependencyDetector found non-application module: " << module->getQualifiedName()
                  << std::endl;
        continue;
      }

      // Iteratively seach for reason the module blocks
      std::function<void(const VariableNetworkNode& node)> iterativeSearch = [&](const VariableNetworkNode& node) {
        const auto& feeder = node.getOwner().getFeedingNode();
        if(feeder.getType() == NodeType::Application) {
          // fed by other ApplicationModule: check if that one is waiting, too
          auto* feedingAppModule = dynamic_cast<Module*>(feeder.getOwningModule())->findApplicationModule();
          if(feedingAppModule->hasReachedTestableMode()) {
            // the feeding module has started its mainLoop(), but not sent us an initial value
            // FIXME: There is a race condition, if the situation has right now resolved and the feeding module just
            // not yet sent the initial value. Ideally we should wait another iteration before printing warnings!
            if(_modulesWeHaveWarnedAbout.find(feedingAppModule) == _modulesWeHaveWarnedAbout.end()) {
              _modulesWeHaveWarnedAbout.insert(feedingAppModule);
              std::cout << "Note: ApplicationModule " << appModule->getQualifiedName() << " is waiting for an "
                        << "initial value, because " << feedingAppModule->getQualifiedName() << " has not yet sent one."
                        << std::endl;
            }
            return;
          }
          if(_awaitedNodes.find(feedingAppModule) == _awaitedNodes.end()) {
            // The other module is not right now waiting. It will either enter the mainLoop soon, or start waiting soon.
            // Let's just wait another iteration.
            return;
          }
          // The other module is right now waiting: continue iterative search
          iterativeSearch(_awaitedNodes.at(feedingAppModule));
        }
        else if(feeder.getType() == NodeType::Device) {
          // fed by device
          const auto& deviceName = feeder.getDeviceAlias();
          if(_devicesWeHaveWarnedAbout.find(deviceName) == _devicesWeHaveWarnedAbout.end()) {
            _devicesWeHaveWarnedAbout.insert(deviceName);
            std::cout << "Note: Still waiting for device " << deviceName << " to come up";
            auto* dm = Application::getInstance().deviceModuleMap[deviceName];
            std::unique_lock dmLock(dm->errorMutex);
            if(dm->deviceHasError) {
              std::cout << " (" << std::string(dm->deviceError._message) << ")";
            }
            std::cout << "..." << std::endl;
          }
        }
        else {
          // fed by anything else
          if(_otherThingsWeHaveWarnedAbout.find(feeder.getType()) == _otherThingsWeHaveWarnedAbout.end()) {
            _otherThingsWeHaveWarnedAbout.insert(feeder.getType());
            std::cout << "At least one ApplicationModule (" << appModule->getQualifiedName() << " is waiting for an "
                      << "initial value from NodeType " << int(feeder.getType()) << "." << std::endl;
            std::cout << "This is probably a BUG in the ChimeraTK framework." << std::endl;
            std::cout << "Network:" << std::endl;
            feeder.getOwner().dump();
          }
        }
      };
      iterativeSearch(_awaitedNodes.at(appModule));
    }

    // if all modules are in the mainLoop, stop this thread
    if(allModulesEnteredMainLoop) break;
  }

  std::cout << "All application modules are running." << std::endl;

  // free some memory
  _waitMap.clear();
  _awaitedVariables.clear();
  _awaitedNodes.clear();
  _modulesWeHaveWarnedAbout.clear();
  _devicesWeHaveWarnedAbout.clear();
  _otherThingsWeHaveWarnedAbout.clear();
}

/*********************************************************************************************************************/

void Application::CircularDependencyDetector::terminate() {
  if(_thread.joinable()) {
    _thread.interrupt();
    _thread.join();
  }
}

/*********************************************************************************************************************/

Application::CircularDependencyDetector::~CircularDependencyDetector() {
  assert(!_thread.joinable());
}

/*********************************************************************************************************************/