diff --git a/CHANGELOG.md b/CHANGELOG.md index 63ce33ba3a76efbe11c99b6a89a21ef50aa505f4..d1ec067540715c6af03bff6c007ccf3055327864 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ ## 20.06.0 (unreleased) FEATURES * implemented acknowledeges - one can acknowledge a data tuple, get last acknowledged tuple id, get list of unacknowledged tuple ids +* implement getting substream info (contains last id) by producer client (not need to have consumer client) IMPROVEMENTS * change behavior when trying to get data from a substream that does not exist - return EndOfStream instead of WrongInput diff --git a/common/cpp/include/database/database.h b/common/cpp/include/database/database.h index 32fa67929c24156e34614c5da8ef8b7c80427d30..f283db72935854bcb66afce86ce6ce379d51e0f1 100644 --- a/common/cpp/include/database/database.h +++ b/common/cpp/include/database/database.h @@ -23,7 +23,7 @@ class Database { virtual Error GetById(const std::string& collection, uint64_t id, FileInfo* file) const = 0; virtual Error GetDataSetById(const std::string& collection, uint64_t set_id, uint64_t id, FileInfo* file) const = 0; - + virtual Error GetStreamInfo(const std::string& collection, StreamInfo* info) const = 0; virtual ~Database() = default; }; diff --git a/common/cpp/include/unittests/MockDatabase.h b/common/cpp/include/unittests/MockDatabase.h index b1ac72561ec8b715297f80c1ca4be39474068692..a06d0446a553e5b112f71fe90517cc0116110e43 100644 --- a/common/cpp/include/unittests/MockDatabase.h +++ b/common/cpp/include/unittests/MockDatabase.h @@ -51,7 +51,15 @@ class MockDatabase : public Database { MOCK_CONST_METHOD4(GetSetById_t, ErrorInterface * (const std::string&, uint64_t set_id, uint64_t id, FileInfo*)); - // stuff to test db destructor is called and avoid "uninteresting call" messages + + Error GetStreamInfo(const std::string& collection, StreamInfo* info) const override { + return Error{GetStreamInfo_t(collection, info)}; + } + + MOCK_CONST_METHOD2(GetStreamInfo_t, ErrorInterface * (const std::string&, StreamInfo*)); + + + // stuff to test db destructor is called and avoid "uninteresting call" messages MOCK_METHOD0(Die, void()); virtual ~MockDatabase() override { if (check_destructor) diff --git a/common/cpp/src/database/mongodb_client.cpp b/common/cpp/src/database/mongodb_client.cpp index 0330c14df2bfdd893b8adac710bde98d6ece407f..7399f31ee154fe6c4e01ec96101a5799704faae7 100644 --- a/common/cpp/src/database/mongodb_client.cpp +++ b/common/cpp/src/database/mongodb_client.cpp @@ -1,3 +1,4 @@ +#include <json_parser/json_parser.h> #include "mongodb_client.h" #include "mongodb_client.h" #include "database/db_error.h" @@ -271,7 +272,7 @@ Error MongoDBClient::InsertAsSubset(const std::string& collection, const FileInf return err; } -Error MongoDBClient::GetRecordFromDb(const std::string& collection, uint64_t id, std::string* res) const { +Error MongoDBClient::GetRecordFromDb(const std::string& collection, uint64_t id, bool ignore_id_return_last, std::string* res) const { if (!connected_) { return DBErrorTemplates::kNotConnected.Generate(); } @@ -287,8 +288,14 @@ Error MongoDBClient::GetRecordFromDb(const std::string& collection, uint64_t id, const bson_t* doc; char* str; - filter = BCON_NEW ("_id", BCON_INT64 (id)); - opts = BCON_NEW ("limit", BCON_INT64 (1)); + if (!ignore_id_return_last) { + filter = BCON_NEW ("_id", BCON_INT64 (id)); + opts = BCON_NEW ("limit", BCON_INT64 (1)); + + } else { + filter = BCON_NEW (NULL); + opts = BCON_NEW ("limit", BCON_INT64 (1), "sort", "{", "_id", BCON_INT64 (-1), "}"); + } cursor = mongoc_collection_find_with_opts (current_collection_, filter, opts, NULL); @@ -318,7 +325,7 @@ Error MongoDBClient::GetRecordFromDb(const std::string& collection, uint64_t id, Error MongoDBClient::GetById(const std::string& collection, uint64_t id, FileInfo* file) const { std::string record_str; - auto err = GetRecordFromDb(collection, id, &record_str); + auto err = GetRecordFromDb(collection, id, false, &record_str); if (err) { return err; } @@ -331,7 +338,7 @@ Error MongoDBClient::GetById(const std::string& collection, uint64_t id, FileInf Error MongoDBClient::GetDataSetById(const std::string& collection, uint64_t set_id, uint64_t id, FileInfo* file) const { std::string record_str; - auto err = GetRecordFromDb(collection, set_id, &record_str); + auto err = GetRecordFromDb(collection, set_id, false, &record_str); if (err) { return err; } @@ -352,4 +359,27 @@ Error MongoDBClient::GetDataSetById(const std::string& collection, uint64_t set_ } +Error StreamInfoFromDbResponse(std::string record_str,StreamInfo* info) { + auto parser = JsonStringParser(std::move(record_str)); + Error parse_err = parser.GetUInt64("_id", &(info->last_id)); + if (parse_err) { + info->last_id = 0; + return DBErrorTemplates::kJsonParseError.Generate("cannot parse mongodb response: " + parse_err->Explain()); + } + return nullptr; +} + +Error MongoDBClient::GetStreamInfo(const std::string &collection, StreamInfo* info) const { + std::string record_str; + auto err = GetRecordFromDb(collection, 0,true, &record_str); + if (err) { + info->last_id = 0; + if (err == DBErrorTemplates::kNoRecord) { + return nullptr; + } + return err; + } + return StreamInfoFromDbResponse(std::move(record_str),info); +} + } diff --git a/common/cpp/src/database/mongodb_client.h b/common/cpp/src/database/mongodb_client.h index 75303a459761908cb8e76e55075c12322919c5bd..090a932eaa05b96ab1e9c99db1b9d4145396a8a3 100644 --- a/common/cpp/src/database/mongodb_client.h +++ b/common/cpp/src/database/mongodb_client.h @@ -43,6 +43,7 @@ class MongoDBClient final : public Database { Error Upsert(const std::string& collection, uint64_t id, const uint8_t* data, uint64_t size) const override; Error GetById(const std::string& collection, uint64_t id, FileInfo* file) const override; Error GetDataSetById(const std::string& collection, uint64_t set_id, uint64_t id, FileInfo* file) const override; + Error GetStreamInfo(const std::string& collection, StreamInfo* info) const override; ~MongoDBClient() override; private: mongoc_client_t* client_{nullptr}; @@ -60,8 +61,7 @@ class MongoDBClient final : public Database { Error InsertBsonDocument(const bson_p& document, bool ignore_duplicates) const; Error UpdateBsonDocument(uint64_t id, const bson_p& document, bool upsert) const; Error AddBsonDocumentToArray(bson_t* query, bson_t* update, bool ignore_duplicates) const; - Error GetRecordFromDb(const std::string& collection, uint64_t id, std::string* res) const; - + Error GetRecordFromDb(const std::string& collection, uint64_t id, bool ignore_id_return_last, std::string* res) const; }; } diff --git a/examples/pipeline/in_to_out/in_to_out.cpp b/examples/pipeline/in_to_out/in_to_out.cpp index 3721cfd5d790a1afb08559bb43b3db80fa1e2a31..e9e794cb660b8ed78b6ca5b6cc5f8b8bbe188ce8 100644 --- a/examples/pipeline/in_to_out/in_to_out.cpp +++ b/examples/pipeline/in_to_out/in_to_out.cpp @@ -38,7 +38,7 @@ struct Args { bool transfer_data; }; -void ProcessAfterSend(asapo::GenericRequestHeader header, asapo::Error err) { +void ProcessAfterSend(asapo::RequestCallbackPayload payload, asapo::Error err) { if (err && err != asapo::ProducerErrorTemplates::kServerWarning) { std::cerr << "Data was not successfully send: " << err << std::endl; return; diff --git a/examples/producer/dummy-data-producer/dummy_data_producer.cpp b/examples/producer/dummy-data-producer/dummy_data_producer.cpp index 97be3a35fe8f3e18801dad8c45ac8c7547edfab1..25f6e18851512689427c7fe321e9ce3db3d93500 100644 --- a/examples/producer/dummy-data-producer/dummy_data_producer.cpp +++ b/examples/producer/dummy-data-producer/dummy_data_producer.cpp @@ -98,7 +98,7 @@ void ProcessCommandArguments(int argc, char* argv[], Args* args) { } } -void ProcessAfterSend(asapo::GenericRequestHeader header, asapo::Error err) { +void ProcessAfterSend(asapo::RequestCallbackPayload payload, asapo::Error err) { mutex.lock(); iterations_remained--; if (err) { @@ -109,7 +109,7 @@ void ProcessAfterSend(asapo::GenericRequestHeader header, asapo::Error err) { mutex.unlock(); } -void ProcessAfterMetaDataSend(asapo::GenericRequestHeader header, asapo::Error err) { +void ProcessAfterMetaDataSend(asapo::RequestCallbackPayload payload, asapo::Error err) { mutex.lock(); iterations_remained--; if (err) { diff --git a/producer/api/cpp/include/producer/common.h b/producer/api/cpp/include/producer/common.h index ba32d692575ca65598aa19ba2ab8c971c45820e4..289f5099922f0eb2bedb905d1454be40ebb3a579 100644 --- a/producer/api/cpp/include/producer/common.h +++ b/producer/api/cpp/include/producer/common.h @@ -11,7 +11,13 @@ namespace asapo { const uint8_t kMaxProcessingThreads = 32; -using RequestCallback = std::function<void(GenericRequestHeader, Error)>; + +struct RequestCallbackPayload { + GenericRequestHeader original_header; + std::string response; +}; + +using RequestCallback = std::function<void(RequestCallbackPayload, Error)>; enum class RequestHandlerType { kTcp, diff --git a/producer/api/cpp/src/producer_impl.cpp b/producer/api/cpp/src/producer_impl.cpp index 0d1bfcddb0e0c98c7b02be427f249a92f87b0dbf..50760be369e229de6cd2cc5f9fa223efe4be270e 100644 --- a/producer/api/cpp/src/producer_impl.cpp +++ b/producer/api/cpp/src/producer_impl.cpp @@ -261,22 +261,22 @@ Error ProducerImpl::SendFile(const EventHeader& event_header, } -using RequestCallbackWithPromise = void (*)(std::shared_ptr<std::promise<StreamInfoResult>>, GenericRequestHeader header, Error err); +using RequestCallbackWithPromise = void (*)(std::shared_ptr<std::promise<StreamInfoResult>>, RequestCallbackPayload header, Error err); RequestCallback unwrap_callback(RequestCallbackWithPromise callback, std::unique_ptr<std::promise<StreamInfoResult>> promise) { auto shared_promise = std::shared_ptr<std::promise<StreamInfoResult>>(std::move(promise)); - RequestCallback wrapper = [ = ](GenericRequestHeader header, Error err) -> void { - callback(shared_promise, std::move(header), std::move(err)); + RequestCallback wrapper = [ = ](RequestCallbackPayload payload, Error err) -> void { + callback(shared_promise, std::move(payload), std::move(err)); }; return wrapper; } -void ActivatePromise(std::shared_ptr<std::promise<StreamInfoResult>> promise, GenericRequestHeader header, Error err) { +void ActivatePromise(std::shared_ptr<std::promise<StreamInfoResult>> promise, RequestCallbackPayload payload, Error err) { StreamInfoResult res; if (err == nullptr) { - auto ok = res.sinfo.SetFromJson(header.message); + auto ok = res.sinfo.SetFromJson(payload.response); res.err=ok?nullptr:ProducerErrorTemplates::kInternalServerError.Generate( - std::string("cannot read JSON string from ")+header.message).release(); + std::string("cannot read JSON string from server response: ")+payload.response).release(); } else { res.err=err.release(); } @@ -318,7 +318,7 @@ StreamInfo ProducerImpl::GetStreamInfo(std::string substream, uint64_t timeout_s return StreamInfo{}; } - return GetInfroFromCallback(&promiseResult,timeout_sec+2,err); // we gove two more sec for request to exit by timeout + return GetInfroFromCallback(&promiseResult,timeout_sec+2,err); // we give two more sec for request to exit by timeout } diff --git a/producer/api/cpp/src/request_handler_filesystem.cpp b/producer/api/cpp/src/request_handler_filesystem.cpp index 5e1437d25082d95613ff5251ca17f05e93fec78d..1e77c7de82f124ff01a688e8f7e81cf95f45a623 100644 --- a/producer/api/cpp/src/request_handler_filesystem.cpp +++ b/producer/api/cpp/src/request_handler_filesystem.cpp @@ -31,7 +31,7 @@ bool RequestHandlerFilesystem::ProcessRequestUnlocked(GenericRequest* request, b err = io__->WriteDataToFile(destination_folder_, request->header.message, (uint8_t*)producer_request->data.get(), (size_t)request->header.data_size, true, true); if (producer_request->callback) { - producer_request->callback(request->header, std::move(err)); + producer_request->callback(RequestCallbackPayload{request->header,""}, std::move(err)); } *retry = false; return true; diff --git a/producer/api/cpp/src/request_handler_tcp.cpp b/producer/api/cpp/src/request_handler_tcp.cpp index 4a94f578d96bfcbaba0d45e4788d415936fc2140..f395d63e91f003df6e0b8f1d50194e35096ef40e 100644 --- a/producer/api/cpp/src/request_handler_tcp.cpp +++ b/producer/api/cpp/src/request_handler_tcp.cpp @@ -19,7 +19,7 @@ Error RequestHandlerTcp::Authorize(const std::string& source_credentials) { if(err) { return err; } - return ReceiveResponse(header); + return ReceiveResponse(header, nullptr); } @@ -75,7 +75,7 @@ Error RequestHandlerTcp::SendRequestContent(const ProducerRequest* request) { return nullptr; } -Error RequestHandlerTcp::ReceiveResponse(const GenericRequestHeader& request_header) { +Error RequestHandlerTcp::ReceiveResponse(const GenericRequestHeader& request_header,std::string* response) { Error err; SendDataResponse sendDataResponse; io__->Receive(sd_, &sendDataResponse, sizeof(sendDataResponse), &err); @@ -104,6 +104,9 @@ Error RequestHandlerTcp::ReceiveResponse(const GenericRequestHeader& request_hea return res_err; } case kNetErrorNoError : + if (response) { + *response = sendDataResponse.message; + } return nullptr; default: auto res_err = ProducerErrorTemplates::kInternalServerError.Generate(); @@ -112,13 +115,13 @@ Error RequestHandlerTcp::ReceiveResponse(const GenericRequestHeader& request_hea } } -Error RequestHandlerTcp::TrySendToReceiver(const ProducerRequest* request) { +Error RequestHandlerTcp::TrySendToReceiver(const ProducerRequest* request,std::string* response) { auto err = SendRequestContent(request); if (err) { return err; } - err = ReceiveResponse(request->header); + err = ReceiveResponse(request->header,response); if (err == nullptr || err == ProducerErrorTemplates::kServerWarning) { log__->Debug("successfully sent data, opcode: " + std::to_string(request->header.op_code) + ", id: " + std::to_string(request->header.data_id) + " to " + connected_receiver_uri_); @@ -211,9 +214,9 @@ bool RequestHandlerTcp::ProcessErrorFromReceiver(const Error& error, } -void RequestHandlerTcp::ProcessRequestCallback(Error err, ProducerRequest* request, bool* retry) { +void RequestHandlerTcp::ProcessRequestCallback(Error err, ProducerRequest* request,std::string response, bool* retry) { if (request->callback) { - request->callback(request->header, std::move(err)); + request->callback(RequestCallbackPayload{request->header,std::move(response)}, std::move(err)); } *retry = false; } @@ -224,21 +227,22 @@ bool RequestHandlerTcp::SendDataToOneOfTheReceivers(ProducerRequest* request, bo if (Disconnected()) { auto err = ConnectToReceiver(request->source_credentials, receiver_uri); if (err == ProducerErrorTemplates::kWrongInput) { - ProcessRequestCallback(std::move(err), request, retry); + ProcessRequestCallback(std::move(err), request, "", retry); return false; } else { if (err != nullptr ) continue; } } - auto err = TrySendToReceiver(request); + std::string response; + auto err = TrySendToReceiver(request,&response); bool server_error_can_retry = ProcessErrorFromReceiver(err, request, receiver_uri); if (server_error_can_retry) { continue; } bool success = err && err != ProducerErrorTemplates::kServerWarning ? false : true; - ProcessRequestCallback(std::move(err), request, retry); + ProcessRequestCallback(std::move(err), request, response, retry); return success; } log__->Warning("put back to the queue, request opcode: " + std::to_string(request->header.op_code) + @@ -254,7 +258,7 @@ bool RequestHandlerTcp::ProcessRequestUnlocked(GenericRequest* request, bool* re auto err = producer_request->UpdateDataSizeFromFileIfNeeded(io__.get()); if (err) { if (producer_request->callback) { - producer_request->callback(producer_request->header, std::move(err)); + producer_request->callback(RequestCallbackPayload{producer_request->header}, std::move(err)); } *retry = false; return false; @@ -297,7 +301,7 @@ void RequestHandlerTcp::ProcessRequestTimeout(GenericRequest* request) { auto err = ProducerErrorTemplates::kTimeout.Generate(err_string); if (producer_request->callback) { - producer_request->callback(request->header, std::move(err)); + producer_request->callback(RequestCallbackPayload{request->header,""}, std::move(err)); } } diff --git a/producer/api/cpp/src/request_handler_tcp.h b/producer/api/cpp/src/request_handler_tcp.h index fa2d39b7110c989b66899b5b6c7b4f9889065ff0..38841d6c8da80376de94454974522a65b2e26f5b 100644 --- a/producer/api/cpp/src/request_handler_tcp.h +++ b/producer/api/cpp/src/request_handler_tcp.h @@ -34,8 +34,8 @@ class RequestHandlerTcp: public RequestHandler { Error ConnectToReceiver(const std::string& source_credentials, const std::string& receiver_address); bool SendDataToOneOfTheReceivers(ProducerRequest* request, bool* retry); Error SendRequestContent(const ProducerRequest* request); - Error ReceiveResponse(const GenericRequestHeader& request_header); - Error TrySendToReceiver(const ProducerRequest* request); + Error ReceiveResponse(const GenericRequestHeader& request_header,std::string* response); + Error TrySendToReceiver(const ProducerRequest* request,std::string* response); SocketDescriptor sd_{kDisconnectedSocketDescriptor}; void UpdateIfNewConnection(); bool UpdateReceiversList(); @@ -50,7 +50,7 @@ class RequestHandlerTcp: public RequestHandler { bool ProcessErrorFromReceiver(const Error& error, const ProducerRequest* request, const std::string& receiver_uri); ReceiversList receivers_list_; system_clock::time_point last_receivers_uri_update_; - void ProcessRequestCallback(Error err, ProducerRequest* request, bool* retry); + void ProcessRequestCallback(Error err, ProducerRequest* request, std::string response, bool* retry); uint64_t thread_id_; uint64_t* ncurrent_connections_; std::string connected_receiver_uri_; diff --git a/producer/api/cpp/unittests/test_request_handler_filesystem.cpp b/producer/api/cpp/unittests/test_request_handler_filesystem.cpp index 3b6eb7cf7f856277c53a75013acce9a52637063b..321b60e4d330380666af87182dbf70ff1dc24f1c 100644 --- a/producer/api/cpp/unittests/test_request_handler_filesystem.cpp +++ b/producer/api/cpp/unittests/test_request_handler_filesystem.cpp @@ -57,10 +57,13 @@ class RequestHandlerFilesystemTests : public testing::Test { expected_meta_size, expected_file_name}; bool called = false; asapo::GenericRequestHeader callback_header; - asapo::ProducerRequest request{"", header, nullptr, "", "", [this](asapo::GenericRequestHeader header, asapo::Error err) { + std::string callback_response; + + asapo::ProducerRequest request{"", header, nullptr, "", "", [this](asapo::RequestCallbackPayload payload, asapo::Error err) { called = true; callback_err = std::move(err); - callback_header = header; + callback_header = payload.original_header; + callback_response = payload.response; }, true, 0}; asapo::ProducerRequest request_nocallback{"", header, nullptr, "", "", nullptr, true, 0}; diff --git a/producer/api/cpp/unittests/test_request_handler_tcp.cpp b/producer/api/cpp/unittests/test_request_handler_tcp.cpp index c3cd919951130a4a12673f67d7ca6a351013e06e..784c25cc5c2b42519d59ade138bcfd301f3b5e6a 100644 --- a/producer/api/cpp/unittests/test_request_handler_tcp.cpp +++ b/producer/api/cpp/unittests/test_request_handler_tcp.cpp @@ -57,7 +57,9 @@ class RequestHandlerTcpTests : public testing::Test { uint64_t expected_meta_size = 4; std::string expected_metadata = "meta"; std::string expected_warning = "warning"; - char expected_file_name[asapo::kMaxMessageSize] = "test_name"; + std::string expected_response = "response"; + + char expected_file_name[asapo::kMaxMessageSize] = "test_name"; char expected_beamtime_id[asapo::kMaxMessageSize] = "test_beamtime_id"; char expected_substream[asapo::kMaxMessageSize] = "test_substream"; @@ -70,20 +72,22 @@ class RequestHandlerTcpTests : public testing::Test { expected_file_name, expected_substream}; bool callback_called = false; asapo::GenericRequestHeader callback_header; + std::string callback_response; - - asapo::ProducerRequest request{expected_beamtime_id, header, nullptr, expected_metadata, "", [this](asapo::GenericRequestHeader header, asapo::Error err) { + asapo::ProducerRequest request{expected_beamtime_id, header, nullptr, expected_metadata, "", [this](asapo::RequestCallbackPayload payload, asapo::Error err) { callback_called = true; callback_err = std::move(err); - callback_header = header; + callback_header = payload.original_header; + callback_response = payload.response; }, true, 0}; std::string expected_origin_fullpath = std::string("origin/") + expected_file_name; asapo::ProducerRequest request_filesend{expected_beamtime_id, header_fromfile, nullptr, expected_metadata, - expected_origin_fullpath, [this](asapo::GenericRequestHeader header, asapo::Error err) { + expected_origin_fullpath, [this](asapo::RequestCallbackPayload payload, asapo::Error err) { callback_called = true; callback_err = std::move(err); - callback_header = header; + callback_header = payload.original_header; + callback_response = payload.response; }, true, 0}; @@ -847,7 +851,7 @@ TEST_F(RequestHandlerTcpTests, FileRequestOK) { ExpectOKSendHeader(true); ExpectOKSendMetaData(true); ExpectOKSendFile(true); - ExpectOKReceive(); + ExpectOKReceive(true,asapo::kNetErrorNoError,expected_response); request_handler.PrepareProcessingRequestLocked(); @@ -855,6 +859,7 @@ TEST_F(RequestHandlerTcpTests, FileRequestOK) { ASSERT_THAT(success, Eq(true)); ASSERT_THAT(callback_called, Eq(true)); ASSERT_THAT(callback_err, Eq(nullptr)); + ASSERT_THAT(callback_response, Eq(expected_response)); ASSERT_THAT(retry, Eq(false)); } @@ -865,7 +870,8 @@ TEST_F(RequestHandlerTcpTests, SendOK) { ExpectOKConnect(true); ExpectOKAuthorize(true); ExpectOKSendAll(true); - ExpectOKReceive(); + ExpectOKReceive(true,asapo::kNetErrorNoError,expected_response); + request_handler.PrepareProcessingRequestLocked(); auto success = request_handler.ProcessRequestUnlocked(&request, &retry); @@ -877,6 +883,7 @@ TEST_F(RequestHandlerTcpTests, SendOK) { ASSERT_THAT(callback_header.data_size, Eq(header.data_size)); ASSERT_THAT(callback_header.op_code, Eq(header.op_code)); ASSERT_THAT(callback_header.data_id, Eq(header.data_id)); + ASSERT_THAT(callback_response, Eq(expected_response)); ASSERT_THAT(std::string{callback_header.message}, Eq(std::string{header.message})); } diff --git a/producer/api/python/asapo_producer.pxd b/producer/api/python/asapo_producer.pxd index b862d98eb95cc040249d5c8198bbae3d0531bdba..61e5f6beaec0893b7cca06a3581279825e4ab29f 100644 --- a/producer/api/python/asapo_producer.pxd +++ b/producer/api/python/asapo_producer.pxd @@ -25,7 +25,9 @@ cdef extern from "asapo_producer.h" namespace "asapo": cdef extern from "asapo_producer.h" namespace "asapo": cppclass FileData: unique_ptr[uint8_t[]] release() - + cppclass StreamInfo: + string Json() + bool SetFromJson(string json_str) cdef extern from "asapo_producer.h" namespace "asapo": cppclass RequestHandlerType: @@ -71,6 +73,10 @@ cdef extern from "asapo_producer.h" namespace "asapo": cdef extern from "asapo_producer.h" namespace "asapo": struct GenericRequestHeader: string Json() + struct RequestCallbackPayload: + GenericRequestHeader original_header + string response + cdef extern from "asapo_producer.h" namespace "asapo": cppclass RequestCallback: @@ -97,6 +103,7 @@ cdef extern from "asapo_producer.h" namespace "asapo" nogil: uint64_t GetRequestsQueueSize() Error WaitRequestsFinished(uint64_t timeout_ms) Error SendSubstreamFinishedFlag(string substream, uint64_t last_id, string next_substream, RequestCallback callback) + StreamInfo GetStreamInfo(string substream, uint64_t timeout_ms, Error* err) cdef extern from "asapo_producer.h" namespace "asapo": uint64_t kDefaultIngestMode diff --git a/producer/api/python/asapo_producer.pyx.in b/producer/api/python/asapo_producer.pyx.in index bca43cb22bbf981ebd851bbe0c2288f9ce9933d3..db3f83dc3d77b3cbe4e84a8115a7d25a684a4e26 100644 --- a/producer/api/python/asapo_producer.pyx.in +++ b/producer/api/python/asapo_producer.pyx.in @@ -169,7 +169,7 @@ cdef class PyProducer: :param ingest_mode: ingest mode flag :type ingest_mode: int :param callback: callback function, default None - :type callback: callback(info,err), where info - json string with event header that was used to send data, err - error string or None + :type callback: callback(info,err), where info - json string with event header that was used to send data and response, err - error string or None :raises: AsapoWrongInputError: wrong input (authorization, meta, ...) AsapoProducerError: actually should not happen @@ -188,7 +188,7 @@ cdef class PyProducer: :param next_substream: name of the next substream or None :type substream: string :param callback: callback function, default None - :type callback: callback(info,err), where info - json string with event header that was used to send data, err - error string or None + :type callback: callback(info,err), where info - json string with event header that was used to send data and response, err - error string or None :raises: AsapoWrongInputError: wrong input (authorization, meta, ...) AsapoProducerError: actually should not happen @@ -198,6 +198,25 @@ cdef class PyProducer: if err: throw_exception(err) + def stream_info(self, substream = 'default', uint64_t timeout_ms = 1000): + """ + :param substream: substream name + :type substream: string + :param timeout_ms: timeout in milliseconds + :type timeout_ms: int + :raises: + AsapoWrongInputError: wrong input (authorization, meta, ...) + AsapoTimeoutError: request not finished for a given timeout + AsapoProducerError: other errors + """ + cdef Error err + cdef StreamInfo info + cdef string b_substream = _bytes(substream) + with nogil: + info = self.c_producer.get().GetStreamInfo(b_substream,timeout_ms,&err) + if err: + throw_exception(err) + return json.loads(_str(info.Json())) def send_file(self, uint64_t id, local_path, exposed_path, user_meta=None, subset=None, substream = "default", ingest_mode = DEFAULT_INGEST_MODE, callback=None): """ @@ -216,7 +235,7 @@ cdef class PyProducer: :param ingest_mode: ingest mode flag :type ingest_mode: int :param callback: callback function, default None - :type callback: callback(info,err), where info - json string with event header that was used to send data, err - error string or None + :type callback: callback(info,err), where info - json string with event header that was used to send data and response, err - error string or None :raises: AsapoWrongInputError: wrong input (authorization, meta, ...) AsapoLocalIOError: problems reading file to send @@ -250,10 +269,11 @@ cdef class PyProducer: if err: throw_exception(err) return - cdef void c_callback_python(self,py_callback, GenericRequestHeader header, Error& err): + cdef void c_callback_python(self,py_callback, RequestCallbackPayload payload, Error& err): if py_callback != None: - info_str = _str(header.Json()) + info_str = _str(payload.original_header.Json()) info = json.loads(info_str) + info['server_response'] = payload.response if err: py_err = python_exception_from_error(err) else: @@ -261,21 +281,21 @@ cdef class PyProducer: py_callback(info,py_err) Py_XDECREF(<PyObject*>py_callback) - cdef void c_callback(self,py_callback, GenericRequestHeader header, Error err) with gil: - self.c_callback_python(py_callback,header,err) + cdef void c_callback(self,py_callback, RequestCallbackPayload payload, Error err) with gil: + self.c_callback_python(py_callback,payload,err) - cdef void c_callback_ndarr(self,py_callback,nd_array,GenericRequestHeader header, Error err) with gil: + cdef void c_callback_ndarr(self,py_callback,nd_array,RequestCallbackPayload payload, Error err) with gil: if nd_array is not None: if nd_array.base is not None: Py_XDECREF(<PyObject*>nd_array.base) else: Py_XDECREF(<PyObject*>nd_array) - self.c_callback_python(py_callback,header,err) + self.c_callback_python(py_callback,payload,err) - cdef void c_callback_bytesaddr(self,py_callback,bytes_array,GenericRequestHeader header, Error err) with gil: + cdef void c_callback_bytesaddr(self,py_callback,bytes_array,RequestCallbackPayload payload, Error err) with gil: if bytes_array is not None: Py_XDECREF(<PyObject*>bytes_array) - self.c_callback_python(py_callback,header,err) + self.c_callback_python(py_callback,payload,err) def cleanup(self): with nogil: if self.c_producer.get() is not NULL: diff --git a/producer/api/python/asapo_wrappers.h b/producer/api/python/asapo_wrappers.h index 2bb1c2e4082de80d2698956b56077ab4389e21dc..5c9929da5127287fffb422869e37e4547bdd4647 100644 --- a/producer/api/python/asapo_wrappers.h +++ b/producer/api/python/asapo_wrappers.h @@ -13,23 +13,23 @@ inline std::string GetErrorString(asapo::Error* err) { return ""; } -using RequestCallbackCython = void (*)(void*, void*, GenericRequestHeader header, Error err); -using RequestCallbackCythonMemory = void (*)(void*, void*, void*, GenericRequestHeader header, Error err); +using RequestCallbackCython = void (*)(void*, void*, RequestCallbackPayload payload , Error err); +using RequestCallbackCythonMemory = void (*)(void*, void*, void*, RequestCallbackPayload payload, Error err); RequestCallback unwrap_callback(RequestCallbackCython callback, void* c_self, void* py_func) { if (py_func == NULL) { return nullptr; } - RequestCallback wrapper = [ = ](GenericRequestHeader header, Error err) -> void { - callback(c_self, py_func, std::move(header), std::move(err)); + RequestCallback wrapper = [ = ](RequestCallbackPayload payload, Error err) -> void { + callback(c_self, py_func, std::move(payload), std::move(err)); }; return wrapper; } RequestCallback unwrap_callback_with_memory(RequestCallbackCythonMemory callback, void* c_self, void* py_func, void* nd_array) { - RequestCallback wrapper = [ = ](GenericRequestHeader header, Error err) -> void { - callback(c_self, py_func, nd_array, std::move(header), std::move(err)); + RequestCallback wrapper = [ = ](RequestCallbackPayload payload, Error err) -> void { + callback(c_self, py_func, nd_array, std::move(payload), std::move(err)); }; return wrapper; } diff --git a/producer/event_monitor_producer/src/main_eventmon.cpp b/producer/event_monitor_producer/src/main_eventmon.cpp index ff57a2dc3f568b0b69d7e66c275df96d957c26f0..0b599676e70f8c89034ba31d7f7020421cd35c81 100644 --- a/producer/event_monitor_producer/src/main_eventmon.cpp +++ b/producer/event_monitor_producer/src/main_eventmon.cpp @@ -50,7 +50,7 @@ std::unique_ptr<Producer> CreateProducer() { } -void ProcessAfterSend(asapo::GenericRequestHeader header, asapo::Error err) { +void ProcessAfterSend(asapo::RequestCallbackPayload payload, asapo::Error err) { if (err) { const auto logger = asapo::GetDefaultEventMonLogger(); logger->Error("data was not successfully send: " + err->Explain()); @@ -60,7 +60,7 @@ void ProcessAfterSend(asapo::GenericRequestHeader header, asapo::Error err) { if (!config->remove_after_send) { return; } - std::string fname = config->root_monitored_folder + asapo::kPathSeparator + header.message; + std::string fname = config->root_monitored_folder + asapo::kPathSeparator + payload.original_header.message; auto error = io->RemoveFile(fname); if (error) { const auto logger = asapo::GetDefaultEventMonLogger(); diff --git a/receiver/CMakeLists.txt b/receiver/CMakeLists.txt index f7340a3684242816e620c7f1719317414f77daf3..bbf62d388e76b126bf2e8ded4247c7cfa3a7cba5 100644 --- a/receiver/CMakeLists.txt +++ b/receiver/CMakeLists.txt @@ -17,6 +17,7 @@ set(RECEIVER_CORE_FILES src/request_handler/request_handler_receive_data.cpp src/request_handler/request_handler_authorize.cpp src/request_handler/request_handler_db_meta_write.cpp + src/request_handler/request_handler_db_stream_info.cpp src/request_handler/request_handler_receive_metadata.cpp src/request_handler/request_handler_db_check_request.cpp src/request_handler/request_factory.cpp @@ -84,6 +85,7 @@ set(TEST_SOURCE_FILES unittests/request_handler/test_request_handler_db_writer.cpp unittests/request_handler/test_request_handler_db_check_request.cpp unittests/request_handler/test_request_handler_db_meta_writer.cpp + unittests/request_handler/test_request_handler_db_stream_info.cpp unittests/request_handler/test_request_handler_db.cpp unittests/request_handler/test_request_handler_authorizer.cpp unittests/request_handler/test_request_handler_receive_data.cpp diff --git a/receiver/src/request.cpp b/receiver/src/request.cpp index 8b9f507720bd75237bc5e1dcace5cc1397de3ff4..8ca9d9b4c017c58e8063a427acc837af872ad1ad 100644 --- a/receiver/src/request.cpp +++ b/receiver/src/request.cpp @@ -177,12 +177,17 @@ void Request::SetAlreadyProcessedFlag() { already_processed_ = true; } -void Request::SetWarningMessage(std::string message) { - warning_message_ = std::move(message); +void Request::SetResponseMessage(std::string message, ResponseMessageType type) { + response_message_ = std::move(message); + response_message_type_ = type; } -const std::string& Request::GetWarningMessage() const { - return warning_message_; +const std::string& Request::GetResponseMessage() const { + return response_message_; +} + +const ResponseMessageType Request::GetResponseMessageType() const { + return response_message_type_; } Error Request::CheckForDuplicates() { diff --git a/receiver/src/request.h b/receiver/src/request.h index 75e4d39a8574c25b98a6e57f41b66fba129e1439..22e80ecfee3cc0a87af391a92cabbbb67c1e19f1 100644 --- a/receiver/src/request.h +++ b/receiver/src/request.h @@ -23,6 +23,11 @@ namespace asapo { using RequestHandlerList = std::vector<const ReceiverRequestHandler*>; +enum class ResponseMessageType { + kWarning, + kInfo +}; + class Request { public: VIRTUAL Error Handle(ReceiverStatistics*); @@ -66,8 +71,9 @@ class Request { VIRTUAL uint64_t GetSlotId() const; VIRTUAL bool WasAlreadyProcessed() const; VIRTUAL void SetAlreadyProcessedFlag(); - VIRTUAL void SetWarningMessage(std::string message); - VIRTUAL const std::string& GetWarningMessage() const; + VIRTUAL void SetResponseMessage(std::string message, ResponseMessageType type); + VIRTUAL const ResponseMessageType GetResponseMessageType() const; + VIRTUAL const std::string& GetResponseMessage() const; VIRTUAL Error CheckForDuplicates(); private: const GenericRequestHeader request_header_; @@ -84,7 +90,8 @@ class Request { std::string metadata_; CacheMeta* slot_meta_ = nullptr; bool already_processed_ = false; - std::string warning_message_; + std::string response_message_; + ResponseMessageType response_message_type_; const RequestHandlerDbCheckRequest* check_duplicate_request_handler_; }; diff --git a/receiver/src/request_handler/request_factory.cpp b/receiver/src/request_handler/request_factory.cpp index 80885791671c4eb8e404146c8ee604f64f7f20a1..1d51fe84b4688b3e135b39ec14a5193fa4c11889 100644 --- a/receiver/src/request_handler/request_factory.cpp +++ b/receiver/src/request_handler/request_factory.cpp @@ -77,6 +77,10 @@ Error RequestFactory::AddHandlersToRequest(std::unique_ptr<Request>& request, // do nothing break; } + case Opcode::kOpcodeStreamInfo: { + request->AddHandler(&request_handler_db_stream_info_); + break; + } default: return ReceiverErrorTemplates::kInvalidOpCode.Generate(); } diff --git a/receiver/src/request_handler/request_factory.h b/receiver/src/request_handler/request_factory.h index 6c246e6ada0ea607e2090524343fda57883797f9..0ac1a3af274ffc2d34ad371a1c20d5fc7fabb332 100644 --- a/receiver/src/request_handler/request_factory.h +++ b/receiver/src/request_handler/request_factory.h @@ -4,6 +4,7 @@ #include "../request.h" #include "../file_processors/write_file_processor.h" #include "../file_processors/receive_file_processor.h" +#include "request_handler_db_stream_info.h" namespace asapo { @@ -22,6 +23,7 @@ class RequestFactory { RequestHandlerReceiveData request_handler_receivedata_; RequestHandlerReceiveMetaData request_handler_receive_metadata_; RequestHandlerDbWrite request_handler_dbwrite_{kDBDataCollectionNamePrefix}; + RequestHandlerDbStreamInfo request_handler_db_stream_info_{kDBDataCollectionNamePrefix}; RequestHandlerDbMetaWrite request_handler_db_meta_write_{kDBMetaCollectionName}; RequestHandlerAuthorize request_handler_authorize_; RequestHandlerDbCheckRequest request_handler_db_check_{kDBDataCollectionNamePrefix};; diff --git a/receiver/src/request_handler/request_handler_db_stream_info.cpp b/receiver/src/request_handler/request_handler_db_stream_info.cpp new file mode 100644 index 0000000000000000000000000000000000000000..1c2695f422f93fbdb576b00df236e60dcf71aa05 --- /dev/null +++ b/receiver/src/request_handler/request_handler_db_stream_info.cpp @@ -0,0 +1,28 @@ +#include "request_handler_db_stream_info.h" +#include "../receiver_config.h" + +namespace asapo { + +RequestHandlerDbStreamInfo::RequestHandlerDbStreamInfo(std::string collection_name_prefix) + : RequestHandlerDb(std::move(collection_name_prefix)) { + +} + + +Error RequestHandlerDbStreamInfo::ProcessRequest(Request* request) const { + if (auto err = RequestHandlerDb::ProcessRequest(request) ) { + return err; + } + + auto col_name = collection_name_prefix_ + "_" + request->GetSubstream(); + StreamInfo info; + auto err = db_client__->GetStreamInfo(col_name,&info); + if (!err) { + log__->Debug(std::string{"get stream info from "} + col_name + " in " + + db_name_ + " at " + GetReceiverConfig()->database_uri); + request->SetResponseMessage(info.Json(),ResponseMessageType::kInfo); + } + return err; +} + +} \ No newline at end of file diff --git a/receiver/src/request_handler/request_handler_db_stream_info.h b/receiver/src/request_handler/request_handler_db_stream_info.h new file mode 100644 index 0000000000000000000000000000000000000000..5ec17c9dd69a56ef303369d67354bbc5c153e958 --- /dev/null +++ b/receiver/src/request_handler/request_handler_db_stream_info.h @@ -0,0 +1,17 @@ +#ifndef ASAPO_REQUEST_HANDLER_DB_STREAM_INFO_H +#define ASAPO_REQUEST_HANDLER_DB_STREAM_INFO_H + +#include "request_handler_db.h" +#include "../request.h" + +namespace asapo { + +class RequestHandlerDbStreamInfo final: public RequestHandlerDb { + public: + RequestHandlerDbStreamInfo(std::string collection_name_prefix); + Error ProcessRequest(Request* request) const override; +}; + +} + +#endif //ASAPO_REQUEST_HANDLER_DB_STREAM_INFO_H diff --git a/receiver/src/request_handler/request_handler_db_write.cpp b/receiver/src/request_handler/request_handler_db_write.cpp index 3b0fddedd1af52eb66e0e4c53bad2a352e004d10..5570dad6a2dc8868fa3a05d051f3add7f92fd4c3 100644 --- a/receiver/src/request_handler/request_handler_db_write.cpp +++ b/receiver/src/request_handler/request_handler_db_write.cpp @@ -38,7 +38,7 @@ Error RequestHandlerDbWrite::ProcessDuplicateRecordSituation(Request* request) c auto check_err = request->CheckForDuplicates(); if (check_err == ReceiverErrorTemplates::kWarningDuplicatedRequest) { std::string warn_str = "ignoring duplicate record for id " + std::to_string(request->GetDataID()); - request->SetWarningMessage(warn_str); + request->SetResponseMessage(warn_str,ResponseMessageType::kWarning); log__->Warning(warn_str); return nullptr; } diff --git a/receiver/src/request_handler/request_handler_file_process.cpp b/receiver/src/request_handler/request_handler_file_process.cpp index 63af6bd33f5076b1d8a658a2c988e4c8ea99908b..47bf3dd21cdfe469f21326acca90bf401556a641 100644 --- a/receiver/src/request_handler/request_handler_file_process.cpp +++ b/receiver/src/request_handler/request_handler_file_process.cpp @@ -20,14 +20,14 @@ Error RequestHandlerFileProcess::ProcessRequest(Request* request) const { Error RequestHandlerFileProcess::ProcessFileExistSituation(Request* request) const { auto err_duplicate = request->CheckForDuplicates(); if (err_duplicate == nullptr) { - request->SetWarningMessage("file has been overwritten"); + request->SetResponseMessage("file has been overwritten",ResponseMessageType::kWarning); log__->Warning(std::string("overwriting file " ) + request->GetOfflinePath() + kPathSeparator + request->GetFileName()); return file_processor_->ProcessFile(request, true); } if (err_duplicate == ReceiverErrorTemplates::kWarningDuplicatedRequest) { request->SetAlreadyProcessedFlag(); - request->SetWarningMessage("duplicated request, ignored"); + request->SetResponseMessage("duplicated request, ignored",ResponseMessageType::kWarning); log__->Warning("duplicated request, id: " + std::to_string(request->GetDataID())); return nullptr; } diff --git a/receiver/src/request_handler/requests_dispatcher.cpp b/receiver/src/request_handler/requests_dispatcher.cpp index 0412e543c383322b8b7990d6f606b2a909ac0af7..460e82861bfe20ceb7dd8d9e90a96aee6b64790a 100644 --- a/receiver/src/request_handler/requests_dispatcher.cpp +++ b/receiver/src/request_handler/requests_dispatcher.cpp @@ -38,9 +38,11 @@ GenericNetworkResponse RequestsDispatcher::CreateResponseToRequest(const std::un if (handle_error) { strncpy(generic_response.message, handle_error->Explain().c_str(), kMaxMessageSize); } - if (request->GetWarningMessage().size() > 0) { - generic_response.error_code = kNetErrorWarning; - strncpy(generic_response.message, request->GetWarningMessage().c_str(), kMaxMessageSize); + if (request->GetResponseMessage().size() > 0) { + if (request->GetResponseMessageType()==ResponseMessageType::kWarning) { + generic_response.error_code = kNetErrorWarning; + } + strncpy(generic_response.message, request->GetResponseMessage().c_str(), kMaxMessageSize); } return generic_response; } diff --git a/receiver/unittests/mock_receiver_config.cpp b/receiver/unittests/mock_receiver_config.cpp index aacffd49cda8d9559c164a9ac35bb81f45f876e6..6a837eb31ce193d8bd1bfaca86a118f20e6a7451 100644 --- a/receiver/unittests/mock_receiver_config.cpp +++ b/receiver/unittests/mock_receiver_config.cpp @@ -72,7 +72,6 @@ Error SetReceiverConfig (const ReceiverConfig& config, std::string error_field) testing::Return(config_string) ); - printf("%s\n", config_string.c_str()); auto err = config_factory.SetConfig("fname"); config_factory.io__.release(); diff --git a/receiver/unittests/receiver_mocking.h b/receiver/unittests/receiver_mocking.h index f2c6dc6ae60dacff77d241f8424f0ef3543f606e..8aebb8a3e3a2cbcee95023881582dd5864cd8a9c 100644 --- a/receiver/unittests/receiver_mocking.h +++ b/receiver/unittests/receiver_mocking.h @@ -80,8 +80,8 @@ class MockRequest: public Request { MOCK_CONST_METHOD0(GetOnlinePath, const std::string & ()); MOCK_CONST_METHOD0(GetOfflinePath, const std::string & ()); - const asapo::CustomRequestData& GetCustomData() const override { - return (asapo::CustomRequestData&) * GetCustomData_t(); + const CustomRequestData& GetCustomData() const override { + return (CustomRequestData&) * GetCustomData_t(); }; MOCK_CONST_METHOD0(GetCustomData_t, const uint64_t* ()); @@ -94,9 +94,13 @@ class MockRequest: public Request { MOCK_CONST_METHOD0(WasAlreadyProcessed, bool()); MOCK_METHOD0(SetAlreadyProcessedFlag, void()); - MOCK_METHOD1(SetWarningMessage, void(std::string)); - MOCK_CONST_METHOD0(GetWarningMessage, const std::string & ()); + MOCK_METHOD2(SetResponseMessage, void(std::string,ResponseMessageType)); + MOCK_CONST_METHOD0(GetResponseMessage, const std::string & ()); + MOCK_CONST_METHOD0(GetResponseMessageType_t, ResponseMessageType ()); + const ResponseMessageType GetResponseMessageType() const override { + return GetResponseMessageType_t(); + }; Error CheckForDuplicates() override { return Error{CheckForDuplicates_t()}; diff --git a/receiver/unittests/request_handler/test_request_factory.cpp b/receiver/unittests/request_handler/test_request_factory.cpp index 2dab899f9420d202233ec9919581133d6d1b50d4..49a76ae672dc3b6e0ea83ea257a9acdf40916bb4 100644 --- a/receiver/unittests/request_handler/test_request_factory.cpp +++ b/receiver/unittests/request_handler/test_request_factory.cpp @@ -13,6 +13,8 @@ #include "../../src/request_handler/request_handler_file_process.h" #include "../../src/request_handler/request_handler_db_write.h" #include "../../src/request_handler/request_handler_authorize.h" +#include "../../src/request_handler/request_handler_db_stream_info.h" + #include "../../src/request_handler/request_handler_receive_data.h" #include "../../src/request_handler/request_handler_receive_metadata.h" @@ -221,5 +223,14 @@ TEST_F(FactoryTests, DonNotGenerateRequestIfIngestModeIsWrong) { ASSERT_THAT(err, Eq(asapo::ReceiverErrorTemplates::kBadRequest)); } +TEST_F(FactoryTests, StreamInfoRequest) { + generic_request_header.op_code = asapo::Opcode::kOpcodeStreamInfo; + auto request = factory.GenerateRequest(generic_request_header, 1, origin_uri, &err); + ASSERT_THAT(err, Eq(nullptr)); + ASSERT_THAT(request->GetListHandlers().size(), Eq(2)); + ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerAuthorize*>(request->GetListHandlers()[0]), Ne(nullptr)); + ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerDbStreamInfo*>(request->GetListHandlers()[1]), Ne(nullptr)); +} + } diff --git a/receiver/unittests/request_handler/test_request_handler_db_check_request.cpp b/receiver/unittests/request_handler/test_request_handler_db_check_request.cpp index 544c4e351bba84cce7d26abf9295d85ba1c36003..0b2cbbcdf668335e75d3c66a80634a238448e573 100644 --- a/receiver/unittests/request_handler/test_request_handler_db_check_request.cpp +++ b/receiver/unittests/request_handler/test_request_handler_db_check_request.cpp @@ -153,7 +153,6 @@ void DbCheckRequestHandlerTests::ExpectRequestParams(asapo::Opcode op_code, cons ; } - if (expect_compare) { EXPECT_CALL(*mock_request, GetDataSize()) .WillOnce(Return(expected_file_size)) @@ -168,13 +167,10 @@ void DbCheckRequestHandlerTests::ExpectRequestParams(asapo::Opcode op_code, cons ; } - EXPECT_CALL(*mock_request, GetSubstream()) .WillOnce(Return(expected_substream)) ; - - EXPECT_CALL(*mock_request, GetDataID()) .WillOnce(Return(expected_id)) ; diff --git a/receiver/unittests/request_handler/test_request_handler_db_stream_info.cpp b/receiver/unittests/request_handler/test_request_handler_db_stream_info.cpp new file mode 100644 index 0000000000000000000000000000000000000000..fa78df3cd4c35c9e967f1c404022d556a17166ab --- /dev/null +++ b/receiver/unittests/request_handler/test_request_handler_db_stream_info.cpp @@ -0,0 +1,122 @@ +#include <gtest/gtest.h> +#include <gmock/gmock.h> + +#include "unittests/MockIO.h" +#include "unittests/MockDatabase.h" +#include "unittests/MockLogger.h" + +#include "../../src/receiver_error.h" +#include "../../src/request.h" +#include "../../src/request_handler/request_factory.h" +#include "../../src/request_handler/request_handler.h" +#include "../../src/request_handler/request_handler_db_stream_info.h" +#include "../../../common/cpp/src/database/mongodb_client.h" + +#include "../mock_receiver_config.h" +#include "common/data_structs.h" +#include "common/networking.h" +#include "../receiver_mocking.h" + +using asapo::MockRequest; +using asapo::FileInfo; +using ::testing::Test; +using ::testing::Return; +using ::testing::ReturnRef; +using ::testing::_; +using ::testing::DoAll; +using ::testing::SetArgReferee; +using ::testing::Gt; +using ::testing::Eq; +using ::testing::Ne; +using ::testing::Mock; +using ::testing::NiceMock; +using ::testing::InSequence; +using ::testing::SetArgPointee; +using ::testing::AllOf; +using ::testing::HasSubstr; + + +using ::asapo::Error; +using ::asapo::ErrorInterface; +using ::asapo::FileDescriptor; +using ::asapo::SocketDescriptor; +using ::asapo::MockIO; +using asapo::Request; +using asapo::RequestHandlerDbStreamInfo; +using ::asapo::GenericRequestHeader; + +using asapo::MockDatabase; +using asapo::RequestFactory; +using asapo::SetReceiverConfig; +using asapo::ReceiverConfig; + + +namespace { + +class DbMetaStreamInfoTests : public Test { + public: + std::string expected_substream = "substream"; + std::string expected_collection_name = std::string(asapo::kDBDataCollectionNamePrefix) + "_" + expected_substream; + RequestHandlerDbStreamInfo handler{asapo::kDBDataCollectionNamePrefix}; + std::unique_ptr<NiceMock<MockRequest>> mock_request; + NiceMock<MockDatabase> mock_db; + NiceMock<asapo::MockLogger> mock_logger; + ReceiverConfig config; + std::string expected_beamtime_id = "beamtime_id"; + std::string expected_stream = "stream"; + std::string info_str = R"({"lastId":10})"; + const uint8_t* expected_info_str = reinterpret_cast<const uint8_t*>(info_str.c_str()); + asapo::StreamInfo expected_stream_info; + void SetUp() override { + GenericRequestHeader request_header; + expected_stream_info.last_id = 10; + request_header.data_id = 0; + handler.db_client__ = std::unique_ptr<asapo::Database> {&mock_db}; + handler.log__ = &mock_logger; + mock_request.reset(new NiceMock<MockRequest> {request_header, 1, "", nullptr}); + ON_CALL(*mock_request, GetBeamtimeId()).WillByDefault(ReturnRef(expected_beamtime_id)); + } + void TearDown() override { + handler.db_client__.release(); + } +}; + +TEST_F(DbMetaStreamInfoTests, CallsUpdate) { + SetReceiverConfig(config, "none"); + + EXPECT_CALL(*mock_request, GetBeamtimeId()) + .WillOnce(ReturnRef(expected_beamtime_id)) + ; + + EXPECT_CALL(*mock_request, GetStream()) + .WillOnce(ReturnRef(expected_stream)) + ; + + EXPECT_CALL(*mock_request, GetSubstream()) + .WillOnce(Return(expected_substream)) + ; + + EXPECT_CALL(mock_db, Connect_t(config.database_uri, expected_beamtime_id + "_" + expected_stream)). + WillOnce(testing::Return(nullptr)); + + + EXPECT_CALL(mock_db, GetStreamInfo_t(expected_collection_name, _)). + WillOnce(DoAll( + SetArgPointee<1>(expected_stream_info), + testing::Return(nullptr) + )); + + EXPECT_CALL(*mock_request, SetResponseMessage(info_str,asapo::ResponseMessageType::kInfo)); + + EXPECT_CALL(mock_logger, Debug(AllOf(HasSubstr("get stream info"), + HasSubstr(config.database_uri), + HasSubstr(expected_beamtime_id), + HasSubstr(expected_collection_name) + ) + ) + ); + + handler.ProcessRequest(mock_request.get()); +} + +} diff --git a/receiver/unittests/request_handler/test_request_handler_db_writer.cpp b/receiver/unittests/request_handler/test_request_handler_db_writer.cpp index 4be9c89b3940769327d5c24ba9f29bc6048fb840..703095754b840d050155bed3a4043b1508cfcc48 100644 --- a/receiver/unittests/request_handler/test_request_handler_db_writer.cpp +++ b/receiver/unittests/request_handler/test_request_handler_db_writer.cpp @@ -254,7 +254,7 @@ TEST_F(DbWriterHandlerTests, SkipIfWasAlreadyProcessed) { TEST_F(DbWriterHandlerTests, DuplicatedRequest_SameRecord) { ExpectDuplicatedID(); - EXPECT_CALL(*mock_request, SetWarningMessage(HasSubstr("duplicate record"))); + EXPECT_CALL(*mock_request, SetResponseMessage(HasSubstr("duplicate record"),asapo::ResponseMessageType::kWarning)); EXPECT_CALL(*mock_request, CheckForDuplicates_t()) .WillOnce( Return(asapo::ReceiverErrorTemplates::kWarningDuplicatedRequest.Generate().release()) diff --git a/receiver/unittests/request_handler/test_request_handler_file_process.cpp b/receiver/unittests/request_handler/test_request_handler_file_process.cpp index a570c579fa7f8e980f118c97c03c7e3264df1103..11e32e3708c1006ed55b1cc65d25de03009f073c 100644 --- a/receiver/unittests/request_handler/test_request_handler_file_process.cpp +++ b/receiver/unittests/request_handler/test_request_handler_file_process.cpp @@ -81,7 +81,7 @@ void FileWriteHandlerTests::ExpecFileProcess(const asapo::SimpleErrorTemplate* e } TEST_F(FileWriteHandlerTests, FileAlreadyExists_NoRecordInDb) { - EXPECT_CALL(*mock_request, SetWarningMessage(HasSubstr("overwritten"))); + EXPECT_CALL(*mock_request, SetResponseMessage(HasSubstr("overwritten"),asapo::ResponseMessageType::kWarning)); EXPECT_CALL(*mock_request, CheckForDuplicates_t()) .WillOnce( Return(nullptr) @@ -106,7 +106,7 @@ TEST_F(FileWriteHandlerTests, FileAlreadyExists_NoRecordInDb) { TEST_F(FileWriteHandlerTests, FileAlreadyExists_DuplicatedRecordInDb) { - EXPECT_CALL(*mock_request, SetWarningMessage(HasSubstr("ignore"))); + EXPECT_CALL(*mock_request, SetResponseMessage(HasSubstr("ignore"),asapo::ResponseMessageType::kWarning)); EXPECT_CALL(*mock_request, SetAlreadyProcessedFlag()); EXPECT_CALL(mock_logger, Warning(HasSubstr("duplicated"))); EXPECT_CALL(*mock_request, GetDataID()).WillOnce(Return(1)); diff --git a/receiver/unittests/request_handler/test_requests_dispatcher.cpp b/receiver/unittests/request_handler/test_requests_dispatcher.cpp index 05201bc4e88eb9b562fb5e72d0d2a99a91e7ccee..f3513ddfb80662de2b68ee0eea9a93e445359050 100644 --- a/receiver/unittests/request_handler/test_requests_dispatcher.cpp +++ b/receiver/unittests/request_handler/test_requests_dispatcher.cpp @@ -262,7 +262,7 @@ TEST_F(RequestsDispatcherTests, OkProcessRequestSendOK) { TEST_F(RequestsDispatcherTests, ProcessRequestReturnsOkWithWarning) { MockHandleRequest(0); MockSendResponse(&response, false); - request->SetWarningMessage("duplicate"); + request->SetResponseMessage("duplicate",asapo::ResponseMessageType::kWarning); auto err = dispatcher->ProcessRequest(request); @@ -271,6 +271,18 @@ TEST_F(RequestsDispatcherTests, ProcessRequestReturnsOkWithWarning) { ASSERT_THAT(std::string(response.message), HasSubstr(std::string("duplicate"))); } +TEST_F(RequestsDispatcherTests, ProcessRequestReturnsOkWithInfo) { + MockHandleRequest(0); + MockSendResponse(&response, false); + request->SetResponseMessage("some info",asapo::ResponseMessageType::kInfo); + + auto err = dispatcher->ProcessRequest(request); + + ASSERT_THAT(err, Eq(nullptr)); + ASSERT_THAT(response.error_code, Eq(asapo::kNetErrorNoError)); + ASSERT_THAT(std::string(response.message), HasSubstr(std::string("some info"))); +} + TEST_F(RequestsDispatcherTests, ProcessRequestReturnsAuthorizationFailure) { MockHandleRequest(1, asapo::ReceiverErrorTemplates::kAuthorizationFailure.Generate()); MockSendResponse(&response, false); diff --git a/receiver/unittests/test_request.cpp b/receiver/unittests/test_request.cpp index bbeaf801e277b0691e46b081ee6d78a74cba7e2d..2a25efbdeb3b8aa528fd4c59d5a11da1b8205bd3 100644 --- a/receiver/unittests/test_request.cpp +++ b/receiver/unittests/test_request.cpp @@ -257,11 +257,20 @@ TEST_F(RequestTests, RequestTests_SetGetBeamtimeYear_Test) { } TEST_F(RequestTests, SetGetWarningMessage) { - request->SetWarningMessage("warn"); + request->SetResponseMessage("warn",asapo::ResponseMessageType::kWarning); + + ASSERT_THAT(request->GetResponseMessage(), "warn"); + ASSERT_THAT(request->GetResponseMessageType(), asapo::ResponseMessageType::kWarning); - ASSERT_THAT(request->GetWarningMessage(), "warn"); } +TEST_F(RequestTests, SetGetInfossage) { + request->SetResponseMessage("info",asapo::ResponseMessageType::kInfo); + + ASSERT_THAT(request->GetResponseMessage(), "info"); + ASSERT_THAT(request->GetResponseMessageType(), asapo::ResponseMessageType::kInfo); + +} TEST_F(RequestTests, SetGetOverwriteAllowed) { request->SetAlreadyProcessedFlag(); diff --git a/tests/automatic/full_chain/send_recv_substreams/send_recv_substreams.cpp b/tests/automatic/full_chain/send_recv_substreams/send_recv_substreams.cpp index 333d139fde158448629714a472e9c1ea631c94b1..2862a87bf0f148a140b1197e0877cd1008298df1 100644 --- a/tests/automatic/full_chain/send_recv_substreams/send_recv_substreams.cpp +++ b/tests/automatic/full_chain/send_recv_substreams/send_recv_substreams.cpp @@ -26,7 +26,7 @@ struct Args { std::string token; }; -void ProcessAfterSend(asapo::GenericRequestHeader header, asapo::Error err) { +void ProcessAfterSend(asapo::RequestCallbackPayload payload, asapo::Error err) { if (err) { std::cerr << "Data was not successfully send: " << err << std::endl; return; diff --git a/tests/automatic/mongo_db/insert_retrieve/insert_retrieve_mongodb.cpp b/tests/automatic/mongo_db/insert_retrieve/insert_retrieve_mongodb.cpp index 8cf88e458b71793b6745a747a4c12a40e05f05f0..289362011cd1a42d51af187adcdf82ab06af7439 100644 --- a/tests/automatic/mongo_db/insert_retrieve/insert_retrieve_mongodb.cpp +++ b/tests/automatic/mongo_db/insert_retrieve/insert_retrieve_mongodb.cpp @@ -64,6 +64,11 @@ int main(int argc, char* argv[]) { M_AssertEq(nullptr, err); err = db_new.GetById("test", 0, &fi_db); Assert(err, "No record"); + + asapo::StreamInfo info; + err = db.GetStreamInfo("test",&info); + M_AssertEq(nullptr, err); + M_AssertEq(fi.id,info.last_id); } return 0; diff --git a/tests/automatic/producer/beamtime_metadata/beamtime_metadata.cpp b/tests/automatic/producer/beamtime_metadata/beamtime_metadata.cpp index 4a80557b916315c261946ec9e3147bb887e2986c..5e00ea491ea6a11a9e999003a37fae3d42e9381e 100644 --- a/tests/automatic/producer/beamtime_metadata/beamtime_metadata.cpp +++ b/tests/automatic/producer/beamtime_metadata/beamtime_metadata.cpp @@ -43,7 +43,7 @@ void ProcessCommandArguments(int argc, char* argv[], Args* args) { } } -void ProcessAfterSend(asapo::GenericRequestHeader header, asapo::Error err) { +void ProcessAfterSend(asapo::RequestCallbackPayload payload, asapo::Error err) { if (err) { std::cerr << "metadata was not successfully send: " << err << std::endl; return; diff --git a/tests/automatic/producer/python_api/check_linux.sh b/tests/automatic/producer/python_api/check_linux.sh index 6110cbc43e223615111ca0fc7f84d724e3ebaa98..657146af910c638faee761df87059dafe35d27fc 100644 --- a/tests/automatic/producer/python_api/check_linux.sh +++ b/tests/automatic/producer/python_api/check_linux.sh @@ -39,9 +39,10 @@ echo test > file1 sleep 1 -$1 $3 $stream $beamtime_id "127.0.0.1:8400" > out || cat out +$1 $3 $stream $beamtime_id "127.0.0.1:8400" &> out || cat out cat out cat out | grep "successfuly sent" | wc -l | grep 11 cat out | grep "local i/o error" cat out | grep "already have record with same id" | wc -l | grep 4 cat out | grep "duplicate" | wc -l | grep 4 +cat out | grep "Finished successfully" diff --git a/tests/automatic/producer/python_api/check_windows.bat b/tests/automatic/producer/python_api/check_windows.bat index 352cbfd3c653ae66f805c44a67ccd45e18a686a2..2cf597afe76a2d35934e81c47dc3cb08c133fbc6 100644 --- a/tests/automatic/producer/python_api/check_windows.bat +++ b/tests/automatic/producer/python_api/check_windows.bat @@ -33,6 +33,8 @@ echo %NUM% | findstr 1 || goto error for /F %%N in ('find /C "} server warning: duplicated request" ^< "out"') do set NUM=%%N echo %NUM% | findstr 1 || goto error +findstr /I /L /C:"Finished successfully" out || goto :error + goto :clean :error diff --git a/tests/automatic/producer/python_api/producer_api.py b/tests/automatic/producer/python_api/producer_api.py index 92df017b55c35bea28983349957b760a51cea122..6bf9c82a27a75ae544552cc41bb54fb2b707a326 100644 --- a/tests/automatic/producer/python_api/producer_api.py +++ b/tests/automatic/producer/python_api/producer_api.py @@ -14,6 +14,13 @@ endpoint = sys.argv[3] token = "" nthreads = 8 +def assert_eq(val,expected,name): + print ("asserting eq for "+name) + if val != expected: + print ("error at "+name) + print ('val: ', val,' expected: ',expected) + sys.exit(1) + def callback(header,err): lock.acquire() # to print if isinstance(err,asapo_producer.AsapoServerWarning): @@ -95,14 +102,17 @@ producer.send_data(6, stream+"/"+"file8",None, #send to another substream producer.send_data(1, stream+"/"+"file9",None, - ingest_mode = asapo_producer.INGEST_MODE_TRANSFER_METADATA_ONLY, substream="substream", callback = callback) - + ingest_mode = asapo_producer.INGEST_MODE_TRANSFER_METADATA_ONLY, substream="stream", callback = callback) producer.wait_requests_finished(50000) n = producer.get_requests_queue_size() -if n!=0: - print("number of remaining requests should be zero, got ",n) - sys.exit(1) +assert_eq(n,0,"requests in queue") + +info = producer.stream_info() +assert_eq(info['lastId'],10,"last id") + +info = producer.stream_info('stream') +assert_eq(info['lastId'],1,"last id from different substream") # create with error @@ -115,5 +125,7 @@ else: sys.exit(1) +print ('Finished successfully') +