diff --git a/common/cpp/include/asapo/common/networking.h b/common/cpp/include/asapo/common/networking.h index d49d83c32a6d37b63a317fa703831b610fe02f4d..ca2c2f8e81fb6b788581e1d1cfe13cdbc816ca39 100644 --- a/common/cpp/include/asapo/common/networking.h +++ b/common/cpp/include/asapo/common/networking.h @@ -30,6 +30,7 @@ enum Opcode : uint8_t { kOpcodeAuthorize, kOpcodeTransferMetaData, kOpcodeDeleteStream, + kOpcodeGetMeta, kOpcodeCount, }; diff --git a/common/cpp/src/database/mongodb_client.cpp b/common/cpp/src/database/mongodb_client.cpp index f264d7db8543a94141ab958c24f748646460132b..d4e1de03f6f48e26ef6ff2c86194d3588575662d 100644 --- a/common/cpp/src/database/mongodb_client.cpp +++ b/common/cpp/src/database/mongodb_client.cpp @@ -203,6 +203,17 @@ Error MongoDBClient::InsertBsonDocument(const bson_p& document, bool ignore_dupl return nullptr; } +bool documentWasChanged(bson_t* reply) { + bson_iter_t iter; + bson_iter_init_find(&iter, reply, "upsertedCount"); + auto n_upsert = bson_iter_int32(&iter); + bson_iter_init_find(&iter, reply, "modifiedCount"); + auto n_mod = bson_iter_int32(&iter); + bson_iter_init_find(&iter, reply, "matchedCount"); + auto n_matched = bson_iter_int32(&iter); + return n_mod + n_upsert + n_matched > 0; +} + Error MongoDBClient::ReplaceBsonDocument(const std::string& id, const bson_p& document, bool upsert) const { bson_error_t mongo_err; @@ -210,20 +221,13 @@ Error MongoDBClient::ReplaceBsonDocument(const std::string& id, const bson_p& do bson_t* selector = BCON_NEW ("_id", BCON_UTF8(id.c_str())); bson_t reply; Error err = nullptr; - bson_iter_t iter; if (!mongoc_collection_replace_one(current_collection_, selector, document.get(), opts, &reply, &mongo_err)) { err = DBErrorTemplates::kInsertError.Generate(mongo_err.message); } - if (err == nullptr) { - bson_iter_init_find(&iter, &reply, "upsertedCount"); - auto n_upsert = bson_iter_int32(&iter); - bson_iter_init_find(&iter, &reply, "modifiedCount"); - auto n_mod = bson_iter_int32(&iter); - if (n_mod + n_upsert != 1) { - err = DBErrorTemplates::kInsertError.Generate("metadata does not exist"); - } + if (err == nullptr && !documentWasChanged(&reply)) { + err = DBErrorTemplates::kWrongInput.Generate("cannot replace: metadata does not exist"); } bson_free(opts); @@ -242,20 +246,12 @@ Error MongoDBClient::UpdateBsonDocument(const std::string& id, const bson_p& doc bson_t reply; Error err = nullptr; - bson_iter_t iter; - if (!mongoc_collection_update_one(current_collection_, selector, update, opts, &reply, &mongo_err)) { err = DBErrorTemplates::kInsertError.Generate(mongo_err.message); } - if (err == nullptr) { - bson_iter_init_find(&iter, &reply, "upsertedCount"); - auto n_upsert = bson_iter_int32(&iter); - bson_iter_init_find(&iter, &reply, "modifiedCount"); - auto n_mod = bson_iter_int32(&iter); - if (n_mod + n_upsert != 1) { - err = DBErrorTemplates::kInsertError.Generate("metadata does not exist"); - } + if (err == nullptr && !documentWasChanged(&reply)) { + err = DBErrorTemplates::kWrongInput.Generate("cannot update: metadata does not exist"); } bson_free(opts); diff --git a/producer/api/cpp/src/producer_impl.cpp b/producer/api/cpp/src/producer_impl.cpp index 3409b3d27efaafff9d48e89e1de435425c321d3a..a0f2c7477e40b883bf057697c7527e83cc5f8620 100644 --- a/producer/api/cpp/src/producer_impl.cpp +++ b/producer/api/cpp/src/producer_impl.cpp @@ -507,7 +507,13 @@ std::string ProducerImpl::GetBeamtimeMeta(uint64_t timeout_ms, Error* err) const } std::string ProducerImpl::GetMeta(const std::string& stream, uint64_t timeout_ms, Error* err) const { - return std::string(); + auto header = GenericRequestHeader{kOpcodeGetMeta, 0, 0, 0, "", stream}; + auto response = BlockingRequest(std::move(header), timeout_ms, err); + if (*err) { + return ""; + } + *err = nullptr; + return response; } } \ No newline at end of file diff --git a/producer/api/cpp/unittests/test_producer_impl.cpp b/producer/api/cpp/unittests/test_producer_impl.cpp index 1fc2dea861192734e7bb5a3f6c5d1ad6787f428a..216ac1a73ff0046adee313392c71dcc5b22391d4 100644 --- a/producer/api/cpp/unittests/test_producer_impl.cpp +++ b/producer/api/cpp/unittests/test_producer_impl.cpp @@ -501,7 +501,7 @@ TEST_F(ProducerImplTests, WaitRequestsFinished) { ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kTimeout)); } -MATCHER_P3(M_CheckGetStreamInfoRequest, op_code, source_credentials, stream, +MATCHER_P3(M_CheckGetRequest, op_code, source_credentials, stream, "Checks if a valid GenericRequestHeader was Send") { auto request = static_cast<ProducerRequest*>(arg); return ((asapo::GenericRequestHeader) (arg->header)).op_code == op_code @@ -509,9 +509,9 @@ MATCHER_P3(M_CheckGetStreamInfoRequest, op_code, source_credentials, stream, && strcmp(((asapo::GenericRequestHeader) (arg->header)).stream, stream) == 0; } -TEST_F(ProducerImplTests, GetStreamInfoMakesCorerctRequest) { +TEST_F(ProducerImplTests, GetStreamInfoMakesCorrectRequest) { producer.SetCredentials(expected_credentials); - EXPECT_CALL(mock_pull, AddRequest_t(M_CheckGetStreamInfoRequest(asapo::kOpcodeStreamInfo, + EXPECT_CALL(mock_pull, AddRequest_t(M_CheckGetRequest(asapo::kOpcodeStreamInfo, expected_credentials_str, expected_stream), true)).WillOnce( Return(nullptr)); @@ -537,9 +537,9 @@ TEST(GetStreamInfoTest, GetStreamInfoTimeout) { ASSERT_THAT(err->Explain(), HasSubstr("opcode: 4")); } -TEST_F(ProducerImplTests, GetLastStreamMakesCorerctRequest) { +TEST_F(ProducerImplTests, GetLastStreamMakesCorrectRequest) { producer.SetCredentials(expected_credentials); - EXPECT_CALL(mock_pull, AddRequest_t(M_CheckGetStreamInfoRequest(asapo::kOpcodeLastStream, + EXPECT_CALL(mock_pull, AddRequest_t(M_CheckGetRequest(asapo::kOpcodeLastStream, expected_credentials_str, ""), true)).WillOnce( Return(nullptr)); @@ -605,7 +605,7 @@ MATCHER_P4(M_CheckDeleteStreamRequest, op_code, source_credentials, stream, flag && strcmp(((asapo::GenericRequestHeader) (arg->header)).stream, stream) == 0; } -TEST_F(ProducerImplTests, DeleteStreamMakesCorerctRequest) { +TEST_F(ProducerImplTests, DeleteStreamMakesCorrectRequest) { producer.SetCredentials(expected_credentials); asapo::DeleteStreamOptions expected_options{}; expected_options.delete_meta = true; @@ -622,4 +622,29 @@ TEST_F(ProducerImplTests, DeleteStreamMakesCorerctRequest) { ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kTimeout)); } +TEST_F(ProducerImplTests, GetStreamMetaMakesCorrectRequest) { + producer.SetCredentials(expected_credentials); + EXPECT_CALL(mock_pull, AddRequest_t(M_CheckGetRequest(asapo::kOpcodeGetMeta, + expected_credentials_str, + expected_stream), true)).WillOnce( + Return(nullptr)); + + asapo::Error err; + producer.GetStreamMeta(expected_stream, 1000, &err); + ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kTimeout)); +} + + +TEST_F(ProducerImplTests, GetBeamtimeMetaMakesCorrectRequest) { + producer.SetCredentials(expected_credentials); + EXPECT_CALL(mock_pull, AddRequest_t(M_CheckGetRequest(asapo::kOpcodeGetMeta, + expected_credentials_str, + ""), true)).WillOnce( + Return(nullptr)); + + asapo::Error err; + producer.GetBeamtimeMeta(1000, &err); + ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kTimeout)); +} + } diff --git a/producer/api/python/asapo_producer.pxd b/producer/api/python/asapo_producer.pxd index 421e41f200f72f02fcfb4df7fdd9661d6a4d61ae..e50f762bc05d4b47cb920d36cc6e58e4a7f2d764 100644 --- a/producer/api/python/asapo_producer.pxd +++ b/producer/api/python/asapo_producer.pxd @@ -121,7 +121,8 @@ cdef extern from "asapo/asapo_producer.h" namespace "asapo" nogil: Error DeleteStream(string stream, uint64_t timeout_ms, DeleteStreamOptions options) Error SendBeamtimeMetadata(string metadata, MetaIngestMode mode, RequestCallback callback) Error SendStreamMetadata(string metadata, MetaIngestMode mode, string stream, RequestCallback callback) - + string GetStreamMeta(string stream, uint64_t timeout_ms, Error* err) + string GetBeamtimeMeta(uint64_t timeout_ms, Error* err) cdef extern from "asapo/asapo_producer.h" namespace "asapo": uint64_t kDefaultIngestMode diff --git a/producer/api/python/asapo_producer.pyx.in b/producer/api/python/asapo_producer.pyx.in index 218358a9a59bc943e5b10993416746d963e54ea3..f681fd636d495403c55920bb580428332bfb8b27 100644 --- a/producer/api/python/asapo_producer.pyx.in +++ b/producer/api/python/asapo_producer.pyx.in @@ -300,6 +300,41 @@ cdef class PyProducer: err = self.c_producer.get().DeleteStream(b_stream,timeout_ms,opts) if err: throw_exception(err) + def get_stream_meta(self,stream = 'default', uint64_t timeout_ms = 1000): + """ + :param stream: stream name + :type stream: string + :param timeout_ms: timeout in milliseconds + :type timeout_ms: int + :raises: + AsapoWrongInputError: wrong input (authorization, ...) + AsapoTimeoutError: request not finished for a given timeout + AsapoProducerError: other errors + """ + cdef Error err + cdef string res + cdef string b_stream = _bytes(stream) + with nogil: + res = self.c_producer.get().GetStreamMeta(b_stream,timeout_ms,&err) + if err: + throw_exception(err) + return json.loads(_str(res) or 'null') + def get_beamtime_meta(self, uint64_t timeout_ms = 1000): + """ + :param timeout_ms: timeout in milliseconds + :type timeout_ms: int + :raises: + AsapoWrongInputError: wrong input (authorization, ...) + AsapoTimeoutError: request not finished for a given timeout + AsapoProducerError: other errors + """ + cdef Error err + cdef string res + with nogil: + res = self.c_producer.get().GetBeamtimeMeta(timeout_ms,&err) + if err: + throw_exception(err) + return json.loads(_str(res) or 'null') def stream_info(self, stream = 'default', uint64_t timeout_ms = 1000): """ :param stream: stream name diff --git a/receiver/CMakeLists.txt b/receiver/CMakeLists.txt index 4a18b189606b27a371224d9a12dead194b711ed8..b743d6131194fc789e4bae3c4208efe9a2d1aa83 100644 --- a/receiver/CMakeLists.txt +++ b/receiver/CMakeLists.txt @@ -22,6 +22,7 @@ set(RECEIVER_CORE_FILES src/request_handler/request_handler_receive_metadata.cpp src/request_handler/request_handler_db_check_request.cpp src/request_handler/request_handler_db_delete_stream.cpp + src/request_handler/request_handler_db_get_meta.cpp src/request_handler/request_factory.cpp src/request_handler/request_handler_db.cpp src/file_processors/write_file_processor.cpp @@ -99,6 +100,7 @@ set(TEST_SOURCE_FILES unittests/request_handler/test_request_handler_receive_data.cpp unittests/request_handler/test_request_handler_receive_metadata.cpp unittests/request_handler/test_request_handler_delete_stream.cpp + unittests/request_handler/test_request_handler_db_get_meta.cpp unittests/statistics/test_statistics_sender_influx_db.cpp unittests/statistics/test_statistics_sender_fluentd.cpp unittests/mock_receiver_config.cpp diff --git a/receiver/src/request_handler/request_factory.cpp b/receiver/src/request_handler/request_factory.cpp index 4279f485adb563261fd899f9d32cf5fbdeee89db..39a2737076ab9819947bc1ecc7afb73417d46350 100644 --- a/receiver/src/request_handler/request_factory.cpp +++ b/receiver/src/request_handler/request_factory.cpp @@ -91,6 +91,10 @@ Error RequestFactory::AddHandlersToRequest(std::unique_ptr<Request>& request, request->AddHandler(&request_handler_db_last_stream_); break; } + case Opcode::kOpcodeGetMeta: { + request->AddHandler(&request_handler_db_get_meta_); + break; + } default: return ReceiverErrorTemplates::kInvalidOpCode.Generate(); } diff --git a/receiver/src/request_handler/request_factory.h b/receiver/src/request_handler/request_factory.h index ee371d5aca70e23200079f9a6a3ce5072622cb34..a44d7efaff638a9abe2ecd633675481b6244baaf 100644 --- a/receiver/src/request_handler/request_factory.h +++ b/receiver/src/request_handler/request_factory.h @@ -7,6 +7,7 @@ #include "request_handler_db_stream_info.h" #include "request_handler_db_last_stream.h" #include "request_handler_db_delete_stream.h" +#include "request_handler_db_get_meta.h" namespace asapo { @@ -29,6 +30,7 @@ class RequestFactory { RequestHandlerDbDeleteStream request_handler_delete_stream_{kDBDataCollectionNamePrefix}; RequestHandlerDbLastStream request_handler_db_last_stream_{kDBDataCollectionNamePrefix}; RequestHandlerDbMetaWrite request_handler_db_meta_write_{kDBMetaCollectionName}; + RequestHandlerDbGetMeta request_handler_db_get_meta_{kDBMetaCollectionName}; RequestHandlerAuthorize request_handler_authorize_; RequestHandlerDbCheckRequest request_handler_db_check_{kDBDataCollectionNamePrefix};; SharedCache cache_; diff --git a/receiver/src/request_handler/request_handler_db_get_meta.cpp b/receiver/src/request_handler/request_handler_db_get_meta.cpp new file mode 100644 index 0000000000000000000000000000000000000000..b046aa1195d0eb7511983370266895c56b45245c --- /dev/null +++ b/receiver/src/request_handler/request_handler_db_get_meta.cpp @@ -0,0 +1,38 @@ +#include "request_handler_db_get_meta.h" +#include "../receiver_config.h" +#include <asapo/database/db_error.h> + +namespace asapo { + +RequestHandlerDbGetMeta::RequestHandlerDbGetMeta(std::string collection_name_prefix) : RequestHandlerDb( + std::move(collection_name_prefix)) { +} + +Error RequestHandlerDbGetMeta::ProcessRequest(Request* request) const { + if (auto err = RequestHandlerDb::ProcessRequest(request) ) { + return err; + } + + auto stream_name = request->GetStream(); + + std::string metaid = stream_name.empty() ? "bt" : "st_" + stream_name; + std::string meta; + auto err = db_client__->GetMetaFromDb(kDBMetaCollectionName, metaid, &meta); + + bool no_error = err == nullptr; + if (err == DBErrorTemplates::kNoRecord) { + no_error = true; + } + + if (no_error) { + log__->Debug(std::string{"get meta for "} + (stream_name.empty() ? "beamtime" : stream_name) + " in " + + db_name_ + " at " + GetReceiverConfig()->database_uri); + request->SetResponseMessage(meta, ResponseMessageType::kInfo); + return nullptr; + } + + return DBErrorToReceiverError(err); +} + + +} \ No newline at end of file diff --git a/receiver/src/request_handler/request_handler_db_get_meta.h b/receiver/src/request_handler/request_handler_db_get_meta.h new file mode 100644 index 0000000000000000000000000000000000000000..faeddc5f6b942a05fcae8354e19533a1469c763b --- /dev/null +++ b/receiver/src/request_handler/request_handler_db_get_meta.h @@ -0,0 +1,18 @@ +#include "request_handler_db.h" +#include "../request.h" + +#ifndef ASAPO_RECEIVER_SRC_REQUEST_HANDLER_REQUEST_HANDLER_DB_GET_META_H_ +#define ASAPO_RECEIVER_SRC_REQUEST_HANDLER_REQUEST_HANDLER_DB_GET_META_H_ + +namespace asapo { + +class RequestHandlerDbGetMeta final: public RequestHandlerDb { + public: + RequestHandlerDbGetMeta(std::string collection_name_prefix); + Error ProcessRequest(Request* request) const override; +}; + +} + + +#endif //ASAPO_RECEIVER_SRC_REQUEST_HANDLER_REQUEST_HANDLER_DB_GET_META_H_ diff --git a/receiver/unittests/request_handler/test_request_factory.cpp b/receiver/unittests/request_handler/test_request_factory.cpp index 03d8bf4495f279493a1c9b90874c4a79b0e73ae8..64029d2ee34c89dd6071bce0a13aff4730d4bc0d 100644 --- a/receiver/unittests/request_handler/test_request_factory.cpp +++ b/receiver/unittests/request_handler/test_request_factory.cpp @@ -16,6 +16,7 @@ #include "../../src/request_handler/request_handler_db_stream_info.h" #include "../../src/request_handler/request_handler_db_last_stream.h" #include "../../src/request_handler/request_handler_db_delete_stream.h" +#include "../../src/request_handler/request_handler_db_get_meta.h" #include "../../src/request_handler/request_handler_receive_data.h" #include "../../src/request_handler/request_handler_receive_metadata.h" @@ -225,5 +226,16 @@ TEST_F(FactoryTests, DeleteStreamRequest) { ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerDbDeleteStream*>(request->GetListHandlers()[1]), Ne(nullptr)); } +TEST_F(FactoryTests, GetMetamRequest) { + generic_request_header.op_code = asapo::Opcode::kOpcodeGetMeta; + auto request = factory.GenerateRequest(generic_request_header, 1, origin_uri, &err); + ASSERT_THAT(err, Eq(nullptr)); + ASSERT_THAT(request->GetListHandlers().size(), Eq(2)); + ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerAuthorize*>(request->GetListHandlers()[0]), Ne(nullptr)); + ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerDbGetMeta*>(request->GetListHandlers()[1]), Ne(nullptr)); +} + + + } diff --git a/receiver/unittests/request_handler/test_request_handler_db_get_meta.cpp b/receiver/unittests/request_handler/test_request_handler_db_get_meta.cpp new file mode 100644 index 0000000000000000000000000000000000000000..bfb4a89e527069cdf111d72c815539d8be11b84c --- /dev/null +++ b/receiver/unittests/request_handler/test_request_handler_db_get_meta.cpp @@ -0,0 +1,128 @@ +#include <gtest/gtest.h> +#include <gmock/gmock.h> +#include <asapo/database/db_error.h> + +#include "asapo/unittests/MockIO.h" +#include "asapo/unittests/MockDatabase.h" +#include "asapo/unittests/MockLogger.h" + +#include "../../src/receiver_error.h" +#include "../../src/request.h" +#include "../../src/request_handler/request_factory.h" +#include "../../src/request_handler/request_handler.h" +#include "../../src/request_handler/request_handler_db_get_meta.h" +#include "../../../common/cpp/src/database/mongodb_client.h" + +#include "../mock_receiver_config.h" +#include "asapo/common/data_structs.h" +#include "asapo/common/networking.h" +#include "../receiver_mocking.h" + +using asapo::MockRequest; +using asapo::MessageMeta; +using ::testing::Test; +using ::testing::Return; +using ::testing::ReturnRef; +using ::testing::_; +using ::testing::DoAll; +using ::testing::SetArgReferee; +using ::testing::Gt; +using ::testing::Eq; +using ::testing::Ne; +using ::testing::Mock; +using ::testing::NiceMock; +using ::testing::InSequence; +using ::testing::SetArgPointee; +using ::testing::AllOf; +using ::testing::HasSubstr; + + +using ::asapo::Error; +using ::asapo::ErrorInterface; +using ::asapo::FileDescriptor; +using ::asapo::SocketDescriptor; +using ::asapo::MockIO; +using asapo::Request; +using asapo::RequestHandlerDbGetMeta; +using ::asapo::GenericRequestHeader; + +using asapo::MockDatabase; +using asapo::RequestFactory; +using asapo::SetReceiverConfig; +using asapo::ReceiverConfig; + + +namespace { + +class DbMetaGetMetaTests : public Test { + public: + RequestHandlerDbGetMeta handler{asapo::kDBDataCollectionNamePrefix}; + std::unique_ptr<NiceMock<MockRequest>> mock_request; + NiceMock<MockDatabase> mock_db; + NiceMock<asapo::MockLogger> mock_logger; + ReceiverConfig config; + std::string expected_beamtime_id = "beamtime_id"; + std::string expected_data_source = "source"; + std::string expected_stream = "stream"; + std::string expected_meta = "meta"; + void SetUp() override { + GenericRequestHeader request_header; + handler.db_client__ = std::unique_ptr<asapo::Database> {&mock_db}; + handler.log__ = &mock_logger; + mock_request.reset(new NiceMock<MockRequest> {request_header, 1, "", nullptr}); + ON_CALL(*mock_request, GetBeamtimeId()).WillByDefault(ReturnRef(expected_beamtime_id)); + } + void TearDown() override { + handler.db_client__.release(); + } + void ExpectGet(bool stream, const asapo::DBErrorTemplate* errorTemplate) { + SetReceiverConfig(config, "none"); + EXPECT_CALL(*mock_request, GetDataSource()).WillOnce(ReturnRef(expected_data_source)); + if (stream) { + EXPECT_CALL(*mock_request, GetStream()).WillOnce(Return(expected_stream)); + } + + EXPECT_CALL(mock_db, Connect_t(config.database_uri, expected_beamtime_id + "_" + expected_data_source)). + WillOnce(testing::Return(nullptr)); + EXPECT_CALL(mock_db, GetMetaFromDb_t("meta", stream ? "st_" + expected_stream : "bt", _)). + WillOnce(DoAll( + SetArgPointee<2>(expected_meta), + testing::Return(errorTemplate == nullptr ? nullptr : errorTemplate->Generate().release()) + )); + if (errorTemplate == nullptr) { + EXPECT_CALL(*mock_request, SetResponseMessage(expected_meta, asapo::ResponseMessageType::kInfo)); + EXPECT_CALL(mock_logger, Debug(AllOf(HasSubstr("meta"), + HasSubstr(config.database_uri), + HasSubstr(expected_data_source), + HasSubstr(stream ? expected_stream : "beamtime"), + HasSubstr(expected_beamtime_id) + ) + ) + ); + } + + } +}; + + + +TEST_F(DbMetaGetMetaTests, GetBeamtimeMetaOk) { + ExpectGet(false, nullptr); + auto err = handler.ProcessRequest(mock_request.get()); + ASSERT_THAT(err, Eq(nullptr)); +} + + +TEST_F(DbMetaGetMetaTests, GetStreamMetaOk) { + ExpectGet(true, nullptr); + auto err = handler.ProcessRequest(mock_request.get()); + ASSERT_THAT(err, Eq(nullptr)); +} + +TEST_F(DbMetaGetMetaTests, GetStreamMetaError) { + ExpectGet(true, &asapo::DBErrorTemplates::kDBError); + auto err = handler.ProcessRequest(mock_request.get()); + ASSERT_THAT(err, Ne(nullptr)); +} + +} diff --git a/tests/automatic/consumer/consumer_api/CMakeLists.txt b/tests/automatic/consumer/consumer_api/CMakeLists.txt index d7f451aa2b93cbdb99a6d5d38fea3df90190df84..59dcf868a82655036c5109093ddaf93f3a45210d 100644 --- a/tests/automatic/consumer/consumer_api/CMakeLists.txt +++ b/tests/automatic/consumer/consumer_api/CMakeLists.txt @@ -11,6 +11,5 @@ target_link_libraries(${TARGET_NAME} test_common asapo-consumer) ################################ # Testing ################################ -add_script_test("${TARGET_NAME}" "$<TARGET_FILE:${TARGET_NAME}>" - ) +add_script_test("${TARGET_NAME}" "$<TARGET_FILE:${TARGET_NAME}>") diff --git a/tests/automatic/mongo_db/meta/meta_mongodb.cpp b/tests/automatic/mongo_db/meta/meta_mongodb.cpp index 1aaf184a3f60089dfedb53e2fdba3d193749d692..6a9f07b0c87eda088640d247e9c463698e989b78 100644 --- a/tests/automatic/mongo_db/meta/meta_mongodb.cpp +++ b/tests/automatic/mongo_db/meta/meta_mongodb.cpp @@ -75,7 +75,7 @@ int main(int argc, char* argv[]) { db.InsertMeta("meta", stream_name, reinterpret_cast<const uint8_t*>(meta.c_str()), meta.size(), mode); M_AssertEq(nullptr, err); err = db.InsertMeta("meta", "notexist", reinterpret_cast<const uint8_t*>(meta.c_str()), meta.size(), mode); - M_AssertTrue(err == asapo::DBErrorTemplates::kInsertError); + M_AssertTrue(err == asapo::DBErrorTemplates::kWrongInput); std::string meta_res; err = db.GetMetaFromDb("meta", "0", &meta_res); @@ -87,6 +87,9 @@ int main(int argc, char* argv[]) { mode.op = asapo::MetaIngestOp::kUpdate; err = db.InsertMeta("meta", stream_name, reinterpret_cast<const uint8_t*>(mod_meta.c_str()), mod_meta.size(), mode); M_AssertEq(nullptr, err); + err = db.InsertMeta("meta", stream_name, reinterpret_cast<const uint8_t*>(mod_meta.c_str()), mod_meta.size(), mode); + M_AssertEq(nullptr, err); + err = db.GetMetaFromDb("meta", stream_name, &meta_res); M_AssertEq(nullptr, err); M_AssertEq(expected_meta, meta_res); diff --git a/tests/automatic/producer/cpp_api/CMakeLists.txt b/tests/automatic/producer/cpp_api/CMakeLists.txt index 0e19465b12a982927702a79369f69fb11978f44e..e4143f91e86755a0531e3678491c4a5f5e3b35d8 100644 --- a/tests/automatic/producer/cpp_api/CMakeLists.txt +++ b/tests/automatic/producer/cpp_api/CMakeLists.txt @@ -11,6 +11,5 @@ target_link_libraries(${TARGET_NAME} test_common asapo-producer) ################################ # Testing ################################ -add_script_test("${TARGET_NAME}" "$<TARGET_FILE:${TARGET_NAME}>" - ) +add_script_test("${TARGET_NAME}" "$<TARGET_FILE:${TARGET_NAME}>") diff --git a/tests/automatic/producer/cpp_api/check_linux.sh b/tests/automatic/producer/cpp_api/check_linux.sh index b3c4f6f846fc1374fb742ac211f0a93faa49c13f..7d6d7dbc2b4daf4f4890fddbc62e2c69b36711f5 100644 --- a/tests/automatic/producer/cpp_api/check_linux.sh +++ b/tests/automatic/producer/cpp_api/check_linux.sh @@ -23,4 +23,4 @@ mkdir -p ${receiver_folder} echo test > file1 -$1 "127.0.0.1:8400" $data_source $beamtime_id +$@ "127.0.0.1:8400" $data_source $beamtime_id diff --git a/tests/automatic/producer/cpp_api/producer_api.cpp b/tests/automatic/producer/cpp_api/producer_api.cpp index 2b687bc4d41a66dbc7787b04b7f0a375c8c00401..d5e5982ab635c69bbbd2a55283465ab48396e0bd 100644 --- a/tests/automatic/producer/cpp_api/producer_api.cpp +++ b/tests/automatic/producer/cpp_api/producer_api.cpp @@ -23,6 +23,21 @@ Args GetArgs(int argc, char* argv[]) { return Args{server, source, beamtime}; } +void TestMeta(const std::unique_ptr<asapo::Producer>& producer) { + asapo::Error err; + std::string meta = R"({"data":"test","embedded":{"edata":2}})"; + producer->SendBeamtimeMetadata(meta, asapo::MetaIngestMode{asapo::MetaIngestOp::kInsert, true}, nullptr); + producer->WaitRequestsFinished(5000); + auto meta_received = producer->GetBeamtimeMeta(5000, &err); + M_AssertTrue(meta_received == meta); + std::string meta_update = R"({"embedded":{"edata":3}})"; + std::string meta_updated = R"({"data":"test","embedded":{"edata":3}})"; + producer->SendBeamtimeMetadata(meta_update, asapo::MetaIngestMode{asapo::MetaIngestOp::kUpdate, false}, nullptr); + producer->WaitRequestsFinished(5000); + meta_received = producer->GetBeamtimeMeta(5000, &err); + M_AssertTrue(meta_received == meta_updated); +} + void Test(const std::unique_ptr<asapo::Producer>& producer) { asapo::MessageMeta fi; @@ -37,9 +52,11 @@ void Test(const std::unique_ptr<asapo::Producer>& producer) { M_AssertTrue(!server.empty(), "server version"); - producer->GetStreamInfo("default",5000,&err); + TestMeta(producer); + + producer->GetStreamInfo("default", 5000, &err); if (err) { - printf("%s\n",err->Explain().c_str()); + printf("%s\n", err->Explain().c_str()); } M_AssertTrue(err == nullptr, "stream info"); @@ -51,7 +68,7 @@ std::unique_ptr<asapo::Producer> CreateProducer(const Args& args) { auto producer = asapo::Producer::Create(args.server, 2, asapo::RequestHandlerType::kTcp, asapo::SourceCredentials{asapo::SourceType::kProcessed, args.beamtime, - "", args.source, ""}, 60000, &err); + "", args.source, ""}, 60000, &err); if (err) { std::cerr << "Cannot start producer. ProducerError: " << err << std::endl; exit(EXIT_FAILURE); @@ -66,7 +83,7 @@ std::unique_ptr<asapo::Producer> CreateProducer(const Args& args) { void TestAll(const Args& args) { asapo::Error err; auto producer = CreateProducer(args); - if (producer==nullptr) { + if (producer == nullptr) { std::cout << "Error CreateProducer: " << err << std::endl; exit(EXIT_FAILURE); } diff --git a/tests/automatic/producer/python_api/check_linux.sh b/tests/automatic/producer/python_api/check_linux.sh index c86f6bd6ae66428eb24dcc37703577a70953d1a3..51552b6eb0ce43d9eb1fd742872b1ab1859d0c95 100644 --- a/tests/automatic/producer/python_api/check_linux.sh +++ b/tests/automatic/producer/python_api/check_linux.sh @@ -27,14 +27,19 @@ echo test > file1 $1 $3 $data_source $beamtime_id "127.0.0.1:8400" &> out || cat out cat out -echo count successfully send, expect 15 -cat out | grep "successfuly sent" | wc -l | tee /dev/stderr | grep 15 +echo count successfully send, expect 17 +cat out | grep "successfuly sent" | wc -l | tee /dev/stderr | grep 17 +echo count wrong input, expect 11 +cat out | grep "wrong input" | wc -l | tee /dev/stderr | grep 11 + +echo count wrong json, expect 2 +cat out | grep "JSON parse error" | wc -l | tee /dev/stderr | grep 2 echo count same id, expect 4 cat out | grep "already have record with same id" | wc -l | tee /dev/stderr | grep 4 echo count duplicates, expect 6 cat out | grep "duplicate" | wc -l | tee /dev/stderr | grep 6 -echo count data in callback, expect 3 -cat out | grep "'data':" | wc -l | tee /dev/stderr | grep 3 +echo count data in callback, expect 6 +cat out | grep "'data':" | wc -l | tee /dev/stderr | grep 6 echo check found local io error cat out | grep "local i/o error" cat out | grep "Finished successfully" diff --git a/tests/automatic/producer/python_api/check_windows.bat b/tests/automatic/producer/python_api/check_windows.bat index 6dbbf441a3e80626d3e7405915e3bf21a49b7fcb..c854be8786882fa78123896faabca55d1054942c 100644 --- a/tests/automatic/producer/python_api/check_windows.bat +++ b/tests/automatic/producer/python_api/check_windows.bat @@ -18,7 +18,7 @@ set PYTHONPATH=%2 type out set NUM=0 for /F %%N in ('find /C "successfuly sent" ^< "out"') do set NUM=%%N -echo %NUM% | findstr 15 || goto error +echo %NUM% | findstr 17 || goto error for /F %%N in ('find /C "} wrong input: Bad request: already have record with same id" ^< "out"') do set NUM=%%N echo %NUM% | findstr 2 || goto error diff --git a/tests/automatic/producer/python_api/producer_api.py b/tests/automatic/producer/python_api/producer_api.py index 64e30bc439739f6e32d9481271c002739d9f717a..6f4bad1586c4f404f4c98afa3ba1157fa5efe18e 100644 --- a/tests/automatic/producer/python_api/producer_api.py +++ b/tests/automatic/producer/python_api/producer_api.py @@ -143,6 +143,11 @@ producer.send_file(1, local_path="./file1", exposed_path="processed/" + data_sou user_meta='{"test_key1":"test_val"}', ingest_mode=asapo_producer.INGEST_MODE_TRANSFER_DATA | asapo_producer.INGEST_MODE_STORE_IN_FILESYSTEM,callback=callback) +producer.send_beamtime_meta('{"data":"bt_meta"}', callback = callback) +producer.send_stream_meta('{"data":"st_meta"}',stream = 'stream', callback = callback) +producer.send_stream_meta('bla',stream = 'stream', callback = callback) + + producer.wait_requests_finished(50000) n = producer.get_requests_queue_size() assert_eq(n, 0, "requests in queue") @@ -174,6 +179,14 @@ producer.send_stream_finished_flag("stream/test $", 2, next_stream = "next_strea producer.wait_requests_finished(10000) +#check meta +bt_meta = producer.get_beamtime_meta() +stream_meta = producer.get_stream_meta(stream = 'stream') +assert_eq(bt_meta['data'], 'bt_meta', "beamtime meta") +assert_eq(stream_meta['data'], 'st_meta', "stream meta") +no_meta = producer.get_stream_meta(stream = 'notexist') +assert_eq(no_meta, None, "no meta") + #stream infos info = producer.stream_info() assert_eq(info['lastId'], 10, "stream_info last id")