Newer
Older
// SPDX-FileCopyrightText: Deutsches Elektronen-Synchrotron DESY, MSK, ChimeraTK Project <chimeratk-support@desy.de>
// SPDX-License-Identifier: LGPL-3.0-or-later
#pragma once
Martin Christoph Hierholzer
committed
Martin Christoph Hierholzer
committed
#include "Application.h"
Martin Christoph Hierholzer
committed
#include "FanOut.h"
#include "InternalModule.h"
#include <ChimeraTK/NDRegisterAccessor.h>
#include <ChimeraTK/ReadAnyGroup.h>
Martin Christoph Hierholzer
committed
namespace ChimeraTK {
/********************************************************************************************************************/
/** FanOut implementation with an internal thread which waits for new data which
* is read from the given feeding implementation and distributed to any number
* of slaves. */
template<typename UserType>
class ThreadedFanOut : public FanOut<UserType>, public InternalModule {
public:
ThreadedFanOut(boost::shared_ptr<ChimeraTK::NDRegisterAccessor<UserType>> feedingImpl,
ConsumerImplementationPairs<UserType> const& consumerImplementationPairs);
void activate() override;
void deactivate() override;
/** Synchronise feeder and the consumers. This function is executed in the
* separate thread. */
virtual void run();
Martin Christoph Hierholzer
committed
VersionNumber readInitialValues();
protected:
/** Thread handling the synchronisation, if needed */
boost::thread _thread;
Martin Christoph Hierholzer
committed
/** Reference to VariableNetwork which is being realised by this FanOut. **/
// VariableNetwork& _network;
};
/********************************************************************************************************************/
/** Same as ThreadedFanOut but with return channel */
template<typename UserType>
class ThreadedFanOutWithReturn : public ThreadedFanOut<UserType> {
public:
ThreadedFanOutWithReturn(boost::shared_ptr<ChimeraTK::NDRegisterAccessor<UserType>> feedingImpl,
ConsumerImplementationPairs<UserType> const& consumerImplementationPairs);
void setReturnChannelSlave(boost::shared_ptr<ChimeraTK::NDRegisterAccessor<UserType>> returnChannelSlave);
Martin Christoph Hierholzer
committed
void addSlave(
boost::shared_ptr<ChimeraTK::NDRegisterAccessor<UserType>> slave, VariableNetworkNode& consumer) override;
void run() override;
protected:
/** Thread handling the synchronisation, if needed */
boost::thread _thread;
boost::shared_ptr<ChimeraTK::NDRegisterAccessor<UserType>> _returnChannelSlave;
Martin Christoph Hierholzer
committed
// using ThreadedFanOut<UserType>::_network;
Martin Christoph Hierholzer
committed
using ThreadedFanOut<UserType>::readInitialValues;
using EntityOwner::_testableModeReached;
/********************************************************************************************************************/
/********************************************************************************************************************/
template<typename UserType>
ThreadedFanOut<UserType>::ThreadedFanOut(boost::shared_ptr<ChimeraTK::NDRegisterAccessor<UserType>> feedingImpl,
ConsumerImplementationPairs<UserType> const& consumerImplementationPairs)
: FanOut<UserType>(feedingImpl) /*, _network(network)*/ {
assert(feedingImpl->getAccessModeFlags().has(AccessMode::wait_for_new_data));
for(auto el : consumerImplementationPairs) {
FanOut<UserType>::addSlave(el.first, el.second);
}
}
/********************************************************************************************************************/
template<typename UserType>
ThreadedFanOut<UserType>::~ThreadedFanOut() {
try {
deactivate();
}
catch(ChimeraTK::logic_error& e) {
std::cerr << e.what() << std::endl;
std::exit(1);
}
}
/********************************************************************************************************************/
template<typename UserType>
void ThreadedFanOut<UserType>::activate() {
assert(!_thread.joinable());
_thread = boost::thread([this] { this->run(); });
}
/********************************************************************************************************************/
template<typename UserType>
void ThreadedFanOut<UserType>::deactivate() {
try {
if(_thread.joinable()) {
_thread.interrupt();
FanOut<UserType>::interrupt();
_thread.join();
}
assert(!_thread.joinable());
}
catch(boost::thread_resource_error& e) {
assert(false);
}
}
/********************************************************************************************************************/
template<typename UserType>
void ThreadedFanOut<UserType>::run() {
Application::registerThread("ThFO" + FanOut<UserType>::_impl->getName());
Application::getInstance().getTestableMode().lock("start");
_testableModeReached = true;
ChimeraTK::VersionNumber version{nullptr};
version = readInitialValues();
while(true) {
// send out copies to slaves
boost::this_thread::interruption_point();
auto validity = FanOut<UserType>::_impl->dataValidity();
for(auto& slave : FanOut<UserType>::_slaves) {
// do not send copy if no data is expected (e.g. trigger)
if(slave->getNumberOfSamples() != 0) {
slave->accessChannel(0) = FanOut<UserType>::_impl->accessChannel(0);
}
slave->setDataValidity(validity);
bool dataLoss = slave->writeDestructively(version);
if(dataLoss) {
Application::incrementDataLossCounter(slave->getName());
}
}
// receive data
boost::this_thread::interruption_point();
FanOut<UserType>::_impl->read();
version = FanOut<UserType>::_impl->getVersionNumber();
}
}
/********************************************************************************************************************/
template<typename UserType>
VersionNumber ThreadedFanOut<UserType>::readInitialValues() {
Application::getInstance().getTestableMode().unlock("readInitialValues");
if(!Application::getInstance().getTestableMode().testLock()) {
Application::getInstance().getTestableMode().lock("readInitialValues");
return FanOut<UserType>::_impl->getVersionNumber();
}
/********************************************************************************************************************/
/********************************************************************************************************************/
template<typename UserType>
ThreadedFanOutWithReturn<UserType>::ThreadedFanOutWithReturn(
boost::shared_ptr<ChimeraTK::NDRegisterAccessor<UserType>> feedingImpl,
ConsumerImplementationPairs<UserType> const& consumerImplementationPairs)
: ThreadedFanOut<UserType>(feedingImpl, consumerImplementationPairs) {
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
for(auto el : consumerImplementationPairs) {
// TODO Calling a virtual in the constructor seems odd,
// but works because we want this version's implementation
addSlave(el.first, el.second);
}
}
/********************************************************************************************************************/
template<typename UserType>
void ThreadedFanOutWithReturn<UserType>::setReturnChannelSlave(
boost::shared_ptr<ChimeraTK::NDRegisterAccessor<UserType>> returnChannelSlave) {
_returnChannelSlave = returnChannelSlave;
}
/********************************************************************************************************************/
template<typename UserType>
void ThreadedFanOutWithReturn<UserType>::addSlave(
boost::shared_ptr<ChimeraTK::NDRegisterAccessor<UserType>> slave, VariableNetworkNode& consumer) {
// TODO Adding slaves is currently by done by the ThreadedFanOut base class.
// Refactor constructors and addSlaves for all FanOuts?
// FanOut<UserType>::addSlave(slave, consumer);
if(consumer.getDirection().withReturn) {
assert(_returnChannelSlave == nullptr);
_returnChannelSlave = slave;
}
}
/********************************************************************************************************************/
template<typename UserType>
void ThreadedFanOutWithReturn<UserType>::run() {
Application::registerThread("ThFO" + FanOut<UserType>::_impl->getName());
Application::getInstance().getTestableMode().lock("start");
_testableModeReached = true;
TransferElementID var;
ChimeraTK::VersionNumber version{nullptr};
version = readInitialValues();
ReadAnyGroup group({FanOut<UserType>::_impl, _returnChannelSlave});
while(true) {
// send out copies to slaves
// do not feed back value returnChannelSlave if it was received from it
if(slave->getId() == var) {
continue;
}
// do not send copy if no data is expected (e.g. trigger)
if(slave->getNumberOfSamples() != 0) {
slave->accessChannel(0) = FanOut<UserType>::_impl->accessChannel(0);
}
bool dataLoss = slave->writeDestructively(version);
if(dataLoss) {
Application::incrementDataLossCounter(slave->getName());
}
}
// receive data
boost::this_thread::interruption_point();
var = group.readAny();
boost::this_thread::interruption_point();
// if the update came through the return channel, return it to the feeder
if(var == _returnChannelSlave->getId()) {
FanOut<UserType>::_impl->accessChannel(0).swap(_returnChannelSlave->accessChannel(0));
if(version < _returnChannelSlave->getVersionNumber()) {
version = _returnChannelSlave->getVersionNumber();
}
version = FanOut<UserType>::_impl->getVersionNumber();
}
}
}
/********************************************************************************************************************/
Martin Christoph Hierholzer
committed
} /* namespace ChimeraTK */