Skip to content
Snippets Groups Projects
Commit 88f81380 authored by Sergey Yakubov's avatar Sergey Yakubov
Browse files

fix thread unsafe calls in io

parent e511a295
No related branches found
No related tags found
No related merge requests found
Showing
with 81 additions and 22 deletions
......@@ -18,6 +18,7 @@ enum class IOErrorType {
kAddressAlreadyInUse,
kConnectionRefused,
kConnectionResetByPeer,
kUnreachableNetwork,
kTimeout,
kFileAlreadyExists,
kNoSpaceLeft,
......@@ -51,6 +52,11 @@ auto const kBadFileNumber = IOErrorTemplate {
auto const kResourceTemporarilyUnavailable = IOErrorTemplate {
"Resource temporarily unavailable", IOErrorType::kResourceTemporarilyUnavailable
};
auto const kUnreachableNetwork = IOErrorTemplate {
"Network is unreachable", IOErrorType::kUnreachableNetwork
};
auto const kPermissionDenied = IOErrorTemplate {
"Permission denied", IOErrorType::kPermissionDenied
};
......
......@@ -236,14 +236,15 @@ Error MongoDBClient::InsertAsSubset(const FileInfo& file,
if (err) {
return err;
}
auto query = BCON_NEW ("$and","[","{","_id", BCON_INT64(subset_id),"}","{","images._id","{","$ne",BCON_INT64(file.id),"}","}","]");
auto query = BCON_NEW ("$and", "[", "{", "_id", BCON_INT64(subset_id), "}", "{", "images._id", "{", "$ne",
BCON_INT64(file.id), "}", "}", "]");
auto update = BCON_NEW ("$setOnInsert", "{",
"size", BCON_INT64 (subset_size),
"}",
"$addToSet", "{",
"images", BCON_DOCUMENT(document.get()), "}");
err = AddBsonDocumentToArray(query, update,ignore_duplicates);
err = AddBsonDocumentToArray(query, update, ignore_duplicates);
bson_destroy (query);
bson_destroy (update);
......
#include <request/request_pool.h>
#include <request/request_pool.h>
#include "request/request_pool.h"
namespace asapo {
......
......@@ -290,10 +290,13 @@ std::unique_ptr<sockaddr_in> SystemIO::BuildSockaddrIn(const std::string& addres
std::string host;
uint16_t port = 0;
std::tie(host, port) = *hostname_port_tuple;
host = ResolveHostnameToIp(host, err);
if (*err != nullptr) {
return nullptr;
}
// this is not thread safe call we should not resolve hostname here - we actually already have ip in address.
// todo: remove this
// host = ResolveHostnameToIp(host, err);
// if (*err != nullptr) {
// return nullptr;
// }
short family = AddressFamilyToPosixFamily(AddressFamilies::INET);
if (family == -1) {
......
......@@ -35,6 +35,8 @@ Error GetLastErrorFromErrno() {
return IOErrorTemplates::kBadFileNumber.Generate();
case EAGAIN:
return IOErrorTemplates::kResourceTemporarilyUnavailable.Generate();
case ENETUNREACH:
return IOErrorTemplates::kUnreachableNetwork.Generate();
case ENOENT:
case ENOTDIR:
return IOErrorTemplates::kFileNotFound.Generate();
......
......@@ -50,7 +50,7 @@ RUN cd /var/run/asapo asapo && terraform init
COPY asapo-* /usr/bin/
COPY *.sh /tmp/asapo_runscripts/
COPY *.sh asapo_overwrite_vars.tfvars /tmp/asapo_runscripts/
COPY *.py /etc/asapo/
COPY *.hcl.tpl /etc/asapo/
......
......@@ -71,7 +71,7 @@ void ProcessCommandArguments(int argc, char* argv[], Args* args) {
std::cout <<
"Usage: " << argv[0] <<
" <destination> <beamtime_id[%<stream>%<token>]> <number_of_byte> <iterations> <nthreads>"
" <mode 0 -t tcp, 1 - filesystem> <timeout (sec)> [n images in set (default 1)]"
" <mode x0 -t tcp, x1 - filesystem, 0x - write files, 1x - do not write files> <timeout (sec)> [n images in set (default 1)]"
<< std::endl;
exit(EXIT_FAILURE);
}
......@@ -127,7 +127,7 @@ asapo::FileData CreateMemoryBuffer(size_t size) {
bool SendDummyData(asapo::Producer* producer, size_t number_of_byte, uint64_t iterations, uint64_t images_in_set,
const std::string& stream) {
const std::string& stream, bool write_files) {
asapo::Error err;
if (iterations > 0) { // send wrong meta, for negative integration tests
......@@ -149,7 +149,8 @@ bool SendDummyData(asapo::Producer* producer, size_t number_of_byte, uint64_t it
}
event_header.user_metadata = std::move(meta);
if (images_in_set == 1) {
auto err = producer->SendData(event_header, std::move(buffer), asapo::kDefaultIngestMode, &ProcessAfterSend);
auto err = producer->SendData(event_header, std::move(buffer), write_files ? asapo::kDefaultIngestMode :
asapo::kTransferData, &ProcessAfterSend);
if (err) {
std::cerr << "Cannot send file: " << err << std::endl;
return false;
......@@ -165,7 +166,8 @@ bool SendDummyData(asapo::Producer* producer, size_t number_of_byte, uint64_t it
event_header.file_name = stream + "/" + event_header.file_name;
}
event_header.user_metadata = meta;
auto err = producer->SendData(event_header, std::move(buffer), asapo::kDefaultIngestMode, &ProcessAfterSend);
auto err = producer->SendData(event_header, std::move(buffer), write_files ? asapo::kDefaultIngestMode :
asapo::kTransferData, &ProcessAfterSend);
if (err) {
std::cerr << "Cannot send file: " << err << std::endl;
return false;
......@@ -179,7 +181,7 @@ bool SendDummyData(asapo::Producer* producer, size_t number_of_byte, uint64_t it
std::unique_ptr<asapo::Producer> CreateProducer(const Args& args) {
asapo::Error err;
auto producer = asapo::Producer::Create(args.receiver_address, args.nthreads,
args.mode == 0 ? asapo::RequestHandlerType::kTcp : asapo::RequestHandlerType::kFilesystem,
args.mode % 10 == 0 ? asapo::RequestHandlerType::kTcp : asapo::RequestHandlerType::kFilesystem,
asapo::SourceCredentials{args.beamtime_id, args.stream, args.token }, &err);
if(err) {
std::cerr << "Cannot start producer. ProducerError: " << err << std::endl;
......@@ -231,7 +233,8 @@ int main (int argc, char* argv[]) {
system_clock::time_point start_time = system_clock::now();
if(!SendDummyData(producer.get(), args.number_of_bytes, args.iterations, args.images_in_set, args.stream)) {
if(!SendDummyData(producer.get(), args.number_of_bytes, args.iterations, args.images_in_set, args.stream,
args.mode / 10 == 0)) {
return EXIT_FAILURE;
}
......
......@@ -46,7 +46,7 @@ int main(int argc, char* argv[]) {
fi.source = "host:1234";
fi.id = 1;
uint64_t subset_size=2;
uint64_t subset_size = 2;
if (args.keyword != "Notconnected") {
db.Connect("127.0.0.1", "data", "test");
......
......@@ -158,7 +158,7 @@ int main(int argc, char* argv[]) {
std::cout << "[META] Check unknown host" << std::endl;
io->CreateAndConnectIPTCPSocket("some-host-that-might-not-exists.aa:1234", &err);
if(asapo::IOErrorTemplates::kUnableToResolveHostname != err) {
if(err != asapo::IOErrorTemplates::kUnreachableNetwork) {
ExitIfErrIsNotOk(&err, 303);
}
......
{
"Mode": "static",
"Receiver": {
"StaticEndpoints": [
"localhost:22001"
],
"MaxConnections": 32
},
"Broker": {
"StaticEndpoint": "localhost:5005"
},
"Mongo": {
"StaticEndpoint": "127.0.0.1:27017"
},
"Port": 5900,
"LogLevel":"debug"
}
......@@ -9,6 +9,9 @@
"Broker": {
"StaticEndpoint": "localhost:5005"
},
"Mongo": {
"StaticEndpoint": "127.0.0.1:27017"
},
"Port": {{ env "NOMAD_PORT_discovery" }},
"LogLevel":"debug"
}
......
job "nginx_kill" {
datacenters = ["dc1"]
type = "batch"
group "nginx_kill" {
count = 1
task "nginx_kill" {
driver = "raw_exec"
config {
command = "killall",
args = ["nginx"]
}
}
}
}
......@@ -2,13 +2,13 @@
beamtime_id=asapo_test
nfiles=10
nfiles=1000
timeout=100
fsize=100
mode=0 #tcp
nthreads=4
fsize=10000
mode=10 #tcp & no file write
nthreads=32
exec=/home/yakubov/projects/asapo/cmake-build-debug/examples/producer/dummy-data-producer/dummy-data-producer
$exec localhost:8400 ${beamtime_id} $fsize $nfiles $nthreads $mode $timeout
\ No newline at end of file
$exec localhost:8400 ${beamtime_id} $fsize $nfiles $nthreads $mode $timeout
#!/usr/bin/env bash
exec=/home/yakubov/projects/asapo/cmake-build-debug/receiver/receiver
$exec receiver.json
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment