Newer
Older
Martin Christoph Hierholzer
committed
/*
* ThreadedFanOut.h
*
* Created on: Jun 15, 2016
* Author: Martin Hierholzer
*/
#ifndef CHIMERATK_THREADED_FAN_OUT_H
#define CHIMERATK_THREADED_FAN_OUT_H
Martin Christoph Hierholzer
committed
#include "Application.h"
Martin Christoph Hierholzer
committed
#include "FanOut.h"
#include "InternalModule.h"
#include <ChimeraTK/NDRegisterAccessor.h>
#include <ChimeraTK/ReadAnyGroup.h>
Martin Christoph Hierholzer
committed
namespace ChimeraTK {
/** FanOut implementation with an internal thread which waits for new data which
* is read from the given feeding implementation and distributed to any number
* of slaves. */
template<typename UserType>
class ThreadedFanOut : public FanOut<UserType>, public InternalModule {
public:
ThreadedFanOut(boost::shared_ptr<ChimeraTK::NDRegisterAccessor<UserType>> feedingImpl, VariableNetwork& network,
ConsumerImplementationPairs<UserType> const& consumerImplementationPairs)
: FanOut<UserType>(feedingImpl), _network(network) {
Martin Christoph Hierholzer
committed
assert(feedingImpl->getAccessModeFlags().has(AccessMode::wait_for_new_data));
for(auto el : consumerImplementationPairs) {
FanOut<UserType>::addSlave(el.first, el.second);
}
}
~ThreadedFanOut() { deactivate(); }
void activate() override {
if(this->_disabled) return;
assert(!_thread.joinable());
_thread = boost::thread([this] { this->run(); });
void deactivate() override {
if(_thread.joinable()) {
_thread.interrupt();
_thread.join();
}
assert(!_thread.joinable());
}
/** Synchronise feeder and the consumers. This function is executed in the
* separate thread. */
virtual void run() {
Application::registerThread("ThFO" + FanOut<UserType>::impl->getName());
Application::testableModeLock("start");
Martin Christoph Hierholzer
committed
testableModeReached = true;
Martin Christoph Hierholzer
committed
ChimeraTK::VersionNumber version{nullptr};
Martin Christoph Hierholzer
committed
version = readInitialValues();
Martin Christoph Hierholzer
committed
while(true) {
// send out copies to slaves
Profiler::startMeasurement();
boost::this_thread::interruption_point();
Martin Christoph Hierholzer
committed
auto validity = FanOut<UserType>::impl->dataValidity();
for(auto& slave : FanOut<UserType>::slaves) {
// do not send copy if no data is expected (e.g. trigger)
if(slave->getNumberOfSamples() != 0) {
slave->accessChannel(0) = FanOut<UserType>::impl->accessChannel(0);
}
Martin Christoph Hierholzer
committed
slave->setDataValidity(validity);
Martin Christoph Hierholzer
committed
bool dataLoss = slave->writeDestructively(version);
if(dataLoss) Application::incrementDataLossCounter(slave->getName());
Martin Christoph Hierholzer
committed
}
Martin Christoph Hierholzer
committed
// receive data
boost::this_thread::interruption_point();
Profiler::stopMeasurement();
FanOut<UserType>::impl->read();
Martin Christoph Hierholzer
committed
version = FanOut<UserType>::impl->getVersionNumber();
}
}
Martin Christoph Hierholzer
committed
VersionNumber readInitialValues() {
Application::testableModeUnlock("readInitialValues");
FanOut<UserType>::impl->read();
if(!Application::testableModeTestLock()) {
Application::testableModeLock("readInitialValues");
}
return FanOut<UserType>::impl->getVersionNumber();
protected:
/** Thread handling the synchronisation, if needed */
boost::thread _thread;
Martin Christoph Hierholzer
committed
/** Reference to VariableNetwork which is being realised by this FanOut. **/
VariableNetwork& _network;
};
/********************************************************************************************************************/
/** Same as ThreadedFanOut but with return channel */
template<typename UserType>
class ThreadedFanOutWithReturn : public ThreadedFanOut<UserType> {
public:
ThreadedFanOutWithReturn(boost::shared_ptr<ChimeraTK::NDRegisterAccessor<UserType>> feedingImpl,
VariableNetwork& network, ConsumerImplementationPairs<UserType> const& consumerImplementationPairs)
: ThreadedFanOut<UserType>(feedingImpl, network, consumerImplementationPairs) {
for(auto el : consumerImplementationPairs) {
// TODO Calling a virtual in the constructor seems odd,
// but works because we want this version's implementation
addSlave(el.first, el.second);
}
}
void setReturnChannelSlave(boost::shared_ptr<ChimeraTK::NDRegisterAccessor<UserType>> returnChannelSlave) {
_returnChannelSlave = returnChannelSlave;
Martin Christoph Hierholzer
committed
void addSlave(
boost::shared_ptr<ChimeraTK::NDRegisterAccessor<UserType>> slave, VariableNetworkNode& consumer) override {
// TODO Adding slaves is currently by done by the ThreadedFanOut base class.
// Refactor constructors and addSlaves for all FanOuts?
// FanOut<UserType>::addSlave(slave, consumer);
if(consumer.getDirection().withReturn) {
assert(_returnChannelSlave == nullptr);
_returnChannelSlave = slave;
}
void run() override {
Application::registerThread("ThFO" + FanOut<UserType>::impl->getName());
Application::testableModeLock("start");
Martin Christoph Hierholzer
committed
testableModeReached = true;
Martin Christoph Hierholzer
committed
TransferElementID var;
Martin Christoph Hierholzer
committed
ChimeraTK::VersionNumber version{nullptr};
Martin Christoph Hierholzer
committed
Martin Christoph Hierholzer
committed
version = readInitialValues();
ReadAnyGroup group({FanOut<UserType>::impl, _returnChannelSlave});
while(true) {
// send out copies to slaves
for(auto& slave : FanOut<UserType>::slaves) {
// do not feed back value returnChannelSlave if it was received from it
if(slave->getId() == var) continue;
// do not send copy if no data is expected (e.g. trigger)
if(slave->getNumberOfSamples() != 0) {
slave->accessChannel(0) = FanOut<UserType>::impl->accessChannel(0);
}
Martin Christoph Hierholzer
committed
bool dataLoss = slave->writeDestructively(version);
if(dataLoss) Application::incrementDataLossCounter(slave->getName());
Martin Christoph Hierholzer
committed
// receive data
boost::this_thread::interruption_point();
Profiler::stopMeasurement();
var = group.readAny();
Profiler::startMeasurement();
boost::this_thread::interruption_point();
// if the update came through the return channel, return it to the feeder
if(var == _returnChannelSlave->getId()) {
FanOut<UserType>::impl->accessChannel(0).swap(_returnChannelSlave->accessChannel(0));
Martin Christoph Hierholzer
committed
if(version < _returnChannelSlave->getVersionNumber()) {
version = _returnChannelSlave->getVersionNumber();
}
FanOut<UserType>::impl->write(version);
}
else {
version = FanOut<UserType>::impl->getVersionNumber();
Martin Christoph Hierholzer
committed
}
protected:
/** Thread handling the synchronisation, if needed */
boost::thread _thread;
boost::shared_ptr<ChimeraTK::NDRegisterAccessor<UserType>> _returnChannelSlave;
Martin Christoph Hierholzer
committed
using ThreadedFanOut<UserType>::_network;
using ThreadedFanOut<UserType>::readInitialValues;
Martin Christoph Hierholzer
committed
using EntityOwner::testableModeReached;
Martin Christoph Hierholzer
committed
} /* namespace ChimeraTK */
#endif /* CHIMERATK_THREADED_FAN_OUT_H */