Skip to content
Snippets Groups Projects
Commit b2d393d1 authored by Sergey Yakubov's avatar Sergey Yakubov
Browse files

sync stream_list every 5 seconds with database for the case stream was deleted

parent 90beb67d
Branches
Tags
No related merge requests found
......@@ -76,7 +76,6 @@ const stream_filter_all = "all"
const stream_filter_finished = "finished"
const stream_filter_unfinished = "unfinished"
var dbSessionLock sync.Mutex
type SizeRecord struct {
......@@ -358,7 +357,7 @@ func (db *Mongodb) negAckRecord(request Request) ([]byte, error) {
return nil, &DBError{utils.StatusWrongInput, err.Error()}
}
err = db.InsertRecordToInprocess(request.DbName, inprocess_collection_name_prefix+request.GroupId, input.Id, input.Params.DelayMs, 1)
err = db.InsertRecordToInprocess(request.DbName, inprocess_collection_name_prefix+request.DbCollectionName+"_"+request.GroupId, input.Id, input.Params.DelayMs, 1)
return []byte(""), err
}
......@@ -372,7 +371,7 @@ func (db *Mongodb) ackRecord(request Request) ([]byte, error) {
_, err = c.InsertOne(context.Background(), &record)
if err == nil {
c = db.client.Database(request.DbName).Collection(inprocess_collection_name_prefix + request.GroupId)
c = db.client.Database(request.DbName).Collection(inprocess_collection_name_prefix + request.DbCollectionName + "_" + request.GroupId)
_, err_del := c.DeleteOne(context.Background(), bson.M{"_id": record.ID})
if err_del != nil {
return nil, &DBError{utils.StatusWrongInput, err.Error()}
......@@ -478,7 +477,7 @@ func (db *Mongodb) getNextAndMaxIndexesFromInprocessed(request Request, ignoreTi
}
tNow := time.Now().Unix()
if (atomic.LoadInt64(&db.lastReadFromInprocess) <= tNow-int64(db.settings.ReadFromInprocessPeriod)) || ignoreTimeout {
record_ind, err = db.getUnProcessedId(request.DbName, inprocess_collection_name_prefix+request.GroupId, delayMs, nResendAttempts)
record_ind, err = db.getUnProcessedId(request.DbName, inprocess_collection_name_prefix+request.DbCollectionName+"_"+request.GroupId, delayMs, nResendAttempts)
if err != nil {
log_str := "error getting unprocessed id " + request.DbName + ", groupid: " + request.GroupId + ":" + err.Error()
logger.Debug(log_str)
......@@ -590,7 +589,7 @@ func (db *Mongodb) getNextRecord(request Request) ([]byte, error) {
}
if err == nil {
err_update := db.InsertToInprocessIfNeeded(request.DbName, inprocess_collection_name_prefix+request.GroupId, nextInd, request.ExtraParam)
err_update := db.InsertToInprocessIfNeeded(request.DbName, inprocess_collection_name_prefix+request.DbCollectionName+"_"+request.GroupId, nextInd, request.ExtraParam)
if err_update != nil {
return nil, err_update
}
......@@ -645,7 +644,7 @@ func (db *Mongodb) resetCounter(request Request) ([]byte, error) {
return []byte(""), err
}
c := db.client.Database(request.DbName).Collection(inprocess_collection_name_prefix + request.GroupId)
c := db.client.Database(request.DbName).Collection(inprocess_collection_name_prefix + request.DbCollectionName + "_" + request.GroupId)
_, err_del := c.DeleteMany(context.Background(), bson.M{"_id": bson.M{"$gte": id}})
if err_del != nil {
return nil, &DBError{utils.StatusWrongInput, err.Error()}
......
......@@ -28,14 +28,15 @@ type StreamsRecord struct {
type Streams struct {
records map[string]StreamsRecord
lastUpdated map[string]int64
lastUpdated map[string]time.Time
lastSynced map[string]time.Time
}
var streams = Streams{lastUpdated: make(map[string]int64, 0), records: make(map[string]StreamsRecord, 0)}
var streams = Streams{lastSynced: make(map[string]time.Time, 0),lastUpdated: make(map[string]time.Time, 0), records: make(map[string]StreamsRecord, 0)}
var streamsLock sync.Mutex
func (ss *Streams) tryGetFromCache(db_name string, updatePeriodMs int) (StreamsRecord, error) {
if ss.lastUpdated[db_name] < time.Now().UnixNano()-int64(updatePeriodMs*1000000) {
if time.Now().Sub(ss.lastUpdated[db_name]).Milliseconds() > int64(updatePeriodMs) {
return StreamsRecord{}, errors.New("cache expired")
}
rec, ok := ss.records[db_name]
......@@ -133,19 +134,19 @@ func updateStreamInfofromCurrent(currentStreams []StreamInfo, record StreamInfo,
return found, false
}
func updateStreamInfos(db *Mongodb, db_name string, rec *StreamsRecord) error {
func updateStreamInfos(db *Mongodb, db_name string, rec *StreamsRecord,forceSync bool) error {
currentStreams := getCurrentStreams(db_name)
for i, record := range rec.Streams {
found, mayContinue := updateStreamInfofromCurrent(currentStreams, record, &rec.Streams[i])
if mayContinue {
if mayContinue && !forceSync {
continue
}
if !found { // set timestamp
if !found || forceSync { // set timestamp
if err := fillInfoFromEarliestRecord(db, db_name, rec, record, i); err != nil {
return err
}
}
if err := fillInfoFromLastRecord(db, db_name, rec, record, i); err != nil { // update firstStream last record (timestamp, stream finished flag)
if err := fillInfoFromLastRecord(db, db_name, rec, record, i); err != nil { // update last record (timestamp, stream finished flag)
return err
}
}
......@@ -163,17 +164,26 @@ func (ss *Streams) updateFromDb(db *Mongodb, db_name string) (StreamsRecord, err
if err != nil {
return StreamsRecord{}, err
}
err = updateStreamInfos(db, db_name, &rec)
forceSync:= false
if time.Now().Sub(ss.lastSynced[db_name]).Seconds() > 5 {
forceSync = true
}
err = updateStreamInfos(db, db_name, &rec,forceSync)
if err != nil {
return StreamsRecord{}, err
}
if forceSync {
ss.lastSynced[db_name] = time.Now()
}
sortRecords(&rec)
if len(rec.Streams) > 0 {
res :=StreamsRecord{}
utils.DeepCopy(rec,&res)
ss.records[db_name] = res
ss.lastUpdated[db_name] = time.Now().UnixNano()
ss.lastUpdated[db_name] = time.Now()
}
return rec, nil
}
......
......@@ -502,11 +502,38 @@ Error MongoDBClient::GetLastStream(StreamInfo* info) const {
return err;
}
Error MongoDBClient::DeleteCollections(const std::string &prefix) const {
mongoc_database_t* database;
char** strv;
bson_error_t error;
std::string querystr = "^" + prefix;
bson_t* query = BCON_NEW ("name", BCON_REGEX(querystr.c_str(), "i"));
bson_t* opts = BCON_NEW ("nameOnly", BCON_BOOL(true),"filter",BCON_DOCUMENT(query));
database = mongoc_client_get_database(client_, database_name_.c_str());
Error err;
if ((strv = mongoc_database_get_collection_names_with_opts(
database, opts, &error))) {
for (auto i = 0; strv[i]; i++) {
DeleteCollection(strv[i]);
}
bson_strfreev(strv);
} else {
err = DBErrorTemplates::kDBError.Generate(error.message);
}
bson_destroy(opts);
bson_destroy(query);
mongoc_database_destroy(database);
return nullptr;
}
Error MongoDBClient::DeleteCollection(const std::string &name) const {
bson_error_t error;
auto collection = mongoc_client_get_collection(client_, database_name_.c_str(), name.c_str());
mongoc_collection_set_write_concern(collection, write_concern_);
auto r = mongoc_collection_drop_with_opts(collection, NULL /* opts */, &error);
mongoc_collection_destroy(collection);
if (!r) {
if (error.code == 26) {
return DBErrorTemplates::kNoRecord.Generate("collection "+name+" not found in "+database_name_);
......@@ -514,8 +541,6 @@ Error MongoDBClient::DeleteCollection(const std::string &name) const {
return DBErrorTemplates::kDBError.Generate(std::string(error.message)+": "+std::to_string(error.code));
}
}
mongoc_collection_destroy(collection);
return nullptr;
}
......@@ -539,10 +564,10 @@ Error MongoDBClient::DeleteStream(const std::string &stream) const {
current_collection_name_ = "";
auto err = DeleteCollection(data_col);
if (err == nullptr) {
DeleteCollections(inprocess_col);
DeleteCollections(acks_col);
std::string querystr = ".*_" + stream+"$";
DeleteCollection(inprocess_col);
DeleteCollection(acks_col);
DeleteDocumentsInCollection("pointer_collection_name",querystr);
DeleteDocumentsInCollection("current_location",querystr);
}
return err;
}
......
......@@ -73,6 +73,7 @@ class MongoDBClient final : public Database {
Error UpdateLastStreamInfo(const char *str, StreamInfo* info) const;
Error UpdateCurrentLastStreamInfo(const std::string& collection_name, StreamInfo* info) const;
Error DeleteCollection(const std::string& name) const;
Error DeleteCollections(const std::string &prefix) const;
Error DeleteDocumentsInCollection(const std::string &collection_name,const std::string &querystr) const;
};
......
......@@ -82,6 +82,7 @@ int main(int argc, char* argv[]) {
asapo::StreamInfo info;
err = db.GetStreamInfo("data_test", &info);
M_AssertEq(nullptr, err);
M_AssertEq(fi.id, info.last_id);
......@@ -93,9 +94,22 @@ int main(int argc, char* argv[]) {
M_AssertEq(true, info.finished);
M_AssertEq("ns",info.next_stream);
// delete stream
db.Insert("inprocess_test_blabla", fi, false);
db.Insert("inprocess_test_blabla1", fi, false);
db.Insert("acks_test_blabla", fi, false);
db.Insert("acks_test_blabla1", fi, false);
db.DeleteStream("test");
err = db.GetStreamInfo("data_test", &info);
M_AssertTrue(err!=nullptr);
err = db.GetStreamInfo("inprocess_test_blabla", &info);
M_AssertTrue(err!=nullptr);
err = db.GetStreamInfo("inprocess_test_blabla1", &info);
M_AssertTrue(err!=nullptr);
err = db.GetStreamInfo("acks_test_blabla", &info);
M_AssertTrue(err!=nullptr);
err = db.GetStreamInfo("acks_test_blabla1", &info);
M_AssertTrue(err!=nullptr);
err = db.DeleteStream("test1");
M_AssertTrue(err==nullptr);
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment