Skip to content
Snippets Groups Projects
FanOut.h 6.11 KiB
/*
 * FanOut.h
 *
 *  Created on: Jun 15, 2016
 *      Author: Martin Hierholzer
 */

#ifndef CHIMERATK_FAN_OUT_H
#define CHIMERATK_FAN_OUT_H

#include <ControlSystemAdapter/ProcessScalar.h>

#include "ApplicationException.h"
#include "ImplementationAdapter.h"

namespace ChimeraTK {


  /** @todo TODO This class should be split into two classes, one with a thread and the other without. The
   *  threaded version doesn't have to be a ProcessScalar! Instead it should be unified with the
   *  ImplementationAdapter class... */
  template<typename UserType>
  class FanOut : public mtca4u::ProcessScalar<UserType>, public ImplementationAdapterBase {

    public:

      /** Use this constructor if the FanOut should be a consuming implementation. */
      FanOut(boost::shared_ptr<mtca4u::ProcessVariable> feedingImpl)
      : _direction(VariableDirection::consuming)
      {
        impl = boost::dynamic_pointer_cast<mtca4u::ProcessScalar<UserType>>(feedingImpl);
        if(!impl) {
          throw ApplicationExceptionWithID<ApplicationExceptionID::illegalParameter>(
              "The FanOut has been constructed with a wrong output implementation type!");
        }
      }

      /** Use this constructor if the FanOut should be a feeding implementation. */
      FanOut()
      : impl(nullptr), _direction(VariableDirection::feeding)
      {}

      /** If activate() has been called on this FanOut, deactivate() must be called before destruction. Otherweise
       *  an assertion will be raised.
       *  Design note: stopping the thread inside the destructor may be too late, since the thread will be accessing
       *  the object while it is being destroyed already! */
      ~FanOut() {
        assert(!_thread.joinable());
      }

      /** Add a slave to the FanOut. Only sending end-points of a consuming node may be added. */
      void addSlave(boost::shared_ptr<mtca4u::ProcessVariable> slave) {
        if(!slave->isSender()) {
          throw ApplicationExceptionWithID<ApplicationExceptionID::illegalParameter>(
              "FanOut::addSlave() has been called with a receiving implementation!");
        }
        auto castedSlave = boost::dynamic_pointer_cast<mtca4u::ProcessScalar<UserType>>(slave);
        if(!castedSlave) {
          throw ApplicationExceptionWithID<ApplicationExceptionID::illegalParameter>(
              "FanOut::addSlave() has been called with a wrong input implementation type!");
        }
        if(impl == nullptr) {       // the first slave will be used as a "main" implementation, if
          impl = castedSlave;       // none was specified at construction
        }
        else {
          slaves.push_back(castedSlave);
        }
      }

      /** Add an external trigger to allow poll-type feeders to be used for push-type consumers */
      void addExternalTrigger(const boost::shared_ptr<mtca4u::ProcessVariable>& externalTriggerImpl) {
        assert(_direction == VariableDirection::consuming);
        externalTrigger = externalTriggerImpl;
        hasExternalTrigger = true;
      }

      void activate() {
        assert(_direction == VariableDirection::consuming);
        assert(!_thread.joinable());
        _thread = std::thread([this] { this->run(); });
      }

      void deactivate() {
        if(_thread.joinable()) {
          requestTerminateThread = true;
          _thread.join();
        }
        assert(!_thread.joinable());
      }

      /** Synchronise feeder and the consumers. This function is executed in the separate thread. */
      void run() {
        assert(_direction == VariableDirection::consuming);
        while(true) {
          if(hasExternalTrigger) {
            // wait for external trigger (if present)
            /// @todo TODO replace with proper blocking implementation when supported by the CSA
            while(externalTrigger->receive() == false) {
              if(requestTerminateThread) return;
              std::this_thread::yield();
            }
            // receive data
            impl->receive();
          }
          else {
            // receive data
            while(impl->receive() == false) {
              if(requestTerminateThread) return;
              std::this_thread::yield();
            }
          }
          for(auto &slave : slaves) {     // send out copies to slaves
            slave->set(impl->get());
            slave->send();
          }
        }
      }

      void set(mtca4u::ProcessScalar<UserType> const & other) {
        impl->set(other.get());
      }

      void set(UserType const & t) {
        impl->set(t);
      }

      operator UserType() const {
        return impl->get();
      }

      UserType get() const {
        return impl->get();
      }

      const std::type_info& getValueType() const {
        return typeid(UserType);
      }

      bool isReceiver() const {
        return _direction == VariableDirection::consuming;
      }

      bool isSender() const {
        return _direction == VariableDirection::feeding;
      }

      TimeStamp getTimeStamp() const {
        return impl->getTimeStamp();
      }

      bool receive() {
        bool ret = impl->receive();
        if(ret) {
          for(auto &slave : slaves) {     // send out copies to slaves
            slave->set(impl->get());
            slave->send();
          }
        }
        return ret;
      }

      bool send() {
        bool ret = true;
        for(auto &slave : slaves) {     // send out copies to slaves
          slave->set(impl->get());
          bool ret2 = slave->send();
          if(!ret2) ret = false;
        }
        bool ret2 = impl->send();
        if(!ret2) ret = false;
        return ret;
      }

    protected:

      boost::shared_ptr<mtca4u::ProcessScalar<UserType>> impl;

      std::list<boost::shared_ptr<mtca4u::ProcessScalar<UserType>>> slaves;

      VariableDirection _direction;

      bool hasExternalTrigger{false};
      boost::shared_ptr<mtca4u::ProcessVariable> externalTrigger;

      /** Thread handling the synchronisation, if needed */
      std::thread _thread;

      /** Flag to request termination of the synchronisation thread. */
      bool requestTerminateThread{false};

  };

} /* namespace ChimeraTK */

#endif /* CHIMERATK_FAN_OUT_H */