Newer
Older
/*
* TriggerFanOut.h
*
* Created on: Jun 15, 2016
* Author: Martin Hierholzer
*/
#ifndef CHIMERATK_TRIGGER_FAN_OUT_H
#define CHIMERATK_TRIGGER_FAN_OUT_H
#include <ChimeraTK/NDRegisterAccessor.h>
#include <ChimeraTK/SupportedUserTypes.h>
#include <ChimeraTK/TransferGroup.h>
Martin Christoph Hierholzer
committed
#include "Application.h"
#include "FeedingFanOut.h"
#include "InternalModule.h"
#include "Profiler.h"
Martin Christoph Hierholzer
committed
#include "DeviceModule.h"
constexpr useconds_t DeviceOpenTimeout = 500;
namespace ChimeraTK {
/** InternalModule which waits for a trigger, then reads a number of variables
* and distributes each of them to any number of slaves. */
class TriggerFanOut : public InternalModule {
public:
Martin Christoph Hierholzer
committed
TriggerFanOut(const boost::shared_ptr<ChimeraTK::TransferElement>& externalTriggerImpl, DeviceModule& deviceModule,
VariableNetwork& network)
Martin Christoph Hierholzer
committed
: externalTrigger(externalTriggerImpl), _deviceModule(deviceModule), _network(network) {}
~TriggerFanOut() { deactivate(); }
void activate() override {
assert(!_thread.joinable());
_thread = boost::thread([this] { this->run(); });
Martin Christoph Hierholzer
committed
// Wait until the thread has launched and acquired and released the testable mode lock at least once.
if(Application::getInstance().isTestableModeEnabled()) {
while(!testableModeReached) {
Application::getInstance().testableModeUnlock("releaseForReachTestableMode");
usleep(100);
Application::getInstance().testableModeLock("acquireForReachTestableMode");
}
}
void deactivate() override {
if(_thread.joinable()) {
_thread.interrupt();
externalTrigger->interrupt();
_thread.join();
}
assert(!_thread.joinable());
/** Add a new network the TriggerFanOut. The network is defined by its feeding
* node. This function will return the corresponding FeedingFanOut, to which
* all slaves have to be added. */
template<typename UserType>
boost::shared_ptr<FeedingFanOut<UserType>> addNetwork(
boost::shared_ptr<ChimeraTK::NDRegisterAccessor<UserType>> feedingNode) {
assert(feedingNode.get() != nullptr);
transferGroup.addAccessor(feedingNode);
auto feedingFanOut = boost::make_shared<FeedingFanOut<UserType>>(feedingNode->getName(), feedingNode->getUnit(),
feedingNode->getDescription(), feedingNode->getNumberOfSamples(),
false); // in TriggerFanOuts we cannot have return channels
boost::fusion::at_key<UserType>(fanOutMap.table)[feedingNode] = feedingFanOut;
return feedingFanOut;
}
/** Synchronise feeder and the consumers. This function is executed in the
* separate thread. */
void run() {
Application::registerThread("TrFO" + externalTrigger->getName());
Application::testableModeLock("start");
Martin Christoph Hierholzer
committed
testableModeReached = true;
ChimeraTK::VersionNumber version = Application::getInstance().getStartVersion();
Martin Christoph Hierholzer
committed
// If trigger gets an initial value pushed, read it (otherwise we would trigger twice at application start)
auto hasInitialValue = _network.getFeedingNode().getExternalTrigger().hasInitialValue();
if(hasInitialValue == VariableNetworkNode::InitialValueMode::Push) {
externalTrigger->read();
version = externalTrigger->getVersionNumber();
Martin Christoph Hierholzer
committed
}
Martin Christoph Hierholzer
committed
// receive data. We need to catch exceptions here, since the ExceptionHandlingDecorator cannot do this for us
// inside a TransferGroup, if the exception is thrown inside doReadTransfer() (as it is directly called on the
// lowest-level TransferElements inside the group).
auto lastValidity = DataValidity::ok;
Martin Christoph Hierholzer
committed
retry:
try {
if(!_deviceModule.device.isOpened()) {
Application::getInstance().testableModeUnlock("waitForDeviceOpen");
boost::this_thread::sleep(boost::posix_time::millisec(DeviceOpenTimeout));
Application::getInstance().testableModeLock("waitForDeviceOpen");
goto retry;
}
Martin Christoph Hierholzer
committed
transferGroup.read();
}
catch(ChimeraTK::runtime_error& e) {
// send the data to the consumers
if(lastValidity == DataValidity::ok) {
lastValidity = DataValidity::faulty;
boost::fusion::for_each(fanOutMap.table, SendDataToConsumers(version, lastValidity));
}
// _deviceModule.reportException(e.what());
_deviceModule.reportException(e.what());
_deviceModule.waitForRecovery();
Martin Christoph Hierholzer
committed
goto retry;
}
// send the version number to the consumers
boost::fusion::for_each(fanOutMap.table, SendDataToConsumers(version));
Martin Christoph Hierholzer
committed
// wait for external trigger (exception handling is done here by the decorator)
boost::this_thread::interruption_point();
Profiler::stopMeasurement();
externalTrigger->read();
Profiler::startMeasurement();
boost::this_thread::interruption_point();
version = externalTrigger->getVersionNumber();
protected:
/** Functor class to send data to the consumers, suitable for
* boost::fusion::for_each(). */
struct SendDataToConsumers {
SendDataToConsumers(VersionNumber version, DataValidity validity = DataValidity::ok)
: _version(version), _validity(validity) {}
template<typename PAIR>
void operator()(PAIR& pair) const {
auto theMap = pair.second; // map of feeder to FeedingFanOut (i.e. part of
// the fanOutMap)
// iterate over all feeder/FeedingFanOut pairs
for(auto& network : theMap) {
auto feeder = network.first;
auto fanOut = network.second;
fanOut->setDataValidity(_validity);
fanOut->accessChannel(0).swap(feeder->accessChannel(0));
Martin Christoph Hierholzer
committed
bool dataLoss = fanOut->writeDestructively(_version);
if(dataLoss) Application::incrementDataLossCounter();
// no need to swap back since we don't need the data
}
}
/** TransferElement acting as our trigger */
boost::shared_ptr<ChimeraTK::TransferElement> externalTrigger;
/** Map of the feeding NDRegisterAccessor to the corresponding FeedingFanOut
* for each UserType */
template<typename UserType>
using FanOutMap = std::map<boost::shared_ptr<ChimeraTK::NDRegisterAccessor<UserType>>,
boost::shared_ptr<FeedingFanOut<UserType>>>;
TemplateUserTypeMap<FanOutMap> fanOutMap;
/** TransferGroup containing all feeders NDRegisterAccessors */
ChimeraTK::TransferGroup transferGroup;
/** Thread handling the synchronisation, if needed */
boost::thread _thread;
Martin Christoph Hierholzer
committed
/** The DeviceModule of the feeder. Required for exception handling */
DeviceModule& _deviceModule;
Martin Christoph Hierholzer
committed
/** Reference to VariableNetwork which is being realised by this FanOut. **/
VariableNetwork& _network;
} /* namespace ChimeraTK */
#endif /* CHIMERATK_TRIGGER_FAN_OUT_H */