-
Martin Christoph Hierholzer authored
- extended the trigger tests - removed some debug output
Martin Christoph Hierholzer authored- extended the trigger tests - removed some debug output
FanOut.h 6.11 KiB
/*
* FanOut.h
*
* Created on: Jun 15, 2016
* Author: Martin Hierholzer
*/
#ifndef CHIMERATK_FAN_OUT_H
#define CHIMERATK_FAN_OUT_H
#include <ControlSystemAdapter/ProcessScalar.h>
#include "ApplicationException.h"
#include "ImplementationAdapter.h"
namespace ChimeraTK {
/** @todo TODO This class should be split into two classes, one with a thread and the other without. The
* threaded version doesn't have to be a ProcessScalar! Instead it should be unified with the
* ImplementationAdapter class... */
template<typename UserType>
class FanOut : public mtca4u::ProcessScalar<UserType>, public ImplementationAdapterBase {
public:
/** Use this constructor if the FanOut should be a consuming implementation. */
FanOut(boost::shared_ptr<mtca4u::ProcessVariable> feedingImpl)
: _direction(VariableDirection::consuming)
{
impl = boost::dynamic_pointer_cast<mtca4u::ProcessScalar<UserType>>(feedingImpl);
if(!impl) {
throw ApplicationExceptionWithID<ApplicationExceptionID::illegalParameter>(
"The FanOut has been constructed with a wrong output implementation type!");
}
}
/** Use this constructor if the FanOut should be a feeding implementation. */
FanOut()
: impl(nullptr), _direction(VariableDirection::feeding)
{}
/** If activate() has been called on this FanOut, deactivate() must be called before destruction. Otherweise
* an assertion will be raised.
* Design note: stopping the thread inside the destructor may be too late, since the thread will be accessing
* the object while it is being destroyed already! */
~FanOut() {
assert(!_thread.joinable());
}
/** Add a slave to the FanOut. Only sending end-points of a consuming node may be added. */
void addSlave(boost::shared_ptr<mtca4u::ProcessVariable> slave) {
if(!slave->isSender()) {
throw ApplicationExceptionWithID<ApplicationExceptionID::illegalParameter>(
"FanOut::addSlave() has been called with a receiving implementation!");
}
auto castedSlave = boost::dynamic_pointer_cast<mtca4u::ProcessScalar<UserType>>(slave);
if(!castedSlave) {
throw ApplicationExceptionWithID<ApplicationExceptionID::illegalParameter>(
"FanOut::addSlave() has been called with a wrong input implementation type!");
}
if(impl == nullptr) { // the first slave will be used as a "main" implementation, if
impl = castedSlave; // none was specified at construction
}
else {
slaves.push_back(castedSlave);
}
}
/** Add an external trigger to allow poll-type feeders to be used for push-type consumers */
void addExternalTrigger(const boost::shared_ptr<mtca4u::ProcessVariable>& externalTriggerImpl) {
assert(_direction == VariableDirection::consuming);
externalTrigger = externalTriggerImpl;
hasExternalTrigger = true;
}
void activate() {
assert(_direction == VariableDirection::consuming);
assert(!_thread.joinable());
_thread = std::thread([this] { this->run(); });
}
void deactivate() {
if(_thread.joinable()) {
requestTerminateThread = true;
_thread.join();
}
assert(!_thread.joinable());
}
/** Synchronise feeder and the consumers. This function is executed in the separate thread. */
void run() {
assert(_direction == VariableDirection::consuming);
while(true) {
if(hasExternalTrigger) {
// wait for external trigger (if present)
/// @todo TODO replace with proper blocking implementation when supported by the CSA
while(externalTrigger->receive() == false) {
if(requestTerminateThread) return;
std::this_thread::yield();
}
// receive data
impl->receive();
}
else {
// receive data
while(impl->receive() == false) {
if(requestTerminateThread) return;
std::this_thread::yield();
}
}
for(auto &slave : slaves) { // send out copies to slaves
slave->set(impl->get());
slave->send();
}
}
}
void set(mtca4u::ProcessScalar<UserType> const & other) {
impl->set(other.get());
}
void set(UserType const & t) {
impl->set(t);
}
operator UserType() const {
return impl->get();
}
UserType get() const {
return impl->get();
}
const std::type_info& getValueType() const {
return typeid(UserType);
}
bool isReceiver() const {
return _direction == VariableDirection::consuming;
}
bool isSender() const {
return _direction == VariableDirection::feeding;
}
TimeStamp getTimeStamp() const {
return impl->getTimeStamp();
}
bool receive() {
bool ret = impl->receive();
if(ret) {
for(auto &slave : slaves) { // send out copies to slaves
slave->set(impl->get());
slave->send();
}
}
return ret;
}
bool send() {
bool ret = true;
for(auto &slave : slaves) { // send out copies to slaves
slave->set(impl->get());
bool ret2 = slave->send();
if(!ret2) ret = false;
}
bool ret2 = impl->send();
if(!ret2) ret = false;
return ret;
}
protected:
boost::shared_ptr<mtca4u::ProcessScalar<UserType>> impl;
std::list<boost::shared_ptr<mtca4u::ProcessScalar<UserType>>> slaves;
VariableDirection _direction;
bool hasExternalTrigger{false};
boost::shared_ptr<mtca4u::ProcessVariable> externalTrigger;
/** Thread handling the synchronisation, if needed */
std::thread _thread;
/** Flag to request termination of the synchronisation thread. */
bool requestTerminateThread{false};
};
} /* namespace ChimeraTK */
#endif /* CHIMERATK_FAN_OUT_H */