Skip to content
Snippets Groups Projects
ThreadedFanOut.h 9.93 KiB
Newer Older
// SPDX-FileCopyrightText: Deutsches Elektronen-Synchrotron DESY, MSK, ChimeraTK Project <chimeratk-support@desy.de>
// SPDX-License-Identifier: LGPL-3.0-or-later
#pragma once
#include "FanOut.h"
#include "InternalModule.h"
#include "ReverseRecoveryDecorator.h"
#include <ChimeraTK/NDRegisterAccessor.h>
#include <ChimeraTK/ReadAnyGroup.h>
#include <ChimeraTK/SupportedUserTypes.h>
#include <ChimeraTK/SystemTags.h>

#include <boost/smart_ptr/shared_ptr.hpp>

#include <string>
  /********************************************************************************************************************/

  /** FanOut implementation with an internal thread which waits for new data which
   * is read from the given feeding implementation and distributed to any number
   * of slaves. */
  template<typename UserType>
  class ThreadedFanOut : public FanOut<UserType>, public InternalModule {
   public:
    ThreadedFanOut(boost::shared_ptr<ChimeraTK::NDRegisterAccessor<UserType>> feedingImpl,
        ConsumerImplementationPairs<UserType> const& consumerImplementationPairs);
    ~ThreadedFanOut() override;
    void activate() override;
    void deactivate() override;

    /** Synchronise feeder and the consumers. This function is executed in the
     * separate thread. */
    VersionNumber readInitialValues();

   protected:
    /** Thread handling the synchronisation, if needed */
    boost::thread _thread;

    /** Reference to VariableNetwork which is being realised by this FanOut. **/
    // VariableNetwork& _network;
  };

  /********************************************************************************************************************/

  /** Same as ThreadedFanOut but with return channel */
  template<typename UserType>
  class ThreadedFanOutWithReturn : public ThreadedFanOut<UserType> {
   public:
    ThreadedFanOutWithReturn(boost::shared_ptr<ChimeraTK::NDRegisterAccessor<UserType>> feedingImpl,
        ConsumerImplementationPairs<UserType> const& consumerImplementationPairs);
        boost::shared_ptr<ChimeraTK::NDRegisterAccessor<UserType>> slave, VariableNetworkNode& consumer) override;
   protected:
    /** Thread handling the synchronisation, if needed */
    boost::thread _thread;
    boost::shared_ptr<ChimeraTK::NDRegisterAccessor<UserType>> _initialValueProvider;
    std::vector<boost::shared_ptr<ChimeraTK::NDRegisterAccessor<UserType>>> _inputChannels;
    // using ThreadedFanOut<UserType>::_network;
    using ThreadedFanOut<UserType>::readInitialValues;
    using EntityOwner::_testableModeReached;
  /********************************************************************************************************************/
  /********************************************************************************************************************/

  template<typename UserType>
  ThreadedFanOut<UserType>::ThreadedFanOut(boost::shared_ptr<ChimeraTK::NDRegisterAccessor<UserType>> feedingImpl,
      ConsumerImplementationPairs<UserType> const& consumerImplementationPairs)
  : FanOut<UserType>(feedingImpl) /*, _network(network)*/ {
    assert(feedingImpl->getAccessModeFlags().has(AccessMode::wait_for_new_data));
    for(auto el : consumerImplementationPairs) {
      FanOut<UserType>::addSlave(el.first, el.second);
    }
  }

  /********************************************************************************************************************/

  template<typename UserType>
  ThreadedFanOut<UserType>::~ThreadedFanOut() {
    try {
      deactivate();
    }
    catch(ChimeraTK::logic_error& e) {
      std::cerr << e.what() << std::endl;
      std::exit(1);
    }
  }

  /********************************************************************************************************************/

  template<typename UserType>
  void ThreadedFanOut<UserType>::activate() {
    if(this->_disabled) {
      return;
    }
    assert(!_thread.joinable());
    _thread = boost::thread([this] { this->run(); });
  }

  /********************************************************************************************************************/

  template<typename UserType>
  void ThreadedFanOut<UserType>::deactivate() {
    try {
      if(_thread.joinable()) {
        _thread.interrupt();
        FanOut<UserType>::interrupt();
        _thread.join();
      }
      assert(!_thread.joinable());
    }
    catch(boost::thread_resource_error& e) {
      assert(false);
    }
  }

  /********************************************************************************************************************/

  template<typename UserType>
  void ThreadedFanOut<UserType>::run() {
    Application::registerThread("ThFO" + FanOut<UserType>::_impl->getName());
Jens Georg's avatar
Jens Georg committed
    Application::getInstance().getTestableMode().lock("start");
    _testableModeReached = true;

    ChimeraTK::VersionNumber version{nullptr};
    version = readInitialValues();
    while(true) {
      // send out copies to slaves
      boost::this_thread::interruption_point();
      auto validity = FanOut<UserType>::_impl->dataValidity();
      for(auto& slave : FanOut<UserType>::_slaves) {
        // do not send copy if no data is expected (e.g. trigger)
        if(slave->getNumberOfSamples() != 0) {
          slave->accessChannel(0) = FanOut<UserType>::_impl->accessChannel(0);
        }
        slave->setDataValidity(validity);
        bool dataLoss = slave->writeDestructively(version);
        if(dataLoss) {
          Application::incrementDataLossCounter(slave->getName());
        }
      }
      // receive data
      boost::this_thread::interruption_point();
      FanOut<UserType>::_impl->read();
      version = FanOut<UserType>::_impl->getVersionNumber();
    }
  }

  /********************************************************************************************************************/

  template<typename UserType>
  VersionNumber ThreadedFanOut<UserType>::readInitialValues() {
Jens Georg's avatar
Jens Georg committed
    Application::getInstance().getTestableMode().unlock("readInitialValues");
    FanOut<UserType>::_impl->read();
Jens Georg's avatar
Jens Georg committed
    if(!Application::getInstance().getTestableMode().testLock()) {
      Application::getInstance().getTestableMode().lock("readInitialValues");
    return FanOut<UserType>::_impl->getVersionNumber();
  }

  /********************************************************************************************************************/
  /********************************************************************************************************************/

  template<typename UserType>
  ThreadedFanOutWithReturn<UserType>::ThreadedFanOutWithReturn(
      boost::shared_ptr<ChimeraTK::NDRegisterAccessor<UserType>> feedingImpl,
      ConsumerImplementationPairs<UserType> const& consumerImplementationPairs)
  : ThreadedFanOut<UserType>(feedingImpl, consumerImplementationPairs) {
    _inputChannels.push_back(feedingImpl);
    // By default, we take the initial value from the feeder
    _initialValueProvider = feedingImpl;
    for(auto el : consumerImplementationPairs) {
      ThreadedFanOutWithReturn<UserType>::addSlave(el.first, el.second);
    }
  }

  /********************************************************************************************************************/

  template<typename UserType>
  void ThreadedFanOutWithReturn<UserType>::addSlave(
      boost::shared_ptr<ChimeraTK::NDRegisterAccessor<UserType>> slave, VariableNetworkNode& consumer) {
    // TODO Adding slaves is currently by done by the ThreadedFanOut base class constructor.
    //      Refactor constructors and addSlaves for all FanOuts?
    // FanOut<UserType>::addSlave(slave, consumer);

    if(consumer.getTags().contains(ChimeraTK::SystemTags::reverseRecovery)) {
      _initialValueProvider = slave;
      // FIXME: Do we need to check here that there is only one reverse recovery accessor
    }

    if(consumer.getDirection().withReturn) {
      _inputChannels.push_back(slave);
    }
  }

  /********************************************************************************************************************/

  template<typename UserType>
  void ThreadedFanOutWithReturn<UserType>::run() {
    Application::registerThread("ThFO" + FanOut<UserType>::_impl->getName());
Jens Georg's avatar
Jens Georg committed
    Application::getInstance().getTestableMode().lock("start");
    _testableModeReached = true;
    std::map<TransferElementID, boost::shared_ptr<NDRegisterAccessor<UserType>>> accessors;
    for(auto& acc : FanOut<UserType>::_slaves) {
      accessors[acc->getId()] = acc;
    }
    accessors[FanOut<UserType>::_impl->getId()] = FanOut<UserType>::_impl;

    // For reading the initial value, swap out _impl, because readInitialValues()
    // operates on it
    std::swap(FanOut<UserType>::_impl, _initialValueProvider);
    TransferElementID changedVariable = FanOut<UserType>::_impl->getId();
    VersionNumber version{nullptr};

    version = readInitialValues();
    std::swap(FanOut<UserType>::_impl, _initialValueProvider);
    ReadAnyGroup group(_inputChannels.begin(), _inputChannels.end());

      // send out copies to all receivers (slaves and return channel of feeding node)
      for(auto& [id, accessor] : accessors) {
        // do not feed back value to the accessor we got it from
        if(id == changedVariable) {
          continue;
        }
        // do not send copy if no data is expected (e.g. trigger)
        if(accessor->getNumberOfSamples() != 0) {
          accessor->accessChannel(0) = accessors[changedVariable]->accessChannel(0);

        bool dataLoss = accessor->writeDestructively(version);
        if(dataLoss) {
          Application::incrementDataLossCounter(accessor->getName());
      // receive data
      boost::this_thread::interruption_point();
      changedVariable = group.readAny();
      boost::this_thread::interruption_point();

      version = accessors[changedVariable]->getVersionNumber();
    }
  }

  /********************************************************************************************************************/