Skip to content
Snippets Groups Projects
Application.cc 56.7 KiB
Newer Older
        }
        else if(consumer.getType() == NodeType::ControlSystem) {
          auto impl = createProcessVariable<UserType>(consumer);
          impl->accessChannel(0) = feedingImpl->accessChannel(0);
          impl->write();
        }
        else if(consumer.getType() == NodeType::Device) {
          // we register the required accessor as a recovery accessor. This is just a bare RegisterAccessor without any decorations directly from the backend.
          if(deviceMap.count(consumer.getDeviceAlias()) == 0) {
            deviceMap[consumer.getDeviceAlias()] =
                ChimeraTK::BackendFactory::getInstance().createBackend(consumer.getDeviceAlias());
          }
          auto impl = deviceMap[consumer.getDeviceAlias()]->getRegisterAccessor<UserType>(
              consumer.getRegisterName(), consumer.getNumberOfElements(), 0, {});
          impl->accessChannel(0) = feedingImpl->accessChannel(0);
          // find the right DeviceModule for this alias name
          DeviceModule* devmod = nullptr;
          for(auto& dm : deviceModuleList) {
            if(dm->deviceAliasOrURI == consumer.getDeviceAlias()) {
              devmod = dm;
              break;
            }
          }
          assert(devmod != nullptr);
          // The accessor implementation already has its data in the user buffer. We now just have to add a valid version number
          // and have a recovery accessors (RecoveryHelper to be excact) which we can register at the DeviceModule.
          // As this is a constant we don't need to change it later and don't have to store it somewere else.
          devmod->addRecoveryAccessor(boost::make_shared<RecoveryHelper>(impl, VersionNumber()));
        }
        else if(consumer.getType() == NodeType::TriggerReceiver) {
          throw ChimeraTK::logic_error("Using constants as triggers is not supported!");
        }
        else {
          throw ChimeraTK::logic_error("Unexpected node type!"); // LCOV_EXCL_LINE (assert-like)

    if(!connectionMade) {                                                     // LCOV_EXCL_LINE (assert-like)
      throw ChimeraTK::logic_error(                                           // LCOV_EXCL_LINE (assert-like)
          "The variable network cannot be handled. Implementation missing!"); // LCOV_EXCL_LINE (assert-like)
    }                                                                         // LCOV_EXCL_LINE (assert-like)
  catch(ChimeraTK::logic_error& e) {
    std::stringstream ss;
    ss << "ChimeraTK::logic_error thrown in Application::typedMakeConnection() for network:" << std::endl;
    network.dump("", ss);
    ss << e.what();
    throw ChimeraTK::logic_error(ss.str());
}

/*********************************************************************************************************************/


template<typename UserType>
void Application::addConsumersToFanout(boost::shared_ptr<FanOut<UserType>> fanOut, VariableNetworkNode& feeder, std::list<VariableNetworkNode> consumers, boost::shared_ptr<ConsumingFanOut<UserType>> consumingFanOut) {
  // In case we have one or more trigger receivers among our consumers, we
  // produce one consuming application variable for each device. Later this will create a TriggerFanOut for
  // each trigger consumer, i.e. one per device so one blocking device does not affect the others.
  /** Map of deviceAliases to their corresponding TriggerFanOuts. */
  std::map<std::string, boost::shared_ptr<ChimeraTK::NDRegisterAccessor<UserType>>> triggerFanOuts;

  // add all consumers to the FanOut
  for(auto& consumer : consumers) {
    if(consumer.getType() == NodeType::Application) {
      if(consumingFanOut && consumer.getMode() == UpdateMode::poll) {
        consumer.setAppAccessorImplementation<UserType>(consumingFanOut);
        consumingFanOut.reset();
      }
      else {
        auto impls = createApplicationVariable<UserType>(consumer);
        fanOut->addSlave(impls.first, consumer);
        consumer.setAppAccessorImplementation<UserType>(impls.second);
      }
    }
    else if(consumer.getType() == NodeType::ControlSystem) {
      auto impl = createProcessVariable<UserType>(consumer);
      fanOut->addSlave(impl, consumer);
    }
    else if(consumer.getType() == NodeType::Device) {
      auto impl = createDeviceVariable<UserType>(consumer.getDeviceAlias(), consumer.getRegisterName(),
          {VariableDirection::consuming, false}, consumer.getMode(), consumer.getNumberOfElements());
      fanOut->addSlave(impl, consumer);
    }
    else if(consumer.getType() == NodeType::TriggerReceiver) {
      std::string deviceAlias = consumer.getNodeToTrigger().getOwner().getFeedingNode().getDeviceAlias();
      auto triggerFanOut = triggerFanOuts[deviceAlias];
      if(!triggerFanOut) { // triggerFanOut is a shared pointer, which evaluates false if default constructed.
        // create a new process variable pair and set the sender/feeder to the fan out
        auto triggerConnection = createApplicationVariable<UserType>(feeder);
        triggerFanOut = triggerConnection.second;
        triggerFanOuts[deviceAlias] = triggerFanOut;
        fanOut->addSlave(triggerConnection.first, consumer);
      }
      consumer.getNodeToTrigger().getOwner().setExternalTriggerImpl(triggerFanOut);
    }
    else {
      throw ChimeraTK::logic_error("Unexpected node type!"); // LCOV_EXCL_LINE (assert-like)
    }
  }
};

/*********************************************************************************************************************/

VariableNetwork& Application::createNetwork() {
  networkList.emplace_back();
  return networkList.back();
}

/*********************************************************************************************************************/

Application& Application::getInstance() {
  return dynamic_cast<Application&>(ApplicationBase::getInstance());

/*********************************************************************************************************************/

void Application::stepApplication(bool waitForDeviceInitialisation) {
  // testableMode_counter must be non-zero, otherwise there is no input for the application to process. It is also
  // sufficient if testableMode_deviceInitialisationCounter is non-zero, if waitForDeviceInitialisation == true. In that
  // case we only wait for the device initialisation to be completed.
  if(testableMode_counter == 0 && (!waitForDeviceInitialisation || testableMode_deviceInitialisationCounter == 0)) {
    throw ChimeraTK::logic_error("Application::stepApplication() called despite no input was provided "
                                 "to the application to process!");
  // let the application run until it has processed all data (i.e. the semaphore
  // counter is 0)
  while(testableMode_counter > 0 || (waitForDeviceInitialisation && testableMode_deviceInitialisationCounter > 0)) {
    if(enableDebugTestableMode && (oldCounter != testableMode_counter)) { // LCOV_EXCL_LINE (only cout)
      std::cout << "Application::stepApplication(): testableMode_counter = " << testableMode_counter
                << std::endl;            // LCOV_EXCL_LINE (only cout)
      oldCounter = testableMode_counter; // LCOV_EXCL_LINE (only cout)
  }
}

/*********************************************************************************************************************/

void Application::testableModeLock(const std::string& name) {
  // don't do anything if testable mode is not enabled
  if(!getInstance().testableMode) return;
  // debug output if enabled (also prevent spamming the same message)
  if(getInstance().enableDebugTestableMode && getInstance().testableMode_repeatingMutexOwner == 0) { // LCOV_EXCL_LINE
                                                                                                     // (only cout)
    std::cout << "Application::testableModeLock(): Thread " << threadName() // LCOV_EXCL_LINE (only cout)
              << " tries to obtain lock for " << name << std::endl;         // LCOV_EXCL_LINE (only cout)
  }                                                                         // LCOV_EXCL_LINE (only cout)

  // if last lock was obtained repeatedly by the same thread, sleep a short time
  // before obtaining the lock to give the other threads a chance to get the
  // lock first
  if(getInstance().testableMode_repeatingMutexOwner > 0) usleep(10000);
  // check if the last owner of the mutex was this thread, which may be a hint
  // that no other thread is waiting for the lock
  if(getInstance().testableMode_lastMutexOwner == std::this_thread::get_id()) {
    if(getInstance().enableDebugTestableMode && getInstance().testableMode_repeatingMutexOwner == 0) { // LCOV_EXCL_LINE
                                                                                                       // (only cout)
      std::cout << "Application::testableModeLock(): Thread " << threadName() // LCOV_EXCL_LINE (only cout)
                << " repeatedly obtained lock successfully for " << name      // LCOV_EXCL_LINE (only cout)
                << ". Further messages will be suppressed." << std::endl;     // LCOV_EXCL_LINE (only cout)
    }                                                                         // LCOV_EXCL_LINE (only cout)
    getInstance().testableMode_repeatingMutexOwner++;
    // detect stall: if the same thread got the mutex with no other thread
    // obtaining it in between for one second, we assume no other thread is able
    // to process data at this time. The test should fail in this case
    if(getInstance().testableMode_repeatingMutexOwner > 100) {
      // print an informative message first, which lists also all variables
      // currently containing unread data.
      std::cout << "*** Tests are stalled due to data which has been sent but "
                   "not received."
                << std::endl;
      std::cout << "    The following variables still contain unread values or "
                   "had data loss due to a queue overflow:"
                << std::endl;
      for(auto& pair : Application::getInstance().testableMode_perVarCounter) {
        if(pair.second > 0) {
          std::cout << "    - " << Application::getInstance().testableMode_names[pair.first] << " ["
                    << getInstance().testableMode_processVars[pair.first]->getId() << "]";
            if(getInstance().testableMode_processVars[pair.first]->readNonBlocking()) {
              std::cout << " (unread data in queue)";
          }
          catch(std::logic_error&) {
            // if we receive a logic_error in readNonBlocking() it just means
            // another thread is waiting on a TransferFuture of this variable,
            // and we actually were not allowed to read...
      // throw a specialised exception to make sure whoever catches it really
      // knows what he does...
      // getInstance().testableMode_counter = 0;
      // for(auto &pair : Application::getInstance().testableMode_perVarCounter)
      // pair.second = 0;
    // last owner of the mutex was different: reset the counter and store the
    // thread id
    getInstance().testableMode_repeatingMutexOwner = 0;
    getInstance().testableMode_lastMutexOwner = std::this_thread::get_id();
    if(getInstance().enableDebugTestableMode) {                               // LCOV_EXCL_LINE (only cout)
      std::cout << "Application::testableModeLock(): Thread " << threadName() // LCOV_EXCL_LINE (only cout)
                << " obtained lock successfully for " << name << std::endl;   // LCOV_EXCL_LINE (only cout)
    }                                                                         // LCOV_EXCL_LINE (only cout)
  }
}

/*********************************************************************************************************************/

void Application::testableModeUnlock(const std::string& name) {
  if(!getInstance().testableMode) return;
  if(getInstance().enableDebugTestableMode &&
      (!getInstance().testableMode_repeatingMutexOwner                                   // LCOV_EXCL_LINE (only cout)
          || getInstance().testableMode_lastMutexOwner != std::this_thread::get_id())) { // LCOV_EXCL_LINE (only cout)
    std::cout << "Application::testableModeUnlock(): Thread " << threadName()            // LCOV_EXCL_LINE (only cout)
              << " releases lock for " << name << std::endl;                             // LCOV_EXCL_LINE (only cout)
  }                                                                                      // LCOV_EXCL_LINE (only cout)
/*********************************************************************************************************************/

std::string& Application::threadName() {
  // Note: due to a presumed bug in gcc (still present in gcc 7), the
  // thread_local definition must be in the cc file to prevent seeing different
  // objects in the same thread under some conditions. Another workaround for
  // this problem can be found in commit
  // dc051bfe35ce6c1ed954010559186f63646cf5d4
  thread_local std::string name{"**UNNAMED**"};
  return name;
}

/*********************************************************************************************************************/

std::unique_lock<std::mutex>& Application::getTestableModeLockObject() {
  // Note: due to a presumed bug in gcc (still present in gcc 7), the
  // thread_local definition must be in the cc file to prevent seeing different
  // objects in the same thread under some conditions. Another workaround for
  // this problem can be found in commit
  // dc051bfe35ce6c1ed954010559186f63646cf5d4
  thread_local std::unique_lock<std::mutex> myLock(Application::testableMode_mutex, std::defer_lock);
  return myLock;

/*********************************************************************************************************************/
void Application::registerDeviceModule(DeviceModule* deviceModule) {
  deviceModuleList.push_back(deviceModule);
}

/*********************************************************************************************************************/
void Application::unregisterDeviceModule(DeviceModule* deviceModule) {
  deviceModuleList.remove(deviceModule);
}