Skip to content
Snippets Groups Projects
Commit c2d9280f authored by Sergey Yakubov's avatar Sergey Yakubov
Browse files

access discovery service via nginx

parent 6d963e22
Branches
Tags
No related merge requests found
Showing
with 51 additions and 37 deletions
......@@ -14,5 +14,8 @@ function(prepare_asapo)
configure_file(${CMAKE_SOURCE_DIR}/config/nomad/broker.nmd.in broker.nmd @ONLY)
configure_file(${CMAKE_SOURCE_DIR}/tests/automatic/settings/discovery_settings.json.tpl discovery.json.tpl COPYONLY)
configure_file(${CMAKE_SOURCE_DIR}/tests/automatic/settings/broker_settings.json.tpl broker.json.tpl COPYONLY)
configure_file(${CMAKE_SOURCE_DIR}/tests/automatic/settings/nginx.conf.tpl nginx.conf.tpl COPYONLY)
configure_file(${CMAKE_SOURCE_DIR}/config/nomad/nginx.nmd.in nginx.nmd @ONLY)
endfunction()
......@@ -21,14 +21,11 @@ job "nginx" {
}
task "nginx" {
driver = "docker"
driver = "raw_exec"
config {
image = "nginx:latest"
volumes = ["/home/yakubov/projects/asapo/config/nomad/nginx.conf:/etc/nginx/nginx.conf","local/:/logs" ]
network_mode = "host"
dns_servers = ["127.0.0.1"]
dns_search_domains=["consul"]
command = "nginx",
args = ["-c","${NOMAD_TASK_DIR}/nginx.conf"]
}
resources {
......@@ -37,7 +34,7 @@ job "nginx" {
network {
mbits = 10
port "nginx" {
static = 8080
static = 8400
}
}
}
......@@ -52,6 +49,14 @@ job "nginx" {
timeout = "2s"
}
}
template {
source = "@WORK_DIR@/nginx.conf.tpl"
destination = "local/nginx.conf"
change_mode = "restart"
}
}
}
}
\ No newline at end of file
{
"Name": "discovery",
"Address": "127.0.0.1",
"Port": 5006,
"Check": {
"HTTP": "http://localhost:5006/receivers",
"Interval": "15s"
}
}
......@@ -23,7 +23,5 @@ func ReadConfig(fname string) (log.Level, error) {
if err := settings.Validate(); err != nil {
return log.FatalLevel,err
}
return log.LevelFromString(settings.LogLevel)
}
......@@ -9,9 +9,11 @@
namespace asapo {
const std::string ReceiverDiscoveryService::kServiceEndpointSuffix = "/discovery/receivers";
ReceiverDiscoveryService::ReceiverDiscoveryService(std::string endpoint, uint64_t update_frequency_ms): httpclient__{DefaultHttpClient()},
log__{GetDefaultProducerLogger()},
endpoint_{std::move(endpoint) + "/receivers"}, update_frequency_ms_{update_frequency_ms} {
endpoint_{std::move(endpoint) + kServiceEndpointSuffix}, update_frequency_ms_{update_frequency_ms} {
}
......
......@@ -28,6 +28,7 @@ class ReceiverDiscoveryService {
std::unique_ptr<HttpClient> httpclient__;
AbstractLogger* log__;
private:
static const std::string kServiceEndpointSuffix;
void ThreadHandler();
Error UpdateFromEndpoint(ReceiversList* list, uint64_t* max_connections);
Error ParseResponse(const std::string& responce, ReceiversList* list, uint64_t* max_connections);
......
......@@ -70,7 +70,7 @@ Error RequestHandlerTcp::TrySendToReceiver(const Request* request) {
}
log__->Debug(std::string("successfully sent data ") + " id: " + std::to_string(request->header.data_id) + " to " +
connected_receiver_uri_);
connected_receiver_uri_);
return nullptr;
}
......@@ -94,7 +94,7 @@ bool RequestHandlerTcp::UpdateReceiversList() {
bool RequestHandlerTcp::TimeToUpdateReceiverList() {
uint64_t elapsed_ms = std::chrono::duration_cast<std::chrono::milliseconds>( high_resolution_clock::now() -
last_receivers_uri_update_).count();
last_receivers_uri_update_).count();
return elapsed_ms > discovery_service__->UpdateFrequency();
}
......
......@@ -48,7 +48,7 @@ class ReceiversStatusTests : public Test {
NiceMock<asapo::MockLogger> mock_logger;
NiceMock<MockHttpClient>* mock_http_client;
std::string expected_endpoint{"endpoint/receivers"};
std::string expected_endpoint{"endpoint/discovery/receivers"};
ReceiverDiscoveryService status{"endpoint", 20};
void SetUp() override {
......
......@@ -13,6 +13,7 @@ receiver_folder=/tmp/asapo/receiver/files
Cleanup() {
echo cleanup
rm -rf ${receiver_folder}
nomad stop nginx
nomad stop receiver
nomad stop discovery
nomad stop broker
......@@ -24,6 +25,7 @@ Cleanup() {
influx -execute "create database ${monitor_database_name}"
echo "db.${broker_database_name}.insert({dummy:1})" | mongo ${broker_database_name}
nomad run nginx.nmd
nomad run receiver.nmd
nomad run discovery.nmd
nomad run broker.nmd
......@@ -32,7 +34,7 @@ sleep 1
#producer
mkdir -p ${receiver_folder}
$1 localhost:5006 100 1000 4 0 &
$1 localhost:8400 100 1000 4 0 &
#producerid=`echo $!`
......
......@@ -7,12 +7,13 @@ echo db.%broker_database_name%.insert({dummy:1}) | %mongo_exe% %broker_database_
c:\opt\consul\nomad run receiver.nmd
c:\opt\consul\nomad run discovery.nmd
c:\opt\consul\nomad run broker.nmd
c:\opt\consul\nomad run nginx.nmd
ping 1.0.0.0 -n 10 -w 100 > nul
REM producer
mkdir %receiver_folder%
start /B "" "%1" localhost:5006 100 1000 4 0
start /B "" "%1" localhost:8400 100 1000 4 0
ping 1.0.0.0 -n 1 -w 100 > nul
REM worker
......@@ -30,6 +31,7 @@ exit /b 1
c:\opt\consul\nomad stop receiver
c:\opt\consul\nomad stop discovery
c:\opt\consul\nomad stop broker
c:\opt\consul\nomad stop nginx
rmdir /S /Q %receiver_folder%
echo db.dropDatabase() | %mongo_exe% %broker_database_name%
......
......@@ -12,6 +12,7 @@ Cleanup() {
influx -execute "drop database ${database_name}"
nomad stop receiver
nomad stop discovery
nomad stop nginx
echo "db.dropDatabase()" | mongo ${mongo_database_name}
rm -rf ${receiver_folder}
}
......@@ -22,10 +23,11 @@ influx -execute "create database ${database_name}"
nomad run receiver.nmd
nomad run discovery.nmd
nomad run nginx.nmd
sleep 1
$1 localhost:5006 100 112 4 0
$1 localhost:8400 100 112 4 0
sleep 1
......
......@@ -13,6 +13,7 @@ Cleanup() {
rm -rf ${receiver_folder}
nomad stop receiver
nomad stop discovery
nomad stop nginx
echo "db.dropDatabase()" | mongo ${mongo_database_name}
influx -execute "drop database ${database_name}"
}
......@@ -20,12 +21,13 @@ Cleanup() {
influx -execute "create database ${database_name}"
echo "db.${mongo_database_name}.insert({dummy:1})" | mongo ${mongo_database_name}
nomad run nginx.nmd
nomad run receiver.nmd
nomad run discovery.nmd
mkdir -p ${receiver_folder}
$1 localhost:5006 100 1 1 0
$1 localhost:8400 100 1 1 0
ls -ln ${receiver_folder}/1.bin | awk '{ print $5 }'| grep 102400
......@@ -7,12 +7,13 @@ echo db.%database_name%.insert({dummy:1})" | %mongo_exe% %database_name%
c:\opt\consul\nomad run receiver.nmd
c:\opt\consul\nomad run discovery.nmd
c:\opt\consul\nomad run nginx.nmd
ping 1.0.0.0 -n 1 -w 100 > nul
mkdir %receiver_folder%
%1 localhost:5006 100 1 1 0
%1 localhost:8400 100 1 1 0
ping 1.0.0.0 -n 1 -w 100 > nul
......@@ -28,6 +29,7 @@ exit /b 1
:clean
c:\opt\consul\nomad stop receiver
c:\opt\consul\nomad stop discovery
c:\opt\consul\nomad stop nginx
rmdir /S /Q %receiver_folder%
echo db.dropDatabase() | %mongo_exe% %database_name%
......
......@@ -13,10 +13,14 @@ http {
# keepalive_timeout 0;
# keepalive_timeout 65;
resolver 127.0.0.1:8600 valid=1s;
server {
listen 8080;
location /discovery/ {
proxy_pass http://discovery.service.consul:5006/;
listen 8400;
set $discovery_endpoint discovery.service.asapo;
location /discovery/ {
rewrite ^/discovery(/.*) $1 break;
proxy_pass http://$discovery_endpoint:5006$uri;
}
}
}
......@@ -56,7 +56,7 @@ std::string GetIDFromJson(const std::string& json_string, Error* err) {
return std::to_string(id);
}
void ServerDataBroker::ProcessServerError(Error* err,const std::string& response,std::string* redirect_uri) {
void ServerDataBroker::ProcessServerError(Error* err, const std::string& response, std::string* redirect_uri) {
if ((*err)->GetErrorType() != asapo::ErrorType::kEndOfFile) {
(*err)->Append(response);
return;
......@@ -69,11 +69,11 @@ void ServerDataBroker::ProcessServerError(Error* err,const std::string& response
*redirect_uri = server_uri_ + "/database/" + source_name_ + "/" + id;
}
}
*err=nullptr;
*err = nullptr;
return;
}
Error ServerDataBroker::ProcessRequest(std::string* response,std::string request_uri) {
Error ServerDataBroker::ProcessRequest(std::string* response, std::string request_uri) {
Error err;
HttpCode code;
*response = httpclient__->Get(request_uri, &code, &err);
......@@ -88,12 +88,12 @@ Error ServerDataBroker::GetFileInfoFromServer(FileInfo* info, const std::string&
uint64_t elapsed_ms = 0;
std::string response;
while (true) {
auto err = ProcessRequest(&response,request_uri);
auto err = ProcessRequest(&response, request_uri);
if (err == nullptr) {
break;
}
ProcessServerError(&err,response,&request_uri);
ProcessServerError(&err, response, &request_uri);
if (err != nullptr) {
return err;
}
......
......@@ -20,8 +20,8 @@ class ServerDataBroker final : public asapo::DataBroker {
std::unique_ptr<HttpClient> httpclient__;
private:
Error GetFileInfoFromServer(FileInfo* info, const std::string& operation);
void ProcessServerError(Error* err,const std::string& response,std::string* redirect_uri);
Error ProcessRequest(std::string* response,std::string request_uri);
void ProcessServerError(Error* err, const std::string& response, std::string* redirect_uri);
Error ProcessRequest(std::string* response, std::string request_uri);
std::string server_uri_;
std::string source_name_;
uint64_t timeout_ms_ = 0;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment