Newer
Older
}
else if(consumer.getType() == NodeType::ControlSystem) {
Martin Christoph Hierholzer
committed
auto impl = createProcessVariable<UserType>(consumer);
impl->accessChannel(0) = feedingImpl->accessChannel(0);
impl->write();
}
else if(consumer.getType() == NodeType::Device) {
Martin Killenberg
committed
// we register the required accessor as a recovery accessor. This is just a bare RegisterAccessor without any decorations directly from the backend.
if(deviceMap.count(consumer.getDeviceAlias()) == 0) {
deviceMap[consumer.getDeviceAlias()] =
ChimeraTK::BackendFactory::getInstance().createBackend(consumer.getDeviceAlias());
}
auto impl = deviceMap[consumer.getDeviceAlias()]->getRegisterAccessor<UserType>(
consumer.getRegisterName(), consumer.getNumberOfElements(), 0, {});
impl->accessChannel(0) = feedingImpl->accessChannel(0);
// find the right DeviceModule for this alias name
DeviceModule* devmod = nullptr;
for(auto& dm : deviceModuleList) {
if(dm->deviceAliasOrURI == consumer.getDeviceAlias()) {
devmod = dm;
break;
}
}
assert(devmod != nullptr);
Martin Killenberg
committed
Martin Killenberg
committed
// The accessor implementation already has its data in the user buffer. We now just have to add a valid version number
// and have a recovery accessors (RecoveryHelper to be excact) which we can register at the DeviceModule.
// As this is a constant we don't need to change it later and don't have to store it somewere else.
devmod->addRecoveryAccessor(boost::make_shared<RecoveryHelper>(impl, VersionNumber()));
}
else if(consumer.getType() == NodeType::TriggerReceiver) {
throw ChimeraTK::logic_error("Using constants as triggers is not supported!");
}
else {
throw ChimeraTK::logic_error("Unexpected node type!"); // LCOV_EXCL_LINE (assert-like)
Martin Christoph Hierholzer
committed
}
}
connectionMade = true;
Martin Christoph Hierholzer
committed
}
if(!connectionMade) { // LCOV_EXCL_LINE (assert-like)
throw ChimeraTK::logic_error( // LCOV_EXCL_LINE (assert-like)
"The variable network cannot be handled. Implementation missing!"); // LCOV_EXCL_LINE (assert-like)
} // LCOV_EXCL_LINE (assert-like)
catch(ChimeraTK::logic_error& e) {
std::stringstream ss;
ss << "ChimeraTK::logic_error thrown in Application::typedMakeConnection() for network:" << std::endl;
network.dump("", ss);
ss << e.what();
throw ChimeraTK::logic_error(ss.str());
Martin Christoph Hierholzer
committed
}
Martin Christoph Hierholzer
committed
}
/*********************************************************************************************************************/
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
template<typename UserType>
void Application::addConsumersToFanout(boost::shared_ptr<FanOut<UserType>> fanOut, VariableNetworkNode& feeder, std::list<VariableNetworkNode> consumers, boost::shared_ptr<ConsumingFanOut<UserType>> consumingFanOut) {
// In case we have one or more trigger receivers among our consumers, we
// produce one consuming application variable for each device. Later this will create a TriggerFanOut for
// each trigger consumer, i.e. one per device so one blocking device does not affect the others.
/** Map of deviceAliases to their corresponding TriggerFanOuts. */
std::map<std::string, boost::shared_ptr<ChimeraTK::NDRegisterAccessor<UserType>>> triggerFanOuts;
// add all consumers to the FanOut
for(auto& consumer : consumers) {
if(consumer.getType() == NodeType::Application) {
if(consumingFanOut && consumer.getMode() == UpdateMode::poll) {
consumer.setAppAccessorImplementation<UserType>(consumingFanOut);
consumingFanOut.reset();
}
else {
auto impls = createApplicationVariable<UserType>(consumer);
fanOut->addSlave(impls.first, consumer);
consumer.setAppAccessorImplementation<UserType>(impls.second);
}
}
else if(consumer.getType() == NodeType::ControlSystem) {
auto impl = createProcessVariable<UserType>(consumer);
fanOut->addSlave(impl, consumer);
}
else if(consumer.getType() == NodeType::Device) {
auto impl = createDeviceVariable<UserType>(consumer.getDeviceAlias(), consumer.getRegisterName(),
{VariableDirection::consuming, false}, consumer.getMode(), consumer.getNumberOfElements());
fanOut->addSlave(impl, consumer);
}
else if(consumer.getType() == NodeType::TriggerReceiver) {
std::string deviceAlias = consumer.getNodeToTrigger().getOwner().getFeedingNode().getDeviceAlias();
auto triggerFanOut = triggerFanOuts[deviceAlias];
if(!triggerFanOut) { // triggerFanOut is a shared pointer, which evaluates false if default constructed.
// create a new process variable pair and set the sender/feeder to the fan out
auto triggerConnection = createApplicationVariable<UserType>(feeder);
triggerFanOut = triggerConnection.second;
triggerFanOuts[deviceAlias] = triggerFanOut;
fanOut->addSlave(triggerConnection.first, consumer);
}
consumer.getNodeToTrigger().getOwner().setExternalTriggerImpl(triggerFanOut);
}
else {
throw ChimeraTK::logic_error("Unexpected node type!"); // LCOV_EXCL_LINE (assert-like)
}
}
};
/*********************************************************************************************************************/
VariableNetwork& Application::createNetwork() {
Martin Christoph Hierholzer
committed
networkList.emplace_back();
return networkList.back();
}
Martin Christoph Hierholzer
committed
/*********************************************************************************************************************/
Application& Application::getInstance() {
return dynamic_cast<Application&>(ApplicationBase::getInstance());
Martin Christoph Hierholzer
committed
}
Martin Christoph Hierholzer
committed
/*********************************************************************************************************************/
void Application::stepApplication(bool waitForDeviceInitialisation) {
// testableMode_counter must be non-zero, otherwise there is no input for the application to process. It is also
// sufficient if testableMode_deviceInitialisationCounter is non-zero, if waitForDeviceInitialisation == true. In that
// case we only wait for the device initialisation to be completed.
if(testableMode_counter == 0 && (!waitForDeviceInitialisation || testableMode_deviceInitialisationCounter == 0)) {
throw ChimeraTK::logic_error("Application::stepApplication() called despite no input was provided "
"to the application to process!");
Martin Christoph Hierholzer
committed
}
// let the application run until it has processed all data (i.e. the semaphore
// counter is 0)
Martin Christoph Hierholzer
committed
size_t oldCounter = 0;
while(testableMode_counter > 0 || (waitForDeviceInitialisation && testableMode_deviceInitialisationCounter > 0)) {
if(enableDebugTestableMode && (oldCounter != testableMode_counter)) { // LCOV_EXCL_LINE (only cout)
std::cout << "Application::stepApplication(): testableMode_counter = " << testableMode_counter
<< std::endl; // LCOV_EXCL_LINE (only cout)
oldCounter = testableMode_counter; // LCOV_EXCL_LINE (only cout)
Martin Christoph Hierholzer
committed
}
Martin Christoph Hierholzer
committed
testableModeUnlock("stepApplication");
Martin Christoph Hierholzer
committed
boost::this_thread::yield();
Martin Christoph Hierholzer
committed
testableModeLock("stepApplication");
Martin Christoph Hierholzer
committed
}
}
/*********************************************************************************************************************/
void Application::testableModeLock(const std::string& name) {
Martin Christoph Hierholzer
committed
// don't do anything if testable mode is not enabled
if(!getInstance().testableMode) return;
Martin Christoph Hierholzer
committed
// debug output if enabled (also prevent spamming the same message)
if(getInstance().enableDebugTestableMode && getInstance().testableMode_repeatingMutexOwner == 0) { // LCOV_EXCL_LINE
// (only cout)
std::cout << "Application::testableModeLock(): Thread " << threadName() // LCOV_EXCL_LINE (only cout)
<< " tries to obtain lock for " << name << std::endl; // LCOV_EXCL_LINE (only cout)
} // LCOV_EXCL_LINE (only cout)
// if last lock was obtained repeatedly by the same thread, sleep a short time
// before obtaining the lock to give the other threads a chance to get the
// lock first
if(getInstance().testableMode_repeatingMutexOwner > 0) usleep(10000);
Martin Christoph Hierholzer
committed
// obtain the lock
Martin Christoph Hierholzer
committed
getTestableModeLockObject().lock();
// check if the last owner of the mutex was this thread, which may be a hint
// that no other thread is waiting for the lock
if(getInstance().testableMode_lastMutexOwner == std::this_thread::get_id()) {
Martin Christoph Hierholzer
committed
// debug output if enabled
if(getInstance().enableDebugTestableMode && getInstance().testableMode_repeatingMutexOwner == 0) { // LCOV_EXCL_LINE
// (only cout)
std::cout << "Application::testableModeLock(): Thread " << threadName() // LCOV_EXCL_LINE (only cout)
<< " repeatedly obtained lock successfully for " << name // LCOV_EXCL_LINE (only cout)
<< ". Further messages will be suppressed." << std::endl; // LCOV_EXCL_LINE (only cout)
} // LCOV_EXCL_LINE (only cout)
Martin Christoph Hierholzer
committed
// increase counter for stall detection
Martin Christoph Hierholzer
committed
getInstance().testableMode_repeatingMutexOwner++;
// detect stall: if the same thread got the mutex with no other thread
// obtaining it in between for one second, we assume no other thread is able
// to process data at this time. The test should fail in this case
if(getInstance().testableMode_repeatingMutexOwner > 100) {
// print an informative message first, which lists also all variables
// currently containing unread data.
std::cout << "*** Tests are stalled due to data which has been sent but "
"not received."
<< std::endl;
std::cout << " The following variables still contain unread values or "
"had data loss due to a queue overflow:"
for(auto& pair : Application::getInstance().testableMode_perVarCounter) {
if(pair.second > 0) {
std::cout << " - " << Application::getInstance().testableMode_names[pair.first] << " ["
<< getInstance().testableMode_processVars[pair.first]->getId() << "]";
Martin Christoph Hierholzer
committed
// check if process variable still has data in the queue
try {
if(getInstance().testableMode_processVars[pair.first]->readNonBlocking()) {
std::cout << " (unread data in queue)";
std::cout << " (data loss)";
}
// if we receive a logic_error in readNonBlocking() it just means
// another thread is waiting on a TransferFuture of this variable,
// and we actually were not allowed to read...
Martin Christoph Hierholzer
committed
std::cout << " (data loss)";
}
std::cout << std::endl;
Martin Christoph Hierholzer
committed
}
}
Martin Christoph Hierholzer
committed
std::cout << "(end of list)" << std::endl;
// throw a specialised exception to make sure whoever catches it really
// knows what he does...
Martin Christoph Hierholzer
committed
throw TestsStalled();
// getInstance().testableMode_counter = 0;
// for(auto &pair : Application::getInstance().testableMode_perVarCounter)
// pair.second = 0;
Martin Christoph Hierholzer
committed
}
// last owner of the mutex was different: reset the counter and store the
// thread id
Martin Christoph Hierholzer
committed
getInstance().testableMode_repeatingMutexOwner = 0;
Martin Christoph Hierholzer
committed
getInstance().testableMode_lastMutexOwner = std::this_thread::get_id();
Martin Christoph Hierholzer
committed
// debug output if enabled
if(getInstance().enableDebugTestableMode) { // LCOV_EXCL_LINE (only cout)
std::cout << "Application::testableModeLock(): Thread " << threadName() // LCOV_EXCL_LINE (only cout)
<< " obtained lock successfully for " << name << std::endl; // LCOV_EXCL_LINE (only cout)
} // LCOV_EXCL_LINE (only cout)
Martin Christoph Hierholzer
committed
}
}
/*********************************************************************************************************************/
void Application::testableModeUnlock(const std::string& name) {
if(!getInstance().testableMode) return;
if(getInstance().enableDebugTestableMode &&
(!getInstance().testableMode_repeatingMutexOwner // LCOV_EXCL_LINE (only cout)
|| getInstance().testableMode_lastMutexOwner != std::this_thread::get_id())) { // LCOV_EXCL_LINE (only cout)
std::cout << "Application::testableModeUnlock(): Thread " << threadName() // LCOV_EXCL_LINE (only cout)
<< " releases lock for " << name << std::endl; // LCOV_EXCL_LINE (only cout)
} // LCOV_EXCL_LINE (only cout)
Martin Christoph Hierholzer
committed
getTestableModeLockObject().unlock();
}
Martin Christoph Hierholzer
committed
/*********************************************************************************************************************/
std::string& Application::threadName() {
// Note: due to a presumed bug in gcc (still present in gcc 7), the
// thread_local definition must be in the cc file to prevent seeing different
// objects in the same thread under some conditions. Another workaround for
// this problem can be found in commit
// dc051bfe35ce6c1ed954010559186f63646cf5d4
thread_local std::string name{"**UNNAMED**"};
return name;
Martin Christoph Hierholzer
committed
}
/*********************************************************************************************************************/
std::unique_lock<std::mutex>& Application::getTestableModeLockObject() {
// Note: due to a presumed bug in gcc (still present in gcc 7), the
// thread_local definition must be in the cc file to prevent seeing different
// objects in the same thread under some conditions. Another workaround for
// this problem can be found in commit
// dc051bfe35ce6c1ed954010559186f63646cf5d4
thread_local std::unique_lock<std::mutex> myLock(Application::testableMode_mutex, std::defer_lock);
Martin Christoph Hierholzer
committed
}
/*********************************************************************************************************************/
void Application::registerDeviceModule(DeviceModule* deviceModule) {
deviceModuleList.push_back(deviceModule);
}
/*********************************************************************************************************************/
void Application::unregisterDeviceModule(DeviceModule* deviceModule) {
deviceModuleList.remove(deviceModule);
}