diff --git a/include/ArrayAccessor.h b/include/ArrayAccessor.h index fb943c3185d6e22d212b5af7299a81bb3f81d259..c82e8263636b10bb348e338edf36312dd343c84a 100644 --- a/include/ArrayAccessor.h +++ b/include/ArrayAccessor.h @@ -32,10 +32,8 @@ namespace ChimeraTK { void read() { if(Accessor<UserType>::_mode == UpdateMode::push) { - while(impl->readNonBlocking() == false) { /// @todo TODO proper blocking implementation - boost::this_thread::yield(); - boost::this_thread::interruption_point(); - } + impl->read(); + boost::this_thread::interruption_point(); } else { /// @todo TODO empty the queue to always receive the latest value diff --git a/include/FanOut.h b/include/FanOut.h index 75df2b9b879920340d59678c2bac7c63f2da55f3..e17f9e1180ef81190dd5c794d117c9afe84e55c6 100644 --- a/include/FanOut.h +++ b/include/FanOut.h @@ -110,34 +110,22 @@ namespace ChimeraTK { void run() { assert(_direction == VariableDirection::consuming); while(true) { - boost::this_thread::yield(); boost::this_thread::interruption_point(); if(hasExternalTrigger) { // wait for external trigger (if present) - /// @todo TODO replace with proper blocking implementation when supported by the CSA - while(externalTrigger->readNonBlocking() == false) { - boost::this_thread::yield(); - boost::this_thread::interruption_point(); - } + externalTrigger->read(); // receive data impl->readNonBlocking(); } else { // receive data - while(impl->readNonBlocking() == false) { - boost::this_thread::yield(); - boost::this_thread::interruption_point(); - } + impl->read(); } - boost::this_thread::yield(); - boost::this_thread::interruption_point(); for(auto &slave : slaves) { // send out copies to slaves // do not send copy if no data is expected (e.g. trigger) if(slave->getNumberOfSamples() != 0) { slave->accessChannel(0) = impl->accessChannel(0); } - boost::this_thread::yield(); - boost::this_thread::interruption_point(); slave->write(); } } @@ -180,7 +168,15 @@ namespace ChimeraTK { } void read() { - throw std::logic_error("Blocking read is not supported by process array."); + impl->read(); + mtca4u::NDRegisterAccessor<UserType>::buffer_2D[0].swap(impl->accessChannel(0)); + for(auto &slave : slaves) { // send out copies to slaves + // do not send copy if no data is expected (e.g. trigger) + if(slave->getNumberOfSamples() != 0) { + slave->accessChannel(0) = mtca4u::NDRegisterAccessor<UserType>::buffer_2D[0]; + } + slave->write(); + } } bool readNonBlocking() { diff --git a/include/ImplementationAdapter.h b/include/ImplementationAdapter.h index be78ad8617af36ea4f5ad4ccff5291b81c5076b6..71969af575f16d8d901b42b3f989c201beca07cf 100644 --- a/include/ImplementationAdapter.h +++ b/include/ImplementationAdapter.h @@ -51,10 +51,7 @@ namespace ChimeraTK { /** Synchronise sender and receiver. This function is executed in the separate thread. */ void run() { while(true) { - while(!_receiver->readNonBlocking()) { - boost::this_thread::yield(); - boost::this_thread::interruption_point(); - } + _receiver->read(); _sender->accessChannel(0) = _receiver->accessChannel(0); _sender->write(); } diff --git a/include/ScalarAccessor.h b/include/ScalarAccessor.h index 08e458090052f99363d82025d6f3ab0f4083fd6d..84cdbd2ef31cef66ef81e7bb9c9a1473c6761fd1 100644 --- a/include/ScalarAccessor.h +++ b/include/ScalarAccessor.h @@ -32,10 +32,8 @@ namespace ChimeraTK { void read() { if(Accessor<UserType>::_mode == UpdateMode::push) { - while(impl->readNonBlocking() == false) { /// @todo TODO proper blocking implementation - boost::this_thread::yield(); - boost::this_thread::interruption_point(); - } + impl->read(); + boost::this_thread::interruption_point(); } else { /// @todo TODO empty the queue to always receive the latest value