Skip to content
Snippets Groups Projects
Commit 70a33dfe authored by Sergey Yakubov's avatar Sergey Yakubov
Browse files

finished mem buffer from server side

parent 59dec5e8
No related branches found
No related tags found
No related merge requests found
Showing
with 136 additions and 45 deletions
......@@ -15,6 +15,8 @@ class FileInfo {
std::chrono::system_clock::time_point modify_date;
uint64_t size{0};
uint64_t id{0};
std::string source;
uint64_t buf_id{0};
std::string Json() const;
bool SetFromJson(const std::string& json_string);
std::string FullName(const std::string& base_path);
......
......@@ -2,20 +2,25 @@
#include "json_parser/json_parser.h"
#include <iostream>
namespace asapo {
std::string FileInfo::Json() const {
auto nanoseconds_from_epoch = std::chrono::time_point_cast<std::chrono::nanoseconds>(modify_date).
time_since_epoch().count();
int64_t buf_id_int = static_cast<int64_t>(buf_id);
std::string s = "{\"_id\":" + std::to_string(id) + ","
"\"size\":" + std::to_string(size) + ","
"\"name\":\"" + name + "\","
"\"lastchange\":" + std::to_string(nanoseconds_from_epoch) + "}";
"\"lastchange\":" + std::to_string(nanoseconds_from_epoch) + ","
"\"source\":\"" + source + "\","
"\"buf_id\":" + std::to_string(buf_id_int)
+ "}";
return s;
}
bool TimeFromJson(const JsonStringParser& parser, const std::string name, std::chrono::system_clock::time_point* val) {
uint64_t nanoseconds_from_epoch;
if (parser.GetUInt64(name, &nanoseconds_from_epoch)) {
......@@ -39,6 +44,8 @@ bool FileInfo::SetFromJson(const std::string& json_string) {
if (parser.GetUInt64("_id", &id) ||
parser.GetUInt64("size", &size) ||
parser.GetString("name", &name) ||
parser.GetString("source", &source) ||
parser.GetUInt64("buf_id", &buf_id) ||
!TimeFromJson(parser, "lastchange", &modify_date)) {
*this = old;
return false;
......
......@@ -93,11 +93,12 @@ void MongoDBClient::CleanUp() {
}
bson_p PrepareBsonDocument(const FileInfo& file, Error* err) {
bson_error_t mongo_err;
auto s = file.Json();
auto json = reinterpret_cast<const uint8_t*>(s.c_str());
auto bson = bson_new_from_json(json, -1, nullptr);
auto bson = bson_new_from_json(json, -1, &mongo_err);
if (!bson) {
*err = TextError(DBError::kInsertError);
*err = TextError(std::string(DBError::kInsertError) + ": " + mongo_err.message);
return nullptr;
}
......
......@@ -3,13 +3,15 @@
using namespace rapidjson;
#include <iostream>
namespace asapo {
RapidJson::RapidJson(const std::string& json, const std::unique_ptr<IO>* io): io__{io}, json_{json} {
RapidJson::RapidJson(const std::string& json, const std::unique_ptr<IO>* io) : io__{io}, json_{json} {
}
Error RapidJson::LazyInitialize()const noexcept {
Error RapidJson::LazyInitialize() const noexcept {
if (embedded_error_) {
return TextError(embedded_error_->Explain());
}
......@@ -26,7 +28,7 @@ Error RapidJson::LazyInitialize()const noexcept {
}
}
if ( doc_.Parse(str.c_str()).HasParseError()) {
if (doc_.Parse(str.c_str()).HasParseError()) {
return TextError("Cannot parse document");
}
......@@ -46,6 +48,9 @@ asapo::Error RapidJson::CheckValueType(const std::string& name, ValueType type,
res = val->IsString();
break;
case ValueType::kUint64:
res = val->IsUint64();
break;
case ValueType::kInt64:
res = val->IsInt64();
break;
case ValueType::kBool:
......@@ -62,27 +67,46 @@ asapo::Error RapidJson::CheckValueType(const std::string& name, ValueType type,
return nullptr;
}
asapo::Error RapidJson::GetValuePointer(const std::string& name, ValueType type, Value** val)const noexcept {
asapo::Error RapidJson::GetValuePointer(const std::string& name, ValueType type, Value** val) const noexcept {
if (Error err = LazyInitialize()) {
return err;
}
auto iterator = object_p_->FindMember(name.c_str());
if (iterator == object_p_->MemberEnd()) {
return TextError("cannot find: " + name);
return TextError("cannot find: " + name);
}
*val = &iterator->value;
*val = &iterator->value;
return CheckValueType(name, type, *val);
}
Error RapidJson::GetInt64(const std::string& name, int64_t* val) const noexcept {
Value* json_val;
if (Error err = GetValuePointer(name, ValueType::kInt64, &json_val)) {
return err;
}
*val = json_val->GetInt64();
return nullptr;
}
Error RapidJson::GetUInt64(const std::string& name, uint64_t* val) const noexcept {
int64_t val_int64;
Error err = GetInt64(name, &val_int64);
if (!initialized_) {
return err;
}
if (err == nullptr) {
*val = static_cast<uint64_t>(val_int64);
return nullptr;
}
Value* json_val;
if (Error err = GetValuePointer(name, ValueType::kUint64, &json_val)) {
return err;
}
*val = json_val->GetInt64();
*val = json_val->GetUint64();
return nullptr;
}
......@@ -104,7 +128,6 @@ Error RapidJson::GetString(const std::string& name, std::string* val) const noex
return nullptr;
}
Error RapidJson::GetArrayUInt64(const std::string& name, std::vector<uint64_t>* val) const noexcept {
Value* json_val;
if (Error err = GetValuePointer(name, ValueType::kArray, &json_val)) {
......@@ -113,10 +136,10 @@ Error RapidJson::GetArrayUInt64(const std::string& name, std::vector<uint64_t>*
val->clear();
for (auto& v : json_val->GetArray()) {
if (!v.IsInt64()) {
if (!v.IsUint64()) {
return TextError("wrong type of array element: " + name);
}
val->push_back(v.GetInt());
val->push_back(v.GetUint64());
}
return nullptr;
......@@ -148,5 +171,4 @@ RapidJson::RapidJson(const RapidJson& parent, const std::string& subname) {
initialized_ = true;
}
}
\ No newline at end of file
......@@ -10,6 +10,7 @@ namespace asapo {
enum class ValueType {
kUint64,
kInt64,
kString,
kObject,
kArray,
......@@ -26,6 +27,7 @@ class RapidJson {
Error GetArrayUInt64(const std::string& name, std::vector<uint64_t>* val) const noexcept;
Error GetArrayString(const std::string& name, std::vector<std::string>* val) const noexcept;
private:
Error GetInt64(const std::string& name, int64_t* val) const noexcept;
const std::unique_ptr<IO>* io__;
mutable rapidjson::Document doc_;
mutable rapidjson::Value object_;
......
......@@ -20,20 +20,34 @@ using ::testing::SetArgPointee;
namespace {
uint64_t big_uint = 18446744073709551615ull;
int64_t big_int = -9223372036854775807ll - 1;
FileInfo PrepareFileInfo() {
FileInfo finfo;
finfo.size = 100;
finfo.id = 1;
finfo.name = "name";
finfo.source = "host:1234";
finfo.buf_id = big_uint;
finfo.modify_date = std::chrono::time_point<std::chrono::system_clock>(std::chrono::milliseconds(1));
return finfo;
}
TEST(FileInFo, Defaults) {
FileInfo finfo;
ASSERT_THAT(finfo.buf_id, Eq(0));
ASSERT_THAT(finfo.id, Eq(0));
}
TEST(FileInFo, CorrectConvertToJson) {
auto finfo = PrepareFileInfo();
std::string json = finfo.Json();
ASSERT_THAT(json, Eq(
R"({"_id":1,"size":100,"name":"name","lastchange":1000000})"));
R"({"_id":1,"size":100,"name":"name","lastchange":1000000,"source":"host:1234","buf_id":-1})"));
}
TEST(FileInFo, CorrectConvertFromJsonReturnsError) {
......@@ -57,11 +71,15 @@ TEST(FileInFo, CorrectConvertFromJson) {
FileInfo result;
auto ok = result.SetFromJson(json);
ASSERT_THAT(ok, Eq(true));
ASSERT_THAT(result.id, Eq(finfo.id));
ASSERT_THAT(result.name, Eq(finfo.name));
ASSERT_THAT(result.size, Eq(finfo.size));
ASSERT_THAT(result.modify_date, Eq(finfo.modify_date));
ASSERT_THAT(ok, Eq(true));
ASSERT_THAT(result.buf_id, Eq(finfo.buf_id));
ASSERT_THAT(result.source, Eq(finfo.source));
}
......
......@@ -118,7 +118,7 @@ void WaitThreadsFinished(const Args& args) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
elapsed_ms += 100;
if (elapsed_ms > args.timeout_sec * 1000) {
std::cerr << "Exit on timeout " << std::endl;
std::cerr << "Producer exit on timeout " << std::endl;
exit(EXIT_FAILURE);
}
}
......
......@@ -21,7 +21,7 @@ nomad run broker.nmd
for i in `seq 1 3`;
do
echo 'db.data.insert({"_id":'$i',"size":100,"name":"'$i'","lastchange":1})' | mongo ${database_name}
echo 'db.data.insert({"_id":'$i',"size":100,"name":"'$i'","lastchange":1,"source":"none","buf_id":0})' | mongo ${database_name}
done
sleep 1
......
......@@ -8,7 +8,7 @@ c:\opt\consul\nomad run nginx.nmd
ping 1.0.0.0 -n 10 -w 100 > nul
for /l %%x in (1, 1, 3) do echo db.data.insert({"_id":%%x,"size":100,"name":"%%x","lastchange":1}) | %mongo_exe% %database_name% || goto :error
for /l %%x in (1, 1, 3) do echo db.data.insert({"_id":%%x,"size":100,"name":"%%x","lastchange":1,"source":"none","buf_id":0}) | %mongo_exe% %database_name% || goto :error
"%1" 127.0.0.1:8400 %database_name% 1 %token_test_run% 1000 | findstr /c:"Processed 3 file" || goto :error
......
......@@ -118,7 +118,7 @@ void Request::SetBeamline(std::string beamline) {
const std::string& Request::GetBeamline() const {
return beamline_;
}
uint64_t Request::GetSlotId() {
uint64_t Request::GetSlotId() const {
return slot_id_;
}
......
......@@ -41,7 +41,7 @@ class Request {
VIRTUAL const std::string& GetBeamline() const;
std::unique_ptr<IO> io__;
DataCache* cache__ = nullptr;
VIRTUAL uint64_t GetSlotId();
VIRTUAL uint64_t GetSlotId() const;
private:
Error PrepareDataBuffer();
Error ReceiveData();
......
......@@ -7,6 +7,15 @@
namespace asapo {
template<typename ... Args>
std::string string_format( const std::string& format, Args ... args ) {
size_t size = snprintf( nullptr, 0, format.c_str(), args ... ) + 1;
std::unique_ptr<char[]> buf( new char[ size ] );
snprintf( buf.get(), size, format.c_str(), args ... );
return std::string( buf.get(), buf.get() + size - 1 );
}
Error RequestHandlerDbWrite::ProcessRequest(Request* request) const {
if (db_name_.empty()) {
db_name_ = request->GetBeamtimeId();
......@@ -16,18 +25,8 @@ Error RequestHandlerDbWrite::ProcessRequest(Request* request) const {
return err;
}
FileInfo file_info;
file_info.name = request->GetFileName();
file_info.size = request->GetDataSize();
file_info.id = request->GetDataID();
Error err;
err = db_client__->Insert(file_info, true);
if (!err) {
log__->Debug(std::string{"insert record id "} + std::to_string(file_info.id) + " to " + kDBCollectionName + " in " +
db_name_ +
" at " + GetReceiverConfig()->broker_db_uri);
}
return err;
return InsertRecordToDb(request);
}
RequestHandlerDbWrite::RequestHandlerDbWrite(): log__{GetDefaultReceiverLogger()} {
......@@ -52,4 +51,25 @@ Error RequestHandlerDbWrite::ConnectToDbIfNeeded() const {
return nullptr;
}
Error RequestHandlerDbWrite::InsertRecordToDb(const Request* request) const {
auto file_info = PrepareFileInfo(request);
auto err = db_client__->Insert(file_info, true);
if (!err) {
log__->Debug(std::string{"insert record id "} + std::to_string(file_info.id) + " to " + kDBCollectionName + " in " +
db_name_ +
" at " + GetReceiverConfig()->broker_db_uri);
}
return err;
}
FileInfo RequestHandlerDbWrite::PrepareFileInfo(const Request* request) const {
FileInfo file_info;
file_info.name = request->GetFileName();
file_info.size = request->GetDataSize();
file_info.id = request->GetDataID();
file_info.buf_id = request->GetSlotId();
file_info.source = GetReceiverConfig()->source_host + ":" + string_format("%ld", GetReceiverConfig()->listen_port);
return file_info;
}
}
......@@ -16,7 +16,9 @@ class RequestHandlerDbWrite final: public RequestHandler {
Error ProcessRequest(Request* request) const override;
std::unique_ptr<Database> db_client__;
const AbstractLogger* log__;
private:
private:
FileInfo PrepareFileInfo(const Request* request) const;
Error InsertRecordToDb(const Request* request) const;
Error ConnectToDbIfNeeded() const;
mutable bool connected_to_db = false;
mutable std::string db_name_;
......
......@@ -68,16 +68,16 @@ Error SetReceiverConfig (const ReceiverConfig& config, std::string error_field)
if (error_field == "SourceHost") {
EXPECT_CALL(mock_io, GetHostName_t(_)).
WillOnce(
WillOnce(
DoAll(SetArgPointee<0>(asapo::IOErrorTemplates::kUnknownIOError.Generate().release()),
Return("")
));
} else if (error_field == "none"){
));
} else if (error_field == "none") {
EXPECT_CALL(mock_io, GetHostName_t(_)).
WillOnce(
WillOnce(
DoAll(SetArgPointee<0>(nullptr),
Return(config.source_host)
));
));
}
auto err = config_factory.SetConfig("fname");
......
......@@ -52,6 +52,7 @@ class MockRequest: public Request {
MOCK_CONST_METHOD0(GetFileName, std::string());
MOCK_CONST_METHOD0(GetDataSize, uint64_t());
MOCK_CONST_METHOD0(GetDataID, uint64_t());
MOCK_CONST_METHOD0(GetSlotId, uint64_t());
MOCK_CONST_METHOD0(GetData, void* ());
MOCK_CONST_METHOD0(GetBeamtimeId, const std::string & ());
MOCK_CONST_METHOD0(GetBeamline, const std::string & ());
......
......@@ -61,6 +61,8 @@ class DbWriterHandlerTests : public Test {
ReceiverConfig config;
std::string expected_beamtime_id = "beamtime_id";
std::string expected_hostname = "host";
uint64_t expected_port = 1234;
uint64_t expected_buf_id = 18446744073709551615ull;
void SetUp() override {
GenericRequestHeader request_header;
request_header.data_id = 2;
......@@ -127,6 +129,8 @@ TEST_F(DbWriterHandlerTests, ProcessRequestDoesNotCallConnectSecondTime) {
MATCHER_P(CompareFileInfo, file, "") {
if (arg.size != file.size) return false;
if (arg.source != file.source) return false;
if (arg.buf_id != file.buf_id) return false;
if (arg.name != file.name) return false;
if (arg.id != file.id) return false;
......@@ -136,12 +140,20 @@ MATCHER_P(CompareFileInfo, file, "") {
TEST_F(DbWriterHandlerTests, CallsInsert) {
config.broker_db_uri = "127.0.0.1:27017";
config.source_host = expected_hostname;
config.listen_port = expected_port;
SetReceiverConfig(config, "none");
EXPECT_CALL(*mock_request, GetBeamtimeId())
.WillOnce(ReturnRef(expected_beamtime_id))
;
EXPECT_CALL(*mock_request, GetSlotId())
.WillOnce(Return(expected_buf_id))
;
EXPECT_CALL(mock_db, Connect_t(config.broker_db_uri, expected_beamtime_id, asapo::kDBCollectionName)).
WillOnce(testing::Return(nullptr));
......@@ -164,6 +176,8 @@ TEST_F(DbWriterHandlerTests, CallsInsert) {
file_info.size = expected_file_size;
file_info.name = expected_file_name;
file_info.id = expected_id;
file_info.buf_id = expected_buf_id;
file_info.source = expected_hostname + ":" + std::to_string(expected_port);
EXPECT_CALL(mock_db, Insert_t(CompareFileInfo(file_info), _)).
......
......@@ -53,5 +53,5 @@ $1 localhost:8400 ${beamtime_id2} 100 900 4 0 100 &
#producerid=`echo $!`
$2 ${proxy_address} ${beamtime_id1} 2 $token1 1000 | tee /dev/stderr | grep "Processed 1000 file(s)"
$2 ${proxy_address} ${beamtime_id2} 2 $token2 1000 | tee /dev/stderr | grep "Processed 900 file(s)"
$2 ${proxy_address} ${beamtime_id1} 2 $token1 5000 | tee /dev/stderr | grep "Processed 1000 file(s)"
$2 ${proxy_address} ${beamtime_id2} 2 $token2 5000 | tee /dev/stderr | grep "Processed 900 file(s)"
......@@ -42,6 +42,8 @@ int main(int argc, char* argv[]) {
fi.name = "relpath/1";
fi.id = args.file_id;
fi.modify_date = std::chrono::system_clock::now();
fi.buf_id = 18446744073709551615ull;
fi.source = "host:1234";
if (args.keyword != "Notconnected") {
db.Connect("127.0.0.1", "data", "test");
......
......@@ -24,7 +24,7 @@ sleep 1
for i in `seq 1 10`;
do
echo 'db.data.insert({"_id":'$i',"size":100,"name":"'$i'","lastchange":1})' | mongo ${database_name}
echo 'db.data.insert({"_id":'$i',"size":100,"name":"'$i'","lastchange":1,"source":"none","buf_id":0})' | mongo ${database_name}
done
$@ 127.0.0.1:8400 $database_name 4 10 $token_test_run
......
......@@ -10,7 +10,7 @@ c:\opt\consul\nomad run nginx.nmd
ping 1.0.0.0 -n 10 -w 100 > nul
for /l %%x in (1, 1, 10) do echo db.data.insert({"_id":%%x,"size":100,"name":"%%x","lastchange":1}) | %mongo_exe% %database_name% || goto :error
for /l %%x in (1, 1, 10) do echo db.data.insert({"_id":%%x,"size":100,"name":"%%x","lastchange":1,"source":"none","buf_id":0}) | %mongo_exe% %database_name% || goto :error
%1 127.0.0.1:8400 %database_name% 4 10 %token_test_run% || goto :error
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment