Skip to content
Snippets Groups Projects
Commit ff3e8ec8 authored by Sergey Yakubov's avatar Sergey Yakubov
Browse files

update StreamInfo returned by producer API

parent 0e22a0d0
Branches
Tags
No related merge requests found
## 21.03.0 (in progress)
IMPROVEMENTS
* Producer API - queue limits in Python, for C++ return original data in error custom data
* Producer API - queue limits in Python, for C++ return original data in error custom data
* Consumer API - add GetCurrentDatasetCount/get_current_dataset_count function with option to include or exclude incomplete datasets
* Consumer API - GetStreamList/get_stream_list - can filter finished/unfinished streams now
* Producer/Consumer API - StreamInfo structure/Python dictionary include more information (is stream finished or not, ...)
BREAKING CHANGES
* Consumer API (C++ only)- GetStreamList has now extra argument StreamFilter
......
......@@ -11,6 +11,10 @@
namespace asapo {
const std::string kFinishStreamKeyword = "asapo_finish_stream";
const std::string kNoNextStreamKeyword = "asapo_no_next";
class JsonStringParser;
uint64_t NanosecsEpochFromTimePoint(std::chrono::system_clock::time_point);
......@@ -19,7 +23,6 @@ std::chrono::system_clock::time_point TimePointfromNanosec(uint64_t nanoseconds_
std::string IsoDateFromEpochNanosecs(uint64_t time_from_epoch_nanosec);
uint64_t NanosecsEpochFromISODate(std::string date_time);
bool TimeFromJson(const JsonStringParser& parser, const std::string& name, std::chrono::system_clock::time_point* val);
class MessageMeta {
......
......@@ -357,39 +357,70 @@ Error MongoDBClient::GetDataSetById(const std::string &collection, uint64_t id_i
}
Error StreamInfoFromDbResponse(const std::string &last_record_str,
const std::string &earliest_record_str,
Error UpdateStreamInfoFromEarliestRecord(const std::string &earliest_record_str,
StreamInfo* info) {
uint64_t id;
std::chrono::system_clock::time_point timestamp_created,timestamp_last;
auto parser1 = JsonStringParser(last_record_str);
Error parse_err = parser1.GetUInt64("_id", &id);
if (parse_err) {
return DBErrorTemplates::kJsonParseError.Generate(
"StreamInfoFromDbResponse: cannot parse mongodb response: " + last_record_str + ": "
+ parse_err->Explain());
}
auto ok = TimeFromJson(parser1, "timestamp", &timestamp_last);
std::chrono::system_clock::time_point timestamp_created;
auto parser = JsonStringParser(earliest_record_str);
auto ok = TimeFromJson(parser, "timestamp", &timestamp_created);
if (!ok) {
return DBErrorTemplates::kJsonParseError.Generate(
"StreamInfoFromDbResponse: cannot parse timestamp in response: " + last_record_str);
"UpdateStreamInfoFromEarliestRecord: cannot parse timestamp in response: " + earliest_record_str);
}
info->timestamp_created = timestamp_created;
return nullptr;
}
Error UpdateFinishedStreamInfo(const std::string &metadata,
StreamInfo* info) {
info->finished = true;
auto parser = JsonStringParser(metadata);
std::string next_stream;
auto err = parser.GetString("next_stream", &next_stream);
if (err) {
return DBErrorTemplates::kJsonParseError.Generate(
"UpdateFinishedStreamInfo: cannot parse finished strean meta response: " + metadata);
}
if (next_stream!=kNoNextStreamKeyword) {
info->next_stream = next_stream;
}
return nullptr;
}
auto parser2 = JsonStringParser(earliest_record_str);
ok = TimeFromJson(parser2, "timestamp", &timestamp_created);
Error UpdateStreamInfoFromLastRecord(const std::string &last_record_str,
StreamInfo* info) {
MessageMeta last_message;
auto ok = last_message.SetFromJson(last_record_str);
if (!ok) {
return DBErrorTemplates::kJsonParseError.Generate(
"StreamInfoFromDbResponse: cannot parse timestamp in response: " + earliest_record_str);
"UpdateStreamInfoFromLastRecord: cannot parse mongodb response: " + last_record_str);
}
info->last_id = last_message.id;
info->timestamp_lastentry = last_message.timestamp;
info->last_id = id;
info->timestamp_created = timestamp_created;
info->timestamp_lastentry = timestamp_last;
if (last_message.name == kFinishStreamKeyword) {
auto err = UpdateFinishedStreamInfo(last_message.metadata, info);
if (err) {
return err;
}
}
return nullptr;
}
Error StreamInfoFromDbResponse(const std::string &last_record_str,
const std::string &earliest_record_str,
StreamInfo* info) {
std::chrono::system_clock::time_point timestamp_created;
auto err = UpdateStreamInfoFromLastRecord(last_record_str,info);
if (err) {
return err;
}
return UpdateStreamInfoFromEarliestRecord(last_record_str,info);
}
Error MongoDBClient::GetStreamInfo(const std::string &collection, StreamInfo* info) const {
std::string last_record_str, earliest_record_str;
auto err = GetRecordFromDb(collection, 0, GetRecordMode::kLast, &last_record_str);
......@@ -407,20 +438,33 @@ Error MongoDBClient::GetStreamInfo(const std::string &collection, StreamInfo* in
return StreamInfoFromDbResponse(last_record_str, earliest_record_str, info);
}
Error MongoDBClient::UpdateStreamInfo(const char* str, StreamInfo* info) const {
std::string stream_name{str};
bool MongoCollectionIsDataStream(const std::string &stream_name) {
std::string prefix = std::string(kDBDataCollectionNamePrefix) + "_";
if (stream_name.rfind(prefix, 0) == 0) {
std::string record_str;
StreamInfo next_info;
auto err = GetStreamInfo(stream_name, &next_info);
return stream_name.rfind(prefix, 0) == 0;
}
Error MongoDBClient::UpdateCurrentLastStreamInfo(const std::string& collection_name, StreamInfo* info) const {
StreamInfo next_info;
auto err = GetStreamInfo(collection_name, &next_info);
std::string prefix = std::string(kDBDataCollectionNamePrefix) + "_";
if (err) {
return err;
}
if (next_info.timestamp_created > info->timestamp_created) {
next_info.name = collection_name.substr(prefix.size());
*info = next_info;
}
return nullptr;
}
Error MongoDBClient::UpdateLastStreamInfo(const char* str, StreamInfo* info) const {
std::string collection_name{str};
if (MongoCollectionIsDataStream(collection_name)) {
auto err = UpdateCurrentLastStreamInfo(collection_name, info);
if (err) {
return err;
}
if (next_info.timestamp_created > info->timestamp_created) {
next_info.name = stream_name.erase(0, prefix.size());
*info = next_info;
}
}
return nullptr;
}
......@@ -444,7 +488,7 @@ Error MongoDBClient::GetLastStream(StreamInfo* info) const {
if ((strv = mongoc_database_get_collection_names_with_opts(
database, opts, &error))) {
for (auto i = 0; strv[i]; i++) {
err = UpdateStreamInfo(strv[i], info);
err = UpdateLastStreamInfo(strv[i], info);
if (err) {
break;
}
......
......@@ -69,7 +69,8 @@ class MongoDBClient final : public Database {
Error UpdateBsonDocument(uint64_t id, const bson_p& document, bool upsert) const;
Error AddBsonDocumentToArray(bson_t* query, bson_t* update, bool ignore_duplicates) const;
Error GetRecordFromDb(const std::string& collection, uint64_t id, GetRecordMode mode, std::string* res) const;
Error UpdateStreamInfo(const char *str,StreamInfo* info) const;
Error UpdateLastStreamInfo(const char *str, StreamInfo* info) const;
Error UpdateCurrentLastStreamInfo(const std::string& collection_name, StreamInfo* info) const;
};
......
......@@ -15,9 +15,6 @@
namespace asapo {
const size_t ProducerImpl::kDiscoveryServiceUpdateFrequencyMs = 10000; // 10s
const std::string ProducerImpl::kFinishStreamKeyword = "asapo_finish_stream";
const std::string ProducerImpl::kNoNextStreamKeyword = "asapo_no_next";
ProducerImpl::ProducerImpl(std::string endpoint, uint8_t n_processing_threads, uint64_t timeout_ms,
asapo::RequestHandlerType type):
......
......@@ -24,8 +24,6 @@ class ProducerImpl : public Producer {
std::unique_ptr<RequestHandlerFactory> request_handler_factory_;
public:
static const size_t kDiscoveryServiceUpdateFrequencyMs;
static const std::string kFinishStreamKeyword;
static const std::string kNoNextStreamKeyword;
explicit ProducerImpl(std::string endpoint, uint8_t n_processing_threads, uint64_t timeout_ms,
asapo::RequestHandlerType type);
......
......@@ -210,7 +210,7 @@ TEST_F(ProducerImplTests, OKSendingStreamFinish) {
next_stream_meta.c_str(),
expected_id + 1,
0,
asapo::ProducerImpl::kFinishStreamKeyword.c_str(),
asapo::kFinishStreamKeyword.c_str(),
expected_stream,
asapo::IngestModeFlags::kTransferMetaDataOnly,
0,
......@@ -234,7 +234,7 @@ TEST_F(ProducerImplTests, OKSendingStreamFinishWithNoNextStream) {
producer.SetCredentials(expected_credentials);
std::string
next_stream_meta = std::string("{\"next_stream\":") + "\"" + asapo::ProducerImpl::kNoNextStreamKeyword
next_stream_meta = std::string("{\"next_stream\":") + "\"" + asapo::kNoNextStreamKeyword
+ "\"}";
EXPECT_CALL(mock_pull, AddRequest_t(M_CheckSendRequest(asapo::kOpcodeTransferData,
......@@ -242,7 +242,7 @@ TEST_F(ProducerImplTests, OKSendingStreamFinishWithNoNextStream) {
next_stream_meta.c_str(),
expected_id + 1,
0,
asapo::ProducerImpl::kFinishStreamKeyword.c_str(),
asapo::kFinishStreamKeyword.c_str(),
expected_stream,
asapo::IngestModeFlags::kTransferMetaDataOnly,
0,
......
......@@ -4,6 +4,7 @@
#include "../../../common/cpp/src/database/mongodb_client.h"
#include "testing.h"
#include "asapo/common/data_structs.h"
using asapo::Error;
......@@ -43,6 +44,7 @@ int main(int argc, char* argv[]) {
fi.buf_id = 18446744073709551615ull;
fi.source = "host:1234";
if (args.keyword != "Notconnected") {
db.Connect("127.0.0.1", "data");
}
......@@ -60,6 +62,8 @@ int main(int argc, char* argv[]) {
fi2.id = 123;
fi1.timestamp = std::chrono::system_clock::now();
fi2.timestamp = std::chrono::system_clock::now()+std::chrono::minutes(1);
fi2.name = asapo::kFinishStreamKeyword;
fi2.metadata=R"({"next_stream":"ns"})";
db.Insert("data_test1", fi1, false);
db.Insert("data_test1", fi2, false);
......@@ -83,7 +87,9 @@ int main(int argc, char* argv[]) {
err = db.GetLastStream(&info);
M_AssertEq(nullptr, err);
M_AssertEq(fi2.id, info.last_id);
M_AssertEq("test1",info.name);
M_AssertEq("test1", info.name);
M_AssertEq(true, info.finished);
M_AssertEq("ns",info.next_stream);
}
return 0;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment