Skip to content
Snippets Groups Projects
Commit a4737bcd authored by Sergey Yakubov's avatar Sergey Yakubov
Browse files

refactor producer

parent b78ce272
No related branches found
No related tags found
No related merge requests found
#include <iostream>
#include <chrono>
#include <vector>
#include <tuple>
#include <mutex>
#include <thread>
......
......@@ -47,6 +47,13 @@ Error ReceiverDiscoveryService::UpdateFromEndpoint(ReceiversList* list, uint64_t
}
void ReceiverDiscoveryService::LogUriList(const ReceiversList& uris) {
std::string s;
s = std::accumulate(std::begin(uris), std::end(uris), s);
log__->Debug("got receivers from " + endpoint_ + ":" + s);
}
void ReceiverDiscoveryService::ThreadHandler() {
std::unique_lock<std::mutex> lock(mutex_);
do {
......@@ -59,9 +66,7 @@ void ReceiverDiscoveryService::ThreadHandler() {
lock.lock();
continue;
}
std::string s;
s = std::accumulate(std::begin(uris), std::end(uris), s);
log__->Debug("got receivers from " + endpoint_ + ":" + s);
LogUriList(uris);
lock.lock();
max_connections_ = max_connections;
uri_list_ = uris;
......
......@@ -31,6 +31,7 @@ class ReceiverDiscoveryService {
void ThreadHandler();
Error UpdateFromEndpoint(ReceiversList* list, uint64_t* max_connections);
Error ParseResponse(const std::string& responce, ReceiversList* list, uint64_t* max_connections);
void LogUriList(const ReceiversList& uris);
std::string endpoint_;
std::thread thread_;
std::condition_variable condition_;
......
......@@ -7,15 +7,12 @@
namespace asapo {
RequestHandlerFilesystem::RequestHandlerFilesystem(std::string destination_folder, uint64_t thread_id):
io__{GenerateDefaultIO()}, log__{GetDefaultProducerLogger()}, destination_folder_{std::move(destination_folder)},
thread_id_{thread_id} {
}
Error RequestHandlerFilesystem::ProcessRequestUnlocked(const Request* request) {
std::string fullpath = destination_folder_ + "/" + request->header.file_name + ".bin";
auto err = io__->WriteDataToFile(fullpath, (uint8_t*)request->data, request->header.data_size);
......
......@@ -22,10 +22,11 @@ Error RequestHandlerTcp::ConnectToReceiver(const std::string& receiver_address)
return err;
}
log__->Info("connected to receiver at " + receiver_address);
connected_receiver_uri_ = receiver_address;
return nullptr;
}
Error RequestHandlerTcp::SendHeaderAndData(const Request* request, const std::string& receiver_address) {
Error RequestHandlerTcp::SendHeaderAndData(const Request* request) {
Error io_error;
io__->Send(sd_, &(request->header), sizeof(request->header), &io_error);
if(io_error) {
......@@ -40,7 +41,7 @@ Error RequestHandlerTcp::SendHeaderAndData(const Request* request, const std::st
return nullptr;
}
Error RequestHandlerTcp::ReceiveResponse(const std::string& receiver_address) {
Error RequestHandlerTcp::ReceiveResponse() {
Error err;
SendDataResponse sendDataResponse;
io__->Receive(sd_, &sendDataResponse, sizeof(sendDataResponse), &err);
......@@ -57,69 +58,93 @@ Error RequestHandlerTcp::ReceiveResponse(const std::string& receiver_address) {
return nullptr;
}
Error RequestHandlerTcp::TrySendToReceiver(const Request* request, const std::string& receiver_address) {
auto err = SendHeaderAndData(request, receiver_address);
Error RequestHandlerTcp::TrySendToReceiver(const Request* request) {
auto err = SendHeaderAndData(request);
if (err) {
return err;
}
err = ReceiveResponse(receiver_address);
err = ReceiveResponse();
if (err) {
return err;
}
log__->Debug(std::string("successfully sent data ") + " id: " + std::to_string(request->header.data_id) + " to " +
receiver_address);
connected_receiver_uri_);
return nullptr;
}
void RequestHandlerTcp::UpdateReceiversUriIfNewConnection() {
if (sd_ != kDisconnectedSocketDescriptor)
void RequestHandlerTcp::UpdateIfNewConnection() {
if (Connected())
return;
receivers_list_ = discovery_service__->RotatedUriList(thread_id_);
last_receivers_uri_update_ = high_resolution_clock::now();
UpdateReceiversList();
(*ncurrent_connections_)++;
}
bool RequestHandlerTcp::CheckForRebalance() {
if (sd_ == kDisconnectedSocketDescriptor)
bool RequestHandlerTcp::UpdateReceiversList() {
auto thread_receivers_new = discovery_service__->RotatedUriList(thread_id_);
last_receivers_uri_update_ = high_resolution_clock::now();
if (thread_receivers_new != receivers_list_) {
receivers_list_ = thread_receivers_new;
return true;
}
return false;
}
bool RequestHandlerTcp::TimeToUpdateReceiverList() {
uint64_t elapsed_ms = std::chrono::duration_cast<std::chrono::milliseconds>( high_resolution_clock::now() -
last_receivers_uri_update_).count();
return elapsed_ms > discovery_service__->UpdateFrequency();
}
bool RequestHandlerTcp::Disconnected() {
return !Connected();
}
bool RequestHandlerTcp::NeedRebalance() {
if (Disconnected())
return false;
auto now = high_resolution_clock::now();
uint64_t elapsed_ms = std::chrono::duration_cast<std::chrono::milliseconds>( now -
last_receivers_uri_update_).count();
bool rebalance = false;
if (elapsed_ms > discovery_service__->UpdateFrequency()) {
auto thread_receivers_new = discovery_service__->RotatedUriList(thread_id_);
last_receivers_uri_update_ = now;
if (thread_receivers_new != receivers_list_) {
receivers_list_ = thread_receivers_new;
rebalance = true;
}
if (TimeToUpdateReceiverList()) {
return UpdateReceiversList();
}
return rebalance;
return false;
}
void RequestHandlerTcp::CloseConnectionToPeformRebalance() {
io__->CloseSocket(sd_, nullptr);
log__->Info("rebalancing");
sd_ = kDisconnectedSocketDescriptor;
}
void RequestHandlerTcp::Disconnect() {
io__->CloseSocket(sd_, nullptr);
sd_ = kDisconnectedSocketDescriptor;
log__->Debug("disconnected from " + connected_receiver_uri_);
connected_receiver_uri_.clear();
}
bool RequestHandlerTcp::ServerError(const Error& err) {
return err != nullptr && err != ProducerErrorTemplates::kFileIdAlreadyInUse;
}
Error RequestHandlerTcp::ProcessRequestUnlocked(const Request* request) {
bool rebalance = CheckForRebalance();
if (rebalance && sd_ != kDisconnectedSocketDescriptor) {
io__->CloseSocket(sd_, nullptr);
log__->Info("rebalancing");
sd_ = kDisconnectedSocketDescriptor;
if (NeedRebalance()) {
CloseConnectionToPeformRebalance();
}
for (auto receiver_uri : receivers_list_) {
if (sd_ == kDisconnectedSocketDescriptor) {
if (Disconnected()) {
auto err = ConnectToReceiver(receiver_uri);
if (err != nullptr ) continue;
}
auto err = TrySendToReceiver(request, receiver_uri);
if (err != nullptr && err != ProducerErrorTemplates::kFileIdAlreadyInUse) {
io__->CloseSocket(sd_, nullptr);
sd_ = kDisconnectedSocketDescriptor;
auto err = TrySendToReceiver(request);
if (ServerError(err)) {
Disconnect();
log__->Debug("cannot send data to " + receiver_uri + ": " + err->Explain());
log__->Debug("disconnected from " + receiver_uri);
continue;
}
......@@ -131,7 +156,7 @@ Error RequestHandlerTcp::ProcessRequestUnlocked(const Request* request) {
return ProducerErrorTemplates::kCannotSendDataToReceivers.Generate();
}
bool RequestHandlerTcp::IsConnected() {
bool RequestHandlerTcp::Connected() {
return sd_ != kDisconnectedSocketDescriptor;
}
......@@ -140,11 +165,11 @@ bool RequestHandlerTcp::CanCreateNewConnections() {
}
bool RequestHandlerTcp::ReadyProcessRequest() {
return IsConnected() || CanCreateNewConnections();
return Connected() || CanCreateNewConnections();
}
void RequestHandlerTcp::PrepareProcessingRequestLocked() {
UpdateReceiversUriIfNewConnection();
UpdateIfNewConnection();
}
void RequestHandlerTcp::TearDownProcessingRequestLocked(const Error& error_from_process) {
......
......@@ -30,18 +30,25 @@ class RequestHandlerTcp: public RequestHandler {
ReceiverDiscoveryService* discovery_service__;
private:
Error ConnectToReceiver(const std::string& receiver_address);
Error SendHeaderAndData(const Request*, const std::string& receiver_address);
Error ReceiveResponse(const std::string& receiver_address);
Error TrySendToReceiver(const Request* request, const std::string& receiver_address);
Error SendHeaderAndData(const Request*);
Error ReceiveResponse();
Error TrySendToReceiver(const Request* request);
SocketDescriptor sd_{kDisconnectedSocketDescriptor};
void UpdateReceiversUriIfNewConnection();
bool CheckForRebalance();
void UpdateIfNewConnection();
bool UpdateReceiversList();
bool TimeToUpdateReceiverList();
bool NeedRebalance();
void CloseConnectionToPeformRebalance();
bool Disconnected();
void Disconnect();
bool ServerError(const Error& err);
ReceiversList receivers_list_;
high_resolution_clock::time_point last_receivers_uri_update_;
bool IsConnected();
bool Connected();
bool CanCreateNewConnections();
uint64_t thread_id_;
uint64_t* ncurrent_connections_;
std::string connected_receiver_uri_;
};
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment