diff --git a/broker/src/asapo_broker/database/mongodb.go b/broker/src/asapo_broker/database/mongodb.go index 4527693cfe7a9949dba17ddfe4ab2d20c7a3782d..5491086a02aebf133e75639a9498cecc956e9c31 100644 --- a/broker/src/asapo_broker/database/mongodb.go +++ b/broker/src/asapo_broker/database/mongodb.go @@ -8,6 +8,7 @@ import ( "context" "encoding/json" "errors" + "fmt" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" @@ -217,6 +218,16 @@ func (db *Mongodb) setCounter(request Request, ind int) (err error) { return } +func (db *Mongodb) errorWhenCannotIncrementField(request Request, max_ind int) (err error) { + if res, err := db.getRecordFromDb(request, max_ind, max_ind);err == nil { + if err := checkStreamFinished(request, max_ind, max_ind, res); err != nil { + return err + } + } + return &DBError{utils.StatusNoData, encodeAnswer(max_ind, max_ind, "")} +} + + func (db *Mongodb) incrementField(request Request, max_ind int, res interface{}) (err error) { update := bson.M{"$inc": bson.M{pointer_field_name: 1}} opts := options.FindOneAndUpdate().SetUpsert(true).SetReturnDocument(options.After) @@ -233,7 +244,7 @@ func (db *Mongodb) incrementField(request Request, max_ind int, res interface{}) if err2 := c.FindOneAndUpdate(context.TODO(), q, update, opts).Decode(res); err2 == nil { return nil } - return &DBError{utils.StatusNoData, encodeAnswer(max_ind, max_ind, "")} + return db.errorWhenCannotIncrementField(request,max_ind) } return &DBError{utils.StatusTransactionInterrupted, err.Error()} } @@ -584,6 +595,7 @@ func checkStreamFinished(request Request, id, id_max int, data map[string]interf return nil } r, ok := ExtractMessageRecord(data) + fmt.Println(r,ok) if !ok || !r.FinishedStream { return nil } diff --git a/broker/src/asapo_broker/database/mongodb_test.go b/broker/src/asapo_broker/database/mongodb_test.go index e1caa8553eca0693e4a60dbdde99a242983a7bff..72471969075187ea4e5226bec89a167a63f26d07 100644 --- a/broker/src/asapo_broker/database/mongodb_test.go +++ b/broker/src/asapo_broker/database/mongodb_test.go @@ -156,6 +156,22 @@ func TestMongoDBGetNextErrorOnFinishedStream(t *testing.T) { assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":1,\"next_stream\":\"next1\"}", err.(*DBError).Message) } +func TestMongoDBGetNextErrorOnFinishedStreamAlways(t *testing.T) { + db.Connect(dbaddress) + defer cleanup() + db.insertRecord(dbname, collection, &rec1) + db.insertRecord(dbname, collection, &rec_finished) + + db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next"}) + db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next"}) + _, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next"}) + + assert.Equal(t, utils.StatusNoData, err.(*DBError).Code) + assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":1,\"next_stream\":\"next1\"}", err.(*DBError).Message) +} + + + func TestMongoDBGetByIdErrorOnFinishedStream(t *testing.T) { db.Connect(dbaddress) defer cleanup() diff --git a/examples/consumer/getnext/getnext.cpp b/examples/consumer/getnext/getnext.cpp index d30e87e578210fb3776e34c18c1fd4d332ad321b..3429a4861ea29b23eee3fbfbda5d9a3c05c8bd78 100644 --- a/examples/consumer/getnext/getnext.cpp +++ b/examples/consumer/getnext/getnext.cpp @@ -42,6 +42,7 @@ struct Args { int nthreads; bool read_data; bool datasets; + bool need_beamtime_meta = false; }; class LatchedTimer { @@ -88,7 +89,7 @@ void WaitThreads(std::vector<std::thread>* threads) { int ProcessError(const Error& err) { if (err == nullptr) return 0; std::cout << err->Explain() << std::endl; - return err == asapo::ConsumerErrorTemplates::kEndOfStream ? 0 : 1; + return (err == asapo::ConsumerErrorTemplates::kEndOfStream ||err == asapo::ConsumerErrorTemplates::kStreamFinished) ? 0 : 1; } std::vector<std::thread> @@ -121,13 +122,12 @@ StartThreads(const Args& params, std::vector<int>* nfiles, std::vector<int>* err (*errors)[i] += ProcessError(err); lock.unlock(); exit(EXIT_FAILURE); - return; } } lock.unlock(); - if (i == 0) { + if (i == 0 && params.need_beamtime_meta) { auto meta = consumer->GetBeamtimeMeta(&err); if (err == nullptr) { std::cout << meta << std::endl; @@ -267,9 +267,9 @@ void TryGetStream(Args* args) { int main(int argc, char* argv[]) { Args params; params.datasets = false; - if (argc != 8 && argc != 9) { + if (argc != 8 && argc != 9 && argc != 10) { std::cout << "Usage: " + std::string{argv[0]} - + " <server> <files_path> <run_name> <nthreads> <token> <timeout ms> <metaonly> [use datasets]" + + " <server> <files_path> <run_name> <nthreads> <token> <timeout ms> <metaonly> [use datasets] [send metadata]" << std::endl; exit(EXIT_FAILURE); @@ -285,6 +285,9 @@ int main(int argc, char* argv[]) { if (argc == 9) { params.datasets = atoi(argv[8]) == 1; } + if (argc == 10) { + params.need_beamtime_meta = atoi(argv[9]) == 1; + } if (params.read_data) { std::cout << "Will read metadata+payload" << std::endl; diff --git a/examples/producer/dummy-data-producer/dummy_data_producer.cpp b/examples/producer/dummy-data-producer/dummy_data_producer.cpp index 560cd20ba6b9f89aa52d40deae4cab08319f2666..1a31044f8b3f99a7b601457671294fd337a191ed 100644 --- a/examples/producer/dummy-data-producer/dummy_data_producer.cpp +++ b/examples/producer/dummy-data-producer/dummy_data_producer.cpp @@ -189,7 +189,7 @@ bool SendDummyData(asapo::Producer* producer, size_t number_of_byte, uint64_t it } } } - return true; + return producer->SendStreamFinishedFlag("default",iterations,"",nullptr) == nullptr; } std::unique_ptr<asapo::Producer> CreateProducer(const Args& args) { diff --git a/tests/automatic/full_chain/simple_chain_metadata/check_linux.sh b/tests/automatic/full_chain/simple_chain_metadata/check_linux.sh index 3d9103f83b59123012d41cc5648b4ecb51f95d37..c0707599e13490f1e27acca4b5a2d241570729ff 100644 --- a/tests/automatic/full_chain/simple_chain_metadata/check_linux.sh +++ b/tests/automatic/full_chain/simple_chain_metadata/check_linux.sh @@ -36,5 +36,5 @@ mkdir -p ${receiver_folder} $producer_bin localhost:8400 ${beamtime_id} 100 0 1 0 1000 echo "Start consumer in metadata only mode" -$consumer_bin ${proxy_address} ${receiver_folder} ${beamtime_id} 2 $token 1000 1 | tee out +$consumer_bin ${proxy_address} ${receiver_folder} ${beamtime_id} 2 $token 1000 1 0 1 | tee out grep "dummy_meta" out diff --git a/tests/automatic/full_chain/simple_chain_metadata/check_windows.bat b/tests/automatic/full_chain/simple_chain_metadata/check_windows.bat index fc5283fe047d782f5676a2aa32aa44612420b184..2b80a5bf9842f62e61fb7fd17115a848011bb972 100644 --- a/tests/automatic/full_chain/simple_chain_metadata/check_windows.bat +++ b/tests/automatic/full_chain/simple_chain_metadata/check_windows.bat @@ -14,7 +14,7 @@ mkdir %receiver_folder% "%1" %proxy_address% %beamtime_id% 100 0 1 0 1000 REM consumer -"%2" %proxy_address% %receiver_folder% %beamtime_id% 2 %token% 5000 1 > out.txt +"%2" %proxy_address% %receiver_folder% %beamtime_id% 2 %token% 5000 1 0 1 > out.txt type out.txt findstr /i /l /c:"dummy_meta" out.txt || goto :error