Skip to content
Snippets Groups Projects
ThreadedFanOut.h 9.75 KiB
Newer Older
// SPDX-FileCopyrightText: Deutsches Elektronen-Synchrotron DESY, MSK, ChimeraTK Project <chimeratk-support@desy.de>
// SPDX-License-Identifier: LGPL-3.0-or-later
#pragma once
#include "FanOut.h"
#include "InternalModule.h"

#include <ChimeraTK/NDRegisterAccessor.h>
#include <ChimeraTK/ReadAnyGroup.h>

  /********************************************************************************************************************/

  /** FanOut implementation with an internal thread which waits for new data which
   * is read from the given feeding implementation and distributed to any number
   * of slaves. */
  template<typename UserType>
  class ThreadedFanOut : public FanOut<UserType>, public InternalModule {
   public:
    ThreadedFanOut(boost::shared_ptr<ChimeraTK::NDRegisterAccessor<UserType>> feedingImpl,
        ConsumerImplementationPairs<UserType> const& consumerImplementationPairs);
    ~ThreadedFanOut() override;
    void activate() override;
    void deactivate() override;

    /** Synchronise feeder and the consumers. This function is executed in the
     * separate thread. */
    VersionNumber readInitialValues();

   protected:
    /** Thread handling the synchronisation, if needed */
    boost::thread _thread;

    /** Reference to VariableNetwork which is being realised by this FanOut. **/
    // VariableNetwork& _network;
  };

  /********************************************************************************************************************/

  /** Same as ThreadedFanOut but with return channel */
  template<typename UserType>
  class ThreadedFanOutWithReturn : public ThreadedFanOut<UserType> {
   public:
    ThreadedFanOutWithReturn(boost::shared_ptr<ChimeraTK::NDRegisterAccessor<UserType>> feedingImpl,
        ConsumerImplementationPairs<UserType> const& consumerImplementationPairs);
    void setReturnChannelSlave(boost::shared_ptr<ChimeraTK::NDRegisterAccessor<UserType>> returnChannelSlave);
        boost::shared_ptr<ChimeraTK::NDRegisterAccessor<UserType>> slave, VariableNetworkNode& consumer) override;
   protected:
    /** Thread handling the synchronisation, if needed */
    boost::thread _thread;
    boost::shared_ptr<ChimeraTK::NDRegisterAccessor<UserType>> _returnChannelSlave;
    // using ThreadedFanOut<UserType>::_network;
    using ThreadedFanOut<UserType>::readInitialValues;
    using EntityOwner::_testableModeReached;
  /********************************************************************************************************************/
  /********************************************************************************************************************/

  template<typename UserType>
  ThreadedFanOut<UserType>::ThreadedFanOut(boost::shared_ptr<ChimeraTK::NDRegisterAccessor<UserType>> feedingImpl,
      ConsumerImplementationPairs<UserType> const& consumerImplementationPairs)
  : FanOut<UserType>(feedingImpl) /*, _network(network)*/ {
    assert(feedingImpl->getAccessModeFlags().has(AccessMode::wait_for_new_data));
    for(auto el : consumerImplementationPairs) {
      FanOut<UserType>::addSlave(el.first, el.second);
    }
  }

  /********************************************************************************************************************/

  template<typename UserType>
  ThreadedFanOut<UserType>::~ThreadedFanOut() {
    try {
      deactivate();
    }
    catch(ChimeraTK::logic_error& e) {
      std::cerr << e.what() << std::endl;
      std::exit(1);
    }
  }

  /********************************************************************************************************************/

  template<typename UserType>
  void ThreadedFanOut<UserType>::activate() {
    if(this->_disabled) {
      return;
    }
    assert(!_thread.joinable());
    _thread = boost::thread([this] { this->run(); });
  }

  /********************************************************************************************************************/

  template<typename UserType>
  void ThreadedFanOut<UserType>::deactivate() {
    try {
      if(_thread.joinable()) {
        _thread.interrupt();
        FanOut<UserType>::interrupt();
        _thread.join();
      }
      assert(!_thread.joinable());
    }
    catch(boost::thread_resource_error& e) {
      assert(false);
    }
  }

  /********************************************************************************************************************/

  template<typename UserType>
  void ThreadedFanOut<UserType>::run() {
    Application::registerThread("ThFO" + FanOut<UserType>::_impl->getName());
Jens Georg's avatar
Jens Georg committed
    Application::getInstance().getTestableMode().lock("start");
    _testableModeReached = true;

    ChimeraTK::VersionNumber version{nullptr};
    version = readInitialValues();
    while(true) {
      // send out copies to slaves
      boost::this_thread::interruption_point();
      auto validity = FanOut<UserType>::_impl->dataValidity();
      for(auto& slave : FanOut<UserType>::_slaves) {
        // do not send copy if no data is expected (e.g. trigger)
        if(slave->getNumberOfSamples() != 0) {
          slave->accessChannel(0) = FanOut<UserType>::_impl->accessChannel(0);
        }
        slave->setDataValidity(validity);
        bool dataLoss = slave->writeDestructively(version);
        if(dataLoss) {
          Application::incrementDataLossCounter(slave->getName());
        }
      }
      // receive data
      boost::this_thread::interruption_point();
      FanOut<UserType>::_impl->read();
      version = FanOut<UserType>::_impl->getVersionNumber();
    }
  }

  /********************************************************************************************************************/

  template<typename UserType>
  VersionNumber ThreadedFanOut<UserType>::readInitialValues() {
Jens Georg's avatar
Jens Georg committed
    Application::getInstance().getTestableMode().unlock("readInitialValues");
    FanOut<UserType>::_impl->read();
Jens Georg's avatar
Jens Georg committed
    if(!Application::getInstance().getTestableMode().testLock()) {
      Application::getInstance().getTestableMode().lock("readInitialValues");
    return FanOut<UserType>::_impl->getVersionNumber();
  }

  /********************************************************************************************************************/
  /********************************************************************************************************************/

  template<typename UserType>
  ThreadedFanOutWithReturn<UserType>::ThreadedFanOutWithReturn(
      boost::shared_ptr<ChimeraTK::NDRegisterAccessor<UserType>> feedingImpl,
      ConsumerImplementationPairs<UserType> const& consumerImplementationPairs)
  : ThreadedFanOut<UserType>(feedingImpl, consumerImplementationPairs) {
    for(auto el : consumerImplementationPairs) {
      // TODO Calling a virtual in the constructor seems odd,
      //      but works because we want this version's implementation
      addSlave(el.first, el.second);
    }
  }

  /********************************************************************************************************************/

  template<typename UserType>
  void ThreadedFanOutWithReturn<UserType>::setReturnChannelSlave(
      boost::shared_ptr<ChimeraTK::NDRegisterAccessor<UserType>> returnChannelSlave) {
    _returnChannelSlave = returnChannelSlave;
  }

  /********************************************************************************************************************/

  template<typename UserType>
  void ThreadedFanOutWithReturn<UserType>::addSlave(
      boost::shared_ptr<ChimeraTK::NDRegisterAccessor<UserType>> slave, VariableNetworkNode& consumer) {
    // TODO Adding slaves is currently by done by the ThreadedFanOut base class.
    //      Refactor constructors and addSlaves for all FanOuts?
    // FanOut<UserType>::addSlave(slave, consumer);
    if(consumer.getDirection().withReturn) {
      assert(_returnChannelSlave == nullptr);
      _returnChannelSlave = slave;
    }
  }

  /********************************************************************************************************************/

  template<typename UserType>
  void ThreadedFanOutWithReturn<UserType>::run() {
    Application::registerThread("ThFO" + FanOut<UserType>::_impl->getName());
Jens Georg's avatar
Jens Georg committed
    Application::getInstance().getTestableMode().lock("start");
    _testableModeReached = true;

    TransferElementID var;
    ChimeraTK::VersionNumber version{nullptr};

    version = readInitialValues();

    ReadAnyGroup group({FanOut<UserType>::_impl, _returnChannelSlave});
    while(true) {
      // send out copies to slaves
      for(auto& slave : FanOut<UserType>::_slaves) {
        // do not feed back value returnChannelSlave if it was received from it
        if(slave->getId() == var) {
          continue;
        }
        // do not send copy if no data is expected (e.g. trigger)
        if(slave->getNumberOfSamples() != 0) {
          slave->accessChannel(0) = FanOut<UserType>::_impl->accessChannel(0);
        }
        bool dataLoss = slave->writeDestructively(version);
        if(dataLoss) {
          Application::incrementDataLossCounter(slave->getName());
        }
      }
      // receive data
      boost::this_thread::interruption_point();
      var = group.readAny();
      boost::this_thread::interruption_point();
      // if the update came through the return channel, return it to the feeder
      if(var == _returnChannelSlave->getId()) {
        FanOut<UserType>::_impl->accessChannel(0).swap(_returnChannelSlave->accessChannel(0));
        if(version < _returnChannelSlave->getVersionNumber()) {
          version = _returnChannelSlave->getVersionNumber();
        }
        FanOut<UserType>::_impl->write(version);
        version = FanOut<UserType>::_impl->getVersionNumber();
      }
    }
  }

  /********************************************************************************************************************/