Skip to content
Snippets Groups Projects
Commit fefe1d19 authored by Sergey Yakubov's avatar Sergey Yakubov
Browse files

start converting integration tests into nomad

parent 0cdc350b
No related branches found
No related tags found
No related merge requests found
Showing with 84 additions and 23 deletions
......@@ -36,7 +36,7 @@ Error RapidJson::LazyInitialize()const noexcept {
return nullptr;
}
asapo::Error CheckValueType(const std::string& name, ValueType type, const Value* val) {
asapo::Error RapidJson::CheckValueType(const std::string& name, ValueType type, const Value* val) const {
bool res = false;
switch (type) {
case ValueType::kObject:
......@@ -56,7 +56,7 @@ asapo::Error CheckValueType(const std::string& name, ValueType type, const Value
break;
}
if (!res) {
return TextError("wrong type: " + name);
return TextError("wrong type: " + name + " in: " + json_);
}
return nullptr;
......
......@@ -33,6 +33,7 @@ class RapidJson {
std::string json_;
mutable bool initialized_ = false;
Error LazyInitialize() const noexcept;
Error CheckValueType(const std::string& name, ValueType type, const rapidjson::Value* val) const;
Error embedded_error_ = nullptr;
asapo::Error GetValuePointer(const std::string& name, ValueType type, rapidjson::Value** val)const noexcept;
......
job "receiver" {
datacenters = ["dc1"]
type = "service"
group "group" {
count = 1
task "service" {
driver = "raw_exec"
config {
command = "/bin/bash",
args = ["-c", "mkdir files && exec ${RECEIVER_DIR}/${RECEIVER_NAME} receiver.json && rm -rf files"]
}
resources {
cpu = 500 # 500 MHz
memory = 256 # 256MB
network {
port "recv" {}
}
}
service {
name = "receiver"
port = "recv"
check {
name = "alive"
type = "tcp"
interval = "10s"
timeout = "2s"
initial_status = "passing"
}
}
template {
source = "${CMAKE_CURRENT_BINARY_DIR}/receiver.json.tpl"
destination = "receiver.json"
change_mode = "signal"
change_signal = "SIGHUP"
}
}
}
}
......@@ -10,7 +10,7 @@ enum class ProducerErrorType {
kConnectionNotReady,
kFileTooLarge,
kFileIdAlreadyInUse,
kUnknownServerError,
kInternalServerError,
kCannotSendDataToReceivers,
kRequestPoolIsFull
};
......@@ -74,8 +74,8 @@ auto const kFileIdAlreadyInUse = ProducerErrorTemplate {
"File already in use", ProducerErrorType::kFileIdAlreadyInUse
};
auto const kUnknownServerError = ProducerErrorTemplate {
"Unknown server error", ProducerErrorType::kUnknownServerError
auto const kInternalServerError = ProducerErrorTemplate {
"Internal server error", ProducerErrorType::kInternalServerError
};
auto const kCannotSendDataToReceivers = ProducerErrorTemplate {
......
......@@ -42,7 +42,6 @@ Error ReceiverDiscoveryService::UpdateFromEndpoint(ReceiversList* list, uint64_t
if (code != HttpCode::OK) {
return TextError(responce);
}
return ParseResponse(responce, list, max_connections);
}
......@@ -59,6 +58,7 @@ void ReceiverDiscoveryService::ThreadHandler() {
lock.lock();
continue;
}
log__->Debug("got receivers from " + endpoint_ );
lock.lock();
max_connections_ = max_connections;
uri_list_ = uris;
......
......@@ -56,7 +56,7 @@ Error RequestHandlerTcp::ReceiveResponse(const std::string& receiver_address) {
if(sendDataResponse.error_code == kNetErrorFileIdAlreadyInUse) {
return ProducerErrorTemplates::kFileIdAlreadyInUse.Generate();
}
return ProducerErrorTemplates::kUnknownServerError.Generate();
return ProducerErrorTemplates::kInternalServerError.Generate();
}
return nullptr;
}
......@@ -69,6 +69,7 @@ Error RequestHandlerTcp::TrySendToReceiver(const Request* request, const std::st
err = ReceiveResponse(receiver_address);
if (err) {
log__->Debug("cannot send data to " + receiver_address + ": " + err->Explain());
return err;
}
......@@ -121,6 +122,7 @@ Error RequestHandlerTcp::ProcessRequestUnlocked(const Request* request) {
if (err != nullptr && err != ProducerErrorTemplates::kFileIdAlreadyInUse) {
io__->CloseSocket(sd_, nullptr);
sd_ = kDisconnectedSocketDescriptor;
log__->Debug("disconnected from " + receiver_uri);
continue;
}
......
......@@ -49,7 +49,9 @@ void RequestPool::ProcessRequest(const std::unique_ptr<RequestHandler>& request_
thread_info->lock.lock();
request_handler->TearDownProcessingRequestLocked(err);
if (err) {
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
PutRequestBackToQueue(std::move(request));
condition_.notify_all();
}
}
......@@ -58,11 +60,11 @@ void RequestPool::ThreadHandler(uint64_t id) {
thread_info.lock = std::unique_lock<std::mutex>(mutex_);
auto request_handler = request_handler_factory__->NewRequestHandler(id, &shared_counter_);
do {
condition_.wait(thread_info.lock, [this, &request_handler] {
auto do_work = condition_.wait_for(thread_info.lock, std::chrono::milliseconds(100), [this, &request_handler] {
return (CanProcessRequest(request_handler) || quit_);
});
//after wait, we own the lock
if (!quit_) {
if (!quit_ && do_work) {
ProcessRequest(request_handler, &thread_info);
};
} while (!quit_);
......
......@@ -80,7 +80,7 @@ TEST(RequestPool, Constructor) {
ASSERT_THAT(dynamic_cast<const asapo::AbstractLogger*>(pool.log__), Ne(nullptr));
}
TEST_F(RequestPoolTests, AddRequestDoesGoFurtherWhenNotReady) {
TEST_F(RequestPoolTests, AddRequestDoesNotGoFurtherWhenNotReady) {
EXPECT_CALL(*mock_request_handler, ReadyProcessRequest()).Times(AtLeast(1)).WillRepeatedly(Return(false));
EXPECT_CALL(*mock_request_handler, PrepareProcessingRequestLocked()).Times(0);
......
......@@ -3,7 +3,10 @@ set(TARGET_NAME receiver)
################################
# Testing
################################
configure_file(${CMAKE_SOURCE_DIR}/tests/automatic/settings/receiver.json receiver.json COPYONLY)
get_target_property(RECEIVER_DIR receiver-bin BINARY_DIR)
get_target_property(RECEIVER_NAME receiver-bin OUTPUT_NAME)
configure_file(${CMAKE_SOURCE_DIR}/config/nomad/receiver.nmd.in receiver.nmd)
configure_file(${CMAKE_SOURCE_DIR}/tests/automatic/settings/receiver.json.tpl receiver.json.tpl COPYONLY)
configure_file(${CMAKE_SOURCE_DIR}/tests/automatic/settings/discovery_settings.json discovery.json COPYONLY)
add_script_test("${TARGET_NAME}-monitoring" "$<TARGET_FILE:dummy-data-producer> $<TARGET_FILE:receiver-bin> $<TARGET_PROPERTY:asapo-discovery,EXENAME>" nomem
)
add_script_test("${TARGET_NAME}-monitoring" "$<TARGET_FILE:dummy-data-producer> $<TARGET_FILE:receiver-bin> $<TARGET_PROPERTY:asapo-discovery,EXENAME>" nomem)
......@@ -10,23 +10,21 @@ trap Cleanup EXIT
Cleanup() {
echo cleanup
influx -execute "drop database ${database_name}"
kill $receiverid
# kill $receiverid
nomad stop receiver
kill $discoveryid
rm -rf files
echo "db.dropDatabase()" | mongo ${mongo_database_name}
}
influx -execute "create database ${database_name}"
nohup $3 -config discovery.json &>/dev/null &
discoveryid=`echo $!`
nomad run receiver.nmd
sleep 0.3
nohup $2 receiver.json &>/dev/null &
sleep 0.3
receiverid=`echo $!`
nohup $3 -config discovery.json &>/dev/null &
discoveryid=`echo $!`
mkdir files
sleep 1
$1 localhost:5006 100 112 4 0
......
{
"MaxConnections": 32,
"Endpoints": ["localhost:4200"],
"Mode": "static",
"Mode": "consul",
"Port":5006,
"LogLevel":"debug"
}
\ No newline at end of file
{
"MonitorDbAddress":"localhost:8086",
"MonitorDbName": "db_test",
"BrokerDbAddress":"localhost:27017",
"BrokerDbName": "test_run",
"ListenPort": {{ env "NOMAD_PORT_recv" }},
"WriteToDisk":true,
"WriteToDb":true,
"LogLevel" : "info"
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment