From 6720657eae0d310cf6a22ebcd4f28019154502dd Mon Sep 17 00:00:00 2001 From: Sergey Yakubov <sergey.yakubov@desy.de> Date: Fri, 28 May 2021 12:52:06 +0200 Subject: [PATCH] fix bug with finished stream, add finished stream to dummy producer --- broker/src/asapo_broker/database/mongodb.go | 14 +++++++++++++- broker/src/asapo_broker/database/mongodb_test.go | 16 ++++++++++++++++ examples/consumer/getnext/getnext.cpp | 13 ++++++++----- .../dummy-data-producer/dummy_data_producer.cpp | 2 +- .../simple_chain_metadata/check_linux.sh | 2 +- .../simple_chain_metadata/check_windows.bat | 2 +- 6 files changed, 40 insertions(+), 9 deletions(-) diff --git a/broker/src/asapo_broker/database/mongodb.go b/broker/src/asapo_broker/database/mongodb.go index 4527693cf..5491086a0 100644 --- a/broker/src/asapo_broker/database/mongodb.go +++ b/broker/src/asapo_broker/database/mongodb.go @@ -8,6 +8,7 @@ import ( "context" "encoding/json" "errors" + "fmt" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" @@ -217,6 +218,16 @@ func (db *Mongodb) setCounter(request Request, ind int) (err error) { return } +func (db *Mongodb) errorWhenCannotIncrementField(request Request, max_ind int) (err error) { + if res, err := db.getRecordFromDb(request, max_ind, max_ind);err == nil { + if err := checkStreamFinished(request, max_ind, max_ind, res); err != nil { + return err + } + } + return &DBError{utils.StatusNoData, encodeAnswer(max_ind, max_ind, "")} +} + + func (db *Mongodb) incrementField(request Request, max_ind int, res interface{}) (err error) { update := bson.M{"$inc": bson.M{pointer_field_name: 1}} opts := options.FindOneAndUpdate().SetUpsert(true).SetReturnDocument(options.After) @@ -233,7 +244,7 @@ func (db *Mongodb) incrementField(request Request, max_ind int, res interface{}) if err2 := c.FindOneAndUpdate(context.TODO(), q, update, opts).Decode(res); err2 == nil { return nil } - return &DBError{utils.StatusNoData, encodeAnswer(max_ind, max_ind, "")} + return db.errorWhenCannotIncrementField(request,max_ind) } return &DBError{utils.StatusTransactionInterrupted, err.Error()} } @@ -584,6 +595,7 @@ func checkStreamFinished(request Request, id, id_max int, data map[string]interf return nil } r, ok := ExtractMessageRecord(data) + fmt.Println(r,ok) if !ok || !r.FinishedStream { return nil } diff --git a/broker/src/asapo_broker/database/mongodb_test.go b/broker/src/asapo_broker/database/mongodb_test.go index e1caa8553..724719690 100644 --- a/broker/src/asapo_broker/database/mongodb_test.go +++ b/broker/src/asapo_broker/database/mongodb_test.go @@ -156,6 +156,22 @@ func TestMongoDBGetNextErrorOnFinishedStream(t *testing.T) { assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":1,\"next_stream\":\"next1\"}", err.(*DBError).Message) } +func TestMongoDBGetNextErrorOnFinishedStreamAlways(t *testing.T) { + db.Connect(dbaddress) + defer cleanup() + db.insertRecord(dbname, collection, &rec1) + db.insertRecord(dbname, collection, &rec_finished) + + db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next"}) + db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next"}) + _, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next"}) + + assert.Equal(t, utils.StatusNoData, err.(*DBError).Code) + assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":1,\"next_stream\":\"next1\"}", err.(*DBError).Message) +} + + + func TestMongoDBGetByIdErrorOnFinishedStream(t *testing.T) { db.Connect(dbaddress) defer cleanup() diff --git a/examples/consumer/getnext/getnext.cpp b/examples/consumer/getnext/getnext.cpp index d30e87e57..3429a4861 100644 --- a/examples/consumer/getnext/getnext.cpp +++ b/examples/consumer/getnext/getnext.cpp @@ -42,6 +42,7 @@ struct Args { int nthreads; bool read_data; bool datasets; + bool need_beamtime_meta = false; }; class LatchedTimer { @@ -88,7 +89,7 @@ void WaitThreads(std::vector<std::thread>* threads) { int ProcessError(const Error& err) { if (err == nullptr) return 0; std::cout << err->Explain() << std::endl; - return err == asapo::ConsumerErrorTemplates::kEndOfStream ? 0 : 1; + return (err == asapo::ConsumerErrorTemplates::kEndOfStream ||err == asapo::ConsumerErrorTemplates::kStreamFinished) ? 0 : 1; } std::vector<std::thread> @@ -121,13 +122,12 @@ StartThreads(const Args& params, std::vector<int>* nfiles, std::vector<int>* err (*errors)[i] += ProcessError(err); lock.unlock(); exit(EXIT_FAILURE); - return; } } lock.unlock(); - if (i == 0) { + if (i == 0 && params.need_beamtime_meta) { auto meta = consumer->GetBeamtimeMeta(&err); if (err == nullptr) { std::cout << meta << std::endl; @@ -267,9 +267,9 @@ void TryGetStream(Args* args) { int main(int argc, char* argv[]) { Args params; params.datasets = false; - if (argc != 8 && argc != 9) { + if (argc != 8 && argc != 9 && argc != 10) { std::cout << "Usage: " + std::string{argv[0]} - + " <server> <files_path> <run_name> <nthreads> <token> <timeout ms> <metaonly> [use datasets]" + + " <server> <files_path> <run_name> <nthreads> <token> <timeout ms> <metaonly> [use datasets] [send metadata]" << std::endl; exit(EXIT_FAILURE); @@ -285,6 +285,9 @@ int main(int argc, char* argv[]) { if (argc == 9) { params.datasets = atoi(argv[8]) == 1; } + if (argc == 10) { + params.need_beamtime_meta = atoi(argv[9]) == 1; + } if (params.read_data) { std::cout << "Will read metadata+payload" << std::endl; diff --git a/examples/producer/dummy-data-producer/dummy_data_producer.cpp b/examples/producer/dummy-data-producer/dummy_data_producer.cpp index 560cd20ba..1a31044f8 100644 --- a/examples/producer/dummy-data-producer/dummy_data_producer.cpp +++ b/examples/producer/dummy-data-producer/dummy_data_producer.cpp @@ -189,7 +189,7 @@ bool SendDummyData(asapo::Producer* producer, size_t number_of_byte, uint64_t it } } } - return true; + return producer->SendStreamFinishedFlag("default",iterations,"",nullptr) == nullptr; } std::unique_ptr<asapo::Producer> CreateProducer(const Args& args) { diff --git a/tests/automatic/full_chain/simple_chain_metadata/check_linux.sh b/tests/automatic/full_chain/simple_chain_metadata/check_linux.sh index 3d9103f83..c0707599e 100644 --- a/tests/automatic/full_chain/simple_chain_metadata/check_linux.sh +++ b/tests/automatic/full_chain/simple_chain_metadata/check_linux.sh @@ -36,5 +36,5 @@ mkdir -p ${receiver_folder} $producer_bin localhost:8400 ${beamtime_id} 100 0 1 0 1000 echo "Start consumer in metadata only mode" -$consumer_bin ${proxy_address} ${receiver_folder} ${beamtime_id} 2 $token 1000 1 | tee out +$consumer_bin ${proxy_address} ${receiver_folder} ${beamtime_id} 2 $token 1000 1 0 1 | tee out grep "dummy_meta" out diff --git a/tests/automatic/full_chain/simple_chain_metadata/check_windows.bat b/tests/automatic/full_chain/simple_chain_metadata/check_windows.bat index fc5283fe0..2b80a5bf9 100644 --- a/tests/automatic/full_chain/simple_chain_metadata/check_windows.bat +++ b/tests/automatic/full_chain/simple_chain_metadata/check_windows.bat @@ -14,7 +14,7 @@ mkdir %receiver_folder% "%1" %proxy_address% %beamtime_id% 100 0 1 0 1000 REM consumer -"%2" %proxy_address% %receiver_folder% %beamtime_id% 2 %token% 5000 1 > out.txt +"%2" %proxy_address% %receiver_folder% %beamtime_id% 2 %token% 5000 1 0 1 > out.txt type out.txt findstr /i /l /c:"dummy_meta" out.txt || goto :error -- GitLab