// SPDX-FileCopyrightText: Deutsches Elektronen-Synchrotron DESY, MSK, ChimeraTK Project <chimeratk-support@desy.de> // SPDX-License-Identifier: LGPL-3.0-or-later #pragma once #include "Application.h" #include "FanOut.h" #include "InternalModule.h" #include <ChimeraTK/NDRegisterAccessor.h> #include <ChimeraTK/ReadAnyGroup.h> namespace ChimeraTK { /********************************************************************************************************************/ /** FanOut implementation with an internal thread which waits for new data which * is read from the given feeding implementation and distributed to any number * of slaves. */ template<typename UserType> class ThreadedFanOut : public FanOut<UserType>, public InternalModule { public: ThreadedFanOut(boost::shared_ptr<ChimeraTK::NDRegisterAccessor<UserType>> feedingImpl, ConsumerImplementationPairs<UserType> const& consumerImplementationPairs); ~ThreadedFanOut() override; void activate() override; void deactivate() override; /** Synchronise feeder and the consumers. This function is executed in the * separate thread. */ virtual void run(); VersionNumber readInitialValues(); protected: /** Thread handling the synchronisation, if needed */ boost::thread _thread; /** Reference to VariableNetwork which is being realised by this FanOut. **/ // VariableNetwork& _network; }; /********************************************************************************************************************/ /** Same as ThreadedFanOut but with return channel */ template<typename UserType> class ThreadedFanOutWithReturn : public ThreadedFanOut<UserType> { public: ThreadedFanOutWithReturn(boost::shared_ptr<ChimeraTK::NDRegisterAccessor<UserType>> feedingImpl, ConsumerImplementationPairs<UserType> const& consumerImplementationPairs); void setReturnChannelSlave(boost::shared_ptr<ChimeraTK::NDRegisterAccessor<UserType>> returnChannelSlave); void addSlave( boost::shared_ptr<ChimeraTK::NDRegisterAccessor<UserType>> slave, VariableNetworkNode& consumer) override; void run() override; protected: /** Thread handling the synchronisation, if needed */ boost::thread _thread; boost::shared_ptr<ChimeraTK::NDRegisterAccessor<UserType>> _returnChannelSlave; // using ThreadedFanOut<UserType>::_network; using ThreadedFanOut<UserType>::readInitialValues; using EntityOwner::_testableModeReached; }; /********************************************************************************************************************/ /********************************************************************************************************************/ template<typename UserType> ThreadedFanOut<UserType>::ThreadedFanOut(boost::shared_ptr<ChimeraTK::NDRegisterAccessor<UserType>> feedingImpl, ConsumerImplementationPairs<UserType> const& consumerImplementationPairs) : FanOut<UserType>(feedingImpl) /*, _network(network)*/ { assert(feedingImpl->getAccessModeFlags().has(AccessMode::wait_for_new_data)); for(auto el : consumerImplementationPairs) { FanOut<UserType>::addSlave(el.first, el.second); } } /********************************************************************************************************************/ template<typename UserType> ThreadedFanOut<UserType>::~ThreadedFanOut() { try { deactivate(); } catch(ChimeraTK::logic_error& e) { std::cerr << e.what() << std::endl; std::exit(1); } } /********************************************************************************************************************/ template<typename UserType> void ThreadedFanOut<UserType>::activate() { if(this->_disabled) { return; } assert(!_thread.joinable()); _thread = boost::thread([this] { this->run(); }); } /********************************************************************************************************************/ template<typename UserType> void ThreadedFanOut<UserType>::deactivate() { try { if(_thread.joinable()) { _thread.interrupt(); FanOut<UserType>::interrupt(); _thread.join(); } assert(!_thread.joinable()); } catch(boost::thread_resource_error& e) { assert(false); } } /********************************************************************************************************************/ template<typename UserType> void ThreadedFanOut<UserType>::run() { Application::registerThread("ThFO" + FanOut<UserType>::_impl->getName()); Application::getInstance().getTestableMode().lock("start"); _testableModeReached = true; ChimeraTK::VersionNumber version{nullptr}; version = readInitialValues(); while(true) { // send out copies to slaves boost::this_thread::interruption_point(); auto validity = FanOut<UserType>::_impl->dataValidity(); for(auto& slave : FanOut<UserType>::_slaves) { // do not send copy if no data is expected (e.g. trigger) if(slave->getNumberOfSamples() != 0) { slave->accessChannel(0) = FanOut<UserType>::_impl->accessChannel(0); } slave->setDataValidity(validity); bool dataLoss = slave->writeDestructively(version); if(dataLoss) { Application::incrementDataLossCounter(slave->getName()); } } // receive data boost::this_thread::interruption_point(); FanOut<UserType>::_impl->read(); version = FanOut<UserType>::_impl->getVersionNumber(); } } /********************************************************************************************************************/ template<typename UserType> VersionNumber ThreadedFanOut<UserType>::readInitialValues() { Application::getInstance().getTestableMode().unlock("readInitialValues"); FanOut<UserType>::_impl->read(); if(!Application::getInstance().getTestableMode().testLock()) { Application::getInstance().getTestableMode().lock("readInitialValues"); } return FanOut<UserType>::_impl->getVersionNumber(); } /********************************************************************************************************************/ /********************************************************************************************************************/ template<typename UserType> ThreadedFanOutWithReturn<UserType>::ThreadedFanOutWithReturn( boost::shared_ptr<ChimeraTK::NDRegisterAccessor<UserType>> feedingImpl, ConsumerImplementationPairs<UserType> const& consumerImplementationPairs) : ThreadedFanOut<UserType>(feedingImpl, consumerImplementationPairs) { for(auto el : consumerImplementationPairs) { // TODO Calling a virtual in the constructor seems odd, // but works because we want this version's implementation addSlave(el.first, el.second); } } /********************************************************************************************************************/ template<typename UserType> void ThreadedFanOutWithReturn<UserType>::setReturnChannelSlave( boost::shared_ptr<ChimeraTK::NDRegisterAccessor<UserType>> returnChannelSlave) { _returnChannelSlave = returnChannelSlave; } /********************************************************************************************************************/ template<typename UserType> void ThreadedFanOutWithReturn<UserType>::addSlave( boost::shared_ptr<ChimeraTK::NDRegisterAccessor<UserType>> slave, VariableNetworkNode& consumer) { // TODO Adding slaves is currently by done by the ThreadedFanOut base class. // Refactor constructors and addSlaves for all FanOuts? // FanOut<UserType>::addSlave(slave, consumer); if(consumer.getDirection().withReturn) { assert(_returnChannelSlave == nullptr); _returnChannelSlave = slave; } } /********************************************************************************************************************/ template<typename UserType> void ThreadedFanOutWithReturn<UserType>::run() { Application::registerThread("ThFO" + FanOut<UserType>::_impl->getName()); Application::getInstance().getTestableMode().lock("start"); _testableModeReached = true; TransferElementID var; ChimeraTK::VersionNumber version{nullptr}; version = readInitialValues(); ReadAnyGroup group({FanOut<UserType>::_impl, _returnChannelSlave}); while(true) { // send out copies to slaves for(auto& slave : FanOut<UserType>::_slaves) { // do not feed back value returnChannelSlave if it was received from it if(slave->getId() == var) { continue; } // do not send copy if no data is expected (e.g. trigger) if(slave->getNumberOfSamples() != 0) { slave->accessChannel(0) = FanOut<UserType>::_impl->accessChannel(0); } bool dataLoss = slave->writeDestructively(version); if(dataLoss) { Application::incrementDataLossCounter(slave->getName()); } } // receive data boost::this_thread::interruption_point(); var = group.readAny(); boost::this_thread::interruption_point(); // if the update came through the return channel, return it to the feeder if(var == _returnChannelSlave->getId()) { FanOut<UserType>::_impl->accessChannel(0).swap(_returnChannelSlave->accessChannel(0)); if(version < _returnChannelSlave->getVersionNumber()) { version = _returnChannelSlave->getVersionNumber(); } FanOut<UserType>::_impl->write(version); } else { version = FanOut<UserType>::_impl->getVersionNumber(); } } } /********************************************************************************************************************/ } /* namespace ChimeraTK */