Skip to content
Snippets Groups Projects
FanOut.h 6.88 KiB
/*
 * FanOut.h
 *
 *  Created on: Jun 15, 2016
 *      Author: Martin Hierholzer
 */

#ifndef CHIMERATK_FAN_OUT_H
#define CHIMERATK_FAN_OUT_H

#include <mtca4u/NDRegisterAccessor.h>

#include "ApplicationException.h"
#include "InternalModule.h"

namespace ChimeraTK {

  
  template<typename UserType>
  class FanOut : public mtca4u::NDRegisterAccessor<UserType>, public InternalModule {

    public:

      /** Use this constructor if the FanOut should be a consuming implementation. */
      FanOut(boost::shared_ptr<mtca4u::NDRegisterAccessor<UserType>> feedingImpl)
      {
        impl = boost::dynamic_pointer_cast<mtca4u::NDRegisterAccessor<UserType>>(feedingImpl);
        if(!impl) {
          throw ApplicationExceptionWithID<ApplicationExceptionID::illegalParameter>(
              "The FanOut has been constructed with a wrong output implementation type!");
        }
        mtca4u::NDRegisterAccessor<UserType>::buffer_2D.resize( impl->getNumberOfChannels() );
        for(size_t i=0; i<impl->getNumberOfChannels(); i++) {
          mtca4u::NDRegisterAccessor<UserType>::buffer_2D[i].resize( impl->getNumberOfSamples() );
        }
      }

      /** If activate() has been called on this FanOut, deactivate() must be called before destruction. Otherweise
       *  an assertion will be raised.
       *  Design note: stopping the thread inside the destructor may be too late, since the thread will be accessing
       *  the object while it is being destroyed already! */
      ~FanOut() {
        assert(!_thread.joinable());
      }

      /** Add a slave to the FanOut. Only sending end-points of a consuming node may be added. */
      void addSlave(boost::shared_ptr<mtca4u::NDRegisterAccessor<UserType>> slave) {
        if(!slave->isWriteable()) {
          throw ApplicationExceptionWithID<ApplicationExceptionID::illegalParameter>(
              "FanOut::addSlave() has been called with a receiving implementation!");
        }
        auto castedSlave = boost::dynamic_pointer_cast<mtca4u::NDRegisterAccessor<UserType>>(slave);
        if(!castedSlave) {
          throw ApplicationExceptionWithID<ApplicationExceptionID::illegalParameter>(
              "FanOut::addSlave() has been called with a wrong input implementation type!");
        }
        if(impl == nullptr) {       // the first slave will be used as a "main" implementation, if
          impl = castedSlave;       // none was specified at construction
          mtca4u::NDRegisterAccessor<UserType>::buffer_2D.resize( impl->getNumberOfChannels() );
          for(size_t i=0; i<impl->getNumberOfChannels(); i++) {
            mtca4u::NDRegisterAccessor<UserType>::buffer_2D[i].resize( impl->getNumberOfSamples() );
          }
        }
        else {
          // check if array shape is compatible, unless the receiver is a trigger node, so no data is expected
          if( castedSlave->getNumberOfSamples() != 0 && 
              ( castedSlave->getNumberOfChannels() != impl->getNumberOfChannels() ||
                castedSlave->getNumberOfSamples() != impl->getNumberOfSamples()      ) ) {
            std::string what = "FanOut::addSlave(): Trying to add a slave '";
            what += castedSlave->getName();
            what += "' with incompatible array shape! Name of master: ";
            what += impl->getName();
            throw ApplicationExceptionWithID<ApplicationExceptionID::illegalParameter>(what.c_str());
          }
          slaves.push_back(castedSlave);
        }
      }
        
      /** Add an external trigger to allow poll-type feeders to be used for push-type consumers */
      void addExternalTrigger(const boost::shared_ptr<mtca4u::TransferElement>& externalTriggerImpl) {
        externalTrigger = externalTriggerImpl;
        hasExternalTrigger = true;
      }

      void activate() override {
        assert(!_thread.joinable());
        _thread = boost::thread([this] { this->run(); });
      }

      void deactivate() override {
        if(_thread.joinable()) {
          _thread.interrupt();
          _thread.join();
        }
        assert(!_thread.joinable());
      }

      /** Synchronise feeder and the consumers. This function is executed in the separate thread. */
      void run() {
        while(true) {
          boost::this_thread::interruption_point();
          if(hasExternalTrigger) {
            // wait for external trigger (if present)
            externalTrigger->read();
            // receive data
            impl->readNonBlocking();
          }
          else {
            // receive data
            impl->read();
          }
          for(auto &slave : slaves) {     // send out copies to slaves
            // do not send copy if no data is expected (e.g. trigger)
            if(slave->getNumberOfSamples() != 0) {
              slave->accessChannel(0) = impl->accessChannel(0);
            }
            slave->write();
          }
        }
      }

      bool isReadable() const override {
        return false;
      }
      
      bool isReadOnly() const override {
        return false;
      }
      
      bool isWriteable() const override {
        return true;
      }

      TimeStamp getTimeStamp() const override {
        return impl->getTimeStamp();
      }
      
      void doReadTransfer() override {
        impl->read();
      }
      
      bool doReadTransferNonBlocking() override {
        return impl->readNonBlocking();
      }
      
      void postRead() override {
        mtca4u::NDRegisterAccessor<UserType>::buffer_2D[0].swap(impl->accessChannel(0));
        for(auto &slave : slaves) {     // send out copies to slaves
          // do not send copy if no data is expected (e.g. trigger)
          if(slave->getNumberOfSamples() != 0) {
            slave->accessChannel(0) = mtca4u::NDRegisterAccessor<UserType>::buffer_2D[0];
          }
          slave->write();
        }
      }

      void write() override {
        throw std::logic_error("Write operation called on read-only variable.");
      }
      
      bool isSameRegister(const boost::shared_ptr<const mtca4u::TransferElement>& e) const override {
        // only true if the very instance of the transfer element is the same
        return e.get() == this;
      }
      
      std::vector<boost::shared_ptr<mtca4u::TransferElement> > getHardwareAccessingElements() override {
        return { boost::enable_shared_from_this<mtca4u::TransferElement>::shared_from_this() };
      }
      
      void replaceTransferElement(boost::shared_ptr<mtca4u::TransferElement>) override {
        // You can't replace anything here. Just do nothing.
      }
      
    protected:

      boost::shared_ptr<mtca4u::NDRegisterAccessor<UserType>> impl;

      std::list<boost::shared_ptr<mtca4u::NDRegisterAccessor<UserType>>> slaves;

      bool hasExternalTrigger{false};
      boost::shared_ptr<mtca4u::TransferElement> externalTrigger;

      /** Thread handling the synchronisation, if needed */
      boost::thread _thread;

  };

} /* namespace ChimeraTK */

#endif /* CHIMERATK_FAN_OUT_H */