diff --git a/broker/src/asapo_broker/database/mongodb.go b/broker/src/asapo_broker/database/mongodb.go index 5e63509e5a687a2bc95bd77df8ae6a6e13bba017..ad70079dbc7b041275d328747139728433bb424d 100644 --- a/broker/src/asapo_broker/database/mongodb.go +++ b/broker/src/asapo_broker/database/mongodb.go @@ -76,7 +76,6 @@ const stream_filter_all = "all" const stream_filter_finished = "finished" const stream_filter_unfinished = "unfinished" - var dbSessionLock sync.Mutex type SizeRecord struct { @@ -358,7 +357,7 @@ func (db *Mongodb) negAckRecord(request Request) ([]byte, error) { return nil, &DBError{utils.StatusWrongInput, err.Error()} } - err = db.InsertRecordToInprocess(request.DbName, inprocess_collection_name_prefix+request.GroupId, input.Id, input.Params.DelayMs, 1) + err = db.InsertRecordToInprocess(request.DbName, inprocess_collection_name_prefix+request.DbCollectionName+"_"+request.GroupId, input.Id, input.Params.DelayMs, 1) return []byte(""), err } @@ -372,7 +371,7 @@ func (db *Mongodb) ackRecord(request Request) ([]byte, error) { _, err = c.InsertOne(context.Background(), &record) if err == nil { - c = db.client.Database(request.DbName).Collection(inprocess_collection_name_prefix + request.GroupId) + c = db.client.Database(request.DbName).Collection(inprocess_collection_name_prefix + request.DbCollectionName + "_" + request.GroupId) _, err_del := c.DeleteOne(context.Background(), bson.M{"_id": record.ID}) if err_del != nil { return nil, &DBError{utils.StatusWrongInput, err.Error()} @@ -478,7 +477,7 @@ func (db *Mongodb) getNextAndMaxIndexesFromInprocessed(request Request, ignoreTi } tNow := time.Now().Unix() if (atomic.LoadInt64(&db.lastReadFromInprocess) <= tNow-int64(db.settings.ReadFromInprocessPeriod)) || ignoreTimeout { - record_ind, err = db.getUnProcessedId(request.DbName, inprocess_collection_name_prefix+request.GroupId, delayMs, nResendAttempts) + record_ind, err = db.getUnProcessedId(request.DbName, inprocess_collection_name_prefix+request.DbCollectionName+"_"+request.GroupId, delayMs, nResendAttempts) if err != nil { log_str := "error getting unprocessed id " + request.DbName + ", groupid: " + request.GroupId + ":" + err.Error() logger.Debug(log_str) @@ -590,7 +589,7 @@ func (db *Mongodb) getNextRecord(request Request) ([]byte, error) { } if err == nil { - err_update := db.InsertToInprocessIfNeeded(request.DbName, inprocess_collection_name_prefix+request.GroupId, nextInd, request.ExtraParam) + err_update := db.InsertToInprocessIfNeeded(request.DbName, inprocess_collection_name_prefix+request.DbCollectionName+"_"+request.GroupId, nextInd, request.ExtraParam) if err_update != nil { return nil, err_update } @@ -645,7 +644,7 @@ func (db *Mongodb) resetCounter(request Request) ([]byte, error) { return []byte(""), err } - c := db.client.Database(request.DbName).Collection(inprocess_collection_name_prefix + request.GroupId) + c := db.client.Database(request.DbName).Collection(inprocess_collection_name_prefix + request.DbCollectionName + "_" + request.GroupId) _, err_del := c.DeleteMany(context.Background(), bson.M{"_id": bson.M{"$gte": id}}) if err_del != nil { return nil, &DBError{utils.StatusWrongInput, err.Error()} diff --git a/broker/src/asapo_broker/database/mongodb_streams.go b/broker/src/asapo_broker/database/mongodb_streams.go index 4d32341c6044570d9c3028f5770e7f08e8c72c63..278ef3c57062196067b1d78c7814b0ecfcfba70e 100644 --- a/broker/src/asapo_broker/database/mongodb_streams.go +++ b/broker/src/asapo_broker/database/mongodb_streams.go @@ -28,14 +28,15 @@ type StreamsRecord struct { type Streams struct { records map[string]StreamsRecord - lastUpdated map[string]int64 + lastUpdated map[string]time.Time + lastSynced map[string]time.Time } -var streams = Streams{lastUpdated: make(map[string]int64, 0), records: make(map[string]StreamsRecord, 0)} +var streams = Streams{lastSynced: make(map[string]time.Time, 0),lastUpdated: make(map[string]time.Time, 0), records: make(map[string]StreamsRecord, 0)} var streamsLock sync.Mutex func (ss *Streams) tryGetFromCache(db_name string, updatePeriodMs int) (StreamsRecord, error) { - if ss.lastUpdated[db_name] < time.Now().UnixNano()-int64(updatePeriodMs*1000000) { + if time.Now().Sub(ss.lastUpdated[db_name]).Milliseconds() > int64(updatePeriodMs) { return StreamsRecord{}, errors.New("cache expired") } rec, ok := ss.records[db_name] @@ -133,19 +134,19 @@ func updateStreamInfofromCurrent(currentStreams []StreamInfo, record StreamInfo, return found, false } -func updateStreamInfos(db *Mongodb, db_name string, rec *StreamsRecord) error { +func updateStreamInfos(db *Mongodb, db_name string, rec *StreamsRecord,forceSync bool) error { currentStreams := getCurrentStreams(db_name) for i, record := range rec.Streams { found, mayContinue := updateStreamInfofromCurrent(currentStreams, record, &rec.Streams[i]) - if mayContinue { + if mayContinue && !forceSync { continue } - if !found { // set timestamp + if !found || forceSync { // set timestamp if err := fillInfoFromEarliestRecord(db, db_name, rec, record, i); err != nil { return err } } - if err := fillInfoFromLastRecord(db, db_name, rec, record, i); err != nil { // update firstStream last record (timestamp, stream finished flag) + if err := fillInfoFromLastRecord(db, db_name, rec, record, i); err != nil { // update last record (timestamp, stream finished flag) return err } } @@ -163,17 +164,26 @@ func (ss *Streams) updateFromDb(db *Mongodb, db_name string) (StreamsRecord, err if err != nil { return StreamsRecord{}, err } - err = updateStreamInfos(db, db_name, &rec) + + forceSync:= false + if time.Now().Sub(ss.lastSynced[db_name]).Seconds() > 5 { + forceSync = true + } + err = updateStreamInfos(db, db_name, &rec,forceSync) if err != nil { return StreamsRecord{}, err } + if forceSync { + ss.lastSynced[db_name] = time.Now() + } + sortRecords(&rec) if len(rec.Streams) > 0 { res :=StreamsRecord{} utils.DeepCopy(rec,&res) ss.records[db_name] = res - ss.lastUpdated[db_name] = time.Now().UnixNano() + ss.lastUpdated[db_name] = time.Now() } return rec, nil } diff --git a/common/cpp/src/database/mongodb_client.cpp b/common/cpp/src/database/mongodb_client.cpp index a1bbddacdb48951c419c56905b64c537eecddb4d..43a97531831d3dc097c7256fe7a45fca003297b0 100644 --- a/common/cpp/src/database/mongodb_client.cpp +++ b/common/cpp/src/database/mongodb_client.cpp @@ -502,11 +502,38 @@ Error MongoDBClient::GetLastStream(StreamInfo* info) const { return err; } + +Error MongoDBClient::DeleteCollections(const std::string &prefix) const { + mongoc_database_t* database; + char** strv; + bson_error_t error; + std::string querystr = "^" + prefix; + bson_t* query = BCON_NEW ("name", BCON_REGEX(querystr.c_str(), "i")); + bson_t* opts = BCON_NEW ("nameOnly", BCON_BOOL(true),"filter",BCON_DOCUMENT(query)); + database = mongoc_client_get_database(client_, database_name_.c_str()); + Error err; + if ((strv = mongoc_database_get_collection_names_with_opts( + database, opts, &error))) { + for (auto i = 0; strv[i]; i++) { + DeleteCollection(strv[i]); + } + bson_strfreev(strv); + } else { + err = DBErrorTemplates::kDBError.Generate(error.message); + } + + bson_destroy(opts); + bson_destroy(query); + mongoc_database_destroy(database); + return nullptr; +} + Error MongoDBClient::DeleteCollection(const std::string &name) const { bson_error_t error; auto collection = mongoc_client_get_collection(client_, database_name_.c_str(), name.c_str()); mongoc_collection_set_write_concern(collection, write_concern_); auto r = mongoc_collection_drop_with_opts(collection, NULL /* opts */, &error); + mongoc_collection_destroy(collection); if (!r) { if (error.code == 26) { return DBErrorTemplates::kNoRecord.Generate("collection "+name+" not found in "+database_name_); @@ -514,8 +541,6 @@ Error MongoDBClient::DeleteCollection(const std::string &name) const { return DBErrorTemplates::kDBError.Generate(std::string(error.message)+": "+std::to_string(error.code)); } } - - mongoc_collection_destroy(collection); return nullptr; } @@ -539,10 +564,10 @@ Error MongoDBClient::DeleteStream(const std::string &stream) const { current_collection_name_ = ""; auto err = DeleteCollection(data_col); if (err == nullptr) { + DeleteCollections(inprocess_col); + DeleteCollections(acks_col); std::string querystr = ".*_" + stream+"$"; - DeleteCollection(inprocess_col); - DeleteCollection(acks_col); - DeleteDocumentsInCollection("pointer_collection_name",querystr); + DeleteDocumentsInCollection("current_location",querystr); } return err; } diff --git a/common/cpp/src/database/mongodb_client.h b/common/cpp/src/database/mongodb_client.h index 6df48b0928aa8c79b5919b1f96f8580b74ff3922..226c134b4d0e17d3ddab76b9fe4b30c31734d7db 100644 --- a/common/cpp/src/database/mongodb_client.h +++ b/common/cpp/src/database/mongodb_client.h @@ -73,6 +73,7 @@ class MongoDBClient final : public Database { Error UpdateLastStreamInfo(const char *str, StreamInfo* info) const; Error UpdateCurrentLastStreamInfo(const std::string& collection_name, StreamInfo* info) const; Error DeleteCollection(const std::string& name) const; + Error DeleteCollections(const std::string &prefix) const; Error DeleteDocumentsInCollection(const std::string &collection_name,const std::string &querystr) const; }; diff --git a/tests/automatic/mongo_db/insert_retrieve/insert_retrieve_mongodb.cpp b/tests/automatic/mongo_db/insert_retrieve/insert_retrieve_mongodb.cpp index a1eee5fb73e25efa638eb81acb2ea472b64816c8..e7724e3c871f8a9cc2672c35b57d1aab9da134da 100644 --- a/tests/automatic/mongo_db/insert_retrieve/insert_retrieve_mongodb.cpp +++ b/tests/automatic/mongo_db/insert_retrieve/insert_retrieve_mongodb.cpp @@ -82,6 +82,7 @@ int main(int argc, char* argv[]) { asapo::StreamInfo info; + err = db.GetStreamInfo("data_test", &info); M_AssertEq(nullptr, err); M_AssertEq(fi.id, info.last_id); @@ -93,9 +94,22 @@ int main(int argc, char* argv[]) { M_AssertEq(true, info.finished); M_AssertEq("ns",info.next_stream); +// delete stream + db.Insert("inprocess_test_blabla", fi, false); + db.Insert("inprocess_test_blabla1", fi, false); + db.Insert("acks_test_blabla", fi, false); + db.Insert("acks_test_blabla1", fi, false); db.DeleteStream("test"); err = db.GetStreamInfo("data_test", &info); M_AssertTrue(err!=nullptr); + err = db.GetStreamInfo("inprocess_test_blabla", &info); + M_AssertTrue(err!=nullptr); + err = db.GetStreamInfo("inprocess_test_blabla1", &info); + M_AssertTrue(err!=nullptr); + err = db.GetStreamInfo("acks_test_blabla", &info); + M_AssertTrue(err!=nullptr); + err = db.GetStreamInfo("acks_test_blabla1", &info); + M_AssertTrue(err!=nullptr); err = db.DeleteStream("test1"); M_AssertTrue(err==nullptr); }