diff --git a/common/cpp/include/asapo/common/data_structs.h b/common/cpp/include/asapo/common/data_structs.h index 7c06ccc691df4c9a640f521971d54e73d99183b3..7b06404dd6ed1b6115635d5e594a2d971d607eb9 100644 --- a/common/cpp/include/asapo/common/data_structs.h +++ b/common/cpp/include/asapo/common/data_structs.h @@ -40,6 +40,7 @@ class MessageMeta { uint64_t buf_id{0}; std::string stream; // might be "unknownStream" for older datasets uint64_t dataset_substream{0}; + uint64_t ingest_mode{0}; std::string Json() const; bool SetFromJson(const std::string &json_string); std::string FullName(const std::string &base_path) const; @@ -173,6 +174,7 @@ enum IngestModeFlags : uint64_t { }; const uint64_t kDefaultIngestMode = kTransferData | kStoreInFilesystem | kStoreInDatabase; +const uint64_t kCacheOnlyIngestMode = kTransferData | kStoreInDatabase; enum class MetaIngestOp : uint64_t { kInsert = 1, diff --git a/common/cpp/src/data_structs/data_structs.cpp b/common/cpp/src/data_structs/data_structs.cpp index ed19d974894be4bdec461a88119071fd519f0b1b..e405a65d3f943a519a8e0cfd0046d2964a5179f7 100644 --- a/common/cpp/src/data_structs/data_structs.cpp +++ b/common/cpp/src/data_structs/data_structs.cpp @@ -72,6 +72,7 @@ std::string MessageMeta::Json() const { "\"buf_id\":" + std::to_string(buf_id_int) + "," "\"stream\":\"" + stream + "\"," "\"dataset_substream\":" + std::to_string(dataset_substream) + "," + "\"ingest_mode\":" + std::to_string(ingest_mode) + "," "\"meta\":" + (metadata.size() == 0 ? std::string("{}") : metadata) + "}"; return s; @@ -138,8 +139,8 @@ bool MessageMeta::SetFromJson(const std::string& json_string) { *this = old; return false; } - -//ignore error if meta not found + //ignore error if ingest_mode not found + parser.GetUInt64("ingest_mode", &ingest_mode); return true; } diff --git a/common/cpp/unittests/data_structs/test_data_structs.cpp b/common/cpp/unittests/data_structs/test_data_structs.cpp index 2fcd83ea150cc2bc8e2c20fe83d04ce2084b1c50..30a24a831651e9f8b0274d272d8346cd5b20894f 100644 --- a/common/cpp/unittests/data_structs/test_data_structs.cpp +++ b/common/cpp/unittests/data_structs/test_data_structs.cpp @@ -33,6 +33,7 @@ MessageMeta PrepareMessageMeta(bool includeNewStreamField = true) { message_meta.dataset_substream = 3; message_meta.name = std::string("folder") + asapo::kPathSeparator + "test"; message_meta.source = "host:1234"; + message_meta.ingest_mode = asapo::kDefaultIngestMode; message_meta.buf_id = big_uint; message_meta.timestamp = std::chrono::time_point<std::chrono::system_clock>(std::chrono::milliseconds(1)); message_meta.metadata = "{\"bla\":10}"; @@ -56,10 +57,10 @@ TEST(MessageMetaTests, CorrectConvertToJson) { std::string json = message_meta.Json(); if (asapo::kPathSeparator == '/') { ASSERT_THAT(json, Eq( - R"({"_id":1,"size":100,"name":"folder/test","timestamp":1000000,"source":"host:1234","buf_id":-1,"stream":"testStream","dataset_substream":3,"meta":{"bla":10}})")); + R"({"_id":1,"size":100,"name":"folder/test","timestamp":1000000,"source":"host:1234","buf_id":-1,"stream":"testStream","dataset_substream":3,"ingest_mode":13,"meta":{"bla":10}})")); } else { ASSERT_THAT(json, Eq( - R"({"_id":1,"size":100,"name":"folder\\test","timestamp":1000000,"source":"host:1234","buf_id":-1,"stream":"testStream","dataset_substream":3,"meta":{"bla":10}})")); + R"({"_id":1,"size":100,"name":"folder\\test","timestamp":1000000,"source":"host:1234","buf_id":-1,"stream":"testStream","dataset_substream":3,"ingest_mode":13,"meta":{"bla":10}})")); } } diff --git a/consumer/api/c/include/asapo/consumer_c.h b/consumer/api/c/include/asapo/consumer_c.h index 01a2018f7db793d9fba3e93addc7a2d9644aa808..80c1f841bcf17b09888fcd49a6f6208f3af2810e 100644 --- a/consumer/api/c/include/asapo/consumer_c.h +++ b/consumer/api/c/include/asapo/consumer_c.h @@ -40,6 +40,7 @@ enum AsapoConsumerErrorType { kWrongInput, kPartialData, kUnsupportedClient, + kDataNotInCache, kUnknownError }; diff --git a/consumer/api/cpp/include/asapo/consumer/consumer_error.h b/consumer/api/cpp/include/asapo/consumer/consumer_error.h index e524e1c1e68d7a56d42fa3358942a1b3852b00bd..89320d151f48a44d2c1c5546262086249f7aceca 100644 --- a/consumer/api/cpp/include/asapo/consumer/consumer_error.h +++ b/consumer/api/cpp/include/asapo/consumer/consumer_error.h @@ -15,7 +15,8 @@ enum class ConsumerErrorType { kLocalIOError, kWrongInput, kPartialData, - kUnsupportedClient + kUnsupportedClient, + kDataNotInCache, }; using ConsumerErrorTemplate = ServiceErrorTemplate<ConsumerErrorType>; @@ -45,6 +46,10 @@ auto const kLocalIOError = ConsumerErrorTemplate { "local i/o error", ConsumerErrorType::kLocalIOError }; +auto const kDataNotInCache = ConsumerErrorTemplate { + "data not in cache", ConsumerErrorType::kDataNotInCache +}; + auto const kStreamFinished = ConsumerErrorTemplate { "stream finished", ConsumerErrorType::kStreamFinished }; diff --git a/consumer/api/cpp/src/consumer_c_glue.cpp b/consumer/api/cpp/src/consumer_c_glue.cpp index 240bfcb90a8b4d3d414457fb00eae39b3a7109d8..1fb7f6dee01f71bc3ecf89b594d8b45b228c7072 100644 --- a/consumer/api/cpp/src/consumer_c_glue.cpp +++ b/consumer/api/cpp/src/consumer_c_glue.cpp @@ -79,7 +79,8 @@ extern "C" { kLocalIOError == asapo::ConsumerErrorType::kLocalIOError&& kWrongInput == asapo::ConsumerErrorType::kWrongInput&& kPartialData == asapo::ConsumerErrorType::kPartialData&& - kUnsupportedClient == asapo::ConsumerErrorType::kUnsupportedClient, + kUnsupportedClient == asapo::ConsumerErrorType::kUnsupportedClient && + kDataNotInCache == asapo::ConsumerErrorType::kDataNotInCache, "incompatible bit reps between c++ and c for asapo::ConsumerErrorType"); static_assert(kAllStreams == asapo::StreamFilter::kAllStreams&& kFinishedStreams == asapo::StreamFilter::kFinishedStreams&& diff --git a/consumer/api/cpp/src/consumer_impl.cpp b/consumer/api/cpp/src/consumer_impl.cpp index beb12a9fddf4ee4ec8a4123562cfbb834495deb6..76f536cbff432ae4d78b6f493b970b357b526587 100644 --- a/consumer/api/cpp/src/consumer_impl.cpp +++ b/consumer/api/cpp/src/consumer_impl.cpp @@ -472,6 +472,10 @@ Error ConsumerImpl::GetDataFromFile(MessageMeta* info, MessageData* data) { return nullptr; } +bool DataCanBeOnDisk(const MessageMeta* info) { + return info->ingest_mode ==0 || (info->ingest_mode & IngestModeFlags::kStoreInFilesystem); +} + Error ConsumerImpl::RetrieveData(MessageMeta* info, MessageData* data) { if (data == nullptr || info == nullptr) { return ConsumerErrorTemplates::kWrongInput.Generate("pointers are empty"); @@ -485,6 +489,10 @@ Error ConsumerImpl::RetrieveData(MessageMeta* info, MessageData* data) { } } + if (!DataCanBeOnDisk(info)) { + return ConsumerErrorTemplates::kDataNotInCache.Generate(); + } + if (has_filesystem_) { return GetDataFromFile(info, data); } @@ -500,6 +508,7 @@ Error ConsumerImpl::GetDataIfNeeded(MessageMeta* info, MessageData* data) { return RetrieveData(info, data); } + bool ConsumerImpl::DataCanBeInBuffer(const MessageMeta* info) { return info->buf_id > 0; } diff --git a/consumer/api/cpp/unittests/test_consumer_impl.cpp b/consumer/api/cpp/unittests/test_consumer_impl.cpp index 2fee68271f47094771aeaccf8711beac0f11089d..b8afdab41eed763659166da1d46acb4f6359dd95 100644 --- a/consumer/api/cpp/unittests/test_consumer_impl.cpp +++ b/consumer/api/cpp/unittests/test_consumer_impl.cpp @@ -676,6 +676,20 @@ TEST_F(ConsumerImplTests, GetMessageCallsReadFromFileIfCannotReadFromCache) { ASSERT_THAT(info.buf_id, Eq(0)); } +TEST_F(ConsumerImplTests, GetMessageReturnsErrorIfCannotReadFromCache) { + MockGetBrokerUri(); + auto to_send = CreateFI(); + to_send.ingest_mode = asapo::kCacheOnlyIngestMode; + to_send.buf_id = 0; + auto json = to_send.Json(); + MockGet(json); + + MessageData data; + + auto err = consumer->GetNext(expected_group_id, &info, &data, expected_stream); + ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kDataNotInCache)); +} + TEST_F(ConsumerImplTests, GetMessageCallsReadFromFileIfZeroBufId) { MockGetBrokerUri(); auto to_send = CreateFI(0); diff --git a/consumer/api/python/asapo_consumer.pxd b/consumer/api/python/asapo_consumer.pxd index 1306b95b8896e6026a7c0e1a114e1ea00d8e2083..140e8950eef5ed830f4a9e218792baa059809a92 100644 --- a/consumer/api/python/asapo_consumer.pxd +++ b/consumer/api/python/asapo_consumer.pxd @@ -115,6 +115,7 @@ cdef extern from "asapo/asapo_consumer.h" namespace "asapo": ErrorTemplateInterface kWrongInput "asapo::ConsumerErrorTemplates::kWrongInput" ErrorTemplateInterface kPartialData "asapo::ConsumerErrorTemplates::kPartialData" ErrorTemplateInterface kUnsupportedClient "asapo::ConsumerErrorTemplates::kUnsupportedClient" + ErrorTemplateInterface kDataNotInCache "asapo::ConsumerErrorTemplates::kDataNotInCache" cdef cppclass ConsumerErrorData: diff --git a/consumer/api/python/asapo_consumer.pyx.in b/consumer/api/python/asapo_consumer.pyx.in index 268554ba6a9c4ce92eb9780c72d87d3d71d9ee07..de490719b58ed31df7d570ad3b226b060498c57d 100644 --- a/consumer/api/python/asapo_consumer.pyx.in +++ b/consumer/api/python/asapo_consumer.pyx.in @@ -50,6 +50,9 @@ class AsapoLocalIOError(AsapoConsumerError): class AsapoUnsupportedClientError(AsapoConsumerError): pass +class AsapoDataNotInCacheError(AsapoConsumerError): + pass + class AsapoStreamFinishedError(AsapoConsumerError): def __init__(self,message,id_max=None,next_stream=None): AsapoConsumerError.__init__(self,message) @@ -105,6 +108,8 @@ cdef throw_exception(Error& err, res = None): raise AsapoInterruptedTransactionError(error_string) elif err == kUnsupportedClient: raise AsapoUnsupportedClientError(error_string) + elif err == kDataNotInCache: + raise AsapoDataNotInCacheError(error_string) else: raise AsapoConsumerError(error_string) diff --git a/producer/api/python/asapo_producer.pxd b/producer/api/python/asapo_producer.pxd index 359758a8ad1265dcfd6f7d45daffabc2b8645ad6..00898e56192f2cda4e36aebfc1f226aa3aac4996 100644 --- a/producer/api/python/asapo_producer.pxd +++ b/producer/api/python/asapo_producer.pxd @@ -131,6 +131,7 @@ cdef extern from "asapo/asapo_producer.h" namespace "asapo" nogil: cdef extern from "asapo/asapo_producer.h" namespace "asapo": uint64_t kDefaultIngestMode + uint64_t kCacheOnlyIngestMode enum IngestModeFlags: kTransferData kTransferMetaDataOnly diff --git a/producer/api/python/asapo_producer.pyx.in b/producer/api/python/asapo_producer.pyx.in index f41e7024a0625e768a1c0215933a90ae3f5e1d34..34670d1df9b1c240260cd44e64bf34edf59283cb 100644 --- a/producer/api/python/asapo_producer.pyx.in +++ b/producer/api/python/asapo_producer.pyx.in @@ -13,6 +13,7 @@ import atexit np.import_array() DEFAULT_INGEST_MODE = kDefaultIngestMode +CACHE_ONLY_INGEST_MODE = kCacheOnlyIngestMode INGEST_MODE_TRANSFER_DATA = kTransferData INGEST_MODE_TRANSFER_METADATA_ONLY = kTransferMetaDataOnly INGEST_MODE_STORE_IN_FILESYSTEM = kStoreInFilesystem diff --git a/receiver/src/request.cpp b/receiver/src/request.cpp index 35ee43bd5c7aec3aeee7f2c26c6e3e82f6005a5f..67dc7c50d3f2eaaefcc81142972978b9a0b02317 100644 --- a/receiver/src/request.cpp +++ b/receiver/src/request.cpp @@ -260,4 +260,8 @@ const std::string& Request::GetOriginHost() const { return origin_host_; } +uint64_t Request::GetIngestMode() const { + return request_header_.custom_data[kPosIngestMode]; +} + } diff --git a/receiver/src/request.h b/receiver/src/request.h index 4d1cf79c19d9538c087431592fbe20c69218135b..007fa9564695bc6e3238bbbdf23eded84394cd14 100644 --- a/receiver/src/request.h +++ b/receiver/src/request.h @@ -45,7 +45,7 @@ class Request { ASAPO_VIRTUAL void* GetData() const; ASAPO_VIRTUAL Opcode GetOpCode() const; ASAPO_VIRTUAL const char* GetMessage() const; - + ASAPO_VIRTUAL uint64_t GetIngestMode() const; ASAPO_VIRTUAL const std::string& GetProducerInstanceId() const; ASAPO_VIRTUAL void SetProducerInstanceId(std::string producer_instance_id); ASAPO_VIRTUAL const std::string& GetPipelineStepId() const; diff --git a/receiver/src/request_handler/request_handler_db_write.cpp b/receiver/src/request_handler/request_handler_db_write.cpp index c4d4529d30369c88144ae9b1808310d630d25f05..e715287b026d4e698d968bb5d116841912aba7f4 100644 --- a/receiver/src/request_handler/request_handler_db_write.cpp +++ b/receiver/src/request_handler/request_handler_db_write.cpp @@ -78,6 +78,7 @@ MessageMeta RequestHandlerDbWrite::PrepareMessageMeta(const Request* request) co message_meta.name = request->GetFileName(); message_meta.size = request->GetDataSize(); message_meta.id = request->GetDataID(); + message_meta.ingest_mode = request->GetIngestMode(); message_meta.buf_id = request->GetSlotId(); message_meta.stream = request->GetStream(); message_meta.source = GetReceiverConfig()->dataserver.advertise_uri; diff --git a/receiver/unittests/receiver_mocking.h b/receiver/unittests/receiver_mocking.h index f47ddde586e44e6568fbb186f649d563ddfc2be8..37ba7cbd7d498e58c7b53c701a628e3659385ff0 100644 --- a/receiver/unittests/receiver_mocking.h +++ b/receiver/unittests/receiver_mocking.h @@ -92,7 +92,7 @@ class MockRequest: public Request { MOCK_METHOD(const std::string &, GetBeamline, (), (const, override)); MOCK_METHOD(asapo::Opcode, GetOpCode, (), (const, override)); MOCK_METHOD(asapo::SocketDescriptor, GetSocket, (), (const, override)); - + MOCK_METHOD(uint64_t, GetIngestMode, (), (const, override)); MOCK_METHOD(const std::string &, GetOnlinePath, (), (const, override)); MOCK_METHOD(const std::string &, GetOfflinePath, (), (const, override)); diff --git a/receiver/unittests/request_handler/test_request_handler_db_writer.cpp b/receiver/unittests/request_handler/test_request_handler_db_writer.cpp index 22a8558f73b477be773881e653396912edc8a17f..d1f5aaa8c7bb9029a94a8faeee802898916fa912 100644 --- a/receiver/unittests/request_handler/test_request_handler_db_writer.cpp +++ b/receiver/unittests/request_handler/test_request_handler_db_writer.cpp @@ -43,6 +43,7 @@ class DbWriterHandlerTests : public Test { NiceMock<asapo::MockLogger> mock_logger; ReceiverConfig config; std::string expected_beamtime_id = "beamtime_id"; + uint64_t expected_ingest_mode = asapo::kDefaultIngestMode; std::string expected_default_source = "detector"; std::string expected_data_source = "source"; std::string expected_host_ip = "127.0.0.1"; @@ -89,6 +90,7 @@ MATCHER_P(CompareMessageMeta, file, "") { if (arg.dataset_substream != file.dataset_substream) return false; if (arg.name != file.name) return false; if (arg.stream != file.stream) return false; + if (arg.ingest_mode != file.ingest_mode) return false; if (arg.id != file.id) return false; if (arg.metadata != file.metadata) return false; @@ -114,6 +116,9 @@ void DbWriterHandlerTests::ExpectRequestParams(asapo::Opcode op_code, const std: .WillRepeatedly(ReturnRef(data_source)) ; + EXPECT_CALL(*mock_request, GetIngestMode()) + .WillRepeatedly(Return(expected_ingest_mode)) + ; EXPECT_CALL(*mock_request, GetSlotId()) .WillOnce(Return(expected_buf_id)) @@ -137,7 +142,6 @@ void DbWriterHandlerTests::ExpectRequestParams(asapo::Opcode op_code, const std: .WillRepeatedly(Return(expected_stream)) ; - EXPECT_CALL(*mock_request, GetMetaData()) .WillOnce(ReturnRef(expected_metadata)) ; @@ -165,6 +169,7 @@ MessageMeta DbWriterHandlerTests::PrepareMessageMeta(bool substream) { message_meta.size = expected_file_size; message_meta.name = expected_file_name; message_meta.id = expected_id; + message_meta.ingest_mode = expected_ingest_mode; if (substream) { message_meta.dataset_substream = expected_substream; } diff --git a/tests/automatic/full_chain/CMakeLists.txt b/tests/automatic/full_chain/CMakeLists.txt index c1e1208856930c9d0e938b1a919b8b9f9f62319e..062d66f2787db7db6241f13be52952ee48556630 100644 --- a/tests/automatic/full_chain/CMakeLists.txt +++ b/tests/automatic/full_chain/CMakeLists.txt @@ -2,6 +2,7 @@ add_subdirectory(simple_chain) if (BUILD_PYTHON) add_subdirectory(simple_chain_usermeta_python) add_subdirectory(send_recv_streams_python) + add_subdirectory(simple_chain_cache_only_python) endif() add_subdirectory(simple_chain_metadata) add_subdirectory(two_beamlines) diff --git a/tests/automatic/full_chain/simple_chain_cache_only_python/CMakeLists.txt b/tests/automatic/full_chain/simple_chain_cache_only_python/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..62fa23900444fbbc1fd67d109392373254ebf884 --- /dev/null +++ b/tests/automatic/full_chain/simple_chain_cache_only_python/CMakeLists.txt @@ -0,0 +1,21 @@ +set(TARGET_NAME simple-chaine-chache-only) + + +################################ +# Testing +################################ +if (UNIX) + get_target_property(PYTHON_LIBS_PRODUCER python-lib-producer BINARY_DIR) + get_target_property(PYTHON_LIBS_CONSUMER python-lib-consumer BINARY_DIR) +else() + get_target_property(PYTHON_LIBS_PRODUCER asapo_producer BINARY_DIR) + get_target_property(PYTHON_LIBS_CONSUMER asapo_consumer BINARY_DIR) +endif() + + +file(TO_NATIVE_PATH ${CMAKE_CURRENT_SOURCE_DIR}/transfer.py TEST_SCRIPT ) + +add_script_test("${TARGET_NAME}-python" "${Python_EXECUTABLE} ${PYTHON_LIBS_PRODUCER} ${PYTHON_LIBS_CONSUMER} ${TEST_SCRIPT} " nomem) + + + diff --git a/tests/automatic/full_chain/simple_chain_cache_only_python/check_linux.sh b/tests/automatic/full_chain/simple_chain_cache_only_python/check_linux.sh new file mode 100644 index 0000000000000000000000000000000000000000..652785038ac33e6bbd1a59e6663e99b738a0602f --- /dev/null +++ b/tests/automatic/full_chain/simple_chain_cache_only_python/check_linux.sh @@ -0,0 +1,20 @@ +#!/usr/bin/env bash + +set -e + +trap Cleanup EXIT + +database_name=db_test +data_source=python +beamtime_id=asapo_test + +Cleanup() { + echo cleanup + echo "db.dropDatabase()" | mongo ${beamtime_id}_${data_source} + influx -database ${database_name} -execute "drop series from statistics, RequestsRate" +} + +export PYTHONPATH=$2:$3:${PYTHONPATH} + +$1 $4 $data_source $beamtime_id "127.0.0.1:8400" $ASAPO_TEST_RW_TOKEN &> out +cat out diff --git a/tests/automatic/full_chain/simple_chain_cache_only_python/check_windows.bat b/tests/automatic/full_chain/simple_chain_cache_only_python/check_windows.bat new file mode 100644 index 0000000000000000000000000000000000000000..caced92dd1472c4733ccbbf35cf37f37b67c8a23 --- /dev/null +++ b/tests/automatic/full_chain/simple_chain_cache_only_python/check_windows.bat @@ -0,0 +1,19 @@ +SET mongo_exe="c:\Program Files\MongoDB\Server\4.2\bin\mongo.exe" +SET beamtime_id=asapo_test +SET data_source=python + +set PYTHONPATH=%2:%3 + +"%1" "%4" %data_source% %beamtime_id% "127.0.0.1:8400" %ASAPO_TEST_RW_TOKEN% > out || goto :error +type out + +goto :clean + +:error +call :clean +exit /b 1 + +:clean +echo db.dropDatabase() | %mongo_exe% %beamtime_id%_python + + diff --git a/tests/automatic/full_chain/simple_chain_cache_only_python/transfer.py b/tests/automatic/full_chain/simple_chain_cache_only_python/transfer.py new file mode 100644 index 0000000000000000000000000000000000000000..29f7f1fe8c5f25acd861d2c7212e040ef98ec40e --- /dev/null +++ b/tests/automatic/full_chain/simple_chain_cache_only_python/transfer.py @@ -0,0 +1,48 @@ +from __future__ import print_function + +import asapo_producer +import asapo_consumer +import sys + + +data_source = sys.argv[1] +beamtime = sys.argv[2] +endpoint = sys.argv[3] + +token = sys.argv[4] +nthreads = 8 + +def callback(payload, err): + if isinstance(err, asapo_producer.AsapoServerWarning): + print("successfuly sent, but with warning from server: ", payload, err) + elif err is not None: + print("could not sent: ", payload, err) + else: + print("successfuly sent: ", payload) + +producer = asapo_producer.create_producer(endpoint,'processed', beamtime, 'auto', data_source, token, nthreads, 6000) +producer.set_log_level("debug") + +# send data +producer.send(1, "processed/file6", b"hello", + ingest_mode=asapo_producer.CACHE_ONLY_INGEST_MODE, callback=callback) + +producer.wait_requests_finished(50000) + +consumer = asapo_consumer.create_consumer(endpoint, 'auto', False, beamtime, data_source, token, 5000) +group_id = consumer.generate_group_id() +_,meta = consumer.get_next(group_id, meta_only = True) +data = consumer.retrieve_data(meta) +text_data = data.tobytes().decode("utf-8") +print (meta,text_data) + +meta['buf_id'] = 0 #this will let asapo skip cache +try: + data = consumer.retrieve_data(meta) +except asapo_consumer.AsapoDataNotInCacheError as err: + print(err) + pass +else: + sys.exit(1) + +print('Finished successfully')