Skip to content
Snippets Groups Projects
Commit 4ba5548e authored by Martin Christoph Hierholzer's avatar Martin Christoph Hierholzer
Browse files

split off the FeedingFanOut from the FanOut

parent 47e828b5
No related branches found
No related tags found
No related merge requests found
......@@ -15,7 +15,7 @@
namespace ChimeraTK {
template<typename UserType>
class FanOut : public mtca4u::NDRegisterAccessor<UserType>, public InternalModule {
......@@ -23,7 +23,6 @@ namespace ChimeraTK {
/** Use this constructor if the FanOut should be a consuming implementation. */
FanOut(boost::shared_ptr<mtca4u::NDRegisterAccessor<UserType>> feedingImpl)
: _direction(VariableDirection::consuming)
{
impl = boost::dynamic_pointer_cast<mtca4u::NDRegisterAccessor<UserType>>(feedingImpl);
if(!impl) {
......@@ -36,11 +35,6 @@ namespace ChimeraTK {
}
}
/** Use this constructor if the FanOut should be a feeding implementation. */
FanOut()
: impl(nullptr), _direction(VariableDirection::feeding)
{}
/** If activate() has been called on this FanOut, deactivate() must be called before destruction. Otherweise
* an assertion will be raised.
* Design note: stopping the thread inside the destructor may be too late, since the thread will be accessing
......@@ -84,13 +78,11 @@ namespace ChimeraTK {
/** Add an external trigger to allow poll-type feeders to be used for push-type consumers */
void addExternalTrigger(const boost::shared_ptr<mtca4u::TransferElement>& externalTriggerImpl) {
assert(_direction == VariableDirection::consuming);
externalTrigger = externalTriggerImpl;
hasExternalTrigger = true;
}
void activate() override {
assert(_direction == VariableDirection::consuming);
assert(!_thread.joinable());
_thread = boost::thread([this] { this->run(); });
}
......@@ -105,7 +97,6 @@ namespace ChimeraTK {
/** Synchronise feeder and the consumers. This function is executed in the separate thread. */
void run() {
assert(_direction == VariableDirection::consuming);
while(true) {
boost::this_thread::interruption_point();
if(hasExternalTrigger) {
......@@ -128,36 +119,16 @@ namespace ChimeraTK {
}
}
void set(mtca4u::NDRegisterAccessor<UserType> const & other) {
impl->set(other.get());
}
void set(UserType const & t) {
impl->set(t);
}
operator UserType() const {
return impl->get();
}
UserType get() const {
return impl->get();
}
const std::type_info& getValueType() const override {
return typeid(UserType);
}
bool isReadable() const override {
return _direction == VariableDirection::consuming;
return false;
}
bool isReadOnly() const override {
return _direction == VariableDirection::consuming;
return false;
}
bool isWriteable() const override {
return _direction == VariableDirection::feeding;
return true;
}
TimeStamp getTimeStamp() const override {
......@@ -184,17 +155,7 @@ namespace ChimeraTK {
}
void write() override {
for(auto &slave : slaves) { // send out copies to slaves
// do not send copy if no data is expected (e.g. trigger)
if(slave->getNumberOfSamples() != 0) {
slave->accessChannel(0) = mtca4u::NDRegisterAccessor<UserType>::buffer_2D[0];
}
slave->write();
}
impl->accessChannel(0).swap(mtca4u::NDRegisterAccessor<UserType>::buffer_2D[0]);
impl->write();
impl->accessChannel(0).swap(mtca4u::NDRegisterAccessor<UserType>::buffer_2D[0]);
return;
throw std::logic_error("Write operation called on read-only variable.");
}
bool isSameRegister(const boost::shared_ptr<const mtca4u::TransferElement>& e) const override {
......@@ -216,8 +177,6 @@ namespace ChimeraTK {
std::list<boost::shared_ptr<mtca4u::NDRegisterAccessor<UserType>>> slaves;
VariableDirection _direction;
bool hasExternalTrigger{false};
boost::shared_ptr<mtca4u::TransferElement> externalTrigger;
......
/*
* FeedingFanOut.h
*
* Created on: Jun 15, 2016
* Author: Martin Hierholzer
*/
#ifndef CHIMERATK_FEEDING_FAN_OUT_H
#define CHIMERATK_FEEDING_FAN_OUT_H
#include <mtca4u/NDRegisterAccessor.h>
#include "ApplicationException.h"
namespace ChimeraTK {
template<typename UserType>
class FeedingFanOut : public mtca4u::NDRegisterAccessor<UserType> {
public:
/** Add a slave to the FanOut. Only sending end-points of a consuming node may be added. */
void addSlave(boost::shared_ptr<mtca4u::NDRegisterAccessor<UserType>> slave) {
if(!slave->isWriteable()) {
throw ApplicationExceptionWithID<ApplicationExceptionID::illegalParameter>(
"FeedingFanOut::addSlave() has been called with a receiving implementation!");
}
if(slaves.size() == 0) { // first slave: initialise buffers @todo TODO FIXME first slave could be a trigger receiver!
mtca4u::NDRegisterAccessor<UserType>::buffer_2D.resize( slave->getNumberOfChannels() );
for(size_t i=0; i<slave->getNumberOfChannels(); i++) {
mtca4u::NDRegisterAccessor<UserType>::buffer_2D[i].resize( slave->getNumberOfSamples() );
}
}
else {
// check if array shape is compatible, unless the receiver is a trigger node, so no data is expected
if( slave->getNumberOfSamples() != 0 &&
( slave->getNumberOfChannels() != slaves.front()->getNumberOfChannels() ||
slave->getNumberOfSamples() != slaves.front()->getNumberOfSamples() ) ) {
std::string what = "FeedingFanOut::addSlave(): Trying to add a slave '";
what += slave->getName();
what += "' with incompatible array shape! Name of first slave: ";
what += slaves.front()->getName();
throw ApplicationExceptionWithID<ApplicationExceptionID::illegalParameter>(what.c_str());
}
}
slaves.push_back(slave);
}
bool isReadable() const override {
return false;
}
bool isReadOnly() const override {
return false;
}
bool isWriteable() const override {
return true;
}
void doReadTransfer() override {
throw std::logic_error("Read operation called on write-only variable.");
}
bool doReadTransferNonBlocking() override {
throw std::logic_error("Read operation called on write-only variable.");
}
void postRead() override {
throw std::logic_error("Read operation called on write-only variable.");
}
void write() override {
boost::shared_ptr<mtca4u::NDRegisterAccessor<UserType>> firstSlave; // will have the data for the other slaves after swapping
for(auto &slave : slaves) { // send out copies to slaves
if(slave->getNumberOfSamples() != 0) { // do not send copy if no data is expected (e.g. trigger)
if(!firstSlave) { // in case of first slave, swap instead of copy
firstSlave = slave;
firstSlave->accessChannel(0).swap(mtca4u::NDRegisterAccessor<UserType>::buffer_2D[0]);
}
else { // not the first slave: copy the data from the first slave
slave->accessChannel(0) = firstSlave->accessChannel(0);
}
}
slave->write();
}
// swap back the data from the first slave so we still have it available
if(firstSlave) {
firstSlave->accessChannel(0).swap(mtca4u::NDRegisterAccessor<UserType>::buffer_2D[0]);
}
return;
}
bool isSameRegister(const boost::shared_ptr<const mtca4u::TransferElement>& e) const override {
// only true if the very instance of the transfer element is the same
return e.get() == this;
}
std::vector<boost::shared_ptr<mtca4u::TransferElement> > getHardwareAccessingElements() override {
return { boost::enable_shared_from_this<mtca4u::TransferElement>::shared_from_this() };
}
void replaceTransferElement(boost::shared_ptr<mtca4u::TransferElement>) override {
// You can't replace anything here. Just do nothing.
}
protected:
std::list<boost::shared_ptr<mtca4u::NDRegisterAccessor<UserType>>> slaves;
};
} /* namespace ChimeraTK */
#endif /* CHIMERATK_FEEDING_FAN_OUT_H */
......@@ -64,6 +64,9 @@ namespace ChimeraTK {
/** Return list of consuming nodes */
std::list<VariableNetworkNode> getConsumingNodes() const;
/** Check whether the network has a consuming application node */
bool hasApplicationConsumer() const;
/** Dump the network structure to std::cout. The optional linePrefix will be prepended to all lines. */
void dump(const std::string& linePrefix="") const;
......
......@@ -20,6 +20,7 @@
#include "Accessor.h"
#include "DeviceAccessor.h"
#include "FanOut.h"
#include "FeedingFanOut.h"
#include "VariableNetworkNode.h"
#include "ScalarAccessor.h"
#include "ArrayAccessor.h"
......@@ -436,27 +437,38 @@ void Application::typedMakeConnection(VariableNetwork &network) {
}
}
else { /* !(nNodes == 2 && !useExternalTrigger) */
// create FanOut
auto fanOut = boost::make_shared<FanOut<UserType>>(feedingImpl);
// use FanOut as implementation for the first application consumer node, add all others as slaves
// @todo TODO need a more sophisticated logic to take care of the UpdateMode
bool isFirst = true;
// if external trigger is enabled, add it to FanOut
// flag needed when the ConsumerFanOut is used
bool mayUseAsImplementation = false;
// create the right FanOut type
boost::shared_ptr<FanOut<UserType>> fanOut;
if(useExternalTrigger) {
isFirst = false; // don't use the FanOut as an implementation if we have an external trigger
// if external trigger is enabled, use externally triggered threaded FanOut
fanOut = boost::make_shared<FanOut<UserType>>(feedingImpl);
fanOut->addExternalTrigger(network.getExternalTriggerImpl());
internalModuleList.push_back(fanOut);
}
// if the trigger is provided by the pushing feeder, use the treaded version of the FanOut to distribute
// new values immediately to all consumers.
else if(useFeederTrigger) {
isFirst = false;
// if the trigger is provided by the pushing feeder, use the treaded version of the FanOut to distribute
// new values immediately to all consumers.
fanOut = boost::make_shared<FanOut<UserType>>(feedingImpl);
internalModuleList.push_back(fanOut);
}
else {
if(!network.hasApplicationConsumer()) {
throw ApplicationExceptionWithID<ApplicationExceptionID::illegalParameter>("No application node in the network but no trigger!");
}
mayUseAsImplementation = true;
fanOut = boost::make_shared<FanOut<UserType>>(feedingImpl);
}
// add all consumers to the FanOut
for(auto &consumer : consumers) {
if(consumer.getType() == NodeType::Application) {
if(isFirst) {
if(mayUseAsImplementation) {
consumer.getAppAccessor().useProcessVariable(fanOut);
isFirst = false;
mayUseAsImplementation = false;
}
else {
auto impls = createApplicationVariable<UserType>(consumer.getNumberOfElements(), consumer.getAppAccessor().getName());
......@@ -482,9 +494,6 @@ void Application::typedMakeConnection(VariableNetwork &network) {
throw ApplicationExceptionWithID<ApplicationExceptionID::illegalParameter>("Unexpected node type!");
}
}
if(isFirst || useExternalTrigger || useFeederTrigger) { // FanOut wasn't used as implementation: store in list to keep it alive
internalModuleList.push_back(fanOut);
}
connectionMade = true;
}
}
......@@ -532,7 +541,7 @@ void Application::typedMakeConnection(VariableNetwork &network) {
}
else {
// create FanOut and use it as the feeder implementation
auto fanOut = boost::make_shared<FanOut<UserType>>();
auto fanOut = boost::make_shared<FeedingFanOut<UserType>>();
feeder.getAppAccessor().useProcessVariable(fanOut);
for(auto &consumer : consumers) {
......
......@@ -221,4 +221,13 @@ namespace ChimeraTK {
return consumers;
}
/*********************************************************************************************************************/
bool VariableNetwork::hasApplicationConsumer() const {
for(auto &n : nodeList) {
if(n.getDirection() == VariableDirection::consuming && n.getType() == NodeType::Application) return true;
}
return false;
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment