diff --git a/CHANGELOG.md b/CHANGELOG.md index b4715b78c46c306b51cb88dbca5f92610f8682a1..e4329302b47839af5cdd8285cce1e853148d53ef 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ +## 21.06.0 + + +BUG FIXES +* Consumer API: multiple consumers from same group receive stream finished error + ## 21.03.3 BUG FIXES diff --git a/CMakeLists.txt b/CMakeLists.txt index ae41d4402516534d750abb3647fc69d183e25370..392ff92b91f385cb84e1c21eb7c7efffea856008 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2,12 +2,12 @@ cmake_minimum_required(VERSION 3.7) project(ASAPO) #protocol version changes if one of the microservice API's change -set (ASAPO_CONSUMER_PROTOCOL "v0.3") +set (ASAPO_CONSUMER_PROTOCOL "v0.4") set (ASAPO_PRODUCER_PROTOCOL "v0.2") set (ASAPO_DISCOVERY_API_VER "v0.1") set (ASAPO_AUTHORIZER_API_VER "v0.1") -set (ASAPO_BROKER_API_VER "v0.3") +set (ASAPO_BROKER_API_VER "v0.4") set (ASAPO_FILE_TRANSFER_SERVICE_API_VER "v0.1") set (ASAPO_RECEIVER_API_VER "v0.2") set (ASAPO_RDS_API_VER "v0.1") diff --git a/CMakeModules/CodeCoverage.cmake b/CMakeModules/CodeCoverage.cmake index 63f4a9815646a623b4af2942a5ee6c8146b592cf..ec41aa803cb9cb4caa8889a7abb0cd402c3f7984 100644 --- a/CMakeModules/CodeCoverage.cmake +++ b/CMakeModules/CodeCoverage.cmake @@ -155,7 +155,7 @@ function(SETUP_TARGET_FOR_COVERAGE) # Create baseline to make sure untouched files show up in the report - COMMAND ${LCOV_PATH} -b ${CMAKE_CURRENT_SOURCE_DIR} -no-external -c -i -d ${PROJECT_BINARY_DIR} -o ${Coverage_NAME}.base + COMMAND ${LCOV_PATH} -b ${CMAKE_CURRENT_SOURCE_DIR} -no-external -c -i -d ${CMAKE_CURRENT_BINARY_DIR} -o ${Coverage_NAME}.base # Run tests COMMAND ${Coverage_EXECUTABLE} diff --git a/PROTOCOL-VERSIONS.md b/PROTOCOL-VERSIONS.md index 78fad52a20a55c0e3b3d103831e38b2cefc26441..6481b4fd64fe9836574cddc855b2f13c3b27a1d2 100644 --- a/PROTOCOL-VERSIONS.md +++ b/PROTOCOL-VERSIONS.md @@ -8,6 +8,7 @@ ### Consumer Protocol | Release | used by client | Supported by server | Status | | ------------ | ------------------- | -------------------- | ---------------- | -| v0.3 | 21.03.3 - 21.03.3 | 21.03.3 - 21.03.3 | Current version | -| v0.2 | 21.03.2 - 21.03.2 | 21.03.2 - 21.03.3 | Deprecates from 01.06.2022 | -| v0.1 | 21.03.0 - 21.03.1 | 21.03.0 - 21.03.3 | Deprecates from 01.06.2022 | +| v0.4 | 21.06.0 - 21.06.0 | 21.06.0 - 21.06.0 | Current version | +| v0.3 | 21.03.3 - 21.03.3 | 21.03.3 - 21.06.0 | Deprecates from 01.06.2022 | +| v0.2 | 21.03.2 - 21.03.2 | 21.03.2 - 21.06.0 | Deprecates from 01.06.2022 | +| v0.1 | 21.03.0 - 21.03.1 | 21.03.0 - 21.06.0 | Deprecates from 01.06.2022 | diff --git a/VERSIONS.md b/VERSIONS.md index 99b18ed5f364df0aa9646011fe844cc38761c3bd..37c226d10fb37341d486979d134de44a61b5ff69 100644 --- a/VERSIONS.md +++ b/VERSIONS.md @@ -1,19 +1,26 @@ ### Producer API -| Release | API changed | Breaking changes | Protocol | Supported by server from/to | Status |Comment| -| ------------ | ----------- |----------------- | -------- | ------------------------- | --------------------- | ------- | -| 21.03.3 | No | No | v0.2 | 21.03.2/21.03.3 | current version |bugfix in server| -| 21.03.2 | Yes | No | v0.2 | 21.03.2/21.03.3 | current version |bugfixes, add delete_stream| -| 21.03.1 | No | No | v0.1 | 21.03.0/21.03.3 | deprecates 01.06.2022 |bugfix in server| -| 21.03.0 | Yes | Yes | v0.1 | 21.03.0/21.03.3 | | | +| Release | API changed\*\* | Protocol | Supported by server from/to | Status |Comment| +| ------------ | ----------- | -------- | ------------------------- | --------------------- | ------- | +| 21.03.3 | No | v0.2 | 21.03.2/21.03.3 | current version |bugfix in server| +| 21.03.2 | Yes | v0.2 | 21.03.2/21.03.3 | current version |bugfixes, add delete_stream| +| 21.03.1 | No | v0.1 | 21.03.0/21.03.3 | deprecates 01.06.2022 |bugfix in server| +| 21.03.0 | Yes | v0.1 | 21.03.0/21.03.3 | | | ### Consumer API -| Release | API changed | Breaking changes | Protocol | Supported by server from/to | Status |Comment| -| ------------ | ----------- |----------------- | -------- | ------------------------- | ---------------- | ------- | -| 21.03.3 | Yes | No* | v0.3 | 21.03.3/21.03.3 | current version |bugfix in server, error type for dublicated ack| -| 21.03.2 | Yes | No | v0.2 | 21.03.2/21.03.3 | deprecates 01.06.2022 |bugfixes, add delete_stream| -| 21.03.1 | No | No | v0.1 | 21.03.0/21.03.3 | deprecates 01.06.2022 |bugfix in server| -| 21.03.0 | Yes | Yes | v0.1 | 21.03.0/21.03.3 | | | +| Release | API changed\*\* | Protocol | Supported by server from/to | Status |Comment| +| ------------ | ----------- | --------- | ------------------------- | ---------------- | ------- | +| 21.06.0 | No* | v0.4 | 21.06.0/21.06.0 | current version |bugfixes | +| 21.03.3 | Yes | v0.3 | 21.03.3/21.06.0 | deprecates 01.06.2022 |bugfix in server, error type for dublicated ack| +| 21.03.2 | Yes | v0.2 | 21.03.2/21.06.0 | deprecates 01.06.2022 |bugfixes, add delete_stream| +| 21.03.1 | No | v0.1 | 21.03.0/21.06.0 | deprecates 01.06.2022 |bugfix in server| +| 21.03.0 | Yes | v0.1 | 21.03.0/21.06.0 | | | -\* insignificant changes/bugfixes (e.g. in return type, etc), normally do not require client code changes, but formally might break the client \ No newline at end of file +\* insignificant changes/bugfixes (e.g. in return type, etc), normally do not require client code changes, but formally might break the client + +\*\* under API change we understand any changes that may require updating/recompiling user code - +(e.g. parameter rename ,...), +adding new functionality (thus new client will not work with old server), +but also changing internal structures and client behavior (e.g. adding a field to a structure (privat or public), changing error type, ...). +Check CHANGELOG.md to see more details about changes. \ No newline at end of file diff --git a/broker/src/asapo_broker/database/mongodb.go b/broker/src/asapo_broker/database/mongodb.go index 4527693cfe7a9949dba17ddfe4ab2d20c7a3782d..5491086a02aebf133e75639a9498cecc956e9c31 100644 --- a/broker/src/asapo_broker/database/mongodb.go +++ b/broker/src/asapo_broker/database/mongodb.go @@ -8,6 +8,7 @@ import ( "context" "encoding/json" "errors" + "fmt" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" @@ -217,6 +218,16 @@ func (db *Mongodb) setCounter(request Request, ind int) (err error) { return } +func (db *Mongodb) errorWhenCannotIncrementField(request Request, max_ind int) (err error) { + if res, err := db.getRecordFromDb(request, max_ind, max_ind);err == nil { + if err := checkStreamFinished(request, max_ind, max_ind, res); err != nil { + return err + } + } + return &DBError{utils.StatusNoData, encodeAnswer(max_ind, max_ind, "")} +} + + func (db *Mongodb) incrementField(request Request, max_ind int, res interface{}) (err error) { update := bson.M{"$inc": bson.M{pointer_field_name: 1}} opts := options.FindOneAndUpdate().SetUpsert(true).SetReturnDocument(options.After) @@ -233,7 +244,7 @@ func (db *Mongodb) incrementField(request Request, max_ind int, res interface{}) if err2 := c.FindOneAndUpdate(context.TODO(), q, update, opts).Decode(res); err2 == nil { return nil } - return &DBError{utils.StatusNoData, encodeAnswer(max_ind, max_ind, "")} + return db.errorWhenCannotIncrementField(request,max_ind) } return &DBError{utils.StatusTransactionInterrupted, err.Error()} } @@ -584,6 +595,7 @@ func checkStreamFinished(request Request, id, id_max int, data map[string]interf return nil } r, ok := ExtractMessageRecord(data) + fmt.Println(r,ok) if !ok || !r.FinishedStream { return nil } diff --git a/broker/src/asapo_broker/database/mongodb_test.go b/broker/src/asapo_broker/database/mongodb_test.go index e1caa8553eca0693e4a60dbdde99a242983a7bff..72471969075187ea4e5226bec89a167a63f26d07 100644 --- a/broker/src/asapo_broker/database/mongodb_test.go +++ b/broker/src/asapo_broker/database/mongodb_test.go @@ -156,6 +156,22 @@ func TestMongoDBGetNextErrorOnFinishedStream(t *testing.T) { assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":1,\"next_stream\":\"next1\"}", err.(*DBError).Message) } +func TestMongoDBGetNextErrorOnFinishedStreamAlways(t *testing.T) { + db.Connect(dbaddress) + defer cleanup() + db.insertRecord(dbname, collection, &rec1) + db.insertRecord(dbname, collection, &rec_finished) + + db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next"}) + db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next"}) + _, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next"}) + + assert.Equal(t, utils.StatusNoData, err.(*DBError).Code) + assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":1,\"next_stream\":\"next1\"}", err.(*DBError).Message) +} + + + func TestMongoDBGetByIdErrorOnFinishedStream(t *testing.T) { db.Connect(dbaddress) defer cleanup() diff --git a/common/cpp/src/data_structs/data_structs.cpp b/common/cpp/src/data_structs/data_structs.cpp index 16b960a7c005c21a443aee52210526efb289dbbc..0786761ab1c817dd825f7cba05200fd6d6bf0f40 100644 --- a/common/cpp/src/data_structs/data_structs.cpp +++ b/common/cpp/src/data_structs/data_structs.cpp @@ -26,6 +26,7 @@ std::string GetStringFromSourceType(SourceType type) { case SourceType::kRaw:return "raw"; case SourceType::kProcessed:return "processed"; } + return "unknown"; } Error GetSourceTypeFromString(std::string stype, SourceType* type) { @@ -158,7 +159,6 @@ std::string StreamInfo::Json() const { bool StreamInfo::SetFromJson(const std::string &json_string) { auto old = *this; JsonStringParser parser(json_string); - uint64_t id; if (parser.GetUInt64("lastId", &last_id) || parser.GetBool("finished", &finished) || parser.GetString("nextStream", &next_stream) || @@ -192,7 +192,7 @@ std::string IsoDateFromEpochNanosecs(uint64_t time_from_epoch_nanosec) { uint64_t NanosecsEpochFromISODate(std::string date_time) { double frac = 0; - int pos = date_time.find_first_of('.'); + auto pos = date_time.find_first_of('.'); if (pos != std::string::npos) { std::string frac_str = date_time.substr(pos); if (sscanf(frac_str.c_str(), "%lf", &frac) != 1) { diff --git a/consumer/api/cpp/src/consumer_impl.cpp b/consumer/api/cpp/src/consumer_impl.cpp index c8358a58cccb45770cf0107b7a804ec8b2f1abad..f6e8d95ec11d865111f6de48e7b23534d13b777f 100644 --- a/consumer/api/cpp/src/consumer_impl.cpp +++ b/consumer/api/cpp/src/consumer_impl.cpp @@ -716,6 +716,7 @@ std::string filterToString(StreamFilter filter) { case StreamFilter::kFinishedStreams:return "finished"; case StreamFilter::kUnfinishedStreams:return "unfinished"; } + return ""; } StreamInfos ConsumerImpl::GetStreamList(std::string from, StreamFilter filter, Error* err) { diff --git a/consumer/api/cpp/unittests/test_consumer_impl.cpp b/consumer/api/cpp/unittests/test_consumer_impl.cpp index b63f22dc6e7a1c1d5efec1c1f3c7b0ab4d8bb66e..93511dee7837f231368c258b2c0039b680cb5227 100644 --- a/consumer/api/cpp/unittests/test_consumer_impl.cpp +++ b/consumer/api/cpp/unittests/test_consumer_impl.cpp @@ -71,8 +71,9 @@ class ConsumerImplTests : public Test { MessageMeta info; std::string expected_server_uri = "test:8400"; std::string expected_broker_uri = "asapo-broker:5005"; - std::string expected_consumer_protocol = "v0.3"; - std::string expected_broker_api = expected_broker_uri + "/" + expected_consumer_protocol; + std::string expected_consumer_protocol = asapo::kConsumerProtocol.GetVersion(); + std::string expected_broker_protocol = asapo::kConsumerProtocol.GetBrokerVersion(); + std::string expected_broker_api = expected_broker_uri + "/" + expected_broker_protocol; std::string expected_fts_uri = "asapo-file-transfer:5008"; std::string expected_token = "token"; std::string expected_path = "/tmp/beamline/beamtime"; diff --git a/discovery/src/asapo_discovery/protocols/hard_coded_consumer.go b/discovery/src/asapo_discovery/protocols/hard_coded_consumer.go index 8f9b12709147fca5a1ab9e579ed0c76568afa7c0..685bbd4ed391022341ea262edab0a0ddfcd19abb 100644 --- a/discovery/src/asapo_discovery/protocols/hard_coded_consumer.go +++ b/discovery/src/asapo_discovery/protocols/hard_coded_consumer.go @@ -12,6 +12,14 @@ func getTimefromDate(date string) time.Time{ func GetSupportedConsumerProtocols() []Protocol { return []Protocol{ + Protocol{"v0.4", + map[string]string{ + "Discovery": "v0.1", + "Authorizer": "v0.1", + "Broker": "v0.4", + "File Transfer": "v0.1", + "Data cache service": "v0.1", + }, &protocolValidatorCurrent{}}, Protocol{"v0.3", map[string]string{ "Discovery": "v0.1", @@ -19,7 +27,7 @@ func GetSupportedConsumerProtocols() []Protocol { "Broker": "v0.3", "File Transfer": "v0.1", "Data cache service": "v0.1", - }, &protocolValidatorCurrent{}}, + }, &protocolValidatorDeprecated{getTimefromDate("2022-06-01")}}, Protocol{"v0.2", map[string]string{ "Discovery": "v0.1", diff --git a/discovery/src/asapo_discovery/protocols/protocol_test.go b/discovery/src/asapo_discovery/protocols/protocol_test.go index 580cc2fda3bc453ea2e852df6dea46ca2be6bedb..4f0e45f869e0003634a5616d0276ff65725eb15e 100644 --- a/discovery/src/asapo_discovery/protocols/protocol_test.go +++ b/discovery/src/asapo_discovery/protocols/protocol_test.go @@ -15,7 +15,8 @@ type protocolTest struct { var protocolTests = []protocolTest{ // consumer - {"consumer", "v0.3", true, "current", "v0.3"}, + {"consumer", "v0.4", true, "current", "v0.4"}, + {"consumer", "v0.3", true, "deprecates", "v0.3"}, {"consumer", "v0.2", true, "deprecates", "v0.2"}, {"consumer", "v0.1", true, "deprecates", "v0.1"}, {"consumer", "v1000.2", false, "unknown", "unknown protocol"}, diff --git a/examples/consumer/getnext/getnext.cpp b/examples/consumer/getnext/getnext.cpp index d30e87e578210fb3776e34c18c1fd4d332ad321b..3429a4861ea29b23eee3fbfbda5d9a3c05c8bd78 100644 --- a/examples/consumer/getnext/getnext.cpp +++ b/examples/consumer/getnext/getnext.cpp @@ -42,6 +42,7 @@ struct Args { int nthreads; bool read_data; bool datasets; + bool need_beamtime_meta = false; }; class LatchedTimer { @@ -88,7 +89,7 @@ void WaitThreads(std::vector<std::thread>* threads) { int ProcessError(const Error& err) { if (err == nullptr) return 0; std::cout << err->Explain() << std::endl; - return err == asapo::ConsumerErrorTemplates::kEndOfStream ? 0 : 1; + return (err == asapo::ConsumerErrorTemplates::kEndOfStream ||err == asapo::ConsumerErrorTemplates::kStreamFinished) ? 0 : 1; } std::vector<std::thread> @@ -121,13 +122,12 @@ StartThreads(const Args& params, std::vector<int>* nfiles, std::vector<int>* err (*errors)[i] += ProcessError(err); lock.unlock(); exit(EXIT_FAILURE); - return; } } lock.unlock(); - if (i == 0) { + if (i == 0 && params.need_beamtime_meta) { auto meta = consumer->GetBeamtimeMeta(&err); if (err == nullptr) { std::cout << meta << std::endl; @@ -267,9 +267,9 @@ void TryGetStream(Args* args) { int main(int argc, char* argv[]) { Args params; params.datasets = false; - if (argc != 8 && argc != 9) { + if (argc != 8 && argc != 9 && argc != 10) { std::cout << "Usage: " + std::string{argv[0]} - + " <server> <files_path> <run_name> <nthreads> <token> <timeout ms> <metaonly> [use datasets]" + + " <server> <files_path> <run_name> <nthreads> <token> <timeout ms> <metaonly> [use datasets] [send metadata]" << std::endl; exit(EXIT_FAILURE); @@ -285,6 +285,9 @@ int main(int argc, char* argv[]) { if (argc == 9) { params.datasets = atoi(argv[8]) == 1; } + if (argc == 10) { + params.need_beamtime_meta = atoi(argv[9]) == 1; + } if (params.read_data) { std::cout << "Will read metadata+payload" << std::endl; diff --git a/examples/producer/dummy-data-producer/dummy_data_producer.cpp b/examples/producer/dummy-data-producer/dummy_data_producer.cpp index 560cd20ba6b9f89aa52d40deae4cab08319f2666..1a31044f8b3f99a7b601457671294fd337a191ed 100644 --- a/examples/producer/dummy-data-producer/dummy_data_producer.cpp +++ b/examples/producer/dummy-data-producer/dummy_data_producer.cpp @@ -189,7 +189,7 @@ bool SendDummyData(asapo::Producer* producer, size_t number_of_byte, uint64_t it } } } - return true; + return producer->SendStreamFinishedFlag("default",iterations,"",nullptr) == nullptr; } std::unique_ptr<asapo::Producer> CreateProducer(const Args& args) { diff --git a/producer/api/cpp/src/producer_impl.cpp b/producer/api/cpp/src/producer_impl.cpp index 09ed2e2f72a9f259308e1898f86360c14ae890d1..000600a6b0dd3c998b1b47b6c5efa904de1da747 100644 --- a/producer/api/cpp/src/producer_impl.cpp +++ b/producer/api/cpp/src/producer_impl.cpp @@ -365,6 +365,7 @@ GenericRequestHeader CreateRequestHeaderFromOp(StreamRequestOp op,std::string st case StreamRequestOp::kLastStream: return GenericRequestHeader{kOpcodeLastStream, 0, 0, 0, "", ""}; } + return GenericRequestHeader{}; } StreamInfo ProducerImpl::StreamRequest(StreamRequestOp op,std::string stream, uint64_t timeout_ms, Error* err) const { diff --git a/producer/api/cpp/unittests/test_producer_impl.cpp b/producer/api/cpp/unittests/test_producer_impl.cpp index 13cafa963dfbde3e1fb2e9121e1ee19b938c034a..023912b1f5c8b880e867cfb427edc4d9f851a811 100644 --- a/producer/api/cpp/unittests/test_producer_impl.cpp +++ b/producer/api/cpp/unittests/test_producer_impl.cpp @@ -67,7 +67,7 @@ class ProducerImplTests : public testing::Test { asapo::ProducerRequestHandlerFactory factory{&service}; testing::NiceMock<asapo::MockLogger> mock_logger; testing::NiceMock<MockRequestPull> mock_pull{&factory, &mock_logger}; - std::string expected_server_uri = "localhost:9400"; + std::string expected_server_uri = "127.0.0.1:9400"; asapo::ProducerImpl producer{expected_server_uri, 1, 3600000, asapo::RequestHandlerType::kTcp}; uint64_t expected_size = 100; uint64_t expected_id = 10; diff --git a/receiver/unittests/test_datacache.cpp b/receiver/unittests/test_datacache.cpp index ddadfdd5a33ad080a301b87f1d02b183e90b70a2..56c90b426ccbca119295524e367936c10f1b0621 100644 --- a/receiver/unittests/test_datacache.cpp +++ b/receiver/unittests/test_datacache.cpp @@ -125,7 +125,7 @@ TEST_F(DataCacheTests, GetFreeSlotRemovesOldMetadataRecords) { DataCache cache{expected_cache_size, 0}; CacheMeta* meta3, *meta4, *meta5; CacheMeta* meta; - auto addr = cache.GetFreeSlotAndLock(10, &meta1); + cache.GetFreeSlotAndLock(10, &meta1); cache.GetFreeSlotAndLock(10, &meta2); cache.GetFreeSlotAndLock(expected_cache_size - 30, &meta3); auto id1 = meta1->id; diff --git a/tests/automatic/asapo_fabric/CMakeLists.txt b/tests/automatic/asapo_fabric/CMakeLists.txt index c9e75c82675d933e1357de941a40f44fda02a715..05111bc399bcb5dcb4f51ef1492ba7c6e92f1785 100644 --- a/tests/automatic/asapo_fabric/CMakeLists.txt +++ b/tests/automatic/asapo_fabric/CMakeLists.txt @@ -15,5 +15,11 @@ foreach(file ${files}) set_target_properties(${TARGET_NAME} PROPERTIES LINKER_LANGUAGE CXX) # Add test - add_integration_test(${TARGET_NAME} ${TARGET_NAME} "") + if (file_we STREQUAL "timeout_test") + add_integration_test(${TARGET_NAME} ${TARGET_NAME} "" nomem) + else() + add_integration_test(${TARGET_NAME} ${TARGET_NAME} "") + endif () + + endforeach() diff --git a/tests/automatic/full_chain/simple_chain_metadata/check_linux.sh b/tests/automatic/full_chain/simple_chain_metadata/check_linux.sh index 3d9103f83b59123012d41cc5648b4ecb51f95d37..c0707599e13490f1e27acca4b5a2d241570729ff 100644 --- a/tests/automatic/full_chain/simple_chain_metadata/check_linux.sh +++ b/tests/automatic/full_chain/simple_chain_metadata/check_linux.sh @@ -36,5 +36,5 @@ mkdir -p ${receiver_folder} $producer_bin localhost:8400 ${beamtime_id} 100 0 1 0 1000 echo "Start consumer in metadata only mode" -$consumer_bin ${proxy_address} ${receiver_folder} ${beamtime_id} 2 $token 1000 1 | tee out +$consumer_bin ${proxy_address} ${receiver_folder} ${beamtime_id} 2 $token 1000 1 0 1 | tee out grep "dummy_meta" out diff --git a/tests/automatic/full_chain/simple_chain_metadata/check_windows.bat b/tests/automatic/full_chain/simple_chain_metadata/check_windows.bat index fc5283fe047d782f5676a2aa32aa44612420b184..2b80a5bf9842f62e61fb7fd17115a848011bb972 100644 --- a/tests/automatic/full_chain/simple_chain_metadata/check_windows.bat +++ b/tests/automatic/full_chain/simple_chain_metadata/check_windows.bat @@ -14,7 +14,7 @@ mkdir %receiver_folder% "%1" %proxy_address% %beamtime_id% 100 0 1 0 1000 REM consumer -"%2" %proxy_address% %receiver_folder% %beamtime_id% 2 %token% 5000 1 > out.txt +"%2" %proxy_address% %receiver_folder% %beamtime_id% 2 %token% 5000 1 0 1 > out.txt type out.txt findstr /i /l /c:"dummy_meta" out.txt || goto :error diff --git a/tests/automatic/high_avail/receiver_mongo_restart/check_linux.sh b/tests/automatic/high_avail/receiver_mongo_restart/check_linux.sh index 6ce5943adaf3bf7d6c76c80e958a74f2f0862a50..4d7a281e16dc7281a6dda26d4850ee64719d6fbf 100644 --- a/tests/automatic/high_avail/receiver_mongo_restart/check_linux.sh +++ b/tests/automatic/high_avail/receiver_mongo_restart/check_linux.sh @@ -58,6 +58,7 @@ nomad run discovery.nmd mkdir -p ${receiver_folder} nfiles=1000 +nrecords=1001 # nfiles + stream finished flag $1 localhost:8400 ${beamtime_id} 100 $nfiles 1 0 200 & @@ -75,6 +76,6 @@ echo processed files: echo "db.data_default.count()" | mongo --port 27016 ${beamtime_id}_detector -echo "db.data_default.count()" | mongo --port 27016 ${beamtime_id}_detector | grep $nfiles +echo "db.data_default.count()" | mongo --port 27016 ${beamtime_id}_detector | grep $nrecords diff --git a/tests/automatic/producer/python_api/producer_api.py b/tests/automatic/producer/python_api/producer_api.py index dc82b2a2c767a97c31c96034451bd23f81d43dcb..b4061c3cf08e8127721437b49a2772c886f5c8a5 100644 --- a/tests/automatic/producer/python_api/producer_api.py +++ b/tests/automatic/producer/python_api/producer_api.py @@ -114,6 +114,9 @@ else: print("should be error sending id 0 ") sys.exit(1) +# wait before sending to another stream so we sure that this stream appears later +producer.wait_requests_finished(50000) + # send to another stream producer.send(1, "processed/" + data_source + "/" + "file9", None, ingest_mode=asapo_producer.INGEST_MODE_TRANSFER_METADATA_ONLY, stream="stream", callback=callback) @@ -143,9 +146,8 @@ producer.send_file(1, local_path="./file1", exposed_path="processed/" + data_sou producer.wait_requests_finished(50000) n = producer.get_requests_queue_size() assert_eq(n, 0, "requests in queue") -assert_eq(n, 0, "requests in queue") -# send to another data to stream stream +# send another data to stream stream producer.send(2, "processed/" + data_source + "/" + "file10", None, ingest_mode=asapo_producer.INGEST_MODE_TRANSFER_METADATA_ONLY, stream="stream", callback=callback) @@ -188,6 +190,7 @@ assert_eq(info['lastId'], 3, "last id from different stream") assert_eq(info['finished'], True, "stream finished") info = producer.stream_info('dataset_stream') +print(info) assert_eq(info['lastId'], 2, "last id from stream with datasets") info = producer.stream_info('not_exist') @@ -195,6 +198,7 @@ assert_eq(info['lastId'], 0, "last id from non existing stream") info_last = producer.last_stream() +print(info_last) assert_eq(info_last['name'], "stream", "last stream") assert_eq(info_last['timestampCreated'] <= info_last['timestampLast'], True, "last is later than first")