From f685e9da4e3ec91ada57d5adbf457e2704897794 Mon Sep 17 00:00:00 2001 From: karnem <mikhail.karnevskiy@desy.de> Date: Wed, 1 Nov 2023 11:56:09 +0100 Subject: [PATCH] Add api to return list of datasources for given beamtime. --- broker/src/asapo_broker/database/mongodb.go | 20 +++++++++++++++++++ .../src/asapo_broker/database/mongodb_test.go | 16 +++++++++++++++ .../asapo_broker/server/get_commands_test.go | 1 + .../asapo_broker/server/get_datasources.go | 9 +++++++++ broker/src/asapo_broker/server/listroutes.go | 6 ++++++ .../cpp/include/asapo/common/data_structs.h | 2 ++ .../api/cpp/include/asapo/consumer/consumer.h | 6 ++++++ consumer/api/cpp/src/consumer_impl.cpp | 16 +++++++++++++++ consumer/api/cpp/src/consumer_impl.h | 2 ++ .../api/cpp/unittests/test_consumer_impl.cpp | 20 +++++++++++++++++++ consumer/api/python/asapo_consumer.pxd | 4 ++++ consumer/api/python/asapo_consumer.pyx.in | 14 +++++++++++++ 12 files changed, 116 insertions(+) create mode 100644 broker/src/asapo_broker/server/get_datasources.go diff --git a/broker/src/asapo_broker/database/mongodb.go b/broker/src/asapo_broker/database/mongodb.go index fe5046e5e..e05266555 100644 --- a/broker/src/asapo_broker/database/mongodb.go +++ b/broker/src/asapo_broker/database/mongodb.go @@ -53,6 +53,10 @@ type Nacks struct { Unacknowledged []int `json:"unacknowledged"` } +type Sources struct { + Datasources []string `json:"sources"` +} + type LastAck struct { ID int `bson:"_id" json:"lastAckId"` } @@ -1212,6 +1216,20 @@ func (db *Mongodb) getStreams(request Request) ([]byte, error) { return json.Marshal(&rec) } +func (db *Mongodb) getDatasources(request Request) ([]byte, error) { + + filter := bson.M{"name": primitive.Regex{Pattern: "^" + request.Beamtime + "_"}} + rec, err := db.client.ListDatabaseNames(context.TODO(), filter) + if err != nil { + return db.processQueryError("get streams", request.DbName(), err, request.Logger()) + } + for i, source := range rec { + rec[i] = source[len(request.Beamtime)+1:] + } + res := Sources{rec} + return json.Marshal(&res) +} + func (db *Mongodb) ProcessRequest(request Request) (answer []byte, err error) { dbClientLock.RLock() defer dbClientLock.RUnlock() @@ -1243,6 +1261,8 @@ func (db *Mongodb) ProcessRequest(request Request) (answer []byte, err error) { return db.queryMessages(request) case "streams": return db.getStreams(request) + case "datasources": + return db.getDatasources(request) case "ackmessage": return db.ackRecord(request) case "negackmessage": diff --git a/broker/src/asapo_broker/database/mongodb_test.go b/broker/src/asapo_broker/database/mongodb_test.go index e9c2fca30..e498509a5 100644 --- a/broker/src/asapo_broker/database/mongodb_test.go +++ b/broker/src/asapo_broker/database/mongodb_test.go @@ -617,6 +617,22 @@ func TestMongoDBGetMetaBtOK(t *testing.T) { assert.Equal(t, string(rec_expect), string(res)) } +func TestMongoDBGetDatasets(t *testing.T) { + db.Connect(dbaddress) + defer cleanup() + + db.insertRecord(beamtime+"_test01", collection, &rec_dataset1) + db.insertRecord(beamtime+"_test02", collection, &rec_dataset1) + db.insertRecord(beamtime+"_test03", collection, &rec_dataset1) + db.insertRecord("_test04", collection, &rec_dataset1) + + res, err := db.ProcessRequest(Request{Beamtime: beamtime, DataSource: datasource, Stream: "whatever", Op: "datasources", ExtraParam: "0"}) + + assert.Nil(t, err) + assert.Equal(t, "{\"sources\":[\"test01\",\"test02\",\"test03\"]}", string(res)) + +} + func TestMongoDBGetMetaStOK(t *testing.T) { db.Connect(dbaddress) defer cleanup() diff --git a/broker/src/asapo_broker/server/get_commands_test.go b/broker/src/asapo_broker/server/get_commands_test.go index 9ccc2ea5b..3508bcbd0 100644 --- a/broker/src/asapo_broker/server/get_commands_test.go +++ b/broker/src/asapo_broker/server/get_commands_test.go @@ -55,6 +55,7 @@ var testsGetCommand = []struct { {"size", expectedSource, expectedStream, "", expectedStream + "/size", "&incomplete=true", "true"}, {"streams", expectedSource, "0", "", "0/streams", "", "{\"from\":\"\",\"filter\":\"\",\"detailed\":\"\"}"}, {"lastack", expectedSource, expectedStream, expectedGroupID, expectedStream + "/" + expectedGroupID + "/lastack", "", ""}, + {"datasources", expectedSource, "0", "", "0/sources", "", ""}, } func (suite *GetCommandsTestSuite) TestGetCommandsCallsCorrectRoutine() { diff --git a/broker/src/asapo_broker/server/get_datasources.go b/broker/src/asapo_broker/server/get_datasources.go new file mode 100644 index 000000000..a8062583e --- /dev/null +++ b/broker/src/asapo_broker/server/get_datasources.go @@ -0,0 +1,9 @@ +package server + +import ( + "net/http" +) + +func routeGetDatasources(w http.ResponseWriter, r *http.Request) { + processRequest(w, r, "datasources", "", false) +} diff --git a/broker/src/asapo_broker/server/listroutes.go b/broker/src/asapo_broker/server/listroutes.go index 885bb1800..f66631806 100644 --- a/broker/src/asapo_broker/server/listroutes.go +++ b/broker/src/asapo_broker/server/listroutes.go @@ -23,6 +23,12 @@ var listRoutes = utils.Routes{ "/{apiver}/beamtime/{beamtime}/{datasource}/{stream}/streams", routeGetStreams, }, + utils.Route{ + "GetDatasources", + "Get", + "/{apiver}/beamtime/{beamtime}/{datasource}/{stream}/sources", + routeGetDatasources, + }, utils.Route{ "DeleteStream", "Post", diff --git a/common/cpp/include/asapo/common/data_structs.h b/common/cpp/include/asapo/common/data_structs.h index be8b8497c..a54b7d30c 100644 --- a/common/cpp/include/asapo/common/data_structs.h +++ b/common/cpp/include/asapo/common/data_structs.h @@ -75,6 +75,8 @@ using MessageMetas = std::vector<MessageMeta>; using IdList = std::vector<uint64_t>; +using SourceList = std::vector<std::string>; + struct DataSet { uint64_t id; uint64_t expected_size; diff --git a/consumer/api/cpp/include/asapo/consumer/consumer.h b/consumer/api/cpp/include/asapo/consumer/consumer.h index fbc6bebec..8f923a854 100644 --- a/consumer/api/cpp/include/asapo/consumer/consumer.h +++ b/consumer/api/cpp/include/asapo/consumer/consumer.h @@ -94,6 +94,12 @@ class Consumer { virtual StreamInfos GetStreamList(std::string from, StreamFilter filter, Error* err) = 0; virtual StreamInfos GetStreamList(std::string from, StreamFilter filter, bool detailed, Error* err) = 0; + //! Get list of datasources for beamtime given in the consumer constructor + /*! + \param err - return nullptr if operation succeed, error otherwise. + */ + virtual SourceList GetSourceList(Error* err) = 0; + //! Delete stream /*! \param stream - stream to send messages to diff --git a/consumer/api/cpp/src/consumer_impl.cpp b/consumer/api/cpp/src/consumer_impl.cpp index 1f04b8036..0bc560da2 100644 --- a/consumer/api/cpp/src/consumer_impl.cpp +++ b/consumer/api/cpp/src/consumer_impl.cpp @@ -856,6 +856,22 @@ StreamInfos ConsumerImpl::GetStreamList(std::string from, StreamFilter filter, b return ParseStreamsFromResponse(std::move(response), err); } +SourceList ConsumerImpl::GetSourceList(Error* err) { + RequestInfo ri = CreateBrokerApiRequest("0", "", "sources"); + ri.post = false; + + auto json_string = BrokerRequestWithTimeout(ri, err); + if (*err) { + return SourceList{}; + } + SourceList list; + JsonStringParser parser(json_string); + if ((*err = parser.GetArrayString("sources", &list))) { + return SourceList{}; + } + return list; +} + RequestInfo ConsumerImpl::GetStreamListRequest(const std::string& from, const StreamFilter& filter, bool detailed) const { RequestInfo ri = CreateBrokerApiRequest("0", "", "streams"); ri.post = false; diff --git a/consumer/api/cpp/src/consumer_impl.h b/consumer/api/cpp/src/consumer_impl.h index f8556e58c..0ed900bd2 100644 --- a/consumer/api/cpp/src/consumer_impl.h +++ b/consumer/api/cpp/src/consumer_impl.h @@ -111,6 +111,8 @@ class ConsumerImpl final : public asapo::Consumer { StreamInfos GetStreamList(std::string from, StreamFilter filter, bool detailed, Error* err) override; StreamInfos GetStreamList(std::string from, StreamFilter filter, Error* err); + SourceList GetSourceList(Error* err); + void SetResendNacs(bool resend, uint64_t delay_ms, uint64_t resend_attempts) override; virtual void InterruptCurrentOperation() override; diff --git a/consumer/api/cpp/unittests/test_consumer_impl.cpp b/consumer/api/cpp/unittests/test_consumer_impl.cpp index 81972bdd9..e0ac76256 100644 --- a/consumer/api/cpp/unittests/test_consumer_impl.cpp +++ b/consumer/api/cpp/unittests/test_consumer_impl.cpp @@ -1274,6 +1274,26 @@ TEST_F(ConsumerImplTests, DeleteStreamUsesCorrectUri) { } +TEST_F(ConsumerImplTests, GetSourceListUsesCorrectUri) { + MockGetBrokerUri(); + std::string return_sources = std::string("{\"sources\":[\"test_source\"]}"); + EXPECT_CALL(mock_http_client, + Get_t(expected_broker_api + "/beamtime/beamtime_id/" + expected_data_source_encoded + "/0/sources" + + "?token=" + expected_token + + "&instanceid=" + expected_instance_id_encoded + "&pipelinestep=" + expected_pipeline_step_encoded, _, + _)).WillOnce(DoAll( + SetArgPointee<1>(HttpCode::OK), + SetArgPointee<2>(nullptr), + Return(return_sources))); + + asapo::Error err; + auto sources = consumer->GetSourceList(&err); + ASSERT_THAT(err, Eq(nullptr)); + ASSERT_THAT(sources.size(), Eq(1)); + ASSERT_THAT(sources.size(), 1); + +} + TEST_F(ConsumerImplTests, GetStreamListUsesCorrectUri) { MockGetBrokerUri(); std::string return_streams = diff --git a/consumer/api/python/asapo_consumer.pxd b/consumer/api/python/asapo_consumer.pxd index 9558fc785..29761ad2a 100644 --- a/consumer/api/python/asapo_consumer.pxd +++ b/consumer/api/python/asapo_consumer.pxd @@ -34,6 +34,9 @@ cdef extern from "asapo/asapo_consumer.h" namespace "asapo": cppclass IdList: vector[uint64_t].iterator begin() vector[uint64_t].iterator end() + cppclass SourceList: + vector[string].iterator begin() + vector[string].iterator end() cppclass MessageMetas: vector[MessageMeta].iterator begin() vector[MessageMeta].iterator end() @@ -94,6 +97,7 @@ cdef extern from "asapo/asapo_consumer.h" namespace "asapo" nogil: DataSet GetDatasetById(uint64_t id, uint64_t min_size, string stream, Error* err) Error RetrieveData(MessageMeta* info, MessageData* data) vector[StreamInfo] GetStreamList(string from_stream, StreamFilter filter, bool detailed, Error* err) + SourceList GetSourceList(Error* err) void SetResendNacs(bool resend, uint64_t delay_ms, uint64_t resend_attempts) void InterruptCurrentOperation() Error GetVersionInfo(string* client_info,string* server_info, bool* supported) diff --git a/consumer/api/python/asapo_consumer.pyx.in b/consumer/api/python/asapo_consumer.pyx.in index 5c8b8b4a0..836243d9f 100644 --- a/consumer/api/python/asapo_consumer.pyx.in +++ b/consumer/api/python/asapo_consumer.pyx.in @@ -284,6 +284,20 @@ cdef class PyConsumer: for stream in streams: list.append(json.loads(_str(stream.Json()))) return list + def get_source_list(self): + """ + Return list of datasources for beamtime given in consumer constructor + """ + cdef Error err + with nogil: + sources = self.c_consumer.get().GetSourceList(&err) + if err: + throw_exception(err) + list = [] + for source in sources: + list.append(source.decode('utf-8')) + return list + def acknowledge(self, group_id, uint64_t id, stream = "default"): cdef string b_group_id = _bytes(group_id) cdef string b_stream = _bytes(stream) -- GitLab