diff --git a/common/cpp/include/common/networking.h b/common/cpp/include/common/networking.h index 1f7959723e082384115f0080157a5d16238e1235..c4b4eecedcc6c466ff7173fcd290f5260508e33d 100644 --- a/common/cpp/include/common/networking.h +++ b/common/cpp/include/common/networking.h @@ -23,6 +23,7 @@ enum Opcode : uint8_t { kOpcodeTransferData, kOpcodeTransferSubsetData, kOpcodeStreamInfo, + kOpcodeLastStream, kOpcodeGetBufferData, kOpcodeAuthorize, kOpcodeTransferMetaData, diff --git a/common/cpp/include/database/database.h b/common/cpp/include/database/database.h index f283db72935854bcb66afce86ce6ce379d51e0f1..506e26c52abe3fafe6b32d7f7fc8f428b146aef7 100644 --- a/common/cpp/include/database/database.h +++ b/common/cpp/include/database/database.h @@ -24,6 +24,7 @@ class Database { virtual Error GetById(const std::string& collection, uint64_t id, FileInfo* file) const = 0; virtual Error GetDataSetById(const std::string& collection, uint64_t set_id, uint64_t id, FileInfo* file) const = 0; virtual Error GetStreamInfo(const std::string& collection, StreamInfo* info) const = 0; + virtual Error GetLastStream(StreamInfo* info) const = 0; virtual ~Database() = default; }; diff --git a/common/cpp/include/unittests/MockDatabase.h b/common/cpp/include/unittests/MockDatabase.h index eca63353034e5ae1c8203072416d17d865af67b0..4e6137041e6462ba94bc70b0b264b3fa85fb3a25 100644 --- a/common/cpp/include/unittests/MockDatabase.h +++ b/common/cpp/include/unittests/MockDatabase.h @@ -58,6 +58,12 @@ class MockDatabase : public Database { MOCK_CONST_METHOD2(GetStreamInfo_t, ErrorInterface * (const std::string&, StreamInfo*)); + Error GetLastStream(StreamInfo* info) const override { + return Error{GetLastStream_t(info)}; + } + + MOCK_CONST_METHOD1(GetLastStream_t, ErrorInterface * (StreamInfo*)); + // stuff to test db destructor is called and avoid "uninteresting call" messages MOCK_METHOD0(Die, void()); diff --git a/common/cpp/src/database/mongodb_client.cpp b/common/cpp/src/database/mongodb_client.cpp index 5b78a0bf98a5b0269e10ed019fe36ba36a9187a8..895380f25c58b51dd3e02253174bf824473ddfb5 100644 --- a/common/cpp/src/database/mongodb_client.cpp +++ b/common/cpp/src/database/mongodb_client.cpp @@ -406,4 +406,46 @@ Error MongoDBClient::GetStreamInfo(const std::string& collection, StreamInfo* in return StreamInfoFromDbResponse(last_record_str,earliest_record_str, info); } +Error MongoDBClient::GetLastStream(StreamInfo* info) const { + if (!connected_) { + return DBErrorTemplates::kNotConnected.Generate(); + } + + mongoc_database_t *database; + bson_t* opts; + char **strv; + bson_error_t error; + + database = mongoc_client_get_database(client_, database_name_.c_str()); + opts = BCON_NEW ("nameOnly", BCON_BOOL(true)); + info->timestamp = std::chrono::system_clock::from_time_t(0); + if ((strv = mongoc_database_get_collection_names_with_opts ( + database, opts, &error))) { + for (auto i = 0; strv[i]; i++) + { + std::string stream_name{strv[i]}; + std::string prefix = std::string(kDBDataCollectionNamePrefix)+"_"; + if (stream_name.rfind(prefix,0)==0) { + std::string record_str; + auto err = GetRecordFromDb(stream_name, 0, GetRecordMode::kLast, &record_str); + StreamInfo next_info; + StreamInfoFromDbResponse(record_str,record_str,&next_info); + next_info.name = stream_name.erase(0,prefix.size()); + if (next_info.timestamp > info->timestamp) { + *info = next_info; + } + } + } + bson_strfreev (strv); + } else { + fprintf (stderr, "Command failed: %s\n", error.message); + } + + fprintf (stderr, "stream: %s \n", info->Json(true).c_str()); + + bson_destroy (opts); + mongoc_database_destroy (database); + return asapo::Error(); +} + } diff --git a/common/cpp/src/database/mongodb_client.h b/common/cpp/src/database/mongodb_client.h index 974f0d4e6690565d75ce45a284e0ea180623f3cf..6fd61eefd6c2fdcecc4f42a240c109a45ea3eabe 100644 --- a/common/cpp/src/database/mongodb_client.h +++ b/common/cpp/src/database/mongodb_client.h @@ -50,6 +50,7 @@ class MongoDBClient final : public Database { Error GetById(const std::string& collection, uint64_t id, FileInfo* file) const override; Error GetDataSetById(const std::string& collection, uint64_t set_id, uint64_t id, FileInfo* file) const override; Error GetStreamInfo(const std::string& collection, StreamInfo* info) const override; + Error GetLastStream(StreamInfo* info) const override; ~MongoDBClient() override; private: mongoc_client_t* client_{nullptr}; diff --git a/producer/api/cpp/include/producer/producer.h b/producer/api/cpp/include/producer/producer.h index c6292ea08106c8f68997c6c81f17d8ad4047f4b5..253b2f3ea8e8c9dd9a2ea3799df8980b7c741733 100644 --- a/producer/api/cpp/include/producer/producer.h +++ b/producer/api/cpp/include/producer/producer.h @@ -27,10 +27,19 @@ class Producer { //! Get substream information from receiver /*! \param substream (optional) - substream + \param timeout_sec - operation timeout in seconds \return StreamInfo - a structure with substream information */ - virtual StreamInfo GetStreamInfo(std::string substream, uint64_t timeout_ms, Error* err) const = 0; - virtual StreamInfo GetStreamInfo(uint64_t timeout_ms, Error* err) const = 0; + virtual StreamInfo GetStreamInfo(std::string substream, uint64_t timeout_sec, Error* err) const = 0; + virtual StreamInfo GetStreamInfo(uint64_t timeout_sec, Error* err) const = 0; + + //! Get substream that has the newest ingested data + /*! + \param timeout_ms - operation timeout in seconds + \return StreamInfo - a structure with substream information + */ + virtual StreamInfo GetLastSubstream(uint64_t timeout_sec, Error* err) const = 0; + //! Sends data to the receiver /*! diff --git a/producer/api/cpp/src/producer_impl.cpp b/producer/api/cpp/src/producer_impl.cpp index 27f6ab00eaa52c1ce424ce0a4a21885f4f26b5f0..98adb7e921f15ae17d83547a64c4207f13f96914 100644 --- a/producer/api/cpp/src/producer_impl.cpp +++ b/producer/api/cpp/src/producer_impl.cpp @@ -288,7 +288,7 @@ void ActivatePromise(std::shared_ptr<std::promise<StreamInfoResult>> promise, Re } catch(...) {} } -StreamInfo GetInfroFromCallback(std::future<StreamInfoResult>* promiseResult, uint64_t timeout_sec, Error* err) { +StreamInfo GetInfoFromCallback(std::future<StreamInfoResult>* promiseResult, uint64_t timeout_sec, Error* err) { try { auto status = promiseResult->wait_for(std::chrono::milliseconds(timeout_sec * 1000)); if (status == std::future_status::ready) { @@ -306,27 +306,43 @@ StreamInfo GetInfroFromCallback(std::future<StreamInfoResult>* promiseResult, ui return StreamInfo{}; } -StreamInfo ProducerImpl::GetStreamInfo(std::string substream, uint64_t timeout_sec, Error* err) const { - GenericRequestHeader request_header{kOpcodeStreamInfo, 0, 0, 0, "", substream}; + +GenericRequestHeader CreateRequestHeaderFromOp(StreamRequestOp op,std::string substream) { + switch (op) { + case StreamRequestOp::kStreamInfo: + return GenericRequestHeader{kOpcodeStreamInfo, 0, 0, 0, "", substream}; + case StreamRequestOp::kLastStream: + return GenericRequestHeader{kOpcodeLastStream, 0, 0, 0, "", ""}; + } +} + +StreamInfo ProducerImpl::StreamRequest(StreamRequestOp op,std::string substream, uint64_t timeout_sec, Error* err) const { + auto header = CreateRequestHeaderFromOp(op,substream); std::unique_ptr<std::promise<StreamInfoResult>> promise {new std::promise<StreamInfoResult>}; std::future<StreamInfoResult> promiseResult = promise->get_future(); - *err = request_pool__->AddRequest(std::unique_ptr<ProducerRequest> {new ProducerRequest{source_cred_string_, std::move(request_header), - nullptr, "", "", - unwrap_callback(ActivatePromise, std::move(promise)), true, - timeout_sec * 1000} + *err = request_pool__->AddRequest(std::unique_ptr<ProducerRequest> {new ProducerRequest{source_cred_string_, std::move(header), + nullptr, "", "", + unwrap_callback(ActivatePromise, std::move(promise)), true, + timeout_sec * 1000} }, true); if (*err) { return StreamInfo{}; } + return GetInfoFromCallback(&promiseResult, timeout_sec + 2, + err); // we give two more sec for request to exit by timeout +} - return GetInfroFromCallback(&promiseResult, timeout_sec + 2, - err); // we give two more sec for request to exit by timeout - +StreamInfo ProducerImpl::GetStreamInfo(std::string substream, uint64_t timeout_sec, Error* err) const { + return StreamRequest(StreamRequestOp::kStreamInfo,substream,timeout_sec,err); } StreamInfo ProducerImpl::GetStreamInfo(uint64_t timeout_sec, Error* err) const { return GetStreamInfo(kDefaultSubstream, timeout_sec, err); } +StreamInfo ProducerImpl::GetLastSubstream(uint64_t timeout_sec, Error* err) const { + return StreamRequest(StreamRequestOp::kLastStream,"",timeout_sec,err); +} + } \ No newline at end of file diff --git a/producer/api/cpp/src/producer_impl.h b/producer/api/cpp/src/producer_impl.h index c5f0583f2a80dd06256f4d6b0b98c6742f796a65..676349c7268765e8719d4c6925bc8dcec8a11a45 100644 --- a/producer/api/cpp/src/producer_impl.h +++ b/producer/api/cpp/src/producer_impl.h @@ -12,6 +12,11 @@ namespace asapo { +enum class StreamRequestOp { + kStreamInfo, + kLastStream +}; + class ProducerImpl : public Producer { private: // important to create it before request_pool__ @@ -29,6 +34,7 @@ class ProducerImpl : public Producer { StreamInfo GetStreamInfo(std::string substream, uint64_t timeout_sec, Error* err) const override ; StreamInfo GetStreamInfo(uint64_t timeout_sec, Error* err) const override; + StreamInfo GetLastSubstream(uint64_t timeout_sec, Error* err) const override; void SetLogLevel(LogLevel level) override; void EnableLocalLog(bool enable) override; @@ -60,6 +66,7 @@ class ProducerImpl : public Producer { Error WaitRequestsFinished(uint64_t timeout_ms) override; private: + StreamInfo StreamRequest(StreamRequestOp op, std::string substream, uint64_t timeout_sec, Error* err) const; Error Send(const EventHeader& event_header, std::string substream, FileData data, std::string full_path, uint64_t ingest_mode, RequestCallback callback, bool manage_data_memory); diff --git a/producer/api/cpp/unittests/test_producer_impl.cpp b/producer/api/cpp/unittests/test_producer_impl.cpp index 0baff125416d89528e490eeb30fb39a7db053c91..248d4cda47c6432fcc87d90d19306dcaccc9eab7 100644 --- a/producer/api/cpp/unittests/test_producer_impl.cpp +++ b/producer/api/cpp/unittests/test_producer_impl.cpp @@ -466,7 +466,7 @@ TEST_F(ProducerImplTests, GetStreamInfoMakesCorerctRequest) { } -TEST(GetStreamInfoTest, GetStreamInfo) { +TEST(GetStreamInfoTest, GetStreamInfoTimeout) { asapo::ProducerImpl producer1{"", 1, 10, asapo::RequestHandlerType::kTcp}; asapo::Error err; auto sinfo = producer1.GetStreamInfo(5, &err); @@ -475,4 +475,17 @@ TEST(GetStreamInfoTest, GetStreamInfo) { ASSERT_THAT(err->Explain(), HasSubstr("opcode: 4")); } +TEST_F(ProducerImplTests, GetLastStreamMakesCorerctRequest) { + producer.SetCredentials(expected_credentials); + EXPECT_CALL(mock_pull, AddRequest_t(M_CheckGetSubstreamInfoRequest(asapo::kOpcodeLastStream, + expected_credentials_str, + ""), true)).WillOnce( + Return(nullptr)); + + asapo::Error err; + producer.GetLastSubstream(1, &err); + ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kTimeout)); +} + + } diff --git a/receiver/CMakeLists.txt b/receiver/CMakeLists.txt index 9e33088349d0df74ace1396226b7b21ac971931c..6cde4bbaaf5c8307f43086785ff665e552ebc261 100644 --- a/receiver/CMakeLists.txt +++ b/receiver/CMakeLists.txt @@ -18,6 +18,7 @@ set(RECEIVER_CORE_FILES src/request_handler/request_handler_authorize.cpp src/request_handler/request_handler_db_meta_write.cpp src/request_handler/request_handler_db_stream_info.cpp + src/request_handler/request_handler_db_last_stream.cpp src/request_handler/request_handler_receive_metadata.cpp src/request_handler/request_handler_db_check_request.cpp src/request_handler/request_factory.cpp @@ -91,6 +92,7 @@ set(TEST_SOURCE_FILES unittests/request_handler/test_request_handler_db_check_request.cpp unittests/request_handler/test_request_handler_db_meta_writer.cpp unittests/request_handler/test_request_handler_db_stream_info.cpp + unittests/request_handler/test_request_handler_db_last_stream.cpp unittests/request_handler/test_request_handler_db.cpp unittests/request_handler/test_request_handler_authorizer.cpp unittests/request_handler/test_request_handler_receive_data.cpp diff --git a/receiver/src/request_handler/request_factory.cpp b/receiver/src/request_handler/request_factory.cpp index 1d51fe84b4688b3e135b39ec14a5193fa4c11889..7e1d3a4c5f61dc61d18a41bf6ed68e11b361847b 100644 --- a/receiver/src/request_handler/request_factory.cpp +++ b/receiver/src/request_handler/request_factory.cpp @@ -4,21 +4,21 @@ namespace asapo { -bool NeedFileWriteHandler (const GenericRequestHeader& request_header) { +bool NeedFileWriteHandler(const GenericRequestHeader &request_header) { return GetReceiverConfig()->write_to_disk && - (request_header.custom_data[kPosIngestMode] & IngestModeFlags::kStoreInFilesystem); + (request_header.custom_data[kPosIngestMode] & IngestModeFlags::kStoreInFilesystem); } -bool NeedDbHandler (const GenericRequestHeader& request_header) { +bool NeedDbHandler(const GenericRequestHeader &request_header) { return GetReceiverConfig()->write_to_db; } -bool RequestFactory::ReceiveDirectToFile(const GenericRequestHeader& request_header) const { +bool RequestFactory::ReceiveDirectToFile(const GenericRequestHeader &request_header) const { return request_header.data_size > GetReceiverConfig()->receive_to_disk_threshold_mb * 1024 * 1024; } -Error RequestFactory::AddReceiveWriteHandlers(std::unique_ptr<Request>& request, - const GenericRequestHeader& request_header) const { +Error RequestFactory::AddReceiveWriteHandlers(std::unique_ptr<Request> &request, + const GenericRequestHeader &request_header) const { if (ReceiveDirectToFile(request_header)) { return AddReceiveDirectToFileHandler(request, request_header); } else { @@ -27,73 +27,78 @@ Error RequestFactory::AddReceiveWriteHandlers(std::unique_ptr<Request>& request, } } -void RequestFactory::AddReceiveViaBufferHandlers(std::unique_ptr<Request>& request, - const GenericRequestHeader& request_header) const { +void RequestFactory::AddReceiveViaBufferHandlers(std::unique_ptr<Request> &request, + const GenericRequestHeader &request_header) const { request->AddHandler(&request_handler_receivedata_); if (NeedFileWriteHandler(request_header)) { request->AddHandler(&request_handler_filewrite_); } } -Error RequestFactory::AddReceiveDirectToFileHandler(std::unique_ptr<Request>& request, - const GenericRequestHeader& request_header) const { +Error RequestFactory::AddReceiveDirectToFileHandler(std::unique_ptr<Request> &request, + const GenericRequestHeader &request_header) const { if (!GetReceiverConfig()->write_to_disk) { return ReceiverErrorTemplates::kInternalServerError.Generate("reciever does not support writing to disk"); } - if (! (request_header.custom_data[kPosIngestMode] & kStoreInFilesystem)) { - return ReceiverErrorTemplates::kBadRequest.Generate("ingest mode should include kStoreInFilesystem for large files "); + if (!(request_header.custom_data[kPosIngestMode] & kStoreInFilesystem)) { + return ReceiverErrorTemplates::kBadRequest.Generate( + "ingest mode should include kStoreInFilesystem for large files "); } request->AddHandler(&request_handler_filereceive_); return nullptr; } -Error RequestFactory::AddHandlersToRequest(std::unique_ptr<Request>& request, - const GenericRequestHeader& request_header) const { +Error RequestFactory::AddHandlersToRequest(std::unique_ptr<Request> &request, + const GenericRequestHeader &request_header) const { request->AddHandler(&request_handler_authorize_); switch (request_header.op_code) { - case Opcode::kOpcodeTransferData: - case Opcode::kOpcodeTransferSubsetData: { - request->AddHandler(&request_handler_receive_metadata_); - auto err = AddReceiveWriteHandlers(request, request_header); - if (err) { - return err; + case Opcode::kOpcodeTransferData: + case Opcode::kOpcodeTransferSubsetData: { + request->AddHandler(&request_handler_receive_metadata_); + auto err = AddReceiveWriteHandlers(request, request_header); + if (err) { + return err; + } + if (NeedDbHandler(request_header)) { + request->AddHandler(&request_handler_dbwrite_); + } + break; } - if (NeedDbHandler(request_header)) { - request->AddHandler(&request_handler_dbwrite_); + case Opcode::kOpcodeTransferMetaData: { + if (NeedDbHandler(request_header)) { + request->AddHandler(&request_handler_receivedata_); + request->AddHandler(&request_handler_db_meta_write_); + } else { + return ReceiverErrorTemplates::kInternalServerError.Generate( + "reciever does not support writing to database"); + } + break; } - break; - } - case Opcode::kOpcodeTransferMetaData: { - if (NeedDbHandler(request_header)) { - request->AddHandler(&request_handler_receivedata_); - request->AddHandler(&request_handler_db_meta_write_); - } else { - return ReceiverErrorTemplates::kInternalServerError.Generate("reciever does not support writing to database"); + case Opcode::kOpcodeAuthorize: { + // do nothing + break; } - break; - } - case Opcode::kOpcodeAuthorize: { - // do nothing - break; - } - case Opcode::kOpcodeStreamInfo: { - request->AddHandler(&request_handler_db_stream_info_); - break; - } - default: - return ReceiverErrorTemplates::kInvalidOpCode.Generate(); + case Opcode::kOpcodeStreamInfo: { + request->AddHandler(&request_handler_db_stream_info_); + break; + } + case Opcode::kOpcodeLastStream: { + request->AddHandler(&request_handler_db_last_stream_); + break; + } + default:return ReceiverErrorTemplates::kInvalidOpCode.Generate(); } return nullptr; } -std::unique_ptr<Request> RequestFactory::GenerateRequest(const GenericRequestHeader& - request_header, SocketDescriptor socket_fd, std::string origin_uri, - Error* err) const noexcept { - auto request = std::unique_ptr<Request> {new Request{request_header, socket_fd, std::move(origin_uri), cache_.get(), - &request_handler_db_check_} +std::unique_ptr<Request> RequestFactory::GenerateRequest(const GenericRequestHeader & +request_header, SocketDescriptor socket_fd, std::string origin_uri, + Error* err) const noexcept { + auto request = std::unique_ptr<Request>{new Request{request_header, socket_fd, std::move(origin_uri), cache_.get(), + &request_handler_db_check_} }; *err = AddHandlersToRequest(request, request_header); if (*err) { @@ -102,10 +107,8 @@ std::unique_ptr<Request> RequestFactory::GenerateRequest(const GenericRequestHea return request; } - -RequestFactory::RequestFactory(SharedCache cache): cache_{cache} { +RequestFactory::RequestFactory(SharedCache cache) : cache_{cache} { } - } diff --git a/receiver/src/request_handler/request_factory.h b/receiver/src/request_handler/request_factory.h index 0ac1a3af274ffc2d34ad371a1c20d5fc7fabb332..fa6d1a53aa3b67576c1e254dd62d34634b923e76 100644 --- a/receiver/src/request_handler/request_factory.h +++ b/receiver/src/request_handler/request_factory.h @@ -5,6 +5,7 @@ #include "../file_processors/write_file_processor.h" #include "../file_processors/receive_file_processor.h" #include "request_handler_db_stream_info.h" +#include "request_handler_db_last_stream.h" namespace asapo { @@ -24,6 +25,7 @@ class RequestFactory { RequestHandlerReceiveMetaData request_handler_receive_metadata_; RequestHandlerDbWrite request_handler_dbwrite_{kDBDataCollectionNamePrefix}; RequestHandlerDbStreamInfo request_handler_db_stream_info_{kDBDataCollectionNamePrefix}; + RequestHandlerDbLastStream request_handler_db_last_stream_{kDBDataCollectionNamePrefix}; RequestHandlerDbMetaWrite request_handler_db_meta_write_{kDBMetaCollectionName}; RequestHandlerAuthorize request_handler_authorize_; RequestHandlerDbCheckRequest request_handler_db_check_{kDBDataCollectionNamePrefix};; diff --git a/receiver/src/request_handler/request_handler_db_last_stream.cpp b/receiver/src/request_handler/request_handler_db_last_stream.cpp new file mode 100644 index 0000000000000000000000000000000000000000..ed5978b6e7f62a7241369b356824813795453ece --- /dev/null +++ b/receiver/src/request_handler/request_handler_db_last_stream.cpp @@ -0,0 +1,28 @@ +#include "request_handler_db_last_stream.h" +#include "../receiver_config.h" + + +namespace asapo { + +RequestHandlerDbLastStream::RequestHandlerDbLastStream(std::string collection_name_prefix) + : RequestHandlerDb(std::move(collection_name_prefix)) { + +} + + +Error RequestHandlerDbLastStream::ProcessRequest(Request* request) const { + if (auto err = RequestHandlerDb::ProcessRequest(request) ) { + return err; + } + + StreamInfo info; + auto err = db_client__->GetLastStream(&info); + if (!err) { + log__->Debug(std::string{"get last stream "} + " in " + + db_name_ + " at " + GetReceiverConfig()->database_uri); + request->SetResponseMessage(info.Json(true), ResponseMessageType::kInfo); + } + return err; +} + +} \ No newline at end of file diff --git a/receiver/src/request_handler/request_handler_db_last_stream.h b/receiver/src/request_handler/request_handler_db_last_stream.h new file mode 100644 index 0000000000000000000000000000000000000000..7673ed58f8419712d7777e4ab226ca556cafc29b --- /dev/null +++ b/receiver/src/request_handler/request_handler_db_last_stream.h @@ -0,0 +1,17 @@ +#ifndef ASAPO_REQUEST_HANDLER_DB_LAST_STREAM_H +#define ASAPO_REQUEST_HANDLER_DB_LAST_STREAM_H + +#include "request_handler_db.h" +#include "../request.h" + +namespace asapo { + +class RequestHandlerDbLastStream final : public RequestHandlerDb { + public: + RequestHandlerDbLastStream(std::string collection_name_prefix); + Error ProcessRequest(Request* request) const override; +}; + +} + +#endif //ASAPO_REQUEST_HANDLER_DB_LAST_STREAM_H diff --git a/receiver/unittests/request_handler/test_request_factory.cpp b/receiver/unittests/request_handler/test_request_factory.cpp index 49a76ae672dc3b6e0ea83ea257a9acdf40916bb4..e1cb5b8aef43cf015c5448f8351cc92d894f58e9 100644 --- a/receiver/unittests/request_handler/test_request_factory.cpp +++ b/receiver/unittests/request_handler/test_request_factory.cpp @@ -14,6 +14,7 @@ #include "../../src/request_handler/request_handler_db_write.h" #include "../../src/request_handler/request_handler_authorize.h" #include "../../src/request_handler/request_handler_db_stream_info.h" +#include "../../src/request_handler/request_handler_db_last_stream.h" #include "../../src/request_handler/request_handler_receive_data.h" #include "../../src/request_handler/request_handler_receive_metadata.h" @@ -232,5 +233,13 @@ TEST_F(FactoryTests, StreamInfoRequest) { ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerDbStreamInfo*>(request->GetListHandlers()[1]), Ne(nullptr)); } +TEST_F(FactoryTests, LastStreamRequest) { + generic_request_header.op_code = asapo::Opcode::kOpcodeLastStream; + auto request = factory.GenerateRequest(generic_request_header, 1, origin_uri, &err); + ASSERT_THAT(err, Eq(nullptr)); + ASSERT_THAT(request->GetListHandlers().size(), Eq(2)); + ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerAuthorize*>(request->GetListHandlers()[0]), Ne(nullptr)); + ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerDbLastStream*>(request->GetListHandlers()[1]), Ne(nullptr)); +} } diff --git a/receiver/unittests/request_handler/test_request_handler_db_last_stream.cpp b/receiver/unittests/request_handler/test_request_handler_db_last_stream.cpp new file mode 100644 index 0000000000000000000000000000000000000000..077c66c16429c0b8a2da1952bc64d3f4071d0c38 --- /dev/null +++ b/receiver/unittests/request_handler/test_request_handler_db_last_stream.cpp @@ -0,0 +1,115 @@ +#include <gtest/gtest.h> +#include <gmock/gmock.h> + +#include "unittests/MockIO.h" +#include "unittests/MockDatabase.h" +#include "unittests/MockLogger.h" + +#include "../../src/receiver_error.h" +#include "../../src/request.h" +#include "../../src/request_handler/request_factory.h" +#include "../../src/request_handler/request_handler.h" +#include "../../src/request_handler/request_handler_db_last_stream.h" +#include "../../../common/cpp/src/database/mongodb_client.h" + +#include "../mock_receiver_config.h" +#include "common/data_structs.h" +#include "common/networking.h" +#include "../receiver_mocking.h" + +using asapo::MockRequest; +using asapo::FileInfo; +using ::testing::Test; +using ::testing::Return; +using ::testing::ReturnRef; +using ::testing::_; +using ::testing::DoAll; +using ::testing::SetArgReferee; +using ::testing::Gt; +using ::testing::Eq; +using ::testing::Ne; +using ::testing::Mock; +using ::testing::NiceMock; +using ::testing::InSequence; +using ::testing::SetArgPointee; +using ::testing::AllOf; +using ::testing::HasSubstr; + + +using ::asapo::Error; +using ::asapo::ErrorInterface; +using ::asapo::FileDescriptor; +using ::asapo::SocketDescriptor; +using ::asapo::MockIO; +using asapo::Request; +using asapo::RequestHandlerDbLastStream; +using ::asapo::GenericRequestHeader; + +using asapo::MockDatabase; +using asapo::RequestFactory; +using asapo::SetReceiverConfig; +using asapo::ReceiverConfig; + + +namespace { + +class DbMetaLastStreamTests : public Test { + public: + std::string expectedlaststream = "substream"; + RequestHandlerDbLastStream handler{asapo::kDBDataCollectionNamePrefix}; + std::unique_ptr<NiceMock<MockRequest>> mock_request; + NiceMock<MockDatabase> mock_db; + NiceMock<asapo::MockLogger> mock_logger; + ReceiverConfig config; + std::string expected_beamtime_id = "beamtime_id"; + std::string expected_stream = "stream"; + std::string info_str = R"({"lastId":10,"name":"substream","timestamp":1000000})"; + asapo::StreamInfo expected_stream_info; + void SetUp() override { + GenericRequestHeader request_header; + expected_stream_info.last_id = 10; + expected_stream_info.name = expectedlaststream; + expected_stream_info.timestamp = std::chrono::time_point<std::chrono::system_clock>(std::chrono::milliseconds(1)); + request_header.data_id = 0; + handler.db_client__ = std::unique_ptr<asapo::Database> {&mock_db}; + handler.log__ = &mock_logger; + mock_request.reset(new NiceMock<MockRequest> {request_header, 1, "", nullptr}); + ON_CALL(*mock_request, GetBeamtimeId()).WillByDefault(ReturnRef(expected_beamtime_id)); + } + void TearDown() override { + handler.db_client__.release(); + } +}; + +TEST_F(DbMetaLastStreamTests, CallsUpdate) { + SetReceiverConfig(config, "none"); + + EXPECT_CALL(*mock_request, GetBeamtimeId()) + .WillOnce(ReturnRef(expected_beamtime_id)) + ; + + EXPECT_CALL(*mock_request, GetStream()).WillOnce(ReturnRef(expected_stream)); + + EXPECT_CALL(mock_db, Connect_t(config.database_uri, expected_beamtime_id + "_" + expected_stream)). + WillOnce(testing::Return(nullptr)); + + + EXPECT_CALL(mock_db, GetLastStream_t(_)). + WillOnce(DoAll( + SetArgPointee<0>(expected_stream_info), + testing::Return(nullptr) + )); + + EXPECT_CALL(*mock_request, SetResponseMessage(info_str, asapo::ResponseMessageType::kInfo)); + + EXPECT_CALL(mock_logger, Debug(AllOf(HasSubstr("get last stream"), + HasSubstr(config.database_uri), + HasSubstr(expected_beamtime_id) + ) + ) + ); + + handler.ProcessRequest(mock_request.get()); +} + +} diff --git a/tests/automatic/mongo_db/insert_retrieve/insert_retrieve_mongodb.cpp b/tests/automatic/mongo_db/insert_retrieve/insert_retrieve_mongodb.cpp index f36e4de35d3302b77b34a64d9a409db58e093533..bcbca88cb4d1aa9455799bf73ca7379bf002de72 100644 --- a/tests/automatic/mongo_db/insert_retrieve/insert_retrieve_mongodb.cpp +++ b/tests/automatic/mongo_db/insert_retrieve/insert_retrieve_mongodb.cpp @@ -1,5 +1,6 @@ #include <iostream> #include <chrono> +#include <thread> #include "../../../common/cpp/src/database/mongodb_client.h" #include "testing.h" @@ -46,29 +47,41 @@ int main(int argc, char* argv[]) { db.Connect("127.0.0.1", "data"); } - auto err = db.Insert("test", fi, false); + auto err = db.Insert("data_test", fi, false); if (args.keyword == "DuplicateID") { Assert(err, "OK"); - err = db.Insert("test", fi, false); + err = db.Insert("data_test", fi, false); } + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + auto fi1 = fi; + fi1.id = 123; + fi1.timestamp = std::chrono::system_clock::now(); + db.Insert("data_test1", fi, false); + db.Insert("data_test1", fi1, false); + Assert(err, args.keyword); if (args.keyword == "OK") { // check retrieve asapo::FileInfo fi_db; asapo::MongoDBClient db_new; db_new.Connect("127.0.0.1", "data"); - err = db_new.GetById("test", fi.id, &fi_db); + err = db_new.GetById("data_test", fi.id, &fi_db); M_AssertTrue(fi_db == fi, "get record from db"); M_AssertEq(nullptr, err); - err = db_new.GetById("test", 0, &fi_db); + err = db_new.GetById("data_test", 0, &fi_db); Assert(err, "No record"); asapo::StreamInfo info; - err = db.GetStreamInfo("test", &info); + err = db.GetStreamInfo("data_test", &info); M_AssertEq(nullptr, err); M_AssertEq(fi.id, info.last_id); + + err = db.GetLastStream(&info); + M_AssertEq(nullptr, err); + M_AssertEq(fi1.id, info.last_id); + M_AssertEq("test1",info.name); } return 0;