diff --git a/broker/src/asapo_broker/database/mongodb.go b/broker/src/asapo_broker/database/mongodb.go index 95ff954acdb07ec65e783db11760e76e971f1ab9..7987dd87267f983dd96b676eb6c26bc3bb688b3e 100644 --- a/broker/src/asapo_broker/database/mongodb.go +++ b/broker/src/asapo_broker/database/mongodb.go @@ -21,6 +21,12 @@ type ID struct { ID int `bson:"_id"` } +type ServiceRecord struct { + ID int `json:"_id"` + Name string `json:"name"` + Meta map[string]interface{} `json:"meta"` +} + type LocationPointer struct { GroupID string `bson:"_id"` Value int `bson:"current_pointer"` @@ -34,6 +40,9 @@ const no_session_msg = "database client not created" const wrong_id_type = "wrong id type" const already_connected_msg = "already connected" +const finish_substream_keyword = "asapo_finish_substream" +const no_next_substream_keyword = "asapo_no_next" + var dbListLock sync.RWMutex var dbPointersLock sync.RWMutex var dbSessionLock sync.RWMutex @@ -201,7 +210,7 @@ func (db *Mongodb) incrementField(dbname string, collection_name string, group_i err = c.FindOneAndUpdate(context.TODO(), q, update, opts).Decode(res) if err != nil { if err == mongo.ErrNoDocuments { - return &DBError{utils.StatusNoData, encodeAnswer(max_ind, max_ind)} + return &DBError{utils.StatusNoData, encodeAnswer(max_ind, max_ind, "")} } return &DBError{utils.StatusTransactionInterrupted, err.Error()} } @@ -209,12 +218,13 @@ func (db *Mongodb) incrementField(dbname string, collection_name string, group_i return nil } -func encodeAnswer(id, id_max int) string { +func encodeAnswer(id, id_max int, next_substream string) string { var r = struct { - Op string `json:"op""` - Id int `json:"id""` - Id_max int `json:"id_max""` - }{"get_record_by_id", id, id_max} + Op string `json:"op"` + Id int `json:"id"` + Id_max int `json:"id_max"` + Next_substream string `json:"next_substream"` + }{"get_record_by_id", id, id_max, next_substream} answer, _ := json.Marshal(&r) return string(answer) } @@ -231,7 +241,7 @@ func (db *Mongodb) getRecordByIDRow(dbname string, collection_name string, id, i c := db.client.Database(dbname).Collection(data_collection_name_prefix + collection_name) err := c.FindOne(context.TODO(), q, options.FindOne()).Decode(&res) if err != nil { - answer := encodeAnswer(id, id_max) + answer := encodeAnswer(id, id_max, "") log_str := "error getting record id " + strconv.Itoa(id) + " for " + dbname + " : " + err.Error() logger.Debug(log_str) return nil, &DBError{utils.StatusNoData, answer} @@ -317,6 +327,24 @@ func (db *Mongodb) getCurrentPointer(db_name string, collection_name string, gro return curPointer, max_ind, nil } +func processLastRecord(data []byte, collection_name string, err error) ([]byte, error) { + var r ServiceRecord + err = json.Unmarshal(data, &r) + if err != nil || r.Name != finish_substream_keyword { + return data, err + } + var next_substream string + next_substream, ok := r.Meta["next_substream"].(string) + if !ok { + next_substream = no_next_substream_keyword + } + + answer := encodeAnswer(r.ID, r.ID, next_substream) + log_str := "reached end of substream " + collection_name + " , next_substream: " + next_substream + logger.Debug(log_str) + return nil, &DBError{utils.StatusNoData, answer} +} + func (db *Mongodb) getNextRecord(db_name string, collection_name string, group_id string, dataset bool) ([]byte, error) { curPointer, max_ind, err := db.getCurrentPointer(db_name, collection_name, group_id, dataset) if err != nil { @@ -326,7 +354,11 @@ func (db *Mongodb) getNextRecord(db_name string, collection_name string, group_i } log_str := "got next pointer " + strconv.Itoa(curPointer.Value) + " for " + db_name + ", groupid: " + group_id logger.Debug(log_str) - return db.getRecordByIDRow(db_name, collection_name, curPointer.Value, max_ind, dataset) + data, err := db.getRecordByIDRow(db_name, collection_name, curPointer.Value, max_ind, dataset) + if curPointer.Value != max_ind { + return data, err + } + return processLastRecord(data, collection_name, err) } func (db *Mongodb) getLastRecord(db_name string, collection_name string, group_id string, dataset bool) ([]byte, error) { diff --git a/broker/src/asapo_broker/database/mongodb_test.go b/broker/src/asapo_broker/database/mongodb_test.go index 16bda6a5af9f419b8cd40047f9c10d9c06196710..aa150d478c09e7e5e967111b3f3a9b8438947ca1 100644 --- a/broker/src/asapo_broker/database/mongodb_test.go +++ b/broker/src/asapo_broker/database/mongodb_test.go @@ -14,8 +14,9 @@ import ( ) type TestRecord struct { - ID int `bson:"_id" json:"_id"` - FName string `bson:"fname" json:"fname"` + ID int `bson:"_id" json:"_id"` + Meta map[string]string `bson:"meta" json:"meta"` + Name string `bson:"name" json:"name"` } type TestDataset struct { @@ -34,9 +35,12 @@ const groupId = "bid2a5auidddp1vl71d0" const metaID = 0 const metaID_str = "0" -var rec1 = TestRecord{1, "aaa"} -var rec2 = TestRecord{2, "bbb"} -var rec3 = TestRecord{3, "ccc"} +var empty_next = map[string]string{"next_substream": ""} + +var rec1 = TestRecord{1, empty_next, "aaa"} +var rec_finished = TestRecord{2, map[string]string{"next_substream": "next1"}, finish_substream_keyword} +var rec2 = TestRecord{2, empty_next, "bbb"} +var rec3 = TestRecord{3, empty_next, "ccc"} var rec1_expect, _ = json.Marshal(rec1) var rec2_expect, _ = json.Marshal(rec2) @@ -114,7 +118,7 @@ func TestMongoDBGetNextErrorWhenRecordNotThereYet(t *testing.T) { db.insertRecord(dbname, collection, &rec2) _, err := db.ProcessRequest(dbname, collection, groupId, "next", "") assert.Equal(t, utils.StatusNoData, err.(*DBError).Code) - assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":2}", err.Error()) + assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":2,\"next_substream\":\"\"}", err.Error()) } func TestMongoDBGetNextOK(t *testing.T) { @@ -126,6 +130,19 @@ func TestMongoDBGetNextOK(t *testing.T) { assert.Equal(t, string(rec1_expect), string(res)) } +func TestMongoDBGetNextErrorOnFinishedStream(t *testing.T) { + db.Connect(dbaddress) + defer cleanup() + db.insertRecord(dbname, collection, &rec1) + db.insertRecord(dbname, collection, &rec_finished) + + db.ProcessRequest(dbname, collection, groupId, "next", "") + _, err := db.ProcessRequest(dbname, collection, groupId, "next", "") + + assert.Equal(t, utils.StatusNoData, err.(*DBError).Code) + assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":2,\"id_max\":2,\"next_substream\":\"next1\"}", err.(*DBError).Message) +} + func TestMongoDBGetNextErrorOnNoMoreData(t *testing.T) { db.Connect(dbaddress) defer cleanup() @@ -134,7 +151,7 @@ func TestMongoDBGetNextErrorOnNoMoreData(t *testing.T) { _, err := db.ProcessRequest(dbname, collection, groupId, "next", "") assert.Equal(t, utils.StatusNoData, err.(*DBError).Code) - assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":1}", err.(*DBError).Message) + assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":1,\"next_substream\":\"\"}", err.(*DBError).Message) } func TestMongoDBGetNextCorrectOrder(t *testing.T) { @@ -162,7 +179,7 @@ func insertRecords(n int) { records := make([]TestRecord, n) for ind, record := range records { record.ID = ind - record.FName = string(ind) + record.Name = string(ind) db.insertRecord(dbname, collection, &record) } @@ -212,7 +229,7 @@ func TestMongoDBgetRecordByIDFails(t *testing.T) { db.insertRecord(dbname, collection, &rec1) _, err := db.ProcessRequest(dbname, collection, groupId, "id", "2") assert.Equal(t, utils.StatusNoData, err.(*DBError).Code) - assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":2,\"id_max\":1}", err.Error()) + assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":2,\"id_max\":1,\"next_substream\":\"\"}", err.Error()) } func TestMongoDBGetRecordNext(t *testing.T) { @@ -493,7 +510,7 @@ func TestMongoDBNoDataOnNotCompletedFirstDataset(t *testing.T) { res_string, err := db.ProcessRequest(dbname, collection, groupId, "next_dataset", "0") assert.Equal(t, utils.StatusNoData, err.(*DBError).Code) - assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":0,\"id_max\":0}", err.(*DBError).Message) + assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":0,\"id_max\":0,\"next_substream\":\"\"}", err.(*DBError).Message) assert.Equal(t, "", string(res_string)) } diff --git a/consumer/api/cpp/include/consumer/consumer_error.h b/consumer/api/cpp/include/consumer/consumer_error.h index 618986c32e06d9b35d8797c8fcb07ab38b20c23c..e994a98025aa44e84a3f750f454b1cafd273ef8a 100644 --- a/consumer/api/cpp/include/consumer/consumer_error.h +++ b/consumer/api/cpp/include/consumer/consumer_error.h @@ -9,6 +9,7 @@ namespace asapo { enum class ConsumerErrorType { kNoData, kEndOfStream, + kStreamFinished, kUnavailableService, kInterruptedTransaction, kLocalIOError, @@ -22,6 +23,7 @@ class ConsumerErrorData : public CustomErrorData { public: uint64_t id; uint64_t id_max; + std::string next_substream; }; @@ -31,6 +33,10 @@ auto const kLocalIOError = ConsumerErrorTemplate{ "local i/o error", ConsumerErrorType::kLocalIOError }; +auto const kStreamFinished = ConsumerErrorTemplate{ + "stream finished", ConsumerErrorType::kStreamFinished +}; + auto const kEndOfStream = ConsumerErrorTemplate{ "no data - end of stream", ConsumerErrorType::kEndOfStream }; diff --git a/consumer/api/cpp/src/server_data_broker.cpp b/consumer/api/cpp/src/server_data_broker.cpp index 133c807850b1af3007237e24da7eca6dc3c7b00a..7891a5ad1efcc9612516c814e40fbdc106713ef9 100644 --- a/consumer/api/cpp/src/server_data_broker.cpp +++ b/consumer/api/cpp/src/server_data_broker.cpp @@ -13,10 +13,11 @@ using std::chrono::system_clock; namespace asapo { -Error GetIDsFromJson(const std::string& json_string, uint64_t* id, uint64_t* id_max) { +Error GetNoDataResponseFromJson(const std::string& json_string, ConsumerErrorData* data) { JsonStringParser parser(json_string); Error err; - if ((err = parser.GetUInt64("id", id)) || (err = parser.GetUInt64("id_max", id_max))) { + if ((err = parser.GetUInt64("id", &data->id)) || (err = parser.GetUInt64("id_max", &data->id_max)) + || (err = parser.GetString("next_substream", &data->next_substream))) { return err; } return nullptr; @@ -24,20 +25,20 @@ Error GetIDsFromJson(const std::string& json_string, uint64_t* id, uint64_t* id_ Error ErrorFromNoDataResponse(const std::string& response) { if (response.find("get_record_by_id") != std::string::npos) { - uint64_t id, id_max; - auto parse_error = GetIDsFromJson(response, &id, &id_max); + ConsumerErrorData data; + auto parse_error = GetNoDataResponseFromJson(response, &data); + std::cout<<response<<std::endl; if (parse_error) { return ConsumerErrorTemplates::kInterruptedTransaction.Generate("malformed response - " + response); } Error err; - if (id >= id_max ) { - err = ConsumerErrorTemplates::kEndOfStream.Generate(); + if (data.id >= data.id_max ) { + err = data.next_substream.empty() ? ConsumerErrorTemplates::kEndOfStream.Generate() : + ConsumerErrorTemplates::kStreamFinished.Generate(); } else { err = ConsumerErrorTemplates::kNoData.Generate(); } - ConsumerErrorData* error_data = new ConsumerErrorData; - error_data->id = id; - error_data->id_max = id_max; + ConsumerErrorData* error_data = new ConsumerErrorData{data}; err->SetCustomData(std::unique_ptr<CustomErrorData> {error_data}); return err; } @@ -169,6 +170,10 @@ Error ServerDataBroker::GetRecordFromServer(std::string* response, std::string g } } + if (err == ConsumerErrorTemplates::kStreamFinished) { + return err; + } + if (request_suffix == "next") { auto save_error = SwitchToGetByIdIfNoData(&err, *response, &request_suffix); if (err == ConsumerErrorTemplates::kInterruptedTransaction) { diff --git a/consumer/api/cpp/unittests/test_server_broker.cpp b/consumer/api/cpp/unittests/test_server_broker.cpp index e628aa4f13e50c4d1a951e2cd48174b3c7cfe221..753f9c042e0db3dd27cdc425a3971aa558f2af71 100644 --- a/consumer/api/cpp/unittests/test_server_broker.cpp +++ b/consumer/api/cpp/unittests/test_server_broker.cpp @@ -71,6 +71,7 @@ class ServerDataBrokerTests : public Test { uint64_t expected_dataset_id = 1; static const uint64_t expected_buf_id = 123; + std::string expected_next_substream = "nextsubstream"; void SetUp() override { data_broker = std::unique_ptr<ServerDataBroker> { new ServerDataBroker(expected_server_uri, expected_path, asapo::SourceCredentials{"beamtime_id", expected_stream, expected_token}) @@ -203,7 +204,7 @@ TEST_F(ServerDataBrokerTests, GetImageReturnsEndOfStreamFromHttpClient) { EXPECT_CALL(mock_http_client, Get_t(HasSubstr("next"), _, _)).WillOnce(DoAll( SetArgPointee<1>(HttpCode::Conflict), SetArgPointee<2>(nullptr), - Return("{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":1}"))); + Return("{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":1,\"next_substream\":\"\"}"))); auto err = data_broker->GetNext(&info, expected_group_id, nullptr); @@ -212,6 +213,25 @@ TEST_F(ServerDataBrokerTests, GetImageReturnsEndOfStreamFromHttpClient) { ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kEndOfStream)); ASSERT_THAT(err_data->id, Eq(1)); ASSERT_THAT(err_data->id_max, Eq(1)); + ASSERT_THAT(err_data->next_substream, Eq("")); +} + +TEST_F(ServerDataBrokerTests, GetImageReturnsStreamFinishedFromHttpClient) { + MockGetBrokerUri(); + + EXPECT_CALL(mock_http_client, Get_t(HasSubstr("next"), _, _)).WillOnce(DoAll( + SetArgPointee<1>(HttpCode::Conflict), + SetArgPointee<2>(nullptr), + Return("{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":1,\"next_substream\":\"" + expected_next_substream + "\"}"))); + + auto err = data_broker->GetNext(&info, expected_group_id, nullptr); + + auto err_data = static_cast<const asapo::ConsumerErrorData*>(err->GetCustomData()); + + ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kStreamFinished)); + ASSERT_THAT(err_data->id, Eq(1)); + ASSERT_THAT(err_data->id_max, Eq(1)); + ASSERT_THAT(err_data->next_substream, Eq(expected_next_substream)); } TEST_F(ServerDataBrokerTests, GetImageReturnsNoDataFromHttpClient) { @@ -220,13 +240,16 @@ TEST_F(ServerDataBrokerTests, GetImageReturnsNoDataFromHttpClient) { EXPECT_CALL(mock_http_client, Get_t(HasSubstr("next"), _, _)).WillOnce(DoAll( SetArgPointee<1>(HttpCode::Conflict), SetArgPointee<2>(nullptr), - Return("{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":2}"))); + Return("{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":2,\"next_substream\":\"""\"}"))); + auto err = data_broker->GetNext(&info, expected_group_id, nullptr); auto err_data = static_cast<const asapo::ConsumerErrorData*>(err->GetCustomData()); ASSERT_THAT(err_data->id, Eq(1)); ASSERT_THAT(err_data->id_max, Eq(2)); + ASSERT_THAT(err_data->next_substream, Eq("")); + ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kNoData)); } @@ -316,7 +339,7 @@ TEST_F(ServerDataBrokerTests, GetImageReturnsEofStreamFromHttpClientUntilTimeout EXPECT_CALL(mock_http_client, Get_t(HasSubstr("next"), _, _)).Times(AtLeast(2)).WillRepeatedly(DoAll( SetArgPointee<1>(HttpCode::Conflict), SetArgPointee<2>(nullptr), - Return("{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":1}"))); + Return("{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":1,\"next_substream\":\"""\"}"))); data_broker->SetTimeout(300); auto err = data_broker->GetNext(&info, expected_group_id, nullptr); @@ -331,7 +354,8 @@ TEST_F(ServerDataBrokerTests, GetImageReturnsNoDataAfterTimeoutEvenIfOtherErrorO EXPECT_CALL(mock_http_client, Get_t(HasSubstr("next"), _, _)).WillOnce(DoAll( SetArgPointee<1>(HttpCode::Conflict), SetArgPointee<2>(nullptr), - Return("{\"op\":\"get_record_by_id\",\"id\":" + std::to_string(expected_dataset_id) + ",\"id_max\":2}"))); + Return("{\"op\":\"get_record_by_id\",\"id\":" + std::to_string(expected_dataset_id) + + ",\"id_max\":2,\"next_substream\":\"""\"}"))); EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/default/" + expected_group_id + "/" + std::to_string(expected_dataset_id) + "?token=" @@ -387,6 +411,20 @@ TEST_F(ServerDataBrokerTests, GetNextRetriesIfConnectionHttpClientErrorUntilTime ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kUnavailableService)); } +TEST_F(ServerDataBrokerTests, GetNextImageReturnsImmediatelyOnFinshedSubstream) { + MockGetBrokerUri(); + + EXPECT_CALL(mock_http_client, Get_t(HasSubstr("next"), _, _)).WillOnce(DoAll( + SetArgPointee<1>(HttpCode::Conflict), + SetArgPointee<2>(nullptr), + Return("{\"op\":\"get_record_by_id\",\"id\":2,\"id_max\":2,\"next_substream\":\"next\"}"))); + + data_broker->SetTimeout(300); + auto err = data_broker->GetNext(&info, expected_group_id, nullptr); + + ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kStreamFinished)); +} + TEST_F(ServerDataBrokerTests, GetImageReturnsFileInfo) { MockGetBrokerUri(); @@ -663,7 +701,7 @@ TEST_F(ServerDataBrokerTests, GetByIdReturnsEndOfStream) { + expected_token, _, _)).WillOnce(DoAll( SetArgPointee<1>(HttpCode::Conflict), SetArgPointee<2>(nullptr), - Return("{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":1}"))); + Return("{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":1,\"next_substream\":\"""\"}"))); auto err = data_broker->GetById(expected_dataset_id, &info, expected_group_id, nullptr); @@ -680,7 +718,7 @@ TEST_F(ServerDataBrokerTests, GetByIdReturnsEndOfStreamWhenIdTooLarge) { + expected_token, _, _)).WillOnce(DoAll( SetArgPointee<1>(HttpCode::Conflict), SetArgPointee<2>(nullptr), - Return("{\"op\":\"get_record_by_id\",\"id\":100,\"id_max\":1}"))); + Return("{\"op\":\"get_record_by_id\",\"id\":100,\"id_max\":1,\"next_substream\":\"""\"}"))); auto err = data_broker->GetById(expected_dataset_id, &info, expected_group_id, nullptr); diff --git a/producer/api/cpp/include/producer/producer.h b/producer/api/cpp/include/producer/producer.h index 78fc4ac91ba6d8cb23d9aefd587e6e8e1d1b85f6..7add05090c1f0858f0044990b0d99c4de33318e2 100644 --- a/producer/api/cpp/include/producer/producer.h +++ b/producer/api/cpp/include/producer/producer.h @@ -77,6 +77,16 @@ class Producer { uint64_t ingest_mode, RequestCallback callback) = 0; + //! Marks substream finished + /*! + \param substream - Name of the substream to makr finished + \param last_id - ID of the last image in substream + \param next_substream - Name of the next substream (empty if not set) + \return Error - Will be nullptr on success + */ + virtual Error SendSubstreamFinishedFlag(std::string substream, uint64_t last_id, std::string next_substream, + RequestCallback callback) = 0; + //! Sends metadata for the current beamtime to the receiver /*! diff --git a/producer/api/cpp/src/producer_impl.cpp b/producer/api/cpp/src/producer_impl.cpp index cf4e60f9014ce90ca39cfab67a6a93a0ad397f1c..12c3b03351abef6692466f734270c405fc2eecbd 100644 --- a/producer/api/cpp/src/producer_impl.cpp +++ b/producer/api/cpp/src/producer_impl.cpp @@ -14,6 +14,8 @@ namespace asapo { const size_t ProducerImpl::kDiscoveryServiceUpdateFrequencyMs = 10000; // 10s +const std::string ProducerImpl::kFinishSubStreamKeyword = "asapo_finish_substream"; +const std::string ProducerImpl::kNoNextSubStreamKeyword = "asapo_no_next"; ProducerImpl::ProducerImpl(std::string endpoint, uint8_t n_processing_threads, asapo::RequestHandlerType type): @@ -127,6 +129,18 @@ Error ProducerImpl::SendData(const EventHeader& event_header, } +Error ProducerImpl::SendSubstreamFinishedFlag(std::string substream, uint64_t last_id, std::string next_substream, + RequestCallback callback) { + EventHeader event_header; + event_header.file_name = kFinishSubStreamKeyword; + event_header.file_size = 0; + event_header.file_id = last_id + 1; + if (next_substream.empty()) { + next_substream = kNoNextSubStreamKeyword; + } + event_header.user_metadata = std::string("{\"next_substream\":") + "\"" + next_substream + "\"}"; + return Send(event_header, std::move(substream), nullptr, "", IngestModeFlags::kTransferMetaDataOnly, callback, true); +} Error ProducerImpl::SendFile(const EventHeader& event_header, std::string full_path, uint64_t ingest_mode, RequestCallback callback) { @@ -226,5 +240,4 @@ Error ProducerImpl::SendFile(const EventHeader& event_header, } - } \ No newline at end of file diff --git a/producer/api/cpp/src/producer_impl.h b/producer/api/cpp/src/producer_impl.h index 57e3f890428849ca5b1535c95b403956d569f362..c54644bdb7cabdfeebbe86861719201da7940429 100644 --- a/producer/api/cpp/src/producer_impl.h +++ b/producer/api/cpp/src/producer_impl.h @@ -19,6 +19,8 @@ class ProducerImpl : public Producer { std::unique_ptr<RequestHandlerFactory> request_handler_factory_; public: static const size_t kDiscoveryServiceUpdateFrequencyMs; + static const std::string kFinishSubStreamKeyword; + static const std::string kNoNextSubStreamKeyword; explicit ProducerImpl(std::string endpoint, uint8_t n_processing_threads, asapo::RequestHandlerType type); ProducerImpl(const ProducerImpl&) = delete; @@ -40,6 +42,10 @@ class ProducerImpl : public Producer { Error SendFile(const EventHeader& event_header, std::string substream, std::string full_path, uint64_t ingest_mode, RequestCallback callback) override; + Error SendSubstreamFinishedFlag(std::string substream, uint64_t last_id, std::string next_substream, + RequestCallback callback) override; + + AbstractLogger* log__; std::unique_ptr<RequestPool> request_pool__; diff --git a/producer/api/cpp/unittests/test_producer_impl.cpp b/producer/api/cpp/unittests/test_producer_impl.cpp index bac3a64cc87cc806354bd87178d8077d3709cca8..16caa956b7bc2ac27faf33ba608d286e671e1e81 100644 --- a/producer/api/cpp/unittests/test_producer_impl.cpp +++ b/producer/api/cpp/unittests/test_producer_impl.cpp @@ -70,6 +70,7 @@ class ProducerImplTests : public testing::Test { char expected_name[asapo::kMaxMessageSize] = "test_name"; char expected_substream[asapo::kMaxMessageSize] = "test_substream"; + std::string expected_next_substream = "next_substream"; asapo::SourceCredentials expected_credentials{ "beamtime_id", "subname", "token" @@ -223,6 +224,56 @@ TEST_F(ProducerImplTests, OKSendingSendDataRequestWithSubstream) { ASSERT_THAT(err, Eq(nullptr)); } +TEST_F(ProducerImplTests, OKSendingSubstreamFinish) { + producer.SetCredentials(expected_credentials); + + std::string next_stream_meta = std::string("{\"next_substream\":") + "\"" + expected_next_substream + "\"}"; + + + EXPECT_CALL(mock_pull, AddRequest_t(M_CheckSendDataRequest(asapo::kOpcodeTransferData, + expected_credentials_str, + next_stream_meta.c_str(), + expected_id + 1, + 0, + asapo::ProducerImpl::kFinishSubStreamKeyword.c_str(), + expected_substream, + asapo::IngestModeFlags::kTransferMetaDataOnly, + 0, + 0 + ))).WillOnce(Return( + nullptr)); + + auto err = producer.SendSubstreamFinishedFlag(expected_substream, expected_id, expected_next_substream, nullptr); + + ASSERT_THAT(err, Eq(nullptr)); +} + +TEST_F(ProducerImplTests, OKSendingSubstreamFinishWithNoNextStream) { + producer.SetCredentials(expected_credentials); + + std::string next_stream_meta = std::string("{\"next_substream\":") + "\"" + asapo::ProducerImpl::kNoNextSubStreamKeyword + + "\"}"; + + + EXPECT_CALL(mock_pull, AddRequest_t(M_CheckSendDataRequest(asapo::kOpcodeTransferData, + expected_credentials_str, + next_stream_meta.c_str(), + expected_id + 1, + 0, + asapo::ProducerImpl::kFinishSubStreamKeyword.c_str(), + expected_substream, + asapo::IngestModeFlags::kTransferMetaDataOnly, + 0, + 0 + ))).WillOnce(Return( + nullptr)); + + auto err = producer.SendSubstreamFinishedFlag(expected_substream, expected_id, "", nullptr); + + ASSERT_THAT(err, Eq(nullptr)); +} + + TEST_F(ProducerImplTests, OKSendingSendSubsetDataRequest) { producer.SetCredentials(expected_credentials); diff --git a/tests/automatic/full_chain/CMakeLists.txt b/tests/automatic/full_chain/CMakeLists.txt index 6e201e52edddea14673da0f2a13ce61c4c50153b..c177a9f4a17ef05167f054ace379ada74f268945 100644 --- a/tests/automatic/full_chain/CMakeLists.txt +++ b/tests/automatic/full_chain/CMakeLists.txt @@ -10,4 +10,5 @@ add_subdirectory(simple_chain_filegen_batches) add_subdirectory(simple_chain_filegen_multisource) add_subdirectory(simple_chain_filegen_readdata_cache) add_subdirectory(simple_chain_filegen_readdata_file) -add_subdirectory(simple_chain_dataset) \ No newline at end of file +add_subdirectory(simple_chain_dataset) +add_subdirectory(send_recv_substreams) \ No newline at end of file diff --git a/tests/automatic/full_chain/send_recv_substreams/CMakeLists.txt b/tests/automatic/full_chain/send_recv_substreams/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..2eff5008291dff153400eb9194ff409f014dadf3 --- /dev/null +++ b/tests/automatic/full_chain/send_recv_substreams/CMakeLists.txt @@ -0,0 +1,15 @@ +set(TARGET_NAME send_recv_substreams) +set(SOURCE_FILES send_recv_substreams.cpp) + +add_executable(${TARGET_NAME} ${SOURCE_FILES}) +target_link_libraries(${TARGET_NAME} asapo-consumer asapo-producer) + +#use expression generator to get rid of VS adding Debug/Release folders +set_target_properties(${TARGET_NAME} PROPERTIES RUNTIME_OUTPUT_DIRECTORY + ${CMAKE_CURRENT_BINARY_DIR}$<$<CONFIG:Debug>:> + ) + +prepare_asapo() + +add_script_test("${TARGET_NAME}" "${CMAKE_CURRENT_BINARY_DIR}/${TARGET_NAME}" nomem) + diff --git a/tests/automatic/full_chain/send_recv_substreams/check_linux.sh b/tests/automatic/full_chain/send_recv_substreams/check_linux.sh new file mode 100644 index 0000000000000000000000000000000000000000..4defc53a4b89f197faaab7e5fe37a850d6360a1c --- /dev/null +++ b/tests/automatic/full_chain/send_recv_substreams/check_linux.sh @@ -0,0 +1,35 @@ +#!/usr/bin/env bash + +source_path=. +beamtime_id=asapo_test +stream_in=detector + +indatabase_name=${beamtime_id}_${stream_in} +token=IEfwsWa0GXky2S3MkxJSUHJT1sI8DD5teRdjBUXVRxk= + +beamline=test + +set -e + +trap Cleanup EXIT + +Cleanup() { + set +e + nomad stop nginx + nomad run nginx_kill.nmd && nomad stop -yes -purge nginx_kill + nomad stop discovery + nomad stop broker + nomad stop receiver + nomad stop authorizer + echo "db.dropDatabase()" | mongo ${indatabase_name} +} + +nomad run nginx.nmd +nomad run discovery.nmd +nomad run broker.nmd +nomad run receiver.nmd +nomad run authorizer.nmd + + +$1 127.0.0.1:8400 $beamtime_id $token > out +cat out \ No newline at end of file diff --git a/tests/automatic/full_chain/send_recv_substreams/check_windows.bat b/tests/automatic/full_chain/send_recv_substreams/check_windows.bat new file mode 100644 index 0000000000000000000000000000000000000000..c4c583cef01d322b6675931661795d3b21cfd12b --- /dev/null +++ b/tests/automatic/full_chain/send_recv_substreams/check_windows.bat @@ -0,0 +1,36 @@ +SET source_path=. +SET beamtime_id=asapo_test +SET stream_in=detector + +SET indatabase_name=%beamtime_id%_%stream_in% + +SET token=IEfwsWa0GXky2S3MkxJSUHJT1sI8DD5teRdjBUXVRxk= + +SET beamline=test + +SET mongo_exe="c:\Program Files\MongoDB\Server\3.6\bin\mongo.exe" + + +c:\opt\consul\nomad run discovery.nmd +c:\opt\consul\nomad run broker.nmd +c:\opt\consul\nomad run nginx.nmd +c:\opt\consul\nomad run receiver.nmd +c:\opt\consul\nomad run authorizer.nmd + +"%1" 127.0.0.1:8400 %beamtime_id% %token% + +goto :clean + +:error +call :clean +exit /b 1 + +:clean +c:\opt\consul\nomad stop discovery +c:\opt\consul\nomad stop broker +c:\opt\consul\nomad stop nginx +c:\opt\consul\nomad run nginx_kill.nmd && c:\opt\consul\nomad stop -yes -purge nginx_kill +c:\opt\consul\nomad stop receiver +c:\opt\consul\nomad stop authorizer + +echo db.dropDatabase() | %mongo_exe% %indatabase_name% diff --git a/tests/automatic/full_chain/send_recv_substreams/send_recv_substreams.cpp b/tests/automatic/full_chain/send_recv_substreams/send_recv_substreams.cpp new file mode 100644 index 0000000000000000000000000000000000000000..fb00a99973a6664deb2e77441489379330c8517c --- /dev/null +++ b/tests/automatic/full_chain/send_recv_substreams/send_recv_substreams.cpp @@ -0,0 +1,109 @@ +#include <iostream> +#include <memory> +#include <vector> +#include <algorithm> +#include <thread> +#include <chrono> +#include <iomanip> +#include <numeric> +#include <mutex> +#include <string> +#include <sstream> + +#include "asapo_consumer.h" +#include "asapo_producer.h" + +using asapo::Error; +using BrokerPtr = std::unique_ptr<asapo::DataBroker>; +using ProducerPtr = std::unique_ptr<asapo::Producer>; +std::string group_id = ""; + +int files_sent; + +struct Args { + std::string server; + std::string beamtime_id; + std::string token; +}; + +void ProcessAfterSend(asapo::GenericRequestHeader header, asapo::Error err) { + if (err) { + std::cerr << "Data was not successfully send: " << err << std::endl; + return; + } + files_sent++; +} + +BrokerPtr CreateBrokerAndGroup(const Args& args, Error* err) { + auto broker = asapo::DataBrokerFactory::CreateServerBroker(args.server, ".", + asapo::SourceCredentials{args.beamtime_id, "", args.token}, err); + if (*err) { + return nullptr; + } + + broker->SetTimeout(10000); + + if (group_id.empty()) { + group_id = broker->GenerateNewGroupId(err); + if (*err) { + return nullptr; + } + } + return broker; +} + +ProducerPtr CreateProducer(const Args& args) { + asapo::Error err; + auto producer = asapo::Producer::Create(args.server, 1, + asapo::RequestHandlerType::kTcp, + asapo::SourceCredentials{args.beamtime_id, "", args.token }, &err); + if(err) { + std::cerr << "Cannot start producer. ProducerError: " << err << std::endl; + exit(EXIT_FAILURE); + } + + producer->EnableLocalLog(true); + producer->SetLogLevel(asapo::LogLevel::Info); + return producer; +} + +int main(int argc, char* argv[]) { + asapo::ExitAfterPrintVersionIfNeeded("GetNext Broker Example", argc, argv); + Args args; + if (argc != 4) { + std::cout << "Usage: " + std::string{argv[0]} + + " <server> <beamtime_id> <token>" + << + std::endl; + exit(EXIT_FAILURE); + } + args.server = std::string{argv[1]}; + args.beamtime_id = std::string{argv[2]}; + args.token = std::string{argv[3]}; + auto producer = CreateProducer(args); + + auto n = 1; + + for (uint64_t i = 0; i < n; i++) { + asapo::EventHeader event_header{i + 1, 0, std::to_string(i + 1)}; + producer->SendData(event_header, "substream1", nullptr, asapo::kTransferMetaDataOnly, ProcessAfterSend); + } + producer->SendSubstreamFinishedFlag("substream1", n, "substream2", ProcessAfterSend); + producer->WaitRequestsFinished(10000); + + Error err; + auto consumer = CreateBrokerAndGroup(args, &err); + + asapo::FileInfo fi; + for (uint64_t i = 0; i < n; i++) { + consumer->GetNext(&fi, group_id, "substream1", nullptr); + } + + err = consumer->GetNext(&fi, group_id, "substream1", nullptr); + if (err != asapo::ConsumerErrorTemplates::kStreamFinished) { + return 1; + } + auto err_data = static_cast<const asapo::ConsumerErrorData*>(err->GetCustomData()); + + return (err_data->next_substream == "substream2") && (files_sent == n + 1) ? 0 : 1; +}