diff --git a/broker/src/asapo_broker/database/mongodb.go b/broker/src/asapo_broker/database/mongodb.go index fe5046e5e49c7926561036973106fa528b80aaf9..e052665556e0bb703077a21d35b3c47214840b59 100644 --- a/broker/src/asapo_broker/database/mongodb.go +++ b/broker/src/asapo_broker/database/mongodb.go @@ -53,6 +53,10 @@ type Nacks struct { Unacknowledged []int `json:"unacknowledged"` } +type Sources struct { + Datasources []string `json:"sources"` +} + type LastAck struct { ID int `bson:"_id" json:"lastAckId"` } @@ -1212,6 +1216,20 @@ func (db *Mongodb) getStreams(request Request) ([]byte, error) { return json.Marshal(&rec) } +func (db *Mongodb) getDatasources(request Request) ([]byte, error) { + + filter := bson.M{"name": primitive.Regex{Pattern: "^" + request.Beamtime + "_"}} + rec, err := db.client.ListDatabaseNames(context.TODO(), filter) + if err != nil { + return db.processQueryError("get streams", request.DbName(), err, request.Logger()) + } + for i, source := range rec { + rec[i] = source[len(request.Beamtime)+1:] + } + res := Sources{rec} + return json.Marshal(&res) +} + func (db *Mongodb) ProcessRequest(request Request) (answer []byte, err error) { dbClientLock.RLock() defer dbClientLock.RUnlock() @@ -1243,6 +1261,8 @@ func (db *Mongodb) ProcessRequest(request Request) (answer []byte, err error) { return db.queryMessages(request) case "streams": return db.getStreams(request) + case "datasources": + return db.getDatasources(request) case "ackmessage": return db.ackRecord(request) case "negackmessage": diff --git a/broker/src/asapo_broker/database/mongodb_test.go b/broker/src/asapo_broker/database/mongodb_test.go index e9c2fca30140b26fd6af4e81f269a22914e28264..e498509a53638d92d65ee669ab6da740892366ab 100644 --- a/broker/src/asapo_broker/database/mongodb_test.go +++ b/broker/src/asapo_broker/database/mongodb_test.go @@ -617,6 +617,22 @@ func TestMongoDBGetMetaBtOK(t *testing.T) { assert.Equal(t, string(rec_expect), string(res)) } +func TestMongoDBGetDatasets(t *testing.T) { + db.Connect(dbaddress) + defer cleanup() + + db.insertRecord(beamtime+"_test01", collection, &rec_dataset1) + db.insertRecord(beamtime+"_test02", collection, &rec_dataset1) + db.insertRecord(beamtime+"_test03", collection, &rec_dataset1) + db.insertRecord("_test04", collection, &rec_dataset1) + + res, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: "whatever", Op: "datasources", ExtraParam: "0"}) + + assert.Nil(t, err) + assert.Equal(t, "{\"sources\":[\"test01\",\"test02\",\"test03\"]}", string(res)) + +} + func TestMongoDBGetMetaStOK(t *testing.T) { db.Connect(dbaddress) defer cleanup() diff --git a/broker/src/asapo_broker/server/get_commands_test.go b/broker/src/asapo_broker/server/get_commands_test.go index 9ccc2ea5b24f74a2e9945b9021d2c568a57aef4b..3508bcbd050e81ac72404250a9e7bac0837aaeeb 100644 --- a/broker/src/asapo_broker/server/get_commands_test.go +++ b/broker/src/asapo_broker/server/get_commands_test.go @@ -55,6 +55,7 @@ var testsGetCommand = []struct { {"size", expectedSource, expectedStream, "", expectedStream + "/size", "&incomplete=true", "true"}, {"streams", expectedSource, "0", "", "0/streams", "", "{\"from\":\"\",\"filter\":\"\",\"detailed\":\"\"}"}, {"lastack", expectedSource, expectedStream, expectedGroupID, expectedStream + "/" + expectedGroupID + "/lastack", "", ""}, + {"datasources", expectedSource, "0", "", "0/sources", "", ""}, } func (suite *GetCommandsTestSuite) TestGetCommandsCallsCorrectRoutine() { diff --git a/broker/src/asapo_broker/server/get_datasources.go b/broker/src/asapo_broker/server/get_datasources.go new file mode 100644 index 0000000000000000000000000000000000000000..a8062583ecbaaa808472d2d74dc02c701a7e035c --- /dev/null +++ b/broker/src/asapo_broker/server/get_datasources.go @@ -0,0 +1,9 @@ +package server + +import ( + "net/http" +) + +func routeGetDatasources(w http.ResponseWriter, r *http.Request) { + processRequest(w, r, "datasources", "", false) +} diff --git a/broker/src/asapo_broker/server/listroutes.go b/broker/src/asapo_broker/server/listroutes.go index 885bb180077d127f4238adc0f026ba2a24d62b9a..f66631806382af3ba7497e35b13db407e4d2c67e 100644 --- a/broker/src/asapo_broker/server/listroutes.go +++ b/broker/src/asapo_broker/server/listroutes.go @@ -23,6 +23,12 @@ var listRoutes = utils.Routes{ "/{apiver}/beamtime/{beamtime}/{datasource}/{stream}/streams", routeGetStreams, }, + utils.Route{ + "GetDatasources", + "Get", + "/{apiver}/beamtime/{beamtime}/{datasource}/{stream}/sources", + routeGetDatasources, + }, utils.Route{ "DeleteStream", "Post", diff --git a/common/cpp/include/asapo/common/data_structs.h b/common/cpp/include/asapo/common/data_structs.h index be8b8497c24a2e4c75e691cad8f3caa0c0909d62..a54b7d30cc16ab7c25069f231fd364e79af506c1 100644 --- a/common/cpp/include/asapo/common/data_structs.h +++ b/common/cpp/include/asapo/common/data_structs.h @@ -75,6 +75,8 @@ using MessageMetas = std::vector<MessageMeta>; using IdList = std::vector<uint64_t>; +using SourceList = std::vector<std::string>; + struct DataSet { uint64_t id; uint64_t expected_size; diff --git a/consumer/api/cpp/include/asapo/consumer/consumer.h b/consumer/api/cpp/include/asapo/consumer/consumer.h index fbc6bebec3fc76fab9eb608be238f15f73578afa..8f923a8549edad0acd13ab9e2a0a7da3a7c5810f 100644 --- a/consumer/api/cpp/include/asapo/consumer/consumer.h +++ b/consumer/api/cpp/include/asapo/consumer/consumer.h @@ -94,6 +94,12 @@ class Consumer { virtual StreamInfos GetStreamList(std::string from, StreamFilter filter, Error* err) = 0; virtual StreamInfos GetStreamList(std::string from, StreamFilter filter, bool detailed, Error* err) = 0; + //! Get list of datasources for beamtime given in the consumer constructor + /*! + \param err - return nullptr if operation succeed, error otherwise. + */ + virtual SourceList GetSourceList(Error* err) = 0; + //! Delete stream /*! \param stream - stream to send messages to diff --git a/consumer/api/cpp/src/consumer_impl.cpp b/consumer/api/cpp/src/consumer_impl.cpp index 1f04b8036f9b735ad788fa7e13fa92789be62064..0bc560da2a5531f084588d36c8a5c9adc2e14adb 100644 --- a/consumer/api/cpp/src/consumer_impl.cpp +++ b/consumer/api/cpp/src/consumer_impl.cpp @@ -856,6 +856,22 @@ StreamInfos ConsumerImpl::GetStreamList(std::string from, StreamFilter filter, b return ParseStreamsFromResponse(std::move(response), err); } +SourceList ConsumerImpl::GetSourceList(Error* err) { + RequestInfo ri = CreateBrokerApiRequest("0", "", "sources"); + ri.post = false; + + auto json_string = BrokerRequestWithTimeout(ri, err); + if (*err) { + return SourceList{}; + } + SourceList list; + JsonStringParser parser(json_string); + if ((*err = parser.GetArrayString("sources", &list))) { + return SourceList{}; + } + return list; +} + RequestInfo ConsumerImpl::GetStreamListRequest(const std::string& from, const StreamFilter& filter, bool detailed) const { RequestInfo ri = CreateBrokerApiRequest("0", "", "streams"); ri.post = false; diff --git a/consumer/api/cpp/src/consumer_impl.h b/consumer/api/cpp/src/consumer_impl.h index f8556e58c3799f8af9fcde5a48feb937d7d423c2..0ed900bd27ba64d6db67b294b0743a804ed7b569 100644 --- a/consumer/api/cpp/src/consumer_impl.h +++ b/consumer/api/cpp/src/consumer_impl.h @@ -111,6 +111,8 @@ class ConsumerImpl final : public asapo::Consumer { StreamInfos GetStreamList(std::string from, StreamFilter filter, bool detailed, Error* err) override; StreamInfos GetStreamList(std::string from, StreamFilter filter, Error* err); + SourceList GetSourceList(Error* err); + void SetResendNacs(bool resend, uint64_t delay_ms, uint64_t resend_attempts) override; virtual void InterruptCurrentOperation() override; diff --git a/consumer/api/cpp/unittests/test_consumer_impl.cpp b/consumer/api/cpp/unittests/test_consumer_impl.cpp index 81972bdd9e717ec912068d59debdfa0cd159fb08..e0ac762566983a245bedb87ee7410ccbcb59f3fa 100644 --- a/consumer/api/cpp/unittests/test_consumer_impl.cpp +++ b/consumer/api/cpp/unittests/test_consumer_impl.cpp @@ -1274,6 +1274,26 @@ TEST_F(ConsumerImplTests, DeleteStreamUsesCorrectUri) { } +TEST_F(ConsumerImplTests, GetSourceListUsesCorrectUri) { + MockGetBrokerUri(); + std::string return_sources = std::string("{\"sources\":[\"test_source\"]}"); + EXPECT_CALL(mock_http_client, + Get_t(expected_broker_api + "/beamtime/beamtime_id/" + expected_data_source_encoded + "/0/sources" + + "?token=" + expected_token + + "&instanceid=" + expected_instance_id_encoded + "&pipelinestep=" + expected_pipeline_step_encoded, _, + _)).WillOnce(DoAll( + SetArgPointee<1>(HttpCode::OK), + SetArgPointee<2>(nullptr), + Return(return_sources))); + + asapo::Error err; + auto sources = consumer->GetSourceList(&err); + ASSERT_THAT(err, Eq(nullptr)); + ASSERT_THAT(sources.size(), Eq(1)); + ASSERT_THAT(sources.size(), 1); + +} + TEST_F(ConsumerImplTests, GetStreamListUsesCorrectUri) { MockGetBrokerUri(); std::string return_streams = diff --git a/consumer/api/python/asapo_consumer.pxd b/consumer/api/python/asapo_consumer.pxd index 9558fc785475727981f19ef96d151771a8fa6ece..29761ad2ab0390cf015374779d350b6d7762dd8b 100644 --- a/consumer/api/python/asapo_consumer.pxd +++ b/consumer/api/python/asapo_consumer.pxd @@ -34,6 +34,9 @@ cdef extern from "asapo/asapo_consumer.h" namespace "asapo": cppclass IdList: vector[uint64_t].iterator begin() vector[uint64_t].iterator end() + cppclass SourceList: + vector[string].iterator begin() + vector[string].iterator end() cppclass MessageMetas: vector[MessageMeta].iterator begin() vector[MessageMeta].iterator end() @@ -94,6 +97,7 @@ cdef extern from "asapo/asapo_consumer.h" namespace "asapo" nogil: DataSet GetDatasetById(uint64_t id, uint64_t min_size, string stream, Error* err) Error RetrieveData(MessageMeta* info, MessageData* data) vector[StreamInfo] GetStreamList(string from_stream, StreamFilter filter, bool detailed, Error* err) + SourceList GetSourceList(Error* err) void SetResendNacs(bool resend, uint64_t delay_ms, uint64_t resend_attempts) void InterruptCurrentOperation() Error GetVersionInfo(string* client_info,string* server_info, bool* supported) diff --git a/consumer/api/python/asapo_consumer.pyx.in b/consumer/api/python/asapo_consumer.pyx.in index 5c8b8b4a0bdc426936d035630d0945f3c7aa50d1..836243d9f10f8e16533c8977a7e80991c5af95ff 100644 --- a/consumer/api/python/asapo_consumer.pyx.in +++ b/consumer/api/python/asapo_consumer.pyx.in @@ -284,6 +284,20 @@ cdef class PyConsumer: for stream in streams: list.append(json.loads(_str(stream.Json()))) return list + def get_source_list(self): + """ + Return list of datasources for beamtime given in consumer constructor + """ + cdef Error err + with nogil: + sources = self.c_consumer.get().GetSourceList(&err) + if err: + throw_exception(err) + list = [] + for source in sources: + list.append(source.decode('utf-8')) + return list + def acknowledge(self, group_id, uint64_t id, stream = "default"): cdef string b_group_id = _bytes(group_id) cdef string b_stream = _bytes(stream)