From 977f64d835f1bc6bd65b9497a9bead8d72bf5bbc Mon Sep 17 00:00:00 2001 From: Sergey Yakubov <sergey.yakubov@desy.de> Date: Wed, 24 Nov 2021 13:01:06 +0100 Subject: [PATCH] refactor request struct --- broker/src/asapo_broker/database/database.go | 7 +- broker/src/asapo_broker/database/encoding.go | 5 +- .../asapo_broker/database/encoding_test.go | 10 +- broker/src/asapo_broker/database/mongodb.go | 70 ++--- .../asapo_broker/database/mongodb_streams.go | 4 +- .../src/asapo_broker/database/mongodb_test.go | 259 +++++++++--------- .../src/asapo_broker/database/streams_test.go | 58 ++-- .../asapo_broker/server/get_commands_test.go | 5 +- .../src/asapo_broker/server/get_meta_test.go | 2 +- .../asapo_broker/server/post_op_image_test.go | 2 +- .../server/post_query_images_test.go | 2 +- .../server/post_reset_counter_test.go | 2 +- .../asapo_broker/server/process_request.go | 13 +- .../server/process_request_test.go | 12 +- common/go/src/asapo_common/logger/logger.go | 4 + 15 files changed, 236 insertions(+), 219 deletions(-) diff --git a/broker/src/asapo_broker/database/database.go b/broker/src/asapo_broker/database/database.go index 7e48a6e52..ff49cef31 100644 --- a/broker/src/asapo_broker/database/database.go +++ b/broker/src/asapo_broker/database/database.go @@ -3,7 +3,8 @@ package database import "asapo_common/utils" type Request struct { - DbName string + Beamtime string + DataSource string Stream string GroupId string Op string @@ -12,6 +13,10 @@ type Request struct { ExtraParam string } +func (request * Request ) DbName() string { + return request.Beamtime+"_"+request.DataSource +} + type Agent interface { ProcessRequest(request Request) ([]byte, error) Ping() error diff --git a/broker/src/asapo_broker/database/encoding.go b/broker/src/asapo_broker/database/encoding.go index 6e61d95f0..86b477bd2 100644 --- a/broker/src/asapo_broker/database/encoding.go +++ b/broker/src/asapo_broker/database/encoding.go @@ -80,8 +80,9 @@ func encodeStringForColName(original string) (result string) { } func encodeRequest(request *Request) error { - request.DbName = encodeStringForDbName(request.DbName) - if len(request.DbName)> max_encoded_source_size { + request.DataSource = encodeStringForDbName(request.DataSource) + request.Beamtime = encodeStringForDbName(request.Beamtime) + if len(request.DbName())> max_encoded_source_size { return &DBError{utils.StatusWrongInput, "source name is too long"} } diff --git a/broker/src/asapo_broker/database/encoding_test.go b/broker/src/asapo_broker/database/encoding_test.go index 1b018289e..82447de77 100644 --- a/broker/src/asapo_broker/database/encoding_test.go +++ b/broker/src/asapo_broker/database/encoding_test.go @@ -18,7 +18,8 @@ func TestEncoding(t *testing.T) { assert.Equal(t, sourceDecoded, source) r := Request{ - DbName: source, + Beamtime: "bt", + DataSource: source, Stream: stream, GroupId: stream, Op: "", @@ -29,7 +30,7 @@ func TestEncoding(t *testing.T) { err := encodeRequest(&r) assert.Equal(t, r.Stream, streamEncoded) assert.Equal(t, r.GroupId, streamEncoded) - assert.Equal(t, r.DbName, sourceEncoded) + assert.Equal(t, r.DataSource, sourceEncoded) assert.Nil(t, err) } @@ -61,9 +62,10 @@ func TestEncodingTooLong(t *testing.T) { for _, test := range encodeTests { stream := RandomString(test.streamSize) group := RandomString(test.groupSize) - source := RandomString(test.sourceSize) + source := RandomString(test.sourceSize-3) r := Request{ - DbName: source, + Beamtime: "bt", + DataSource: source, Stream: stream, GroupId: group, Op: "", diff --git a/broker/src/asapo_broker/database/mongodb.go b/broker/src/asapo_broker/database/mongodb.go index 0291da19f..812491c78 100644 --- a/broker/src/asapo_broker/database/mongodb.go +++ b/broker/src/asapo_broker/database/mongodb.go @@ -199,7 +199,7 @@ func maxIndexQuery(request Request, returnIncompete bool) bson.M { } func (db *Mongodb) getMaxIndex(request Request, returnIncompete bool) (max_id int, err error) { - c := db.client.Database(request.DbName).Collection(data_collection_name_prefix + request.Stream) + c := db.client.Database(request.DbName()).Collection(data_collection_name_prefix + request.Stream) q := maxIndexQuery(request, returnIncompete) opts := options.FindOne().SetSort(bson.M{"_id": -1}).SetReturnKey(true) @@ -227,7 +227,7 @@ func duplicateError(err error) bool { func (db *Mongodb) setCounter(request Request, ind int) (err error) { update := bson.M{"$set": bson.M{pointer_field_name: ind}} opts := options.Update().SetUpsert(true) - c := db.client.Database(request.DbName).Collection(pointer_collection_name) + c := db.client.Database(request.DbName()).Collection(pointer_collection_name) q := bson.M{"_id": request.GroupId + "_" + request.Stream} _, err = c.UpdateOne(context.TODO(), q, update, opts) return @@ -252,7 +252,7 @@ func (db *Mongodb) changeField(request Request, change fieldChangeRequest, res i opts := options.FindOneAndUpdate().SetUpsert(true).SetReturnDocument(options.After) q := bson.M{"_id": request.GroupId + "_" + request.Stream, change.fieldName: bson.M{"$lt": change.max_ind}} - c := db.client.Database(request.DbName).Collection(change.collectionName) + c := db.client.Database(request.DbName()).Collection(change.collectionName) err = c.FindOneAndUpdate(context.TODO(), q, update, opts).Decode(res) if err != nil { @@ -306,11 +306,11 @@ func recordContainsPartialData(request Request, rec map[string]interface{}) bool func (db *Mongodb) getRecordFromDb(request Request, id, id_max int) (res map[string]interface{}, err error) { q := bson.M{"_id": id} - c := db.client.Database(request.DbName).Collection(data_collection_name_prefix + request.Stream) + c := db.client.Database(request.DbName()).Collection(data_collection_name_prefix + request.Stream) err = c.FindOne(context.TODO(), q, options.FindOne()).Decode(&res) if err != nil { answer := encodeAnswer(id, id_max, "") - log_str := "error getting record id " + strconv.Itoa(id) + " for " + request.DbName + " : " + err.Error() + log_str := "error getting record id " + strconv.Itoa(id) + " for " + request.DbName() + " : " + err.Error() logger.Debug(log_str) return res, &DBError{utils.StatusNoData, answer} } @@ -327,7 +327,7 @@ func (db *Mongodb) getRecordByIDRaw(request Request, id, id_max int) ([]byte, er return nil, err } - log_str := "got record id " + strconv.Itoa(id) + " for " + request.DbName + log_str := "got record id " + strconv.Itoa(id) + " for " + request.DbName() logger.Debug(log_str) record, err := utils.MapToJson(&res) @@ -392,7 +392,7 @@ func (db *Mongodb) negAckRecord(request Request) ([]byte, error) { return nil, &DBError{utils.StatusWrongInput, err.Error()} } - err = db.InsertRecordToInprocess(request.DbName, inprocess_collection_name_prefix+request.Stream+"_"+request.GroupId, input.Id, input.Params.DelayMs, 1, true) + err = db.InsertRecordToInprocess(request.DbName(), inprocess_collection_name_prefix+request.Stream+"_"+request.GroupId, input.Id, input.Params.DelayMs, 1, true) return []byte(""), err } @@ -402,7 +402,7 @@ func (db *Mongodb) ackRecord(request Request) ([]byte, error) { if err != nil { return nil, &DBError{utils.StatusWrongInput, err.Error()} } - c := db.client.Database(request.DbName).Collection(acks_collection_name_prefix + request.Stream + "_" + request.GroupId) + c := db.client.Database(request.DbName()).Collection(acks_collection_name_prefix + request.Stream + "_" + request.GroupId) _, err = c.InsertOne(context.Background(), &record) if err != nil { if duplicateError(err) { @@ -411,7 +411,7 @@ func (db *Mongodb) ackRecord(request Request) ([]byte, error) { return nil, err } - c = db.client.Database(request.DbName).Collection(inprocess_collection_name_prefix + request.Stream + "_" + request.GroupId) + c = db.client.Database(request.DbName()).Collection(inprocess_collection_name_prefix + request.Stream + "_" + request.GroupId) _, err_del := c.DeleteOne(context.Background(), bson.M{"_id": record.ID}) if err_del != nil { return nil, &DBError{utils.StatusWrongInput, err.Error()} @@ -425,7 +425,7 @@ func (db *Mongodb) checkDatabaseOperationPrerequisites(request Request) error { return &DBError{utils.StatusServiceUnavailable, no_session_msg} } - if len(request.DbName) == 0 || len(request.Stream) == 0 { + if len(request.DbName()) <= 1 || len(request.Stream) == 0 { return &DBError{utils.StatusWrongInput, "beamtime_id ans stream must be set"} } @@ -527,9 +527,9 @@ func (db *Mongodb) getNextAndMaxIndexesFromInprocessed(request Request, ignoreTi t := db.lastReadFromInprocess[request.Stream+"_"+request.GroupId] dbSessionLock.Unlock() if (t <= tNow-int64(db.settings.ReadFromInprocessPeriod)) || ignoreTimeout { - record_ind, err = db.getUnProcessedId(request.DbName, inprocess_collection_name_prefix+request.Stream+"_"+request.GroupId, delayMs, nResendAttempts) + record_ind, err = db.getUnProcessedId(request.DbName(), inprocess_collection_name_prefix+request.Stream+"_"+request.GroupId, delayMs, nResendAttempts) if err != nil { - log_str := "error getting unprocessed id " + request.DbName + ", groupid: " + request.GroupId + ":" + err.Error() + log_str := "error getting unprocessed id " + request.DbName() + ", groupid: " + request.GroupId + ":" + err.Error() logger.Debug(log_str) return 0, 0, err } @@ -552,11 +552,11 @@ func (db *Mongodb) getNextAndMaxIndexesFromInprocessed(request Request, ignoreTi func (db *Mongodb) getNextAndMaxIndexesFromCurPointer(request Request) (int, int, error) { curPointer, max_ind, err := db.getCurrentPointer(request) if err != nil { - log_str := "error getting next pointer for " + request.DbName + ", groupid: " + request.GroupId + ":" + err.Error() + log_str := "error getting next pointer for " + request.DbName() + ", groupid: " + request.GroupId + ":" + err.Error() logger.Debug(log_str) return 0, 0, err } - log_str := "got next pointer " + strconv.Itoa(curPointer.Value) + " for " + request.DbName + ", groupid: " + request.GroupId + log_str := "got next pointer " + strconv.Itoa(curPointer.Value) + " for " + request.DbName() + ", groupid: " + request.GroupId logger.Debug(log_str) return curPointer.Value, max_ind, nil } @@ -641,7 +641,7 @@ func (db *Mongodb) getNextRecord(request Request) ([]byte, error) { } if err == nil { - err_update := db.InsertToInprocessIfNeeded(request.DbName, inprocess_collection_name_prefix+request.Stream+"_"+request.GroupId, nextInd, request.ExtraParam) + err_update := db.InsertToInprocessIfNeeded(request.DbName(), inprocess_collection_name_prefix+request.Stream+"_"+request.GroupId, nextInd, request.ExtraParam) if err_update != nil { return nil, err_update } @@ -689,7 +689,7 @@ func getSizeFilter(request Request) bson.M { } func (db *Mongodb) getSize(request Request) ([]byte, error) { - c := db.client.Database(request.DbName).Collection(data_collection_name_prefix + request.Stream) + c := db.client.Database(request.DbName()).Collection(data_collection_name_prefix + request.Stream) filter := getSizeFilter(request) size, err := c.CountDocuments(context.TODO(), filter, options.Count()) @@ -716,7 +716,7 @@ func (db *Mongodb) resetCounter(request Request) ([]byte, error) { return []byte(""), err } - c := db.client.Database(request.DbName).Collection(inprocess_collection_name_prefix + request.Stream + "_" + request.GroupId) + c := db.client.Database(request.DbName()).Collection(inprocess_collection_name_prefix + request.Stream + "_" + request.GroupId) _, err_del := c.DeleteMany(context.Background(), bson.M{"_id": bson.M{"$gte": id}}) if err_del != nil { return nil, &DBError{utils.StatusWrongInput, err_del.Error()} @@ -743,20 +743,20 @@ func (db *Mongodb) getMeta(request Request) ([]byte, error) { } q := bson.M{"_id": id} var res map[string]interface{} - c := db.client.Database(request.DbName).Collection(meta_collection_name) + c := db.client.Database(request.DbName()).Collection(meta_collection_name) err = c.FindOne(context.TODO(), q, options.FindOne()).Decode(&res) if err != nil { - log_str := "error getting meta for " + id + " in " + request.DbName + " : " + err.Error() + log_str := "error getting meta for " + id + " in " + request.DbName() + " : " + err.Error() logger.Debug(log_str) return nil, &DBError{utils.StatusNoData, err.Error()} } userMeta, ok := res["meta"] if !ok { - log_str := "error getting meta for " + id + " in " + request.DbName + " : cannot parse database response" + log_str := "error getting meta for " + id + " in " + request.DbName() + " : cannot parse database response" logger.Error(log_str) return nil, errors.New(log_str) } - log_str := "got metadata for " + id + " in " + request.DbName + log_str := "got metadata for " + id + " in " + request.DbName() logger.Debug(log_str) return utils.MapToJson(&userMeta) } @@ -769,14 +769,14 @@ func (db *Mongodb) processQueryError(query, dbname string, err error) ([]byte, e func (db *Mongodb) queryMessages(request Request) ([]byte, error) { var res []map[string]interface{} - q, sort, err := db.BSONFromSQL(request.DbName, request.ExtraParam) + q, sort, err := db.BSONFromSQL(request.DbName(), request.ExtraParam) if err != nil { - log_str := "error parsing query: " + request.ExtraParam + " for " + request.DbName + " : " + err.Error() + log_str := "error parsing query: " + request.ExtraParam + " for " + request.DbName() + " : " + err.Error() logger.Debug(log_str) return nil, &DBError{utils.StatusWrongInput, err.Error()} } - c := db.client.Database(request.DbName).Collection(data_collection_name_prefix + request.Stream) + c := db.client.Database(request.DbName()).Collection(data_collection_name_prefix + request.Stream) opts := options.Find() if len(sort) > 0 { @@ -786,14 +786,14 @@ func (db *Mongodb) queryMessages(request Request) ([]byte, error) { cursor, err := c.Find(context.TODO(), q, opts) if err != nil { - return db.processQueryError(request.ExtraParam, request.DbName, err) + return db.processQueryError(request.ExtraParam, request.DbName(), err) } err = cursor.All(context.TODO(), &res) if err != nil { - return db.processQueryError(request.ExtraParam, request.DbName, err) + return db.processQueryError(request.ExtraParam, request.DbName(), err) } - log_str := "processed query " + request.ExtraParam + " for " + request.DbName + " ,found" + strconv.Itoa(len(res)) + " records" + log_str := "processed query " + request.ExtraParam + " for " + request.DbName() + " ,found" + strconv.Itoa(len(res)) + " records" logger.Debug(log_str) if res != nil { return utils.MapToJson(&res) @@ -880,11 +880,11 @@ func (db *Mongodb) nacks(request Request) ([]byte, error) { } func (db *Mongodb) deleteCollection(request Request, name string) error { - return db.client.Database(request.DbName).Collection(name).Drop(context.Background()) + return db.client.Database(request.DbName()).Collection(name).Drop(context.Background()) } func (db *Mongodb) collectionExist(request Request, name string) (bool, error) { - result, err := db.client.Database(request.DbName).ListCollectionNames(context.TODO(), bson.M{"name": name}) + result, err := db.client.Database(request.DbName()).ListCollectionNames(context.TODO(), bson.M{"name": name}) if err != nil { return false, err } @@ -910,7 +910,7 @@ func (db *Mongodb) deleteDataCollection(errorOnNotexist bool, request Request) e func (db *Mongodb) deleteDocumentsInCollection(request Request, collection string, field string, pattern string) error { filter := bson.M{field: bson.D{{"$regex", primitive.Regex{Pattern: pattern, Options: "i"}}}} - _, err := db.client.Database(request.DbName).Collection(collection).DeleteMany(context.TODO(), filter) + _, err := db.client.Database(request.DbName()).Collection(collection).DeleteMany(context.TODO(), filter) return err } @@ -923,7 +923,7 @@ func escapeQuery(query string) (res string) { } func (db *Mongodb) deleteCollectionsWithPrefix(request Request, prefix string) error { - cols, err := db.client.Database(request.DbName).ListCollectionNames(context.TODO(), bson.M{"name": bson.D{ + cols, err := db.client.Database(request.DbName()).ListCollectionNames(context.TODO(), bson.M{"name": bson.D{ {"$regex", primitive.Regex{Pattern: "^" + escapeQuery(prefix), Options: "i"}}}}) if err != nil { return err @@ -966,7 +966,7 @@ func (db *Mongodb) deleteStream(request Request) ([]byte, error) { return nil, &DBError{utils.StatusWrongInput, "wrong params: " + request.ExtraParam} } if !*params.DeleteMeta { - logger.Debug("skipping delete stream meta for " + request.Stream + " in " + request.DbName) + logger.Debug("skipping delete stream meta for " + request.Stream + " in " + request.DbName()) return nil, nil } @@ -980,7 +980,7 @@ func (db *Mongodb) deleteStream(request Request) ([]byte, error) { } func (db *Mongodb) lastAck(request Request) ([]byte, error) { - c := db.client.Database(request.DbName).Collection(acks_collection_name_prefix + request.Stream + "_" + request.GroupId) + c := db.client.Database(request.DbName()).Collection(acks_collection_name_prefix + request.Stream + "_" + request.GroupId) opts := options.FindOne().SetSort(bson.M{"_id": -1}).SetReturnKey(true) result := LastAck{0} var q bson.M = nil @@ -1047,7 +1047,7 @@ func extractNacsFromCursor(err error, cursor *mongo.Cursor) ([]int, error) { } func (db *Mongodb) getNacks(request Request, min_index, max_index int) ([]int, error) { - c := db.client.Database(request.DbName).Collection(acks_collection_name_prefix + request.Stream + "_" + request.GroupId) + c := db.client.Database(request.DbName()).Collection(acks_collection_name_prefix + request.Stream + "_" + request.GroupId) if res, err, ok := db.canAvoidDbRequest(min_index, max_index, c); ok { return res, err @@ -1062,7 +1062,7 @@ func (db *Mongodb) getNacks(request Request, min_index, max_index int) ([]int, e func (db *Mongodb) getStreams(request Request) ([]byte, error) { rec, err := streams.getStreams(db, request) if err != nil { - return db.processQueryError("get streams", request.DbName, err) + return db.processQueryError("get streams", request.DbName(), err) } return json.Marshal(&rec) } diff --git a/broker/src/asapo_broker/database/mongodb_streams.go b/broker/src/asapo_broker/database/mongodb_streams.go index a182f5080..d3974e9c7 100644 --- a/broker/src/asapo_broker/database/mongodb_streams.go +++ b/broker/src/asapo_broker/database/mongodb_streams.go @@ -265,9 +265,9 @@ func (ss *Streams) getStreams(db *Mongodb, request Request) (StreamsRecord, erro } streamsLock.Lock() - rec, err := ss.tryGetFromCache(request.DbName, db.settings.UpdateStreamCachePeriodMs) + rec, err := ss.tryGetFromCache(request.DbName(), db.settings.UpdateStreamCachePeriodMs) if err != nil { - rec, err = ss.updateFromDb(db, request.DbName) + rec, err = ss.updateFromDb(db, request.DbName()) } streamsLock.Unlock() if err != nil { diff --git a/broker/src/asapo_broker/database/mongodb_test.go b/broker/src/asapo_broker/database/mongodb_test.go index 09bf8ab1c..d7f38ad47 100644 --- a/broker/src/asapo_broker/database/mongodb_test.go +++ b/broker/src/asapo_broker/database/mongodb_test.go @@ -36,7 +36,9 @@ type TestDataset struct { var db Mongodb -const dbname = "12345" +const beamtime = "bt" +const datasource = "12345" +const dbname = "bt_12345" const collection = "stream" const collection2 = "stream2" const dbaddress = "127.0.0.1:27017" @@ -100,17 +102,17 @@ func TestMongoDBConnectOK(t *testing.T) { } func TestMongoDBGetNextErrorWhenNotConnected(t *testing.T) { - _, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next"}) + _, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next"}) assert.Equal(t, utils.StatusServiceUnavailable, err.(*DBError).Code) } func TestMongoDBGetMetaErrorWhenNotConnected(t *testing.T) { - _, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, Op: "meta", ExtraParam: "0"}) + _, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, Op: "meta", ExtraParam: "0"}) assert.Equal(t, utils.StatusServiceUnavailable, err.(*DBError).Code) } func TestMongoDBQueryMessagesErrorWhenNotConnected(t *testing.T) { - _, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, Op: "querymessages", ExtraParam: "0"}) + _, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, Op: "querymessages", ExtraParam: "0"}) assert.Equal(t, utils.StatusServiceUnavailable, err.(*DBError).Code) } @@ -124,7 +126,7 @@ func TestMongoDBGetNextErrorWhenWrongDatabasename(t *testing.T) { func TestMongoDBGetNextErrorWhenNonExistingDatacollectionname(t *testing.T) { db.Connect(dbaddress) defer cleanup() - _, err := db.ProcessRequest(Request{DbName: dbname, Stream: "bla", GroupId: groupId, Op: "next"}) + _, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: "bla", GroupId: groupId, Op: "next"}) assert.Equal(t, utils.StatusNoData, err.(*DBError).Code) assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":0,\"id_max\":0,\"next_stream\":\"\"}", err.Error()) } @@ -132,7 +134,7 @@ func TestMongoDBGetNextErrorWhenNonExistingDatacollectionname(t *testing.T) { func TestMongoDBGetLastErrorWhenNonExistingDatacollectionname(t *testing.T) { db.Connect(dbaddress) defer cleanup() - _, err := db.ProcessRequest(Request{DbName: dbname, Stream: "bla", GroupId: groupId, Op: "last"}) + _, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: "bla", GroupId: groupId, Op: "last"}) assert.Equal(t, utils.StatusNoData, err.(*DBError).Code) assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":0,\"id_max\":0,\"next_stream\":\"\"}", err.Error()) } @@ -140,7 +142,7 @@ func TestMongoDBGetLastErrorWhenNonExistingDatacollectionname(t *testing.T) { func TestMongoDBGetByIdErrorWhenNoData(t *testing.T) { db.Connect(dbaddress) defer cleanup() - _, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "id", ExtraParam: "2"}) + _, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "id", ExtraParam: "2"}) assert.Equal(t, utils.StatusNoData, err.(*DBError).Code) assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":2,\"id_max\":0,\"next_stream\":\"\"}", err.Error()) @@ -150,7 +152,7 @@ func TestMongoDBGetNextErrorWhenRecordNotThereYet(t *testing.T) { db.Connect(dbaddress) defer cleanup() db.insertRecord(dbname, collection, &rec2) - _, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next"}) + _, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next"}) assert.Equal(t, utils.StatusNoData, err.(*DBError).Code) assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":2,\"next_stream\":\"\"}", err.Error()) } @@ -159,7 +161,7 @@ func TestMongoDBGetNextOK(t *testing.T) { db.Connect(dbaddress) defer cleanup() db.insertRecord(dbname, collection, &rec1) - res, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next"}) + res, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next"}) assert.Nil(t, err) assert.Equal(t, string(rec1_expect), string(res)) } @@ -170,8 +172,8 @@ func TestMongoDBGetNextErrorOnFinishedStream(t *testing.T) { db.insertRecord(dbname, collection, &rec1) db.insertRecord(dbname, collection, &rec_finished) - db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next"}) - _, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next"}) + db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next"}) + _, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next"}) assert.Equal(t, utils.StatusNoData, err.(*DBError).Code) assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":1,\"next_stream\":\"next1\"}", err.(*DBError).Message) @@ -183,9 +185,9 @@ func TestMongoDBGetNextErrorOnFinishedStreamAlways(t *testing.T) { db.insertRecord(dbname, collection, &rec1) db.insertRecord(dbname, collection, &rec_finished) - db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next"}) - db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next"}) - _, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next"}) + db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next"}) + db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next"}) + _, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next"}) assert.Equal(t, utils.StatusNoData, err.(*DBError).Code) assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":1,\"next_stream\":\"next1\"}", err.(*DBError).Message) @@ -199,7 +201,7 @@ func TestMongoDBGetByIdErrorOnFinishedStream(t *testing.T) { db.insertRecord(dbname, collection, &rec1) db.insertRecord(dbname, collection, &rec_finished) - _, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "id", ExtraParam: "2"}) + _, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "id", ExtraParam: "2"}) assert.Equal(t, utils.StatusNoData, err.(*DBError).Code) assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":1,\"next_stream\":\"next1\"}", err.(*DBError).Message) @@ -211,7 +213,7 @@ func TestMongoDBGetLastErrorOnFinishedStream(t *testing.T) { db.insertRecord(dbname, collection, &rec1) db.insertRecord(dbname, collection, &rec_finished) - res, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "last"}) + res, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "last"}) fmt.Println(string(res)) assert.Equal(t, utils.StatusNoData, err.(*DBError).Code) assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":1,\"next_stream\":\"next1\"}", err.(*DBError).Message) @@ -221,8 +223,8 @@ func TestMongoDBGetNextErrorOnNoMoreData(t *testing.T) { db.Connect(dbaddress) defer cleanup() db.insertRecord(dbname, collection, &rec1) - db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next"}) - _, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next"}) + db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next"}) + _, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next"}) assert.Equal(t, utils.StatusNoData, err.(*DBError).Code) assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":1,\"next_stream\":\"\"}", err.(*DBError).Message) @@ -233,8 +235,8 @@ func TestMongoDBGetNextCorrectOrder(t *testing.T) { defer cleanup() db.insertRecord(dbname, collection, &rec2) db.insertRecord(dbname, collection, &rec1) - res1, _ := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next"}) - res2, _ := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next"}) + res1, _ := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next"}) + res2, _ := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next"}) assert.Equal(t, string(rec1_expect), string(res1)) assert.Equal(t, string(rec2_expect), string(res2)) } @@ -271,7 +273,7 @@ func getRecords(n int, resend bool) []int { for i := 0; i < n; i++ { go func() { defer wg.Done() - res_bin, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: extra_param}) + res_bin, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: extra_param}) if err != nil { fmt.Println("error at read ", i) } @@ -316,13 +318,13 @@ func TestMongoDBGetLastAfterErasingDatabase(t *testing.T) { db.Connect(dbaddress) defer cleanup() insertRecords(10) - db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next"}) + db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next"}) db.dropDatabase(dbname) db.insertRecord(dbname, collection, &rec1) db.insertRecord(dbname, collection, &rec2) - res, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "last", ExtraParam: "0"}) + res, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "last", ExtraParam: "0"}) assert.Nil(t, err) assert.Equal(t, string(rec2_expect), string(res)) } @@ -331,7 +333,7 @@ func TestMongoDBGetNextAfterErasingDatabase(t *testing.T) { db.Connect(dbaddress) defer cleanup() insertRecords(200) - db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next"}) + db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next"}) db.dropDatabase(dbname) n := 100 @@ -344,10 +346,10 @@ func TestMongoDBGetNextEmptyAfterErasingDatabase(t *testing.T) { db.Connect(dbaddress) defer cleanup() insertRecords(10) - db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next"}) + db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next"}) db.dropDatabase(dbname) - _, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next"}) + _, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next"}) assert.Equal(t, utils.StatusNoData, err.(*DBError).Code) assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":0,\"id_max\":0,\"next_stream\":\"\"}", err.Error()) } @@ -357,7 +359,7 @@ func TestMongoDBgetRecordByID(t *testing.T) { defer cleanup() db.insertRecord(dbname, collection, &rec1) - res, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "id", ExtraParam: "1"}) + res, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "id", ExtraParam: "1"}) assert.Nil(t, err) assert.Equal(t, string(rec1_expect), string(res)) } @@ -366,7 +368,7 @@ func TestMongoDBgetRecordByIDFails(t *testing.T) { db.Connect(dbaddress) defer cleanup() db.insertRecord(dbname, collection, &rec1) - _, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "id", ExtraParam: "2"}) + _, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "id", ExtraParam: "2"}) assert.Equal(t, utils.StatusNoData, err.(*DBError).Code) assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":2,\"id_max\":1,\"next_stream\":\"\"}", err.Error()) } @@ -375,7 +377,7 @@ func TestMongoDBGetRecordNext(t *testing.T) { db.Connect(dbaddress) defer cleanup() db.insertRecord(dbname, collection, &rec1) - res, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next"}) + res, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next"}) assert.Nil(t, err) assert.Equal(t, string(rec1_expect), string(res)) } @@ -386,8 +388,8 @@ func TestMongoDBGetRecordNextMultipleCollections(t *testing.T) { db.insertRecord(dbname, collection, &rec1) db.insertRecord(dbname, collection2, &rec_dataset1) - res, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next"}) - res_string, err2 := db.ProcessRequest(Request{DbName: dbname, Stream: collection2, GroupId: groupId, Op: "next", DatasetOp: true}) + res, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next"}) + res_string, err2 := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection2, GroupId: groupId, Op: "next", DatasetOp: true}) var res_ds TestDataset json.Unmarshal(res_string, &res_ds) @@ -403,7 +405,7 @@ func TestMongoDBGetRecordID(t *testing.T) { db.Connect(dbaddress) defer cleanup() db.insertRecord(dbname, collection, &rec1) - res, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "id", ExtraParam: "1"}) + res, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "id", ExtraParam: "1"}) assert.Nil(t, err) assert.Equal(t, string(rec1_expect), string(res)) } @@ -412,7 +414,7 @@ func TestMongoDBWrongOp(t *testing.T) { db.Connect(dbaddress) defer cleanup() db.insertRecord(dbname, collection, &rec1) - _, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "bla"}) + _, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "bla"}) assert.NotNil(t, err) } @@ -422,7 +424,7 @@ func TestMongoDBGetRecordLast(t *testing.T) { db.insertRecord(dbname, collection, &rec1) db.insertRecord(dbname, collection, &rec2) - res, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "last", ExtraParam: "0"}) + res, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "last", ExtraParam: "0"}) assert.Nil(t, err) assert.Equal(t, string(rec2_expect), string(res)) } @@ -433,13 +435,13 @@ func TestMongoDBGetNextAfterGetLastCorrect(t *testing.T) { db.insertRecord(dbname, collection, &rec1) db.insertRecord(dbname, collection, &rec2) - res, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "last", ExtraParam: "0"}) + res, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "last", ExtraParam: "0"}) assert.Nil(t, err) assert.Equal(t, string(rec2_expect), string(res)) db.insertRecord(dbname, collection, &rec3) - res, err = db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next"}) + res, err = db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next"}) assert.Nil(t, err) assert.Equal(t, string(rec1_expect), string(res)) @@ -449,14 +451,14 @@ func TestMongoDBGetGetLastInGroupCorrect(t *testing.T) { db.Connect(dbaddress) defer cleanup() db.insertRecord(dbname, collection, &rec1) - db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next"}) // to check it does not influence groupedlast + db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next"}) // to check it does not influence groupedlast - res, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "groupedlast", ExtraParam: ""}) + res, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "groupedlast", ExtraParam: ""}) assert.Nil(t, err) assert.Equal(t, string(rec1_expect), string(res)) // first record - ok, then error - res, err = db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "groupedlast", ExtraParam: ""}) + res, err = db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "groupedlast", ExtraParam: ""}) assert.NotNil(t, err) if err != nil { assert.Equal(t, utils.StatusNoData, err.(*DBError).Code) @@ -464,15 +466,15 @@ func TestMongoDBGetGetLastInGroupCorrect(t *testing.T) { } // second record - ok, then error db.insertRecord(dbname, collection, &rec2) - res, err = db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "groupedlast", ExtraParam: ""}) + res, err = db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "groupedlast", ExtraParam: ""}) assert.Nil(t, err) assert.Equal(t, string(rec2_expect), string(res)) - res, err = db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "groupedlast", ExtraParam: ""}) + res, err = db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "groupedlast", ExtraParam: ""}) assert.NotNil(t, err) // stream finished - immediately error db.insertRecord(dbname, collection, &rec_finished3) - res, err = db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "groupedlast", ExtraParam: ""}) + res, err = db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "groupedlast", ExtraParam: ""}) assert.NotNil(t, err) if err != nil { assert.Equal(t, utils.StatusNoData, err.(*DBError).Code) @@ -487,9 +489,9 @@ func TestMongoDBGetGetLastInGroupImmediateErrorOnFinishStream(t *testing.T) { db.insertRecord(dbname, collection, &rec1) db.insertRecord(dbname, collection, &rec2) db.insertRecord(dbname, collection, &rec_finished3) - _, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "groupedlast", ExtraParam: ""}) + _, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "groupedlast", ExtraParam: ""}) assert.NotNil(t, err) - _, err = db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "groupedlast", ExtraParam: ""}) + _, err = db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "groupedlast", ExtraParam: ""}) assert.NotNil(t, err) if err != nil { assert.Equal(t, utils.StatusNoData, err.(*DBError).Code) @@ -506,7 +508,7 @@ func TestMongoDBGetSize(t *testing.T) { db.insertRecord(dbname, collection, &rec2) db.insertRecord(dbname, collection, &rec3) - res, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, Op: "size"}) + res, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, Op: "size"}) assert.Nil(t, err) assert.Equal(t, string(recs1_expect), string(res)) } @@ -517,7 +519,7 @@ func TestMongoDBGetSizeWithFinishedStream(t *testing.T) { db.insertRecord(dbname, collection, &rec1) db.insertRecord(dbname, collection, &rec_finished) - res, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, Op: "size"}) + res, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, Op: "size"}) assert.Nil(t, err) var rec_expect, _ = json.Marshal(&SizeRecord{1}) assert.Equal(t, string(rec_expect), string(res)) @@ -528,10 +530,10 @@ func TestMongoDBGetSizeForDatasets(t *testing.T) { defer cleanup() db.insertRecord(dbname, collection, &rec1) - _, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, Op: "size", ExtraParam: "false"}) + _, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, Op: "size", ExtraParam: "false"}) assert.Equal(t, utils.StatusWrongInput, err.(*DBError).Code) - _, err1 := db.ProcessRequest(Request{DbName: dbname, Stream: collection, Op: "size", ExtraParam: "true"}) + _, err1 := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, Op: "size", ExtraParam: "true"}) assert.Equal(t, utils.StatusWrongInput, err1.(*DBError).Code) } @@ -541,7 +543,7 @@ func TestMongoDBGetSizeForDatasetsWithFinishedStream(t *testing.T) { db.insertRecord(dbname, collection, &rec_dataset1_incomplete) db.insertRecord(dbname, collection, &rec_finished) - res, _ := db.ProcessRequest(Request{DbName: dbname, Stream: collection, Op: "size", ExtraParam: "true"}) + res, _ := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, Op: "size", ExtraParam: "true"}) var rec_expect, _ = json.Marshal(&SizeRecord{1}) assert.Equal(t, string(rec_expect), string(res)) @@ -556,7 +558,7 @@ func TestMongoDBGetSizeDataset(t *testing.T) { size2_expect, _ := json.Marshal(SizeRecord{2}) - res, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, Op: "size", ExtraParam: "true"}) + res, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, Op: "size", ExtraParam: "true"}) assert.Nil(t, err) assert.Equal(t, string(size2_expect), string(res)) } @@ -565,7 +567,7 @@ func TestMongoDBGetSizeNoRecords(t *testing.T) { db.Connect(dbaddress) defer cleanup() - res, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, Op: "size"}) + res, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, Op: "size"}) assert.Nil(t, err) assert.Equal(t, string(recs2_expect), string(res)) } @@ -583,7 +585,7 @@ func TestMongoPingNotConected(t *testing.T) { } func TestMongoDBgetRecordByIDNotConnected(t *testing.T) { - _, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "id", ExtraParam: "1"}) + _, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "id", ExtraParam: "1"}) assert.Equal(t, utils.StatusServiceUnavailable, err.(*DBError).Code) } @@ -593,15 +595,15 @@ func TestMongoDBResetCounter(t *testing.T) { db.insertRecord(dbname, collection, &rec1) db.insertRecord(dbname, collection, &rec2) - res1, err1 := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next"}) + res1, err1 := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next"}) assert.Nil(t, err1) assert.Equal(t, string(rec1_expect), string(res1)) - _, err_reset := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "resetcounter", ExtraParam: "1"}) + _, err_reset := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "resetcounter", ExtraParam: "1"}) assert.Nil(t, err_reset) - res2, err2 := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next"}) + res2, err2 := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next"}) assert.Nil(t, err2) assert.Equal(t, string(rec2_expect), string(res2)) @@ -613,7 +615,7 @@ func TestMongoDBGetMetaBtOK(t *testing.T) { rec_expect, _ := json.Marshal(recbt.Meta) db.insertMeta(dbname, &recbt) - res, err := db.ProcessRequest(Request{DbName: dbname, Stream: "whatever", Op: "meta", ExtraParam: "0"}) + res, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: "whatever", Op: "meta", ExtraParam: "0"}) assert.Nil(t, err) assert.Equal(t, string(rec_expect), string(res)) @@ -625,7 +627,7 @@ func TestMongoDBGetMetaStOK(t *testing.T) { rec_expect, _ := json.Marshal(recst.Meta) db.insertMeta(dbname, &recst) - res, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, Op: "meta", ExtraParam: "1"}) + res, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, Op: "meta", ExtraParam: "1"}) assert.Nil(t, err) assert.Equal(t, string(rec_expect), string(res)) @@ -635,7 +637,7 @@ func TestMongoDBGetMetaErr(t *testing.T) { db.Connect(dbaddress) defer cleanup() - _, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, Op: "meta", ExtraParam: "1"}) + _, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, Op: "meta", ExtraParam: "1"}) assert.NotNil(t, err) assert.Equal(t, utils.StatusNoData, err.(*DBError).Code) } @@ -711,7 +713,7 @@ func TestMongoDBQueryMessagesOK(t *testing.T) { // continue // } - res_string, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, Op: "querymessages", ExtraParam: test.query}) + res_string, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, Op: "querymessages", ExtraParam: test.query}) var res []TestRecordMeta json.Unmarshal(res_string, &res) // fmt.Println(string(res_string)) @@ -730,7 +732,7 @@ func TestMongoDBQueryMessagesOnEmptyDatabase(t *testing.T) { db.Connect(dbaddress) defer cleanup() for _, test := range tests { - res_string, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, Op: "querymessages", ExtraParam: test.query}) + res_string, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, Op: "querymessages", ExtraParam: test.query}) var res []TestRecordMeta json.Unmarshal(res_string, &res) assert.Equal(t, 0, len(res)) @@ -756,7 +758,7 @@ func TestMongoDBGetDataset(t *testing.T) { db.insertRecord(dbname, collection, &rec_dataset1) - res_string, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next", DatasetOp: true}) + res_string, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next", DatasetOp: true}) assert.Nil(t, err) @@ -772,7 +774,7 @@ func TestMongoDBNoDataOnNotCompletedFirstDataset(t *testing.T) { db.insertRecord(dbname, collection, &rec_dataset1_incomplete) - _, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next", DatasetOp: true}) + _, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next", DatasetOp: true}) assert.Equal(t, utils.StatusPartialData, err.(*DBError).Code) var res TestDataset @@ -787,8 +789,8 @@ func TestMongoDBNoDataOnNotCompletedNextDataset(t *testing.T) { db.insertRecord(dbname, collection, &rec_dataset1_incomplete) db.insertRecord(dbname, collection, &rec_dataset2_incomplete) - _, err1 := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next", DatasetOp: true}) - _, err2 := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next", DatasetOp: true}) + _, err1 := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next", DatasetOp: true}) + _, err2 := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next", DatasetOp: true}) assert.Equal(t, utils.StatusPartialData, err1.(*DBError).Code) assert.Equal(t, utils.StatusPartialData, err2.(*DBError).Code) @@ -804,7 +806,7 @@ func TestMongoDBGetRecordLastDataSetSkipsIncompleteSets(t *testing.T) { db.insertRecord(dbname, collection, &rec_dataset1) db.insertRecord(dbname, collection, &rec_dataset2) - res_string, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "last", DatasetOp: true, ExtraParam: "0"}) + res_string, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "last", DatasetOp: true, ExtraParam: "0"}) assert.Nil(t, err) @@ -821,7 +823,7 @@ func TestMongoDBGetRecordLastDataSetReturnsIncompleteSets(t *testing.T) { db.insertRecord(dbname, collection, &rec_dataset1) db.insertRecord(dbname, collection, &rec_dataset2) - res_string, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "last", + res_string, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "last", DatasetOp: true, MinDatasetSize: 3, ExtraParam: "0"}) assert.Nil(t, err) @@ -839,7 +841,7 @@ func TestMongoDBGetRecordLastDataSetSkipsIncompleteSetsWithMinSize(t *testing.T) db.insertRecord(dbname, collection, &rec_dataset1) db.insertRecord(dbname, collection, &rec_dataset2_incomplete3) - res_string, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "last", + res_string, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "last", DatasetOp: true, MinDatasetSize: 3, ExtraParam: "0"}) assert.Nil(t, err) @@ -856,7 +858,7 @@ func TestMongoDBGetRecordLastDataSetWithFinishedStream(t *testing.T) { db.insertRecord(dbname, collection, &rec_dataset1) db.insertRecord(dbname, collection, &rec_finished) - _, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "last", + _, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "last", DatasetOp: true, ExtraParam: "0"}) assert.NotNil(t, err) @@ -873,7 +875,7 @@ func TestMongoDBGetRecordLastDataSetWithIncompleteDatasetsAndFinishedStreamRetur db.insertRecord(dbname, collection, &rec_dataset1_incomplete) db.insertRecord(dbname, collection, &rec_finished) - _, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "last", + _, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "last", DatasetOp: true, MinDatasetSize: 2, ExtraParam: "0"}) assert.NotNil(t, err) @@ -890,7 +892,7 @@ func TestMongoDBGetRecordLastDataSetOK(t *testing.T) { db.insertRecord(dbname, collection, &rec_dataset1) db.insertRecord(dbname, collection, &rec_dataset3) - res_string, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "last", DatasetOp: true, ExtraParam: "0"}) + res_string, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "last", DatasetOp: true, ExtraParam: "0"}) assert.Nil(t, err) @@ -905,7 +907,7 @@ func TestMongoDBGetDatasetID(t *testing.T) { defer cleanup() db.insertRecord(dbname, collection, &rec_dataset1) - res_string, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "id", DatasetOp: true, ExtraParam: "1"}) + res_string, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "id", DatasetOp: true, ExtraParam: "1"}) assert.Nil(t, err) @@ -921,7 +923,7 @@ func TestMongoDBErrorOnIncompleteDatasetID(t *testing.T) { defer cleanup() db.insertRecord(dbname, collection, &rec_dataset1_incomplete) - _, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "id", DatasetOp: true, ExtraParam: "1"}) + _, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "id", DatasetOp: true, ExtraParam: "1"}) assert.Equal(t, utils.StatusPartialData, err.(*DBError).Code) @@ -937,7 +939,7 @@ func TestMongoDBOkOnIncompleteDatasetID(t *testing.T) { defer cleanup() db.insertRecord(dbname, collection, &rec_dataset1_incomplete) - res_string, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "id", DatasetOp: true, MinDatasetSize: 3, ExtraParam: "1"}) + res_string, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "id", DatasetOp: true, MinDatasetSize: 3, ExtraParam: "1"}) assert.Nil(t, err) @@ -984,7 +986,7 @@ func TestMongoDBListStreams(t *testing.T) { } var rec_streams_expect, _ = json.Marshal(test.expectedStreams) - res, err := db.ProcessRequest(Request{DbName: dbname, Stream: "0", Op: "streams", ExtraParam: utils.EncodeTwoStrings(test.from,"")}) + res, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: "0", Op: "streams", ExtraParam: utils.EncodeTwoStrings(test.from,"")}) if test.ok { assert.Nil(t, err, test.test) assert.Equal(t, string(rec_streams_expect), string(res), test.test) @@ -1004,7 +1006,7 @@ func TestMongoDBAckMessage(t *testing.T) { query_str := "{\"Id\":1,\"Op\":\"ackmessage\"}" - request := Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "ackmessage", ExtraParam: query_str} + request := Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "ackmessage", ExtraParam: query_str} res, err := db.ProcessRequest(request) nacks, _ := db.getNacks(request, 0, 0) assert.Nil(t, err) @@ -1041,12 +1043,12 @@ func TestMongoDBNacks(t *testing.T) { db.insertRecord(dbname, collection, &rec_finished11) } if test.ackRecords { - db.ackRecord(Request{DbName: dbname, Stream: collection, GroupId: groupId, ExtraParam: "{\"Id\":2,\"Op\":\"ackmessage\"}"}) - db.ackRecord(Request{DbName: dbname, Stream: collection, GroupId: groupId, ExtraParam: "{\"Id\":3,\"Op\":\"ackmessage\"}"}) - db.ackRecord(Request{DbName: dbname, Stream: collection, GroupId: groupId, ExtraParam: "{\"Id\":4,\"Op\":\"ackmessage\"}"}) + db.ackRecord(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, ExtraParam: "{\"Id\":2,\"Op\":\"ackmessage\"}"}) + db.ackRecord(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, ExtraParam: "{\"Id\":3,\"Op\":\"ackmessage\"}"}) + db.ackRecord(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, ExtraParam: "{\"Id\":4,\"Op\":\"ackmessage\"}"}) } - res, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "nacks", ExtraParam: test.rangeString}) + res, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "nacks", ExtraParam: test.rangeString}) if test.ok { assert.Nil(t, err, test.test) assert.Equal(t, test.resString, string(res), test.test) @@ -1076,12 +1078,12 @@ func TestMongoDBLastAcks(t *testing.T) { db.insertRecord(dbname, collection, &rec_finished11) } if test.ackRecords { - db.ackRecord(Request{DbName: dbname, Stream: collection, GroupId: groupId, ExtraParam: "{\"Id\":2,\"Op\":\"ackmessage\"}"}) - db.ackRecord(Request{DbName: dbname, Stream: collection, GroupId: groupId, ExtraParam: "{\"Id\":3,\"Op\":\"ackmessage\"}"}) - db.ackRecord(Request{DbName: dbname, Stream: collection, GroupId: groupId, ExtraParam: "{\"Id\":4,\"Op\":\"ackmessage\"}"}) + db.ackRecord(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, ExtraParam: "{\"Id\":2,\"Op\":\"ackmessage\"}"}) + db.ackRecord(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, ExtraParam: "{\"Id\":3,\"Op\":\"ackmessage\"}"}) + db.ackRecord(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, ExtraParam: "{\"Id\":4,\"Op\":\"ackmessage\"}"}) } - res, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "lastack"}) + res, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "lastack"}) assert.Nil(t, err, test.test) assert.Equal(t, test.resString, string(res), test.test) cleanup() @@ -1095,8 +1097,8 @@ func TestMongoDBGetNextUsesInprocessedImmedeatly(t *testing.T) { err := db.insertRecord(dbname, collection, &rec1) db.insertRecord(dbname, collection, &rec2) - res, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "0_3"}) - res1, err1 := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "0_3"}) + res, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "0_3"}) + res1, err1 := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "0_3"}) assert.Nil(t, err) assert.Nil(t, err1) @@ -1109,9 +1111,9 @@ func TestMongoDBGetNextUsesInprocessedNumRetry(t *testing.T) { db.Connect(dbaddress) defer cleanup() err := db.insertRecord(dbname, collection, &rec1) - res, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "0_1"}) - res1, err1 := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "0_1"}) - _, err2 := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "0_1"}) + res, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "0_1"}) + res1, err1 := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "0_1"}) + _, err2 := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "0_1"}) assert.Nil(t, err) assert.Nil(t, err1) @@ -1129,10 +1131,10 @@ func TestMongoDBGetNextUsesInprocessedAfterTimeout(t *testing.T) { defer cleanup() err := db.insertRecord(dbname, collection, &rec1) db.insertRecord(dbname, collection, &rec2) - res, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "1000_3"}) - res1, err1 := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "1000_3"}) + res, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "1000_3"}) + res1, err1 := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "1000_3"}) time.Sleep(time.Second) - res2, err2 := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "1000_3"}) + res2, err2 := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "1000_3"}) assert.Nil(t, err) assert.Nil(t, err1) assert.Nil(t, err2) @@ -1148,10 +1150,10 @@ func TestMongoDBGetNextReturnsToNormalAfterUsesInprocessed(t *testing.T) { err := db.insertRecord(dbname, collection, &rec1) db.insertRecord(dbname, collection, &rec2) db.insertRecord(dbname, collection, &rec_finished3) - res, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "1000_3"}) + res, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "1000_3"}) time.Sleep(time.Second) - res1, err1 := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "1000_3"}) - res2, err2 := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "1000_3"}) + res1, err1 := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "1000_3"}) + res2, err2 := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "1000_3"}) assert.Nil(t, err) assert.Nil(t, err1) assert.Nil(t, err2) @@ -1166,8 +1168,8 @@ func TestMongoDBGetNextUsesInprocessedImmedeatlyIfFinishedStream(t *testing.T) { defer cleanup() err := db.insertRecord(dbname, collection, &rec1) db.insertRecord(dbname, collection, &rec_finished) - res, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "0_3"}) - res1, err1 := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "0_3"}) + res, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "0_3"}) + res1, err1 := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "0_3"}) assert.Nil(t, err) assert.Nil(t, err1) assert.Equal(t, string(rec1_expect), string(res)) @@ -1180,9 +1182,9 @@ func TestMongoDBGetNextUsesInprocessedImmedeatlyIfEndofStream(t *testing.T) { defer cleanup() err := db.insertRecord(dbname, collection, &rec1) db.insertRecord(dbname, collection, &rec2) - res, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "0_3"}) - res1, err1 := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "0_3"}) - res2, err2 := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "0_3"}) + res, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "0_3"}) + res1, err1 := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "0_3"}) + res2, err2 := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "0_3"}) assert.Nil(t, err) assert.Nil(t, err1) assert.Nil(t, err2) @@ -1196,11 +1198,11 @@ func TestMongoDBAckDeletesInprocessed(t *testing.T) { db.Connect(dbaddress) defer cleanup() db.insertRecord(dbname, collection, &rec1) - db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "0_3"}) + db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "0_3"}) query_str := "{\"Id\":1,\"Op\":\"ackmessage\"}" - db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "ackmessage", ExtraParam: query_str}) - _, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "0_3"}) + db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "ackmessage", ExtraParam: query_str}) + _, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "0_3"}) assert.NotNil(t, err) if err != nil { assert.Equal(t, utils.StatusNoData, err.(*DBError).Code) @@ -1214,8 +1216,8 @@ func TestMongoDBAckTwiceErrors(t *testing.T) { defer cleanup() db.insertRecord(dbname, collection, &rec1) query_str := "{\"Id\":1,\"Op\":\"ackmessage\"}" - db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "ackmessage", ExtraParam: query_str}) - _,err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "ackmessage", ExtraParam: query_str}) + db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "ackmessage", ExtraParam: query_str}) + _,err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "ackmessage", ExtraParam: query_str}) assert.Equal(t, utils.StatusWrongInput, err.(*DBError).Code) } @@ -1234,14 +1236,14 @@ func TestMongoDBNegAck(t *testing.T) { inputParams.Params.DelayMs = 0 db.insertRecord(dbname, collection, &rec1) - db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next"}) + db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next"}) bparam, _ := json.Marshal(&inputParams) - db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "negackmessage", ExtraParam: string(bparam)}) - res, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next"}) // first time message from negack - _, err1 := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next"}) // second time nothing - db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "negackmessage", ExtraParam: string(bparam)}) - _, err2 := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next"}) // second time nothing + db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "negackmessage", ExtraParam: string(bparam)}) + res, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next"}) // first time message from negack + _, err1 := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next"}) // second time nothing + db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "negackmessage", ExtraParam: string(bparam)}) + _, err2 := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next"}) // second time nothing assert.Nil(t, err) assert.Equal(t, string(rec1_expect), string(res)) @@ -1260,12 +1262,12 @@ func TestMongoDBGetNextClearsInprocessAfterReset(t *testing.T) { defer cleanup() err := db.insertRecord(dbname, collection, &rec1) db.insertRecord(dbname, collection, &rec2) - res, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "0_1"}) - res1, err1 := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "0_1"}) + res, err := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "0_1"}) + res1, err1 := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "0_1"}) - db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "resetcounter", ExtraParam: "0"}) - res2, err2 := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "0_1"}) - res3, err3 := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "0_1"}) + db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "resetcounter", ExtraParam: "0"}) + res2, err2 := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "0_1"}) + res3, err3 := db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: collection, GroupId: groupId, Op: "next", ExtraParam: "0_1"}) assert.Nil(t, err) assert.Nil(t, err1) @@ -1295,16 +1297,16 @@ func TestDeleteStreams(t *testing.T) { for _, test := range testsDeleteStream { db.Connect(dbaddress) db.insertRecord(dbname, encodeStringForColName(test.stream), &rec1) - db.ProcessRequest(Request{DbName: dbname, Stream: test.stream, GroupId: "123", Op: "next"}) + db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: test.stream, GroupId: "123", Op: "next"}) query_str := "{\"Id\":1,\"Op\":\"ackmessage\"}" - request := Request{DbName: dbname, Stream: test.stream, GroupId: groupId, Op: "ackmessage", ExtraParam: query_str} + request := Request{Beamtime:beamtime, DataSource:datasource, Stream: test.stream, GroupId: groupId, Op: "ackmessage", ExtraParam: query_str} _, err := db.ProcessRequest(request) assert.Nil(t, err, test.message) - _, err = db.ProcessRequest(Request{DbName: dbname, Stream: test.stream, GroupId: "", Op: "delete_stream", ExtraParam: test.params}) + _, err = db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: test.stream, GroupId: "", Op: "delete_stream", ExtraParam: test.params}) if test.ok { - rec, err := streams.getStreams(&db, Request{DbName: dbname, ExtraParam: ""}) - acks_exist,_:= db.collectionExist(Request{DbName: dbname, ExtraParam: ""},acks_collection_name_prefix+test.stream) - inprocess_exist,_:= db.collectionExist(Request{DbName: dbname, ExtraParam: ""},inprocess_collection_name_prefix+test.stream) + rec, err := streams.getStreams(&db, Request{Beamtime:beamtime, DataSource:datasource, ExtraParam: ""}) + acks_exist,_:= db.collectionExist(Request{Beamtime:beamtime, DataSource:datasource, ExtraParam: ""},acks_collection_name_prefix+test.stream) + inprocess_exist,_:= db.collectionExist(Request{Beamtime:beamtime, DataSource:datasource, ExtraParam: ""},inprocess_collection_name_prefix+test.stream) assert.Equal(t,0,len(rec.Streams),test.message) assert.Equal(t,false,acks_exist,test.message) assert.Equal(t,false,inprocess_exist,test.message) @@ -1312,7 +1314,7 @@ func TestDeleteStreams(t *testing.T) { } else { assert.NotNil(t, err, test.message) } - _, err = db.ProcessRequest(Request{DbName: dbname, Stream: test.stream, GroupId: "", Op: "delete_stream", ExtraParam: test.params}) + _, err = db.ProcessRequest(Request{Beamtime:beamtime, DataSource:datasource, Stream: test.stream, GroupId: "", Op: "delete_stream", ExtraParam: test.params}) if test.ok2 { assert.Nil(t, err, test.message+" 2") } else { @@ -1323,7 +1325,8 @@ func TestDeleteStreams(t *testing.T) { var testsEncodings = []struct { - dbname string + beamtime string + datasource string collection string group string dbname_indb string @@ -1332,10 +1335,10 @@ var testsEncodings = []struct { message string ok bool }{ - {"dbname", "col", "group", "dbname","col","group", "no encoding",true}, - {"dbname"+badSymbolsDb, "col", "group", "dbname"+badSymbolsDbEncoded,"col","group", "symbols in db",true}, - {"dbname", "col"+badSymbolsCol, "group"+badSymbolsCol, "dbname","col"+badSymbolsColEncoded,"group"+badSymbolsColEncoded, "symbols in col",true}, - {"dbname"+badSymbolsDb, "col"+badSymbolsCol, "group"+badSymbolsCol, "dbname"+badSymbolsDbEncoded,"col"+badSymbolsColEncoded,"group"+badSymbolsColEncoded, "symbols in col and db",true}, + {"bt","dbname", "col", "group", "bt_dbname","col","group", "no encoding",true}, + {"bt","dbname"+badSymbolsDb, "col", "group", "bt_dbname"+badSymbolsDbEncoded,"col","group", "symbols in db",true}, + {"bt","dbname", "col"+badSymbolsCol, "group"+badSymbolsCol, "bt_dbname","col"+badSymbolsColEncoded,"group"+badSymbolsColEncoded, "symbols in col",true}, + {"bt","dbname"+badSymbolsDb, "col"+badSymbolsCol, "group"+badSymbolsCol, "bt_dbname"+badSymbolsDbEncoded,"col"+badSymbolsColEncoded,"group"+badSymbolsColEncoded, "symbols in col and db",true}, } @@ -1343,7 +1346,7 @@ func TestMongoDBEncodingOK(t *testing.T) { for _, test := range testsEncodings { db.Connect(dbaddress) db.insertRecord(test.dbname_indb, test.collection_indb, &rec1) - res, err := db.ProcessRequest(Request{DbName: test.dbname, Stream: test.collection, GroupId: test.group, Op: "next"}) + res, err := db.ProcessRequest(Request{Beamtime:test.beamtime,DataSource: test.datasource, Stream: test.collection, GroupId: test.group, Op: "next"}) if test.ok { assert.Nil(t, err, test.message) assert.Equal(t, string(rec1_expect), string(res), test.message) diff --git a/broker/src/asapo_broker/database/streams_test.go b/broker/src/asapo_broker/database/streams_test.go index 4ba11e0b3..2bb15c0b5 100644 --- a/broker/src/asapo_broker/database/streams_test.go +++ b/broker/src/asapo_broker/database/streams_test.go @@ -28,16 +28,16 @@ func TestStreamsTestSuite(t *testing.T) { } func (suite *StreamsTestSuite) TestStreamsEmpty() { - rec, err := streams.getStreams(&db, Request{DbName: "test", ExtraParam: ""}) + rec, err := streams.getStreams(&db, Request{Beamtime:"test",DataSource:datasource, ExtraParam: ""}) suite.Nil(err) suite.Empty(rec.Streams, 0) } func (suite *StreamsTestSuite) TestStreamsNotUsesCacheWhenEmpty() { db.settings.UpdateStreamCachePeriodMs = 1000 - streams.getStreams(&db, Request{DbName: dbname, ExtraParam: ""}) + streams.getStreams(&db, Request{Beamtime:beamtime, DataSource:datasource, ExtraParam: ""}) db.insertRecord(dbname, collection, &rec1) - rec, err := streams.getStreams(&db, Request{DbName: dbname, ExtraParam: ""}) + rec, err := streams.getStreams(&db, Request{Beamtime:beamtime, DataSource:datasource, ExtraParam: ""}) suite.Nil(err) suite.Equal(1, len(rec.Streams)) } @@ -45,9 +45,9 @@ func (suite *StreamsTestSuite) TestStreamsNotUsesCacheWhenEmpty() { func (suite *StreamsTestSuite) TestStreamsUsesCache() { db.settings.UpdateStreamCachePeriodMs = 1000 db.insertRecord(dbname, collection, &rec2) - streams.getStreams(&db, Request{DbName: dbname, ExtraParam: ""}) + streams.getStreams(&db, Request{Beamtime:beamtime, DataSource:datasource, ExtraParam: ""}) db.insertRecord(dbname, collection, &rec1) - rec, err := streams.getStreams(&db, Request{DbName: dbname, ExtraParam: ""}) + rec, err := streams.getStreams(&db, Request{Beamtime:beamtime, DataSource:datasource, ExtraParam: ""}) suite.Nil(err) suite.Equal(int64(1), rec.Streams[0].Timestamp) suite.Equal(false, rec.Streams[0].Finished) @@ -60,15 +60,15 @@ func (suite *StreamsTestSuite) TestStreamsCacheexpires() { var res1 StreamsRecord go func() { db.insertRecord(dbname, collection, &rec1) - streams.getStreams(&db, Request{DbName: dbname, ExtraParam: ""}) + streams.getStreams(&db, Request{Beamtime:beamtime, DataSource:datasource, ExtraParam: ""}) db.insertRecord(dbname, collection, &rec_finished) - res1,_ = streams.getStreams(&db, Request{DbName: dbname, ExtraParam: ""}) + res1,_ = streams.getStreams(&db, Request{Beamtime:beamtime, DataSource:datasource, ExtraParam: ""}) }() db.insertRecord(dbname, collection+"1", &rec1_later) - res2,_ := streams.getStreams(&db, Request{DbName: dbname, ExtraParam: ""}) + res2,_ := streams.getStreams(&db, Request{Beamtime:beamtime, DataSource:datasource, ExtraParam: ""}) db.insertRecord(dbname, collection+"1", &rec_finished) time.Sleep(time.Second) - res3, err := streams.getStreams(&db, Request{DbName: dbname, ExtraParam: ""}) + res3, err := streams.getStreams(&db, Request{Beamtime:beamtime, DataSource:datasource, ExtraParam: ""}) suite.Nil(err) suite.Equal(true, res3.Streams[0].Finished) fmt.Println(res1,res2) @@ -80,7 +80,7 @@ func (suite *StreamsTestSuite) TestStreamsGetFinishedInfo() { db.settings.UpdateStreamCachePeriodMs = 1000 db.insertRecord(dbname, collection, &rec1) db.insertRecord(dbname, collection, &rec_finished) - rec, err := streams.getStreams(&db, Request{DbName: dbname, ExtraParam: ""}) + rec, err := streams.getStreams(&db, Request{Beamtime:beamtime, DataSource:datasource, ExtraParam: ""}) suite.Nil(err) suite.Equal(int64(0), rec.Streams[0].Timestamp) suite.Equal(true, rec.Streams[0].Finished) @@ -92,7 +92,7 @@ func (suite *StreamsTestSuite) TestStreamsDataSetsGetFinishedInfo() { db.settings.UpdateStreamCachePeriodMs = 1000 db.insertRecord(dbname, collection, &rec_dataset1_incomplete) db.insertRecord(dbname, collection, &rec_finished) - rec, err := streams.getStreams(&db, Request{DbName: dbname, ExtraParam: ""}) + rec, err := streams.getStreams(&db, Request{Beamtime:beamtime, DataSource:datasource, ExtraParam: ""}) suite.Nil(err) suite.Equal(int64(1), rec.Streams[0].Timestamp) suite.Equal(int64(2), rec.Streams[0].TimestampLast) @@ -106,8 +106,8 @@ func (suite *StreamsTestSuite) TestStreamsMultipleRequests() { db.insertRecord(dbname, collection, &rec_dataset1_incomplete) db.insertRecord(dbname, collection, &rec_finished) db.insertRecord(dbname, collection2, &rec_dataset1_incomplete) - rec, err := streams.getStreams(&db, Request{DbName: dbname, ExtraParam: "0/unfinished"}) - rec2, err2 := streams.getStreams(&db, Request{DbName: dbname, ExtraParam: "0/finished"}) + rec, err := streams.getStreams(&db, Request{Beamtime:beamtime, DataSource:datasource, ExtraParam: "0/unfinished"}) + rec2, err2 := streams.getStreams(&db, Request{Beamtime:beamtime, DataSource:datasource, ExtraParam: "0/finished"}) suite.Nil(err) suite.Equal(collection2, rec.Streams[0].Name) suite.Equal(1, len(rec.Streams)) @@ -119,10 +119,10 @@ func (suite *StreamsTestSuite) TestStreamsMultipleRequests() { func (suite *StreamsTestSuite) TestStreamsNotUsesCacheWhenExpired() { db.settings.UpdateStreamCachePeriodMs = 10 db.insertRecord(dbname, collection, &rec2) - streams.getStreams(&db, Request{DbName: dbname, ExtraParam: ""}) + streams.getStreams(&db, Request{Beamtime:beamtime,DataSource:datasource, ExtraParam: ""}) db.insertRecord(dbname, collection, &rec1) time.Sleep(time.Millisecond * 100) - rec, err := streams.getStreams(&db, Request{DbName: dbname, ExtraParam: ""}) + rec, err := streams.getStreams(&db, Request{Beamtime:beamtime,DataSource:datasource, ExtraParam: ""}) suite.Nil(err) suite.Equal(int64(1), rec.Streams[0].Timestamp) } @@ -130,9 +130,9 @@ func (suite *StreamsTestSuite) TestStreamsNotUsesCacheWhenExpired() { func (suite *StreamsTestSuite) TestStreamRemovesDatabase() { db.settings.UpdateStreamCachePeriodMs = 0 db.insertRecord(dbname, collection, &rec1) - streams.getStreams(&db, Request{DbName: dbname, ExtraParam: ""}) + streams.getStreams(&db, Request{Beamtime:beamtime,DataSource:datasource, ExtraParam: ""}) db.dropDatabase(dbname) - rec, err := streams.getStreams(&db, Request{DbName: dbname, ExtraParam: ""}) + rec, err := streams.getStreams(&db, Request{Beamtime:beamtime,DataSource:datasource, ExtraParam: ""}) suite.Nil(err) suite.Empty(rec.Streams, 0) } @@ -143,18 +143,18 @@ var streamFilterTests=[]struct{ streams []string message string }{ - {request: Request{DbName:dbname, ExtraParam:""},error: false,streams: []string{collection,collection2},message: "default all streams"}, - {request: Request{DbName:dbname, ExtraParam:"0/"},error: false,streams: []string{collection,collection2},message: "default 0/ all streams"}, - {request: Request{DbName:dbname, ExtraParam:utils.EncodeTwoStrings(collection,"")},error: false,streams: []string{collection,collection2},message: "first parameter only - all streams"}, - {request: Request{DbName:dbname, ExtraParam:"0/all"},error: false,streams: []string{collection,collection2},message: "second parameter only - all streams"}, - {request: Request{DbName:dbname, ExtraParam:"0/finished"},error: false,streams: []string{collection2},message: "second parameter only - finished streams"}, - {request: Request{DbName:dbname, ExtraParam:"0/unfinished"},error: false,streams: []string{collection},message: "second parameter only - unfinished streams"}, - {request: Request{DbName:dbname, ExtraParam:utils.EncodeTwoStrings(collection2,"all")},error: false,streams: []string{collection2},message: "from stream2"}, - {request: Request{DbName:dbname, ExtraParam:utils.EncodeTwoStrings(collection2,"unfinished")},error: false,streams: []string{},message: "from stream2 and filter"}, - {request: Request{DbName:dbname, ExtraParam:utils.EncodeTwoStrings(collection2,"bla")},error: true,streams: []string{},message: "wrong filter"}, - {request: Request{DbName:dbname, ExtraParam:utils.EncodeTwoStrings(collection2,"all_aaa")},error: true,streams: []string{},message: "wrong filter2"}, - {request: Request{DbName:dbname, ExtraParam:utils.EncodeTwoStrings("blabla","")},error: false,streams: []string{},message: "from unknown stream returns nothing"}, - {request: Request{DbName:dbname, ExtraParam:utils.EncodeTwoStrings(collection2,"")},error: false,streams: []string{collection2},message: "from stream2, first parameter only"}, + {request: Request{Beamtime:beamtime,DataSource:datasource,ExtraParam:""},error: false,streams: []string{collection,collection2},message: "default all streams"}, + {request: Request{Beamtime:beamtime,DataSource:datasource, ExtraParam:"0/"},error: false,streams: []string{collection,collection2},message: "default 0/ all streams"}, + {request: Request{Beamtime:beamtime,DataSource:datasource, ExtraParam:utils.EncodeTwoStrings(collection,"")},error: false,streams: []string{collection,collection2},message: "first parameter only - all streams"}, + {request: Request{Beamtime:beamtime,DataSource:datasource, ExtraParam:"0/all"},error: false,streams: []string{collection,collection2},message: "second parameter only - all streams"}, + {request: Request{Beamtime:beamtime,DataSource:datasource, ExtraParam:"0/finished"},error: false,streams: []string{collection2},message: "second parameter only - finished streams"}, + {request: Request{Beamtime:beamtime,DataSource:datasource, ExtraParam:"0/unfinished"},error: false,streams: []string{collection},message: "second parameter only - unfinished streams"}, + {request: Request{Beamtime:beamtime,DataSource:datasource, ExtraParam:utils.EncodeTwoStrings(collection2,"all")},error: false,streams: []string{collection2},message: "from stream2"}, + {request: Request{Beamtime:beamtime,DataSource:datasource, ExtraParam:utils.EncodeTwoStrings(collection2,"unfinished")},error: false,streams: []string{},message: "from stream2 and filter"}, + {request: Request{Beamtime:beamtime,DataSource:datasource, ExtraParam:utils.EncodeTwoStrings(collection2,"bla")},error: true,streams: []string{},message: "wrong filter"}, + {request: Request{Beamtime:beamtime,DataSource:datasource, ExtraParam:utils.EncodeTwoStrings(collection2,"all_aaa")},error: true,streams: []string{},message: "wrong filter2"}, + {request: Request{Beamtime:beamtime,DataSource:datasource, ExtraParam:utils.EncodeTwoStrings("blabla","")},error: false,streams: []string{},message: "from unknown stream returns nothing"}, + {request: Request{Beamtime:beamtime,DataSource:datasource, ExtraParam:utils.EncodeTwoStrings(collection2,"")},error: false,streams: []string{collection2},message: "from stream2, first parameter only"}, } func (suite *StreamsTestSuite) TestStreamFilters() { diff --git a/broker/src/asapo_broker/server/get_commands_test.go b/broker/src/asapo_broker/server/get_commands_test.go index 17f3aea5a..d9e5fffc4 100644 --- a/broker/src/asapo_broker/server/get_commands_test.go +++ b/broker/src/asapo_broker/server/get_commands_test.go @@ -60,7 +60,7 @@ var testsGetCommand = []struct { func (suite *GetCommandsTestSuite) TestGetCommandsCallsCorrectRoutine() { for _, test := range testsGetCommand { - suite.mock_db.On("ProcessRequest", database.Request{DbName: expectedDBName, Stream: test.stream, GroupId: test.groupid, Op: test.command, ExtraParam: test.externalParam}).Return([]byte("Hello"), nil) + suite.mock_db.On("ProcessRequest", database.Request{Beamtime: expectedBeamtimeId, DataSource: test.source, Stream: test.stream, GroupId: test.groupid, Op: test.command, ExtraParam: test.externalParam}).Return([]byte("Hello"), nil) logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request "+test.command))) w := doRequest("/beamtime/" + expectedBeamtimeId + "/" + test.source + "/" + test.reqString+correctTokenSuffix+test.queryParams) suite.Equal(http.StatusOK, w.Code, test.command+ " OK") @@ -83,8 +83,7 @@ func (suite *GetCommandsTestSuite) TestGetCommandsCorrectlyProcessedEncoding() { test.reqString = strings.Replace(test.reqString,test.groupid,encodedGroup,1) test.reqString = strings.Replace(test.reqString,test.source,encodedSource,1) test.reqString = strings.Replace(test.reqString,test.stream,encodedStream,1) - dbname := expectedBeamtimeId + "_" + newsource - suite.mock_db.On("ProcessRequest", database.Request{DbName: dbname, Stream: newstream, GroupId: newgroup, Op: test.command, ExtraParam: test.externalParam}).Return([]byte("Hello"), nil) + suite.mock_db.On("ProcessRequest", database.Request{Beamtime: expectedBeamtimeId,DataSource: newsource, Stream: newstream, GroupId: newgroup, Op: test.command, ExtraParam: test.externalParam}).Return([]byte("Hello"), nil) logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request "+test.command))) w := doRequest("/beamtime/" + expectedBeamtimeId + "/" + encodedSource + "/" + test.reqString+correctTokenSuffix+test.queryParams) suite.Equal(http.StatusOK, w.Code, test.command+ " OK") diff --git a/broker/src/asapo_broker/server/get_meta_test.go b/broker/src/asapo_broker/server/get_meta_test.go index b54a72865..43ac602b2 100644 --- a/broker/src/asapo_broker/server/get_meta_test.go +++ b/broker/src/asapo_broker/server/get_meta_test.go @@ -33,7 +33,7 @@ func TestGetMetaTestSuite(t *testing.T) { } func (suite *GetMetaTestSuite) TestGetMetaOK() { - suite.mock_db.On("ProcessRequest", database.Request{DbName: expectedDBName, Stream: expectedStream, Op: "meta", ExtraParam: "0"}).Return([]byte(""), nil) + suite.mock_db.On("ProcessRequest", database.Request{Beamtime: expectedBeamtimeId,DataSource: expectedSource, Stream: expectedStream, Op: "meta", ExtraParam: "0"}).Return([]byte(""), nil) logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request meta"))) w := doRequest("/beamtime/" + expectedBeamtimeId + "/" + expectedSource + "/" + expectedStream + "/0/meta" + "/0" + correctTokenSuffix,"GET") suite.Equal(http.StatusOK, w.Code, "meta OK") diff --git a/broker/src/asapo_broker/server/post_op_image_test.go b/broker/src/asapo_broker/server/post_op_image_test.go index 2cc3159ee..b11944376 100644 --- a/broker/src/asapo_broker/server/post_op_image_test.go +++ b/broker/src/asapo_broker/server/post_op_image_test.go @@ -34,7 +34,7 @@ func TestMessageOpTestSuite(t *testing.T) { func (suite *MessageOpTestSuite) TestAckMessageOpOK() { query_str := "{\"Id\":1,\"Op\":\"ackmessage\"}" - suite.mock_db.On("ProcessRequest", database.Request{DbName: expectedDBName, Stream: expectedStream, GroupId: expectedGroupID, Op: "ackmessage", ExtraParam: query_str}).Return([]byte(""), nil) + suite.mock_db.On("ProcessRequest", database.Request{Beamtime: expectedBeamtimeId,DataSource: expectedSource, Stream: expectedStream, GroupId: expectedGroupID, Op: "ackmessage", ExtraParam: query_str}).Return([]byte(""), nil) logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request ackmessage"))) w := doRequest("/beamtime/" + expectedBeamtimeId + "/" + expectedSource + "/" + expectedStream + "/" + expectedGroupID + "/1" + correctTokenSuffix,"POST",query_str) suite.Equal(http.StatusOK, w.Code, "ackmessage OK") diff --git a/broker/src/asapo_broker/server/post_query_images_test.go b/broker/src/asapo_broker/server/post_query_images_test.go index d51d2490a..13071ed74 100644 --- a/broker/src/asapo_broker/server/post_query_images_test.go +++ b/broker/src/asapo_broker/server/post_query_images_test.go @@ -35,7 +35,7 @@ func TestQueryTestSuite(t *testing.T) { func (suite *QueryTestSuite) TestQueryOK() { query_str := "aaaa" - suite.mock_db.On("ProcessRequest", database.Request{DbName: expectedDBName, Stream: expectedStream,Op: "querymessages", ExtraParam: query_str}).Return([]byte("{}"), nil) + suite.mock_db.On("ProcessRequest", database.Request{Beamtime: expectedBeamtimeId,DataSource: expectedSource, Stream: expectedStream,Op: "querymessages", ExtraParam: query_str}).Return([]byte("{}"), nil) logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request querymessages"))) w := doRequest("/beamtime/"+expectedBeamtimeId+"/"+expectedSource+"/"+expectedStream+"/0/querymessages"+correctTokenSuffix, "POST", query_str) diff --git a/broker/src/asapo_broker/server/post_reset_counter_test.go b/broker/src/asapo_broker/server/post_reset_counter_test.go index 64291bee2..25a9128b2 100644 --- a/broker/src/asapo_broker/server/post_reset_counter_test.go +++ b/broker/src/asapo_broker/server/post_reset_counter_test.go @@ -33,7 +33,7 @@ func TestResetCounterTestSuite(t *testing.T) { } func (suite *ResetCounterTestSuite) TestResetCounterOK() { - expectedRequest := database.Request{DbName: expectedDBName, Stream: expectedStream, GroupId:expectedGroupID, Op: "resetcounter", ExtraParam: "10"} + expectedRequest := database.Request{Beamtime: expectedBeamtimeId,DataSource: expectedSource, Stream: expectedStream, GroupId:expectedGroupID, Op: "resetcounter", ExtraParam: "10"} suite.mock_db.On("ProcessRequest", expectedRequest).Return([]byte(""), nil) logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request resetcounter"))) diff --git a/broker/src/asapo_broker/server/process_request.go b/broker/src/asapo_broker/server/process_request.go index 41b6564b7..bbb84f922 100644 --- a/broker/src/asapo_broker/server/process_request.go +++ b/broker/src/asapo_broker/server/process_request.go @@ -63,19 +63,20 @@ func processRequest(w http.ResponseWriter, r *http.Request, op string, extra_par w.Header().Set("Access-Control-Allow-Origin", "*") - db_name, datasource, stream, group_id, ok := extractRequestParameters(r, needGroupID) + beamtime, datasource, stream, group_id, ok := extractRequestParameters(r, needGroupID) if !ok { w.WriteHeader(http.StatusBadRequest) return } - if err := authorize(r, db_name, needWriteAccess(op)); err != nil { - writeAuthAnswer(w, "get "+op, db_name, err) + if err := authorize(r, beamtime, needWriteAccess(op)); err != nil { + writeAuthAnswer(w, "get "+op, beamtime, err) return } request := database.Request{} - request.DbName = db_name+"_"+datasource + request.Beamtime = beamtime + request.DataSource = datasource request.Op = op request.ExtraParam = extra_param request.Stream = stream @@ -113,10 +114,12 @@ func reconnectIfNeeded(db_error error) { } } +//func dbRequestLogger(request database.Request) + func processRequestInDb(request database.Request) (answer []byte, code int) { statistics.IncreaseCounter() answer, err := db.ProcessRequest(request) - log_str := "processing request " + request.Op + " in " + request.DbName + " at " + settings.GetDatabaseServer() + log_str := "processing request " + request.Op + " in " + request.DbName() + " at " + settings.GetDatabaseServer() if err != nil { go reconnectIfNeeded(err) return returnError(err, log_str) diff --git a/broker/src/asapo_broker/server/process_request_test.go b/broker/src/asapo_broker/server/process_request_test.go index cf0d41626..1edaf7780 100644 --- a/broker/src/asapo_broker/server/process_request_test.go +++ b/broker/src/asapo_broker/server/process_request_test.go @@ -151,7 +151,7 @@ func (suite *ProcessRequestTestSuite) TestProcessRequestWithNoToken() { func (suite *ProcessRequestTestSuite) TestProcessRequestWithWrongDatabaseName() { - expectedRequest := database.Request{DbName: expectedDBName, Stream: expectedStream, GroupId: expectedGroupID, Op: "next"} + expectedRequest := database.Request{Beamtime: expectedBeamtimeId,DataSource: expectedSource, Stream: expectedStream, GroupId: expectedGroupID, Op: "next"} suite.mock_db.On("ProcessRequest", expectedRequest).Return([]byte(""), &database.DBError{utils.StatusNoData, ""}) @@ -165,7 +165,7 @@ func (suite *ProcessRequestTestSuite) TestProcessRequestWithWrongDatabaseName() func (suite *ProcessRequestTestSuite) TestProcessRequestWithConnectionError() { - expectedRequest := database.Request{DbName: expectedDBName, Stream: expectedStream, GroupId: expectedGroupID, Op: "next"} + expectedRequest := database.Request{Beamtime: expectedBeamtimeId,DataSource: expectedSource, Stream: expectedStream, GroupId: expectedGroupID, Op: "next"} suite.mock_db.On("ProcessRequest", expectedRequest).Return([]byte(""), &database.DBError{utils.StatusServiceUnavailable, ""}) @@ -181,7 +181,7 @@ func (suite *ProcessRequestTestSuite) TestProcessRequestWithConnectionError() { func (suite *ProcessRequestTestSuite) TestProcessRequestWithInternalDBError() { - expectedRequest := database.Request{DbName: expectedDBName, Stream: expectedStream, GroupId: expectedGroupID, Op: "next"} + expectedRequest := database.Request{Beamtime: expectedBeamtimeId,DataSource: expectedSource, Stream: expectedStream, GroupId: expectedGroupID, Op: "next"} suite.mock_db.On("ProcessRequest", expectedRequest).Return([]byte(""), errors.New("")) logger.MockLog.On("Error", mock.MatchedBy(containsMatcher("processing request next"))) @@ -196,7 +196,7 @@ func (suite *ProcessRequestTestSuite) TestProcessRequestWithInternalDBError() { func (suite *ProcessRequestTestSuite) TestProcessRequestAddsCounter() { - expectedRequest := database.Request{DbName: expectedDBName, Stream: expectedStream, GroupId: expectedGroupID, Op: "next"} + expectedRequest := database.Request{Beamtime: expectedBeamtimeId,DataSource: expectedSource, Stream: expectedStream, GroupId: expectedGroupID, Op: "next"} suite.mock_db.On("ProcessRequest", expectedRequest).Return([]byte("Hello"), nil) logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request next in "+expectedDBName))) @@ -207,7 +207,7 @@ func (suite *ProcessRequestTestSuite) TestProcessRequestAddsCounter() { func (suite *ProcessRequestTestSuite) TestProcessRequestAddsDataset() { - expectedRequest := database.Request{DbName: expectedDBName, Stream: expectedStream, GroupId: expectedGroupID, DatasetOp: true, Op: "next"} + expectedRequest := database.Request{Beamtime: expectedBeamtimeId,DataSource: expectedSource, Stream: expectedStream, GroupId: expectedGroupID, DatasetOp: true, Op: "next"} suite.mock_db.On("ProcessRequest", expectedRequest).Return([]byte("Hello"), nil) logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request next in "+expectedDBName))) @@ -231,7 +231,7 @@ func (suite *ProcessRequestTestSuite) TestProcessRequestDeleteStreamReadToken() func (suite *ProcessRequestTestSuite) TestProcessRequestDeleteStreamWriteToken() { query_str := "query_string" - expectedRequest := database.Request{DbName: expectedDBName, Stream: expectedStream, GroupId: "", Op: "delete_stream", ExtraParam: query_str} + expectedRequest := database.Request{Beamtime: expectedBeamtimeId,DataSource: expectedSource, Stream: expectedStream, GroupId: "", Op: "delete_stream", ExtraParam: query_str} suite.mock_db.On("ProcessRequest", expectedRequest).Return([]byte("Hello"), nil) logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request delete_stream in "+expectedDBName))) diff --git a/common/go/src/asapo_common/logger/logger.go b/common/go/src/asapo_common/logger/logger.go index ffa2c4ce8..6a87f810c 100644 --- a/common/go/src/asapo_common/logger/logger.go +++ b/common/go/src/asapo_common/logger/logger.go @@ -29,6 +29,10 @@ type Logger interface { var my_logger Logger = &logRusLogger{} +func WithFields(args map[string]interface{}) Logger { + return my_logger.WithFields(args) +} + func Info(args ...interface{}) { my_logger.Info(args...) } -- GitLab