From 93c7e301e81157bdfcfd9ef691a8b6bdb14dd172 Mon Sep 17 00:00:00 2001 From: Sergey Yakubov <sergey.yakubov@desy.de> Date: Wed, 18 Nov 2020 13:35:55 +0100 Subject: [PATCH] more refactoring --- broker/src/asapo_broker/database/mongodb.go | 200 +++++++++--------- .../src/asapo_broker/database/mongodb_test.go | 17 +- 2 files changed, 109 insertions(+), 108 deletions(-) diff --git a/broker/src/asapo_broker/database/mongodb.go b/broker/src/asapo_broker/database/mongodb.go index 5a73166b8..041f6b0b3 100644 --- a/broker/src/asapo_broker/database/mongodb.go +++ b/broker/src/asapo_broker/database/mongodb.go @@ -159,10 +159,10 @@ func (db *Mongodb) insertMeta(dbname string, s interface{}) error { return err } -func (db *Mongodb) getMaxIndex(dbname string, collection_name string, dataset bool) (max_id int, err error) { - c := db.client.Database(dbname).Collection(data_collection_name_prefix + collection_name) +func (db *Mongodb) getMaxIndex(request Request) (max_id int, err error) { + c := db.client.Database(request.DbName).Collection(data_collection_name_prefix + request.DbCollectionName) var q bson.M - if dataset { + if request.DatasetOp { q = bson.M{"$expr": bson.M{"$eq": []interface{}{"$size", bson.M{"$size": "$images"}}}} } else { q = nil @@ -190,20 +190,20 @@ func duplicateError(err error) bool { return command_error.Name == "DuplicateKey" } -func (db *Mongodb) setCounter(dbname string, collection_name string, group_id string, ind int) (err error) { +func (db *Mongodb) setCounter(request Request, ind int) (err error) { update := bson.M{"$set": bson.M{pointer_field_name: ind}} opts := options.Update().SetUpsert(true) - c := db.client.Database(dbname).Collection(pointer_collection_name) - q := bson.M{"_id": group_id + "_" + collection_name} + c := db.client.Database(request.DbName).Collection(pointer_collection_name) + q := bson.M{"_id": request.GroupId + "_" + request.DbCollectionName} _, err = c.UpdateOne(context.TODO(), q, update, opts) return } -func (db *Mongodb) incrementField(dbname string, collection_name string, group_id string, max_ind int, res interface{}) (err error) { +func (db *Mongodb) incrementField(request Request, max_ind int, res interface{}) (err error) { update := bson.M{"$inc": bson.M{pointer_field_name: 1}} opts := options.FindOneAndUpdate().SetUpsert(true).SetReturnDocument(options.After) - q := bson.M{"_id": group_id + "_" + collection_name, pointer_field_name: bson.M{"$lt": max_ind}} - c := db.client.Database(dbname).Collection(pointer_collection_name) + q := bson.M{"_id": request.GroupId + "_" + request.DbCollectionName, pointer_field_name: bson.M{"$lt": max_ind}} + c := db.client.Database(request.DbName).Collection(pointer_collection_name) err = c.FindOneAndUpdate(context.TODO(), q, update, opts).Decode(res) if err != nil { @@ -234,24 +234,24 @@ func encodeAnswer(id, id_max int, next_substream string) string { return string(answer) } -func (db *Mongodb) getRecordByIDRow(dbname string, collection_name string, id, id_max int, dataset bool) ([]byte, error) { +func (db *Mongodb) getRecordByIDRow(request Request, id, id_max int) ([]byte, error) { var res map[string]interface{} var q bson.M - if dataset { + if request.DatasetOp { q = bson.M{"$and": []bson.M{bson.M{"_id": id}, bson.M{"$expr": bson.M{"$eq": []interface{}{"$size", bson.M{"$size": "$images"}}}}}} } else { q = bson.M{"_id": id} } - c := db.client.Database(dbname).Collection(data_collection_name_prefix + collection_name) + c := db.client.Database(request.DbName).Collection(data_collection_name_prefix + request.DbCollectionName) err := c.FindOne(context.TODO(), q, options.FindOne()).Decode(&res) if err != nil { answer := encodeAnswer(id, id_max, "") - log_str := "error getting record id " + strconv.Itoa(id) + " for " + dbname + " : " + err.Error() + log_str := "error getting record id " + strconv.Itoa(id) + " for " + request.DbName + " : " + err.Error() logger.Debug(log_str) return nil, &DBError{utils.StatusNoData, answer} } - log_str := "got record id " + strconv.Itoa(id) + " for " + dbname + log_str := "got record id " + strconv.Itoa(id) + " for " + request.DbName logger.Debug(log_str) return utils.MapToJson(&res) } @@ -272,22 +272,22 @@ func (db *Mongodb) getEarliestRecord(dbname string, collection_name string) (map return res,nil } -func (db *Mongodb) getRecordByID(dbname string, collection_name string, group_id string, id_str string, dataset bool) ([]byte, error) { - id, err := strconv.Atoi(id_str) +func (db *Mongodb) getRecordByID(request Request) ([]byte, error) { + id, err := strconv.Atoi(request.ExtraParam) if err != nil { return nil, &DBError{utils.StatusWrongInput, err.Error()} } - max_ind, err := db.getMaxIndex(dbname, collection_name, dataset) + max_ind, err := db.getMaxIndex(request) if err != nil { return nil, err } - return db.getRecordByIDRow(dbname, collection_name, id, max_ind, dataset) + return db.getRecordByIDRow(request, id, max_ind) } -func (db *Mongodb) negAckRecord(dbname string, group_id string, input_str string) ([]byte, error) { +func (db *Mongodb) negAckRecord(request Request) ([]byte, error) { input := struct { Id int Params struct { @@ -295,27 +295,27 @@ func (db *Mongodb) negAckRecord(dbname string, group_id string, input_str string } }{} - err := json.Unmarshal([]byte(input_str), &input) + err := json.Unmarshal([]byte(request.ExtraParam), &input) if err != nil { return nil, &DBError{utils.StatusWrongInput, err.Error()} } - err = db.InsertRecordToInprocess(dbname,inprocess_collection_name_prefix+group_id,input.Id,input.Params.DelaySec, 1) + err = db.InsertRecordToInprocess(request.DbName,inprocess_collection_name_prefix+request.GroupId,input.Id,input.Params.DelaySec, 1) return []byte(""), err } -func (db *Mongodb) ackRecord(dbname string, collection_name string, group_id string, id_str string) ([]byte, error) { +func (db *Mongodb) ackRecord(request Request) ([]byte, error) { var record ID - err := json.Unmarshal([]byte(id_str),&record) + err := json.Unmarshal([]byte(request.ExtraParam),&record) if err != nil { return nil, &DBError{utils.StatusWrongInput, err.Error()} } - c := db.client.Database(dbname).Collection(acks_collection_name_prefix + collection_name + "_" + group_id) + c := db.client.Database(request.DbName).Collection(acks_collection_name_prefix + request.DbCollectionName + "_" + request.GroupId) _, err = c.InsertOne(context.Background(), &record) if err == nil { - c = db.client.Database(dbname).Collection(inprocess_collection_name_prefix + group_id) + c = db.client.Database(request.DbName).Collection(inprocess_collection_name_prefix + request.GroupId) _, err_del := c.DeleteOne(context.Background(), bson.M{"_id": record.ID}) if err_del != nil { return nil, &DBError{utils.StatusWrongInput, err.Error()} @@ -325,20 +325,20 @@ func (db *Mongodb) ackRecord(dbname string, collection_name string, group_id str return []byte(""), err } -func (db *Mongodb) checkDatabaseOperationPrerequisites(db_name string, collection_name string, group_id string) error { +func (db *Mongodb) checkDatabaseOperationPrerequisites(request Request) error { if db.client == nil { return &DBError{utils.StatusServiceUnavailable, no_session_msg} } - if len(db_name) == 0 || len(collection_name) == 0 { + if len(request.DbName) == 0 || len(request.DbCollectionName) == 0 { return &DBError{utils.StatusWrongInput, "beamtime_id ans substream must be set"} } return nil } -func (db *Mongodb) getCurrentPointer(db_name string, collection_name string, group_id string, dataset bool) (LocationPointer, int, error) { - max_ind, err := db.getMaxIndex(db_name, collection_name, dataset) +func (db *Mongodb) getCurrentPointer(request Request) (LocationPointer, int, error) { + max_ind, err := db.getMaxIndex(request) if err != nil { return LocationPointer{}, 0, err } @@ -348,7 +348,7 @@ func (db *Mongodb) getCurrentPointer(db_name string, collection_name string, gro } var curPointer LocationPointer - err = db.incrementField(db_name, collection_name, group_id, max_ind, &curPointer) + err = db.incrementField(request, max_ind, &curPointer) if err != nil { return LocationPointer{}, 0, err } @@ -408,11 +408,11 @@ func (db *Mongodb) InsertToInprocessIfNeeded(db_name string, collection_name str } -func (db *Mongodb) getNextAndMaxIndexesFromInprocessed(db_name string, collection_name string, group_id string, dataset bool, extra_param string, ignoreTimeout bool) (int, int, error) { +func (db *Mongodb) getNextAndMaxIndexesFromInprocessed(request Request, ignoreTimeout bool) (int, int, error) { var record_ind, max_ind, delaySec, nResendAttempts int var err error - if len(extra_param) != 0 { - delaySec, nResendAttempts, err = extractsTwoIntsFromString(extra_param) + if len(request.ExtraParam) != 0 { + delaySec, nResendAttempts, err = extractsTwoIntsFromString(request.ExtraParam) if err != nil { return 0, 0, err } @@ -421,15 +421,15 @@ func (db *Mongodb) getNextAndMaxIndexesFromInprocessed(db_name string, collectio } tNow := time.Now().Unix() if (atomic.LoadInt64(&db.lastReadFromInprocess) <= tNow-int64(db.settings.ReadFromInprocessPeriod)) || ignoreTimeout { - record_ind, err = db.getUnProcessedId(db_name, inprocess_collection_name_prefix+group_id, delaySec,nResendAttempts) + record_ind, err = db.getUnProcessedId(request.DbName, inprocess_collection_name_prefix+request.GroupId, delaySec,nResendAttempts) if err != nil { - log_str := "error getting unprocessed id " + db_name + ", groupid: " + group_id + ":" + err.Error() + log_str := "error getting unprocessed id " + request.DbName + ", groupid: " + request.GroupId + ":" + err.Error() logger.Debug(log_str) return 0, 0, err } } if record_ind != 0 { - max_ind, err = db.getMaxIndex(db_name, collection_name, dataset) + max_ind, err = db.getMaxIndex(request) if err != nil { return 0, 0, err } @@ -441,29 +441,29 @@ func (db *Mongodb) getNextAndMaxIndexesFromInprocessed(db_name string, collectio } -func (db *Mongodb) getNextAndMaxIndexesFromCurPointer(db_name string, collection_name string, group_id string, dataset bool, extra_param string) (int, int, error) { - curPointer, max_ind, err := db.getCurrentPointer(db_name, collection_name, group_id, dataset) +func (db *Mongodb) getNextAndMaxIndexesFromCurPointer(request Request) (int, int, error) { + curPointer, max_ind, err := db.getCurrentPointer(request) if err != nil { - log_str := "error getting next pointer for " + db_name + ", groupid: " + group_id + ":" + err.Error() + log_str := "error getting next pointer for " + request.DbName + ", groupid: " + request.GroupId + ":" + err.Error() logger.Debug(log_str) return 0, 0, err } - log_str := "got next pointer " + strconv.Itoa(curPointer.Value) + " for " + db_name + ", groupid: " + group_id + log_str := "got next pointer " + strconv.Itoa(curPointer.Value) + " for " + request.DbName + ", groupid: " + request.GroupId logger.Debug(log_str) return curPointer.Value, max_ind, nil } -func (db *Mongodb) getNextAndMaxIndexes(db_name string, collection_name string, group_id string, dataset bool, extra_param string) (int, int, error) { - nextInd, maxInd, err := db.getNextAndMaxIndexesFromInprocessed(db_name, collection_name, group_id, dataset, extra_param, false) +func (db *Mongodb) getNextAndMaxIndexes(request Request) (int, int, error) { + nextInd, maxInd, err := db.getNextAndMaxIndexesFromInprocessed(request, false) if err != nil { return 0, 0, err } if nextInd == 0 { - nextInd, maxInd, err = db.getNextAndMaxIndexesFromCurPointer(db_name, collection_name, group_id, dataset, extra_param) + nextInd, maxInd, err = db.getNextAndMaxIndexesFromCurPointer(request) if err_db, ok := err.(*DBError); ok && err_db.Code == utils.StatusNoData { var err_inproc error - nextInd, maxInd, err_inproc = db.getNextAndMaxIndexesFromInprocessed(db_name, collection_name, group_id, dataset, extra_param, true) + nextInd, maxInd, err_inproc = db.getNextAndMaxIndexesFromInprocessed(request, true) if err_inproc != nil { return 0, 0, err_inproc } @@ -475,8 +475,7 @@ func (db *Mongodb) getNextAndMaxIndexes(db_name string, collection_name string, return nextInd, maxInd, nil } -func (db *Mongodb) processLastRecord(data []byte, err error, db_name string, collection_name string, - group_id string, dataset bool, extra_param string) ([]byte, error) { +func (db *Mongodb) processLastRecord(request Request, data []byte, err error) ([]byte, error) { var r ServiceRecord err = json.Unmarshal(data, &r) if err != nil || r.Name != finish_substream_keyword { @@ -489,35 +488,36 @@ func (db *Mongodb) processLastRecord(data []byte, err error, db_name string, col } answer := encodeAnswer(r.ID, r.ID, next_substream) - log_str := "reached end of substream " + collection_name + " , next_substream: " + next_substream + log_str := "reached end of substream " + request.DbCollectionName + " , next_substream: " + next_substream logger.Debug(log_str) var err_inproc error - nextInd, maxInd, err_inproc := db.getNextAndMaxIndexesFromInprocessed(db_name, collection_name, group_id, dataset, extra_param, true) + nextInd, maxInd, err_inproc := db.getNextAndMaxIndexesFromInprocessed(request, true) if err_inproc != nil { return nil, err_inproc } if nextInd != 0 { - return db.getRecordByIDRow(db_name, collection_name, nextInd, maxInd, dataset) + return db.getRecordByIDRow(request, nextInd, maxInd) } return nil, &DBError{utils.StatusNoData, answer} } -func (db *Mongodb) getNextRecord(db_name string, collection_name string, group_id string, dataset bool, extra_param string) ([]byte, error) { - nextInd, maxInd, err := db.getNextAndMaxIndexes(db_name, collection_name, group_id, dataset, extra_param) +func (db *Mongodb) getNextRecord(request Request) ([]byte, error) { + + nextInd, maxInd, err := db.getNextAndMaxIndexes(request) if err != nil { return nil, err } - data, err := db.getRecordByIDRow(db_name, collection_name, nextInd, maxInd, dataset) + data, err := db.getRecordByIDRow(request, nextInd, maxInd) if nextInd == maxInd { - data, err = db.processLastRecord(data, err,db_name,collection_name,group_id,dataset,extra_param) + data, err = db.processLastRecord(request,data, err) } if err == nil { - err_update := db.InsertToInprocessIfNeeded(db_name, inprocess_collection_name_prefix+group_id, nextInd, extra_param) + err_update := db.InsertToInprocessIfNeeded(request.DbName, inprocess_collection_name_prefix+request.GroupId, nextInd, request.ExtraParam) if err_update != nil { return nil, err_update } @@ -525,20 +525,20 @@ func (db *Mongodb) getNextRecord(db_name string, collection_name string, group_i return data, err } -func (db *Mongodb) getLastRecord(db_name string, collection_name string, group_id string, dataset bool) ([]byte, error) { - max_ind, err := db.getMaxIndex(db_name, collection_name, dataset) +func (db *Mongodb) getLastRecord(request Request) ([]byte, error) { + max_ind, err := db.getMaxIndex(request) if err != nil { return nil, err } - res, err := db.getRecordByIDRow(db_name, collection_name, max_ind, max_ind, dataset) + res, err := db.getRecordByIDRow(request, max_ind, max_ind) - db.setCounter(db_name, collection_name, group_id, max_ind) + db.setCounter(request, max_ind) return res, err } -func (db *Mongodb) getSize(db_name string, collection_name string) ([]byte, error) { - c := db.client.Database(db_name).Collection(data_collection_name_prefix + collection_name) +func (db *Mongodb) getSize(request Request) ([]byte, error) { + c := db.client.Database(request.DbName).Collection(data_collection_name_prefix + request.DbCollectionName) var rec SizeRecord var err error @@ -550,18 +550,18 @@ func (db *Mongodb) getSize(db_name string, collection_name string) ([]byte, erro return json.Marshal(&rec) } -func (db *Mongodb) resetCounter(db_name string, collection_name string, group_id string, id_str string) ([]byte, error) { - id, err := strconv.Atoi(id_str) +func (db *Mongodb) resetCounter(request Request) ([]byte, error) { + id, err := strconv.Atoi(request.ExtraParam) if err != nil { return nil, err } - err = db.setCounter(db_name, collection_name, group_id, id) + err = db.setCounter(request, id) if err!= nil { return []byte(""), err } - c := db.client.Database(db_name).Collection(inprocess_collection_name_prefix + group_id) + c := db.client.Database(request.DbName).Collection(inprocess_collection_name_prefix + request.GroupId) _, err_del := c.DeleteMany(context.Background(), bson.M{"_id": bson.M{"$gte": id}}) if err_del != nil { return nil, &DBError{utils.StatusWrongInput, err.Error()} @@ -570,22 +570,22 @@ func (db *Mongodb) resetCounter(db_name string, collection_name string, group_id return []byte(""), nil } -func (db *Mongodb) getMeta(dbname string, id_str string) ([]byte, error) { - id, err := strconv.Atoi(id_str) +func (db *Mongodb) getMeta(request Request) ([]byte, error) { + id, err := strconv.Atoi(request.ExtraParam) if err != nil { return nil, err } var res map[string]interface{} q := bson.M{"_id": id} - c := db.client.Database(dbname).Collection(meta_collection_name) + c := db.client.Database(request.DbName).Collection(meta_collection_name) err = c.FindOne(context.TODO(), q, options.FindOne()).Decode(&res) if err != nil { - log_str := "error getting meta with id " + strconv.Itoa(id) + " for " + dbname + " : " + err.Error() + log_str := "error getting meta with id " + strconv.Itoa(id) + " for " + request.DbName + " : " + err.Error() logger.Debug(log_str) return nil, &DBError{utils.StatusNoData, err.Error()} } - log_str := "got record id " + strconv.Itoa(id) + " for " + dbname + log_str := "got record id " + strconv.Itoa(id) + " for " + request.DbName logger.Debug(log_str) return utils.MapToJson(&res) } @@ -596,16 +596,16 @@ func (db *Mongodb) processQueryError(query, dbname string, err error) ([]byte, e return nil, &DBError{utils.StatusNoData, err.Error()} } -func (db *Mongodb) queryImages(dbname string, collection_name string, query string) ([]byte, error) { +func (db *Mongodb) queryImages(request Request) ([]byte, error) { var res []map[string]interface{} - q, sort, err := db.BSONFromSQL(dbname, query) + q, sort, err := db.BSONFromSQL(request.DbName, request.ExtraParam) if err != nil { - log_str := "error parsing query: " + query + " for " + dbname + " : " + err.Error() + log_str := "error parsing query: " + request.ExtraParam + " for " + request.DbName + " : " + err.Error() logger.Debug(log_str) return nil, &DBError{utils.StatusWrongInput, err.Error()} } - c := db.client.Database(dbname).Collection(data_collection_name_prefix + collection_name) + c := db.client.Database(request.DbName).Collection(data_collection_name_prefix + request.DbCollectionName) opts := options.Find() if len(sort) > 0 { @@ -615,14 +615,14 @@ func (db *Mongodb) queryImages(dbname string, collection_name string, query stri cursor, err := c.Find(context.TODO(), q, opts) if err != nil { - return db.processQueryError(query, dbname, err) + return db.processQueryError(request.ExtraParam, request.DbName, err) } err = cursor.All(context.TODO(), &res) if err != nil { - return db.processQueryError(query, dbname, err) + return db.processQueryError(request.ExtraParam, request.DbName, err) } - log_str := "processed query " + query + " for " + dbname + " ,found" + strconv.Itoa(len(res)) + " records" + log_str := "processed query " + request.ExtraParam + " for " + request.DbName + " ,found" + strconv.Itoa(len(res)) + " records" logger.Debug(log_str) if res != nil { return utils.MapToJson(&res) @@ -658,8 +658,8 @@ func extractsTwoIntsFromString(from_to string) (int, int, error) { } -func (db *Mongodb) nacks(db_name string, collection_name string, group_id string, from_to string) ([]byte, error) { - from, to, err := extractsTwoIntsFromString(from_to) +func (db *Mongodb) nacks(request Request) ([]byte, error) { + from, to, err := extractsTwoIntsFromString(request.ExtraParam) if err != nil { return nil, err } @@ -669,7 +669,7 @@ func (db *Mongodb) nacks(db_name string, collection_name string, group_id string } if to == 0 { - to, err = db.getMaxIndex(db_name, collection_name, false) + to, err = db.getMaxIndex(request) if err != nil { return nil, err } @@ -680,7 +680,7 @@ func (db *Mongodb) nacks(db_name string, collection_name string, group_id string return utils.MapToJson(&res) } - res.Unacknowledged, err = db.getNacks(db_name, collection_name, group_id, from, to) + res.Unacknowledged, err = db.getNacks(request, from, to) if err != nil { return nil, err } @@ -688,8 +688,8 @@ func (db *Mongodb) nacks(db_name string, collection_name string, group_id string return utils.MapToJson(&res) } -func (db *Mongodb) lastAck(db_name string, collection_name string, group_id string) ([]byte, error) { - c := db.client.Database(db_name).Collection(acks_collection_name_prefix + collection_name + "_" + group_id) +func (db *Mongodb) lastAck(request Request) ([]byte, error) { + c := db.client.Database(request.DbName).Collection(acks_collection_name_prefix + request.DbCollectionName + "_" + request.GroupId) opts := options.FindOne().SetSort(bson.M{"_id": -1}).SetReturnKey(true) result := LastAck{0} var q bson.M = nil @@ -704,9 +704,9 @@ func (db *Mongodb) lastAck(db_name string, collection_name string, group_id stri return utils.MapToJson(&result) } -func (db *Mongodb) getNacks(db_name string, collection_name string, group_id string, min_index, max_index int) ([]int, error) { +func (db *Mongodb) getNacks(request Request, min_index, max_index int) ([]int, error) { - c := db.client.Database(db_name).Collection(acks_collection_name_prefix + collection_name + "_" + group_id) + c := db.client.Database(request.DbName).Collection(acks_collection_name_prefix + request.DbCollectionName + "_" + request.GroupId) if min_index > max_index { return []int{}, errors.New("from index is greater than to index") @@ -755,45 +755,45 @@ func (db *Mongodb) getNacks(db_name string, collection_name string, group_id str return resp[0].Numbers, nil } -func (db *Mongodb) getSubstreams(db_name string, from string) ([]byte, error) { - rec, err := substreams.getSubstreams(db,db_name,from) +func (db *Mongodb) getSubstreams(request Request) ([]byte, error) { + rec, err := substreams.getSubstreams(db,request.DbName,request.ExtraParam) if err != nil { - return db.processQueryError("get substreams", db_name, err) + return db.processQueryError("get substreams", request.DbName, err) } return json.Marshal(&rec) } func (db *Mongodb) ProcessRequest(request Request) (answer []byte, err error) { - if err := db.checkDatabaseOperationPrerequisites(request.DbName, request.DbCollectionName, request.GroupId); err != nil { + if err := db.checkDatabaseOperationPrerequisites(request); err != nil { return nil, err } switch request.Op { case "next": - return db.getNextRecord(request.DbName, request.DbCollectionName, request.GroupId, request.DatasetOp, request.ExtraParam) + return db.getNextRecord(request) case "id": - return db.getRecordByID(request.DbName, request.DbCollectionName, request.GroupId, request.ExtraParam, request.DatasetOp) + return db.getRecordByID(request) case "last": - return db.getLastRecord(request.DbName, request.DbCollectionName, request.GroupId, request.DatasetOp) + return db.getLastRecord(request) case "resetcounter": - return db.resetCounter(request.DbName, request.DbCollectionName, request.GroupId, request.ExtraParam) + return db.resetCounter(request) case "size": - return db.getSize(request.DbName, request.DbCollectionName) + return db.getSize(request) case "meta": - return db.getMeta(request.DbName, request.ExtraParam) + return db.getMeta(request) case "queryimages": - return db.queryImages(request.DbName, request.DbCollectionName, request.ExtraParam) + return db.queryImages(request) case "substreams": - return db.getSubstreams(request.DbName,request.ExtraParam) + return db.getSubstreams(request) case "ackimage": - return db.ackRecord(request.DbName, request.DbCollectionName, request.GroupId, request.ExtraParam) + return db.ackRecord(request) case "negackimage": - return db.negAckRecord(request.DbName, request.GroupId, request.ExtraParam) + return db.negAckRecord(request) case "nacks": - return db.nacks(request.DbName, request.DbCollectionName, request.GroupId, request.ExtraParam) + return db.nacks(request) case "lastack": - return db.lastAck(request.DbName, request.DbCollectionName, request.GroupId) + return db.lastAck(request) } return nil, errors.New("Wrong db operation: " + request.Op) diff --git a/broker/src/asapo_broker/database/mongodb_test.go b/broker/src/asapo_broker/database/mongodb_test.go index 8a93623bb..22a1ffd8f 100644 --- a/broker/src/asapo_broker/database/mongodb_test.go +++ b/broker/src/asapo_broker/database/mongodb_test.go @@ -693,8 +693,9 @@ func TestMongoDBAckImage(t *testing.T) { db.insertRecord(dbname, collection, &rec1) query_str := "{\"Id\":1,\"Op\":\"ackimage\"}" - res, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "ackimage", ExtraParam: query_str}) - nacks, _ := db.getNacks(dbname, collection, groupId, 1, 1) + request := Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "ackimage", ExtraParam: query_str} + res, err := db.ProcessRequest(request) + nacks, _ := db.getNacks(request, 1, 1) assert.Nil(t, err) assert.Equal(t, "", string(res)) assert.Equal(t, 0, len(nacks)) @@ -728,9 +729,9 @@ func TestMongoDBNacks(t *testing.T) { insertRecords(10) } if test.ackRecords { - db.ackRecord(dbname, collection, groupId, "{\"Id\":2,\"Op\":\"ackimage\"}") - db.ackRecord(dbname, collection, groupId, "{\"Id\":3,\"Op\":\"ackimage\"}") - db.ackRecord(dbname, collection, groupId, "{\"Id\":4,\"Op\":\"ackimage\"}") + db.ackRecord(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, ExtraParam: "{\"Id\":2,\"Op\":\"ackimage\"}"}) + db.ackRecord(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, ExtraParam: "{\"Id\":3,\"Op\":\"ackimage\"}"}) + db.ackRecord(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, ExtraParam: "{\"Id\":4,\"Op\":\"ackimage\"}"}) } res, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "nacks", ExtraParam: test.rangeString}) @@ -762,9 +763,9 @@ func TestMongoDBLastAcks(t *testing.T) { insertRecords(10) } if test.ackRecords { - db.ackRecord(dbname, collection, groupId, "{\"Id\":2,\"Op\":\"ackimage\"}") - db.ackRecord(dbname, collection, groupId, "{\"Id\":3,\"Op\":\"ackimage\"}") - db.ackRecord(dbname, collection, groupId, "{\"Id\":4,\"Op\":\"ackimage\"}") + db.ackRecord(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, ExtraParam: "{\"Id\":2,\"Op\":\"ackimage\"}"}) + db.ackRecord(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, ExtraParam: "{\"Id\":3,\"Op\":\"ackimage\"}"}) + db.ackRecord(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, ExtraParam: "{\"Id\":4,\"Op\":\"ackimage\"}"}) } res, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "lastack"}) -- GitLab