diff --git a/include/DoocsProcessArray.h b/include/DoocsProcessArray.h index 18432ae5681ded16b7072e726034eb30a4957cc4..f6c84906217278322b9bf376f20b140cef4f3a95 100644 --- a/include/DoocsProcessArray.h +++ b/include/DoocsProcessArray.h @@ -7,6 +7,8 @@ #include <ChimeraTK/ControlSystemAdapter/ControlSystemSynchronizationUtility.h> #include <ChimeraTK/ControlSystemAdapter/ProcessVariableListener.h> #include <ChimeraTK/OneDRegisterAccessor.h> +#include <ChimeraTK/ScalarRegisterAccessor.h> // needed for the macro pulse number +#include <ChimeraTK/DataConsistencyGroup.h> #include "DoocsUpdater.h" #include "splitStringAtFirstSlash.h" @@ -20,11 +22,12 @@ namespace ChimeraTK { public: DoocsProcessArray(EqFct* eqFct, std::string const& doocsPropertyName, boost::shared_ptr<ChimeraTK::NDRegisterAccessor<DOOCS_PRIMITIVE_T>> const& processArray, DoocsUpdater& updater) - : DOOCS_T(doocsPropertyName.c_str(), processArray->getNumberOfSamples(), eqFct), _processArray(processArray) { + : DOOCS_T(doocsPropertyName.c_str(), processArray->getNumberOfSamples(), eqFct), _processArray(processArray), _doocsUpdater(updater), _eqFct(eqFct){ if(processArray->isReadable()) { updater.addVariable(ChimeraTK::OneDRegisterAccessor<DOOCS_PRIMITIVE_T>(processArray), eqFct, - std::bind(&DoocsProcessArray<DOOCS_T, DOOCS_PRIMITIVE_T>::updateDoocsBuffer, this)); + std::bind(&DoocsProcessArray<DOOCS_T, DOOCS_PRIMITIVE_T>::updateDoocsBuffer, this, processArray->getId())); + _consistencyGroup.add(processArray); } // Check if the array length exceeds the maximum allowed length by DOOCS. @@ -62,7 +65,15 @@ namespace ChimeraTK { } } - void updateDoocsBuffer() { + void updateDoocsBuffer(TransferElementID transferElementId) { + // FIXME: A first implementation is checking the data consistency here. Later this should be + // before calling this function because calling this function through a function pointer is + // comparatively expensive. + // Only check the consistency group if there is a macro pulse number associated. + if (_macroPulseNumberSource && !_consistencyGroup.update(transferElementId)){ + return; + } + // Note: we already own the location lock by specification of the // DoocsUpdater auto& processVector = _processArray->accessChannel(0); @@ -92,12 +103,20 @@ namespace ChimeraTK { void publishZeroMQ() { publishZMQ = true; } void setMacroPulseNumberSource(boost::shared_ptr<ChimeraTK::NDRegisterAccessor<int64_t>> macroPulseNumberSource) { - _macroPulseNumberSource = macroPulseNumberSource; + if(_processArray->isReadable()) { + _macroPulseNumberSource = macroPulseNumberSource; + _consistencyGroup.add(macroPulseNumberSource); + _doocsUpdater.addVariable(ChimeraTK::ScalarRegisterAccessor<int64_t>(macroPulseNumberSource), _eqFct, + std::bind(&DoocsProcessArray<DOOCS_T, DOOCS_PRIMITIVE_T>::updateDoocsBuffer, this, macroPulseNumberSource->getId())); + } } protected: boost::shared_ptr<ChimeraTK::NDRegisterAccessor<DOOCS_PRIMITIVE_T>> _processArray; boost::shared_ptr<ChimeraTK::NDRegisterAccessor<int64_t>> _macroPulseNumberSource; + DataConsistencyGroup _consistencyGroup; + DoocsUpdater & _doocsUpdater; // store the reference to the updater. We need it when adding the macro pulse number + EqFct * _eqFct; // We need it when adding the macro pulse number bool publishZMQ{false}; // Internal function which copies the content from the DOOCS container into diff --git a/tests/src/serverTestZeroMQ.cpp b/tests/src/serverTestZeroMQ.cpp index 1203998c9b7781c0a1390ac1170e697f8f48e84f..78575e6746f8d409608582dab9315a17571f1a86 100644 --- a/tests/src/serverTestZeroMQ.cpp +++ b/tests/src/serverTestZeroMQ.cpp @@ -128,6 +128,9 @@ BOOST_AUTO_TEST_CASE(testScalar) { // First send, then wait. We assume that after 10 ms the event has been received once the ZMQ mechanism is up and running DoocsServerTestHelper::doocsSet<uint32_t>("//UINT/TO_DEVICE_SCALAR", expectedValue); referenceTestApplication.runMainLoopOnce(); + // FIXME: This timeout is essential so everything has been received and the next + // dataReceived really is false. It is a potential source of timing problems / + // reace conditions in this test. usleep(10000); if(++counter > 1000) break; } @@ -206,6 +209,7 @@ BOOST_AUTO_TEST_CASE(testArray) { // Add additional delay for the ZMQ system to come up usleep(2000000); + referenceTestApplication.versionNumber = ChimeraTK::VersionNumber(); int macroPulseNumber = 99999; DoocsServerTestHelper::doocsSet<int>("//INT/TO_DEVICE_SCALAR", macroPulseNumber); @@ -217,9 +221,13 @@ BOOST_AUTO_TEST_CASE(testArray) { size_t counter = 0; dataReceived = false; while(!dataReceived) { - usleep(1000); + DoocsServerTestHelper::doocsSet<int32_t>("//UINT/TO_DEVICE_ARRAY", expectedArrayValue); referenceTestApplication.runMainLoopOnce(); - if(++counter > 10000) break; + // FIXME: This timeout is essential so everything has been received and the next + // dataReceived really is false. It is a potential source of timing problems / + // reace conditions in this test. + usleep(10000); + if(++counter > 1000) break; } BOOST_CHECK(dataReceived == true); { @@ -229,15 +237,30 @@ BOOST_AUTO_TEST_CASE(testArray) { for(size_t k = 0; k < 10; ++k) BOOST_CHECK_EQUAL(received.get_int(k), expectedArrayValue[k]); } - // Trigger another update, the last one was eaten by the wait for startup above - DoocsServerTestHelper::doocsSet<int>("//INT/TO_DEVICE_SCALAR", macroPulseNumber); - DoocsServerTestHelper::doocsSet<int>("//UINT/TO_DEVICE_ARRAY", expectedArrayValue); - - // From now on, each update should be received. + // From now on, each consistent update should be received. + // Make sure consistent receiving is happening whether the macro pulse number is send first or second. + std::array<bool,10> sendMacroPulseFirst = {true, true, true, false, false, false, true, false, true, false}; for(size_t i = 0; i < 10; ++i) { - dataReceived = false; + referenceTestApplication.versionNumber = ChimeraTK::VersionNumber(); + --macroPulseNumber; + expectedArrayValue[1] = 100 + i; + if (sendMacroPulseFirst[i]) { + DoocsServerTestHelper::doocsSet<int32_t>("//INT/TO_DEVICE_SCALAR", macroPulseNumber); + }else{// send the value first + DoocsServerTestHelper::doocsSet<int32_t>("//UINT/TO_DEVICE_ARRAY", expectedArrayValue); + } + + // nothing must be received, no consistent set yet + dataReceived = false; referenceTestApplication.runMainLoopOnce(); usleep(10000); BOOST_CHECK(dataReceived == false); + + // now send the variable which has not been send yet + if (sendMacroPulseFirst[i]) { + DoocsServerTestHelper::doocsSet<int32_t>("//UINT/TO_DEVICE_ARRAY", expectedArrayValue); + }else{ + DoocsServerTestHelper::doocsSet<int32_t>("//INT/TO_DEVICE_SCALAR", macroPulseNumber); + } referenceTestApplication.runMainLoopOnce(); CHECK_WITH_TIMEOUT(dataReceived == true); { @@ -251,10 +274,6 @@ BOOST_AUTO_TEST_CASE(testArray) { BOOST_CHECK_EQUAL(receivedInfo.sec, secs); BOOST_CHECK_EQUAL(receivedInfo.usec, usecs); BOOST_CHECK_EQUAL(receivedInfo.ident, macroPulseNumber); - --macroPulseNumber; - DoocsServerTestHelper::doocsSet<int>("//INT/TO_DEVICE_SCALAR", macroPulseNumber); - expectedArrayValue[1] = 100 + i; - DoocsServerTestHelper::doocsSet<int32_t>("//UINT/TO_DEVICE_ARRAY", expectedArrayValue); } }