diff --git a/common/cpp/src/json_parser/rapid_json.cpp b/common/cpp/src/json_parser/rapid_json.cpp index cf47a597c5bd54015523a1d6ecdf13e1236bb1f4..31fcd58424b7eb29e80262398695d1431651ef13 100644 --- a/common/cpp/src/json_parser/rapid_json.cpp +++ b/common/cpp/src/json_parser/rapid_json.cpp @@ -36,7 +36,7 @@ Error RapidJson::LazyInitialize()const noexcept { return nullptr; } -asapo::Error CheckValueType(const std::string& name, ValueType type, const Value* val) { +asapo::Error RapidJson::CheckValueType(const std::string& name, ValueType type, const Value* val) const { bool res = false; switch (type) { case ValueType::kObject: @@ -56,7 +56,7 @@ asapo::Error CheckValueType(const std::string& name, ValueType type, const Value break; } if (!res) { - return TextError("wrong type: " + name); + return TextError("wrong type: " + name + " in: " + json_); } return nullptr; diff --git a/common/cpp/src/json_parser/rapid_json.h b/common/cpp/src/json_parser/rapid_json.h index bb23bd730c9295c3b3ed55a282ebb0df3b4d11f3..ca0c0b054ae31b9d0444f0d20c29ceb3cbe743c9 100644 --- a/common/cpp/src/json_parser/rapid_json.h +++ b/common/cpp/src/json_parser/rapid_json.h @@ -33,6 +33,7 @@ class RapidJson { std::string json_; mutable bool initialized_ = false; Error LazyInitialize() const noexcept; + Error CheckValueType(const std::string& name, ValueType type, const rapidjson::Value* val) const; Error embedded_error_ = nullptr; asapo::Error GetValuePointer(const std::string& name, ValueType type, rapidjson::Value** val)const noexcept; diff --git a/config/nomad/receiver.nmd.in b/config/nomad/receiver.nmd.in new file mode 100644 index 0000000000000000000000000000000000000000..1d9c1675e1d1b247e1b962533e207445a79b6e2a --- /dev/null +++ b/config/nomad/receiver.nmd.in @@ -0,0 +1,46 @@ +job "receiver" { + datacenters = ["dc1"] + + type = "service" + + group "group" { + count = 1 + + task "service" { + driver = "raw_exec" + + config { + command = "/bin/bash", + args = ["-c", "mkdir files && exec ${RECEIVER_DIR}/${RECEIVER_NAME} receiver.json && rm -rf files"] + } + + resources { + cpu = 500 # 500 MHz + memory = 256 # 256MB + network { + port "recv" {} + } + } + + service { + name = "receiver" + port = "recv" + check { + name = "alive" + type = "tcp" + interval = "10s" + timeout = "2s" + initial_status = "passing" + } + } + + template { + source = "${CMAKE_CURRENT_BINARY_DIR}/receiver.json.tpl" + destination = "receiver.json" + change_mode = "signal" + change_signal = "SIGHUP" + } + + } + } +} diff --git a/producer/api/include/producer/producer_error.h b/producer/api/include/producer/producer_error.h index a7d93435cbd27da4cf1c07a14c6a8a87867f0817..5a0641b7dd8c7ad1ee1159fee85fee954ebbfa5e 100644 --- a/producer/api/include/producer/producer_error.h +++ b/producer/api/include/producer/producer_error.h @@ -10,7 +10,7 @@ enum class ProducerErrorType { kConnectionNotReady, kFileTooLarge, kFileIdAlreadyInUse, - kUnknownServerError, + kInternalServerError, kCannotSendDataToReceivers, kRequestPoolIsFull }; @@ -74,8 +74,8 @@ auto const kFileIdAlreadyInUse = ProducerErrorTemplate { "File already in use", ProducerErrorType::kFileIdAlreadyInUse }; -auto const kUnknownServerError = ProducerErrorTemplate { - "Unknown server error", ProducerErrorType::kUnknownServerError +auto const kInternalServerError = ProducerErrorTemplate { + "Internal server error", ProducerErrorType::kInternalServerError }; auto const kCannotSendDataToReceivers = ProducerErrorTemplate { diff --git a/producer/api/src/receiver_discovery_service.cpp b/producer/api/src/receiver_discovery_service.cpp index 564313444e9d84f8d6ccd04c24e6a6da8057edde..62e6a7a354000acfec9166351f3c728279def75b 100644 --- a/producer/api/src/receiver_discovery_service.cpp +++ b/producer/api/src/receiver_discovery_service.cpp @@ -42,7 +42,6 @@ Error ReceiverDiscoveryService::UpdateFromEndpoint(ReceiversList* list, uint64_t if (code != HttpCode::OK) { return TextError(responce); } - return ParseResponse(responce, list, max_connections); } @@ -59,6 +58,7 @@ void ReceiverDiscoveryService::ThreadHandler() { lock.lock(); continue; } + log__->Debug("got receivers from " + endpoint_ ); lock.lock(); max_connections_ = max_connections; uri_list_ = uris; diff --git a/producer/api/src/request_handler_tcp.cpp b/producer/api/src/request_handler_tcp.cpp index fbdd3022543eb1401923438b5aa0610c1b0dc26f..b382e001086d47b25b6fd21fb18ac3b26639bf19 100644 --- a/producer/api/src/request_handler_tcp.cpp +++ b/producer/api/src/request_handler_tcp.cpp @@ -56,7 +56,7 @@ Error RequestHandlerTcp::ReceiveResponse(const std::string& receiver_address) { if(sendDataResponse.error_code == kNetErrorFileIdAlreadyInUse) { return ProducerErrorTemplates::kFileIdAlreadyInUse.Generate(); } - return ProducerErrorTemplates::kUnknownServerError.Generate(); + return ProducerErrorTemplates::kInternalServerError.Generate(); } return nullptr; } @@ -69,6 +69,7 @@ Error RequestHandlerTcp::TrySendToReceiver(const Request* request, const std::st err = ReceiveResponse(receiver_address); if (err) { + log__->Debug("cannot send data to " + receiver_address + ": " + err->Explain()); return err; } @@ -121,6 +122,7 @@ Error RequestHandlerTcp::ProcessRequestUnlocked(const Request* request) { if (err != nullptr && err != ProducerErrorTemplates::kFileIdAlreadyInUse) { io__->CloseSocket(sd_, nullptr); sd_ = kDisconnectedSocketDescriptor; + log__->Debug("disconnected from " + receiver_uri); continue; } diff --git a/producer/api/src/request_pool.cpp b/producer/api/src/request_pool.cpp index 1825a0a1f78ba3e83cf1f31b9d304362f072b0d0..a5598feed7ae120b7a2be1edc632f90342db5079 100644 --- a/producer/api/src/request_pool.cpp +++ b/producer/api/src/request_pool.cpp @@ -49,7 +49,9 @@ void RequestPool::ProcessRequest(const std::unique_ptr<RequestHandler>& request_ thread_info->lock.lock(); request_handler->TearDownProcessingRequestLocked(err); if (err) { + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); PutRequestBackToQueue(std::move(request)); + condition_.notify_all(); } } @@ -58,11 +60,11 @@ void RequestPool::ThreadHandler(uint64_t id) { thread_info.lock = std::unique_lock<std::mutex>(mutex_); auto request_handler = request_handler_factory__->NewRequestHandler(id, &shared_counter_); do { - condition_.wait(thread_info.lock, [this, &request_handler] { + auto do_work = condition_.wait_for(thread_info.lock, std::chrono::milliseconds(100), [this, &request_handler] { return (CanProcessRequest(request_handler) || quit_); }); //after wait, we own the lock - if (!quit_) { + if (!quit_ && do_work) { ProcessRequest(request_handler, &thread_info); }; } while (!quit_); diff --git a/producer/api/unittests/test_request_pool.cpp b/producer/api/unittests/test_request_pool.cpp index 5b2c8f3bb02b5cda3ad32d5c39f615fcb8c952df..64a0b70e6d0b1066e01e586ba58ccefb5fc97535 100644 --- a/producer/api/unittests/test_request_pool.cpp +++ b/producer/api/unittests/test_request_pool.cpp @@ -80,7 +80,7 @@ TEST(RequestPool, Constructor) { ASSERT_THAT(dynamic_cast<const asapo::AbstractLogger*>(pool.log__), Ne(nullptr)); } -TEST_F(RequestPoolTests, AddRequestDoesGoFurtherWhenNotReady) { +TEST_F(RequestPoolTests, AddRequestDoesNotGoFurtherWhenNotReady) { EXPECT_CALL(*mock_request_handler, ReadyProcessRequest()).Times(AtLeast(1)).WillRepeatedly(Return(false)); EXPECT_CALL(*mock_request_handler, PrepareProcessingRequestLocked()).Times(0); diff --git a/tests/automatic/producer_receiver/check_monitoring/CMakeLists.txt b/tests/automatic/producer_receiver/check_monitoring/CMakeLists.txt index c7be857c54734dc844fd4c272785f30222e3daaa..5b43f59df51f2c66fdb2e11682b0c1dcb6250601 100644 --- a/tests/automatic/producer_receiver/check_monitoring/CMakeLists.txt +++ b/tests/automatic/producer_receiver/check_monitoring/CMakeLists.txt @@ -3,7 +3,10 @@ set(TARGET_NAME receiver) ################################ # Testing ################################ -configure_file(${CMAKE_SOURCE_DIR}/tests/automatic/settings/receiver.json receiver.json COPYONLY) +get_target_property(RECEIVER_DIR receiver-bin BINARY_DIR) +get_target_property(RECEIVER_NAME receiver-bin OUTPUT_NAME) + +configure_file(${CMAKE_SOURCE_DIR}/config/nomad/receiver.nmd.in receiver.nmd) +configure_file(${CMAKE_SOURCE_DIR}/tests/automatic/settings/receiver.json.tpl receiver.json.tpl COPYONLY) configure_file(${CMAKE_SOURCE_DIR}/tests/automatic/settings/discovery_settings.json discovery.json COPYONLY) -add_script_test("${TARGET_NAME}-monitoring" "$<TARGET_FILE:dummy-data-producer> $<TARGET_FILE:receiver-bin> $<TARGET_PROPERTY:asapo-discovery,EXENAME>" nomem - ) +add_script_test("${TARGET_NAME}-monitoring" "$<TARGET_FILE:dummy-data-producer> $<TARGET_FILE:receiver-bin> $<TARGET_PROPERTY:asapo-discovery,EXENAME>" nomem) diff --git a/tests/automatic/producer_receiver/check_monitoring/check_linux.sh b/tests/automatic/producer_receiver/check_monitoring/check_linux.sh index 16cc1bdc99abec345ae904ce998b58b9e8a9de7e..39597e47a943435d3f9c6a728d51aab33da7aef2 100644 --- a/tests/automatic/producer_receiver/check_monitoring/check_linux.sh +++ b/tests/automatic/producer_receiver/check_monitoring/check_linux.sh @@ -10,23 +10,21 @@ trap Cleanup EXIT Cleanup() { echo cleanup influx -execute "drop database ${database_name}" - kill $receiverid +# kill $receiverid + nomad stop receiver kill $discoveryid - rm -rf files echo "db.dropDatabase()" | mongo ${mongo_database_name} } influx -execute "create database ${database_name}" -nohup $3 -config discovery.json &>/dev/null & -discoveryid=`echo $!` +nomad run receiver.nmd sleep 0.3 -nohup $2 receiver.json &>/dev/null & -sleep 0.3 -receiverid=`echo $!` +nohup $3 -config discovery.json &>/dev/null & +discoveryid=`echo $!` -mkdir files +sleep 1 $1 localhost:5006 100 112 4 0 diff --git a/tests/automatic/settings/discovery_settings.json b/tests/automatic/settings/discovery_settings.json index 379b5bfa0d48e5c7882fdd58fd0fd9f8dd2c2133..bcd04b3e01ad8cfba4a2c1fc824a232f81418276 100644 --- a/tests/automatic/settings/discovery_settings.json +++ b/tests/automatic/settings/discovery_settings.json @@ -1,7 +1,6 @@ { "MaxConnections": 32, - "Endpoints": ["localhost:4200"], - "Mode": "static", + "Mode": "consul", "Port":5006, "LogLevel":"debug" } \ No newline at end of file diff --git a/tests/automatic/settings/receiver.json.tpl b/tests/automatic/settings/receiver.json.tpl new file mode 100644 index 0000000000000000000000000000000000000000..0d0abd37e77fb3767c0aede509d8fb2bdb21831b --- /dev/null +++ b/tests/automatic/settings/receiver.json.tpl @@ -0,0 +1,10 @@ +{ + "MonitorDbAddress":"localhost:8086", + "MonitorDbName": "db_test", + "BrokerDbAddress":"localhost:27017", + "BrokerDbName": "test_run", + "ListenPort": {{ env "NOMAD_PORT_recv" }}, + "WriteToDisk":true, + "WriteToDb":true, + "LogLevel" : "info" +}