diff --git a/common/cpp/src/http_client/curl_http_client.cpp b/common/cpp/src/http_client/curl_http_client.cpp index ef9b379c1885b77c25e185ab62dd2c41028c1e64..2c0693146a6511e431c893f92b06053e29789b99 100644 --- a/common/cpp/src/http_client/curl_http_client.cpp +++ b/common/cpp/src/http_client/curl_http_client.cpp @@ -109,7 +109,7 @@ Error CurlHttpClient::Command(bool post, CurlDataContainer* data_container, cons FileData AllocateMemory(uint64_t size, Error* err) { FileData data; try { - data = FileData{new uint8_t[(size_t)size+1]}; + data = FileData{new uint8_t[(size_t)size + 1]}; data[size] = 0; // for reinterpret cast to string worked } catch (...) { *err = ErrorTemplates::kMemoryAllocationError.Generate(); diff --git a/consumer/api/cpp/include/consumer/data_broker.h b/consumer/api/cpp/include/consumer/data_broker.h index 539b688187dcb0bef2f9700d7a59d4fac1defa6e..f142bf497d4206b2b5697b1e579c87fa4497ad62 100644 --- a/consumer/api/cpp/include/consumer/data_broker.h +++ b/consumer/api/cpp/include/consumer/data_broker.h @@ -136,6 +136,7 @@ class DataBroker { class DataBrokerFactory { public: static std::unique_ptr<DataBroker> CreateServerBroker(std::string server_name, std::string source_path, + bool has_filesystem, SourceCredentials source, Error* error) noexcept; diff --git a/consumer/api/cpp/src/data_broker.cpp b/consumer/api/cpp/src/data_broker.cpp index 48abcb63e5dc0ed2d694b33ab5291f162fae5aed..1f3b1db63f68a21992a76d3283271c5c96e13007 100644 --- a/consumer/api/cpp/src/data_broker.cpp +++ b/consumer/api/cpp/src/data_broker.cpp @@ -26,9 +26,10 @@ std::unique_ptr<DataBroker> Create(const std::string& source_name, } std::unique_ptr<DataBroker> DataBrokerFactory::CreateServerBroker(std::string server_name, std::string source_path, - SourceCredentials source, + bool has_filesystem, SourceCredentials source, Error* error) noexcept { - return Create<ServerDataBroker>(std::move(server_name), error, std::move(source_path), std::move(source)); + return Create<ServerDataBroker>(std::move(server_name), error, std::move(source_path), has_filesystem, + std::move(source)); } diff --git a/consumer/api/cpp/src/server_data_broker.cpp b/consumer/api/cpp/src/server_data_broker.cpp index bf0c3056624d71cb1e104a750d318425d5af4502..2c468df4acd43d40e60b88b49b14325cc8c265e8 100644 --- a/consumer/api/cpp/src/server_data_broker.cpp +++ b/consumer/api/cpp/src/server_data_broker.cpp @@ -68,10 +68,12 @@ Error ErrorFromServerResponce(const RequestOutput* response, const HttpCode& cod ServerDataBroker::ServerDataBroker(std::string server_uri, std::string source_path, + bool has_filesystem, SourceCredentials source) : io__{GenerateDefaultIO()}, httpclient__{DefaultHttpClient()}, net_client__{new TcpClient()}, -endpoint_{std::move(server_uri)}, source_path_{std::move(source_path)}, source_credentials_(std::move(source)) { + endpoint_{std::move(server_uri)}, source_path_{std::move(source_path)}, has_filesystem_{has_filesystem}, +source_credentials_(std::move(source)) { if (source_credentials_.stream.empty()) { source_credentials_.stream = SourceCredentials::kDefaultStream; @@ -270,6 +272,16 @@ Error ServerDataBroker::GetImageFromServer(GetImageServerOperation op, uint64_t return GetDataIfNeeded(info, data); } +Error ServerDataBroker::GetDataFromFile(FileInfo* info, FileData* data) { + Error error; + *data = io__->GetDataFromFile(info->FullName(source_path_), &info->size, &error); + if (error) { + return ConsumerErrorTemplates::kLocalIOError.Generate(error->Explain()); + } + return nullptr; +} + + Error ServerDataBroker::RetrieveData(FileInfo* info, FileData* data) { if (data == nullptr || info == nullptr) { return ConsumerErrorTemplates::kWrongInput.Generate("pointers are empty"); @@ -283,13 +295,11 @@ Error ServerDataBroker::RetrieveData(FileInfo* info, FileData* data) { } } - Error error; - *data = io__->GetDataFromFile(info->FullName(source_path_), &info->size, &error); - if (error) { - return ConsumerErrorTemplates::kLocalIOError.Generate(error->Explain()); + if (has_filesystem_) { + return GetDataFromFile(info, data); } - return nullptr; + return GetDataFromFileTransferService(info, data); } Error ServerDataBroker::GetDataIfNeeded(FileInfo* info, FileData* data) { @@ -527,4 +537,22 @@ std::vector<std::string> ServerDataBroker::GetSubstreamList(Error* err) { return ParseSubstreamsFromResponse(std::move(response), err); } + +Error ServerDataBroker::GetDataFromFileTransferService(FileInfo* info, FileData* data) { + RequestInfo ri; + ri.host = endpoint_; + ri.api = "/authorizer/folder"; + ri.post = true; + ri.body = "{\"Folder\":\"" + source_path_ + "\",\"Beamtime:\"" + source_credentials_.beamtime_id + ",\"Token\":\"" + + source_credentials_.user_token + "\"}"; + Error err; + RequestOutput output; + err = ProcessRequest(&output, ri, nullptr); + if (err) { + return err; + } + auto folder_token_ = std::move(output.string_output); + return nullptr; +} + } diff --git a/consumer/api/cpp/src/server_data_broker.h b/consumer/api/cpp/src/server_data_broker.h index 2835e8325ffe269cde304d9e0d40e1bf1b3dfcd4..66c8393af68a2b2492f4359084ab49687395702e 100644 --- a/consumer/api/cpp/src/server_data_broker.h +++ b/consumer/api/cpp/src/server_data_broker.h @@ -50,7 +50,8 @@ Error ErrorFromNoDataResponse(const std::string& response); class ServerDataBroker final : public asapo::DataBroker { public: - explicit ServerDataBroker(std::string server_uri, std::string source_path, SourceCredentials source); + explicit ServerDataBroker(std::string server_uri, std::string source_path, bool has_filesystem, + SourceCredentials source); Error ResetLastReadMarker(std::string group_id) override; Error ResetLastReadMarker(std::string group_id, std::string substream) override; @@ -94,6 +95,8 @@ class ServerDataBroker final : public asapo::DataBroker { std::unique_ptr<HttpClient> httpclient__; std::unique_ptr<NetClient> net_client__; private: + Error GetDataFromFileTransferService(FileInfo* info, FileData* data); + Error GetDataFromFile(FileInfo* info, FileData* data); static const std::string kBrokerDerviceName; static const std::string kFileTransferService_name; std::string RequestWithToken(std::string uri); @@ -121,6 +124,7 @@ class ServerDataBroker final : public asapo::DataBroker { std::string endpoint_; std::string current_broker_uri_; std::string source_path_; + bool has_filesystem_; SourceCredentials source_credentials_; uint64_t timeout_ms_ = 0; }; diff --git a/consumer/api/cpp/unittests/test_consumer_api.cpp b/consumer/api/cpp/unittests/test_consumer_api.cpp index d683a4233d2d5eb9b0260ca48ec8d9a5da4da060..90e5b0b0182c91f63a0a1b03d056a809b76c0d63 100644 --- a/consumer/api/cpp/unittests/test_consumer_api.cpp +++ b/consumer/api/cpp/unittests/test_consumer_api.cpp @@ -27,7 +27,7 @@ class DataBrokerFactoryTests : public Test { TEST_F(DataBrokerFactoryTests, CreateServerDataSource) { - auto data_broker = DataBrokerFactory::CreateServerBroker("server", "path", asapo::SourceCredentials{"beamtime_id", "", "", "token"}, &error); + auto data_broker = DataBrokerFactory::CreateServerBroker("server", "path", false, asapo::SourceCredentials{"beamtime_id", "", "", "token"}, &error); ASSERT_THAT(error, Eq(nullptr)); ASSERT_THAT(dynamic_cast<ServerDataBroker*>(data_broker.get()), Ne(nullptr)); diff --git a/consumer/api/cpp/unittests/test_server_broker.cpp b/consumer/api/cpp/unittests/test_server_broker.cpp index 26b9e61ebd8db83c175b4d3eac1ffd8952c12bb6..3c0d58fa9c870cf914aea432e102c900f40c502b 100644 --- a/consumer/api/cpp/unittests/test_server_broker.cpp +++ b/consumer/api/cpp/unittests/test_server_broker.cpp @@ -1,4 +1,5 @@ #include <gmock/gmock.h> +#include <gmock/gmock.h> #include "gtest/gtest.h" #include "consumer/data_broker.h" @@ -44,7 +45,9 @@ namespace { TEST(FolderDataBroker, Constructor) { auto data_broker = - std::unique_ptr<ServerDataBroker> {new ServerDataBroker("test", "path", asapo::SourceCredentials{"beamtime_id", "", "", "token"})}; + std::unique_ptr<ServerDataBroker> {new ServerDataBroker("test", "path", false, + asapo::SourceCredentials{"beamtime_id", "", "", "token"}) + }; ASSERT_THAT(dynamic_cast<asapo::SystemIO*>(data_broker->io__.get()), Ne(nullptr)); ASSERT_THAT(dynamic_cast<asapo::CurlHttpClient*>(data_broker->httpclient__.get()), Ne(nullptr)); ASSERT_THAT(dynamic_cast<asapo::TcpClient*>(data_broker->net_client__.get()), Ne(nullptr)); @@ -59,6 +62,7 @@ class ServerDataBrokerTests : public Test { FileInfo info; std::string expected_server_uri = "test:8400"; std::string expected_broker_uri = "broker:5005"; + std::string expected_fts_uri = "fts:5008"; std::string expected_token = "token"; std::string expected_path = "/tmp/beamline/beamtime"; std::string expected_filename = "filename"; @@ -68,13 +72,15 @@ class ServerDataBrokerTests : public Test { std::string expected_substream = "substream"; std::string expected_metadata = "{\"meta\":1}"; std::string expected_query_string = "bla"; + std::string expected_folder_token = "folder_token"; + std::string expected_beamtime_id = "beamtime_id"; uint64_t expected_dataset_id = 1; static const uint64_t expected_buf_id = 123; std::string expected_next_substream = "nextsubstream"; void SetUp() override { data_broker = std::unique_ptr<ServerDataBroker> { - new ServerDataBroker(expected_server_uri, expected_path, asapo::SourceCredentials{"beamtime_id", "", expected_stream, expected_token}) + new ServerDataBroker(expected_server_uri, expected_path, true, asapo::SourceCredentials{expected_beamtime_id, "", expected_stream, expected_token}) }; data_broker->io__ = std::unique_ptr<IO> {&mock_io}; data_broker->httpclient__ = std::unique_ptr<asapo::HttpClient> {&mock_http_client}; @@ -100,11 +106,23 @@ class ServerDataBrokerTests : public Test { Return("") )); } - void MockGetBrokerUri() { - EXPECT_CALL(mock_http_client, Get_t(HasSubstr(expected_server_uri + "/discovery/broker"), _, _)).WillOnce(DoAll( + void MockGetServiceUri(std::string service, std::string result) { + EXPECT_CALL(mock_http_client, Get_t(HasSubstr(expected_server_uri + "/discovery/" + service), _, _)).WillOnce(DoAll( SetArgPointee<1>(HttpCode::OK), SetArgPointee<2>(nullptr), - Return(expected_broker_uri))); + Return(result))); + } + + void MockBeforeFTS(FileData* data); + + void MockGetFTSUri() { + MockGetServiceUri("fts", expected_fts_uri); + } + + void ExpectFolderToken(); + + void MockGetBrokerUri() { + MockGetServiceUri("broker", expected_broker_uri); } void MockReadDataFromFile(int times = 1) { if (times == 0) { @@ -137,7 +155,7 @@ TEST_F(ServerDataBrokerTests, DefaultStreamIsDetector) { data_broker->httpclient__.release(); data_broker->net_client__.release(); data_broker = std::unique_ptr<ServerDataBroker> { - new ServerDataBroker(expected_server_uri, expected_path, asapo::SourceCredentials{"beamtime_id", "", "", expected_token}) + new ServerDataBroker(expected_server_uri, expected_path, false, asapo::SourceCredentials{"beamtime_id", "", "", expected_token}) }; data_broker->io__ = std::unique_ptr<IO> {&mock_io}; data_broker->httpclient__ = std::unique_ptr<asapo::HttpClient> {&mock_http_client}; @@ -1002,7 +1020,50 @@ TEST_F(ServerDataBrokerTests, GetSubstreamListUsesCorrectUri) { } +void ServerDataBrokerTests::MockBeforeFTS(FileData* data) { + data_broker->io__.release(); + data_broker->httpclient__.release(); + data_broker->net_client__.release(); + data_broker = std::unique_ptr<ServerDataBroker> { + new ServerDataBroker(expected_server_uri, expected_path, false, asapo::SourceCredentials{"beamtime_id", "", "", expected_token}) + }; + data_broker->io__ = std::unique_ptr<IO> {&mock_io}; + data_broker->httpclient__ = std::unique_ptr<asapo::HttpClient> {&mock_http_client}; + data_broker->net_client__ = std::unique_ptr<asapo::NetClient> {&mock_netclient}; + + MockGetBrokerUri(); + auto to_send = CreateFI(); + auto json = to_send.Json(); + MockGet(json); + + + EXPECT_CALL(mock_netclient, GetData_t(&info, + data)).WillOnce(Return(asapo::IOErrorTemplates::kUnknownIOError.Generate().release())); +} + +void ServerDataBrokerTests::ExpectFolderToken() { + std::string expected_folder_query_string = "{\"Folder\":\"" + expected_path + "\",\"Beamtime:\"" + expected_beamtime_id + + ",\"Token\":\"" + expected_token + "\"}"; + + EXPECT_CALL(mock_http_client, Post_t(HasSubstr(expected_server_uri + "/authorizer/folder"), + expected_folder_query_string, _, _)).WillOnce(DoAll( + SetArgPointee<2>(HttpCode::OK), + SetArgPointee<3>(nullptr), + Return(expected_folder_token) + )); + +} + +TEST_F(ServerDataBrokerTests, GetImageUsesFileTransferServiceIfCannotReadFromCache) { + FileData data; + + MockBeforeFTS(&data); + ExpectFolderToken(); + + data_broker->GetNext(&info, expected_group_id, &data); + ASSERT_THAT(info.buf_id, Eq(0)); +} } diff --git a/consumer/api/python/asapo_consumer.pxd b/consumer/api/python/asapo_consumer.pxd index 1475ad22f0e0ba37df4ba88ee97aa8dc7e3f6fa4..e55b036a581cc5c8f1ec013be8b9869d6902f0fc 100644 --- a/consumer/api/python/asapo_consumer.pxd +++ b/consumer/api/python/asapo_consumer.pxd @@ -66,7 +66,7 @@ cdef extern from "asapo_consumer.h" namespace "asapo" nogil: cdef extern from "asapo_consumer.h" namespace "asapo" nogil: cdef cppclass DataBrokerFactory: DataBrokerFactory() except + - unique_ptr[DataBroker] CreateServerBroker(string server_name,string source_path,SourceCredentials source,Error* error) + unique_ptr[DataBroker] CreateServerBroker(string server_name,string source_path,bool has_filesystem,SourceCredentials source,Error* error) cdef extern from "asapo_consumer.h" namespace "asapo": diff --git a/consumer/api/python/asapo_consumer.pyx.in b/consumer/api/python/asapo_consumer.pyx.in index ad31fd10332652e785bc61d6ea31274262efe2d9..153bd0f769f38c9316e5c556119c56840d8f3376 100644 --- a/consumer/api/python/asapo_consumer.pyx.in +++ b/consumer/api/python/asapo_consumer.pyx.in @@ -256,9 +256,10 @@ cdef class __PyDataBrokerFactory: def __cinit__(self): with nogil: self.c_factory = DataBrokerFactory() - def create_server_broker(self,server_name,source_path,beamtime_id,stream,token,timeout): + def create_server_broker(self,server_name,source_path,has_filesystem,beamtime_id,stream,token,timeout): cdef string b_server_name = _bytes(server_name) cdef string b_source_path = _bytes(source_path) + cdef bool b_has_filesystem = has_filesystem cdef SourceCredentials source source.beamtime_id = _bytes(beamtime_id) source.user_token = _bytes(token) @@ -266,7 +267,7 @@ cdef class __PyDataBrokerFactory: cdef Error err cdef unique_ptr[DataBroker] c_broker with nogil: - c_broker = self.c_factory.CreateServerBroker(b_server_name,b_source_path,source,&err) + c_broker = self.c_factory.CreateServerBroker(b_server_name,b_source_path,b_has_filesystem,source,&err) broker = PyDataBroker() broker.c_broker = c_broker.release() broker.c_broker.SetTimeout(timeout) @@ -274,17 +275,19 @@ cdef class __PyDataBrokerFactory: throw_exception(err) return broker -def create_server_broker(server_name,source_path,beamtime_id,stream,token,timeout_ms): +def create_server_broker(server_name,source_path,has_filesystem,beamtime_id,stream,token,timeout_ms): """ :param server_name: Server endpoint (hostname:port) :type server_name: string :param source_path: Path to the folder to read data from :type source_path: string + :param has_filesystem: Path to the folder to read data from + :type has_filesystem: bool :return: Broker object and error. (None,err) if case of error, (broker, None) if success :rtype: Tuple with broker object and error. """ factory = __PyDataBrokerFactory() - return factory.create_server_broker(_bytes(server_name),_bytes(source_path),_bytes(beamtime_id),_bytes(stream),_bytes(token),timeout_ms) + return factory.create_server_broker(_bytes(server_name),_bytes(source_path),has_filesystem, _bytes(beamtime_id),_bytes(stream),_bytes(token),timeout_ms) __version__ = "@ASAPO_VERSION_PYTHON@" diff --git a/examples/consumer/getnext_broker/getnext_broker.cpp b/examples/consumer/getnext_broker/getnext_broker.cpp index 242c82a898d51e1c2e6cbdebdc4a0a04688502bf..a792e1f2199cab85cfa32f01e1ae6cfca3e43b14 100644 --- a/examples/consumer/getnext_broker/getnext_broker.cpp +++ b/examples/consumer/getnext_broker/getnext_broker.cpp @@ -54,7 +54,7 @@ std::vector<std::thread> StartThreads(const Args& params, auto exec_next = [¶ms, nfiles, errors, nbuf, nfiles_total](int i) { asapo::FileInfo fi; Error err; - auto broker = asapo::DataBrokerFactory::CreateServerBroker(params.server, params.file_path, + auto broker = asapo::DataBrokerFactory::CreateServerBroker(params.server, params.file_path, true, asapo::SourceCredentials{params.beamtime_id, "", params.stream, params.token}, &err); broker->SetTimeout((uint64_t) params.timeout_ms); diff --git a/examples/consumer/getnext_broker_python/getnext.py b/examples/consumer/getnext_broker_python/getnext.py index 070030af19c698902f4798178f047a198ced1c5d..10d6517cdedc885ff9ddf5ec79828d5908fb8d3c 100644 --- a/examples/consumer/getnext_broker_python/getnext.py +++ b/examples/consumer/getnext_broker_python/getnext.py @@ -6,7 +6,7 @@ import sys source, path, beamtime, token, group_id = sys.argv[1:] -broker = asapo_consumer.create_server_broker(source,path, beamtime,"",token,60000) +broker = asapo_consumer.create_server_broker(source,path,True, beamtime,"",token,60000) if group_id == "new": diff --git a/examples/pipeline/in_to_out/in_to_out.cpp b/examples/pipeline/in_to_out/in_to_out.cpp index d9bd76f51c1e6f5775b0012b3a9f480908123f98..b5813df8032e5d14159db08ea0eea294d30c19c2 100644 --- a/examples/pipeline/in_to_out/in_to_out.cpp +++ b/examples/pipeline/in_to_out/in_to_out.cpp @@ -64,7 +64,7 @@ int ProcessError(const Error& err) { } BrokerPtr CreateBrokerAndGroup(const Args& args, Error* err) { - auto broker = asapo::DataBrokerFactory::CreateServerBroker(args.server, args.file_path, + auto broker = asapo::DataBrokerFactory::CreateServerBroker(args.server, args.file_path, true, asapo::SourceCredentials{args.beamtime_id, "", args.stream_in, args.token}, err); if (*err) { return nullptr; diff --git a/examples/pipeline/in_to_out_python/in_to_out.py b/examples/pipeline/in_to_out_python/in_to_out.py index 9bc02800e7944e485a7ed1e32b8d9486ca24a695..e3444b92a91b28ab1dcd5aa83ab6cfa9fdc8318e 100644 --- a/examples/pipeline/in_to_out_python/in_to_out.py +++ b/examples/pipeline/in_to_out_python/in_to_out.py @@ -28,7 +28,7 @@ timeout_s_producer=int(timeout_s_producer) nthreads=int(nthreads) transfer_data=int(transfer_data)>0 -broker = asapo_consumer.create_server_broker(source,path, beamtime,stream_in,token,timeout_s*1000) +broker = asapo_consumer.create_server_broker(source,path, True,beamtime,stream_in,token,timeout_s*1000) producer = asapo_producer.create_producer(source,beamtime,'auto', stream_out, token, nthreads, 600) diff --git a/tests/automatic/bug_fixes/consumer_python_memleak/memleak.py b/tests/automatic/bug_fixes/consumer_python_memleak/memleak.py index 0ff83df0d938ce8dec651b8a979dd0a78c2643af..2f5eaac9bc5eeda3289c08e2de21e7828b169f98 100644 --- a/tests/automatic/bug_fixes/consumer_python_memleak/memleak.py +++ b/tests/automatic/bug_fixes/consumer_python_memleak/memleak.py @@ -5,7 +5,7 @@ import sys source, path, beamtime, token = sys.argv[1:] broker = asapo_consumer.create_server_broker( - source, path, beamtime, "stream", token, 1000) + source, path,True, beamtime, "stream", token, 1000) group_id = broker.generate_group_id() print('generated group id: ', group_id) diff --git a/tests/automatic/consumer/consumer_api/consumer_api.cpp b/tests/automatic/consumer/consumer_api/consumer_api.cpp index 2a85a1d351632ccba6ebf6ae131c2b589c172fb6..73ebf67fe962db006eb8520a4a6a0a6600eae6d9 100644 --- a/tests/automatic/consumer/consumer_api/consumer_api.cpp +++ b/tests/automatic/consumer/consumer_api/consumer_api.cpp @@ -186,7 +186,7 @@ void TestDataset(const std::unique_ptr<asapo::DataBroker>& broker, const std::st void TestAll(const Args& args) { asapo::Error err; - auto broker = asapo::DataBrokerFactory::CreateServerBroker(args.server, ".", + auto broker = asapo::DataBrokerFactory::CreateServerBroker(args.server, ".", true, asapo::SourceCredentials{args.run_name, "", "", args.token}, &err); broker->SetTimeout(100); auto group_id = broker->GenerateNewGroupId(&err); diff --git a/tests/automatic/consumer/consumer_api_python/consumer_api.py b/tests/automatic/consumer/consumer_api_python/consumer_api.py index 10349aec51fd140f36f2fe1f807cd22a19489ee2..6fd1f7ba86bee06d8e1a7ccf8ba31c39bc27fe71 100644 --- a/tests/automatic/consumer/consumer_api_python/consumer_api.py +++ b/tests/automatic/consumer/consumer_api_python/consumer_api.py @@ -141,7 +141,7 @@ def check_single(broker,group_id_new): else: exit_on_noerr("wrong query") - broker = asapo_consumer.create_server_broker("bla",path, beamtime,"",token,1000) + broker = asapo_consumer.create_server_broker("bla",path, True, beamtime,"",token,1000) try: broker.get_last(group_id_new, meta_only=True) except asapo_consumer.AsapoUnavailableServiceError as err: @@ -192,7 +192,7 @@ def check_dataset(broker,group_id_new): source, path, beamtime, token, mode = sys.argv[1:] -broker = asapo_consumer.create_server_broker(source,path, beamtime,"",token,60000) +broker = asapo_consumer.create_server_broker(source,path,True, beamtime,"",token,60000) group_id_new = broker.generate_group_id() diff --git a/tests/automatic/consumer/next_multithread_broker/next_multithread_broker.cpp b/tests/automatic/consumer/next_multithread_broker/next_multithread_broker.cpp index 3f06dda46b641e892bc57064e1d506ee7fde8a59..deea0768f29ff3844d6f3c7d1ac50cf1487b2535 100644 --- a/tests/automatic/consumer/next_multithread_broker/next_multithread_broker.cpp +++ b/tests/automatic/consumer/next_multithread_broker/next_multithread_broker.cpp @@ -49,7 +49,7 @@ Args GetArgs(int argc, char* argv[]) { void TestAll(const Args& args) { asapo::Error err; - auto broker = asapo::DataBrokerFactory::CreateServerBroker(args.server, "dummy", asapo::SourceCredentials{args.run_name, "", "", args.token}, &err); + auto broker = asapo::DataBrokerFactory::CreateServerBroker(args.server, "dummy", true, asapo::SourceCredentials{args.run_name, "", "", args.token}, &err); auto group_id = broker->GenerateNewGroupId(&err); broker->SetTimeout(10000); std::vector<asapo::FileInfos>file_infos(args.nthreads); diff --git a/tests/automatic/curl_http_client/curl_http_client_command/curl_httpclient_command.cpp b/tests/automatic/curl_http_client/curl_http_client_command/curl_httpclient_command.cpp index 97f047e3a942b01864806c3185a7d941d8ef772e..264d88c6befd767c99e69ae1b6dcbc52d35c3fd8 100644 --- a/tests/automatic/curl_http_client/curl_http_client_command/curl_httpclient_command.cpp +++ b/tests/automatic/curl_http_client/curl_http_client_command/curl_httpclient_command.cpp @@ -36,7 +36,7 @@ int main(int argc, char* argv[]) { std::string authorize_request = "{\"Folder\":\"" + args.folder + "\",\"BeamtimeId\":\"aaa\",\"Token\":\"" + token + "\"}"; asapo::Error err; - auto broker = asapo::DataBrokerFactory::CreateServerBroker(args.uri_authorizer, "", asapo::SourceCredentials{"", "", "", ""}, &err); + auto broker = asapo::DataBrokerFactory::CreateServerBroker(args.uri_authorizer, "", true, asapo::SourceCredentials{"", "", "", ""}, &err); auto server_broker = static_cast<asapo::ServerDataBroker*>(broker.get()); asapo::HttpCode code; diff --git a/tests/automatic/full_chain/send_recv_substreams/send_recv_substreams.cpp b/tests/automatic/full_chain/send_recv_substreams/send_recv_substreams.cpp index 34a1623ad0c80f60e60b985f9aacfd4f53aa3895..333d139fde158448629714a472e9c1ea631c94b1 100644 --- a/tests/automatic/full_chain/send_recv_substreams/send_recv_substreams.cpp +++ b/tests/automatic/full_chain/send_recv_substreams/send_recv_substreams.cpp @@ -35,7 +35,7 @@ void ProcessAfterSend(asapo::GenericRequestHeader header, asapo::Error err) { } BrokerPtr CreateBrokerAndGroup(const Args& args, Error* err) { - auto broker = asapo::DataBrokerFactory::CreateServerBroker(args.server, ".", + auto broker = asapo::DataBrokerFactory::CreateServerBroker(args.server, ".", true, asapo::SourceCredentials{args.beamtime_id, "", "", args.token}, err); if (*err) { return nullptr; diff --git a/tests/automatic/full_chain/send_recv_substreams_python/send_recv_substreams.py b/tests/automatic/full_chain/send_recv_substreams_python/send_recv_substreams.py index f52b8f4460d60254bbdbbccec174f9d0ccc84c1c..60ad65264b71f728a5f16c2a8babfb3d03d9c2f4 100644 --- a/tests/automatic/full_chain/send_recv_substreams_python/send_recv_substreams.py +++ b/tests/automatic/full_chain/send_recv_substreams_python/send_recv_substreams.py @@ -27,7 +27,7 @@ def callback(header,err): source, beamtime, token = sys.argv[1:] -broker = asapo_consumer.create_server_broker(source,".", beamtime,"",token,timeout) +broker = asapo_consumer.create_server_broker(source,".",True, beamtime,"",token,timeout) producer = asapo_producer.create_producer(source,beamtime,'auto', "", token, 1, 600) producer.set_log_level("debug") diff --git a/tests/automatic/full_chain/simple_chain_usermeta_python/get_user_meta.py b/tests/automatic/full_chain/simple_chain_usermeta_python/get_user_meta.py index a3ac8309400f77fd7d0b9181719173e530015023..5fb497e041b462bf7290286f805bf1fe646c0737 100644 --- a/tests/automatic/full_chain/simple_chain_usermeta_python/get_user_meta.py +++ b/tests/automatic/full_chain/simple_chain_usermeta_python/get_user_meta.py @@ -6,7 +6,7 @@ import sys source, path, beamtime, token, group_id = sys.argv[1:] -broker = asapo_consumer.create_server_broker(source,path, beamtime,"",token,60000) +broker = asapo_consumer.create_server_broker(source,path,True, beamtime,"",token,60000) images = broker.query_images("meta.user_meta regexp 'test*' order by _id") diff --git a/tests/manual/performance_broker_receiver/getlast_broker.cpp b/tests/manual/performance_broker_receiver/getlast_broker.cpp index 4272ae17c239cd8a7139d03835619af80761046a..1adcda25b2a1edee2db8379dfabc6229fa565987 100644 --- a/tests/manual/performance_broker_receiver/getlast_broker.cpp +++ b/tests/manual/performance_broker_receiver/getlast_broker.cpp @@ -47,7 +47,7 @@ std::vector<std::thread> StartThreads(const Args& params, auto exec_next = [¶ms, nfiles, errors, nbuf, nfiles_total](int i) { asapo::FileInfo fi; Error err; - auto broker = asapo::DataBrokerFactory::CreateServerBroker(params.server, params.file_path, + auto broker = asapo::DataBrokerFactory::CreateServerBroker(params.server, params.file_path, true, asapo::SourceCredentials{params.beamtime_id, "", "", params.token}, &err); broker->SetTimeout((uint64_t) params.timeout_ms); asapo::FileData data; diff --git a/tests/manual/python_tests/plot_images_online.py b/tests/manual/python_tests/plot_images_online.py index 829200981948f473083b41ce4bca327306cefa30..31b0e88e01bf99908bfb377d785331ba77541d34 100644 --- a/tests/manual/python_tests/plot_images_online.py +++ b/tests/manual/python_tests/plot_images_online.py @@ -9,7 +9,7 @@ import matplotlib.pyplot as plt #dset = f.create_dataset("mydataset", data = d1) #f.close() -broker, err = asapo_consumer.create_server_broker("psana002:8400", "/tmp", "asapo_test2","","yzgAcLmijSLWIm8dBiGNCbc0i42u5HSm-zR6FRqo__Y=", 1000000) +broker, err = asapo_consumer.create_server_broker("psana002:8400", "/tmp", True, "asapo_test2","","yzgAcLmijSLWIm8dBiGNCbc0i42u5HSm-zR6FRqo__Y=", 1000000) last_id = 0 while True: diff --git a/tests/manual/python_tests/test_p.py b/tests/manual/python_tests/test_p.py index 337cac212e868892737fcd60258076b9e890361b..764a2f377f2c06a94d46ba85d698935200d825c3 100644 --- a/tests/manual/python_tests/test_p.py +++ b/tests/manual/python_tests/test_p.py @@ -11,7 +11,7 @@ beamtime = "asapo_test" token = "KmUDdacgBzaOD3NIJvN1NmKGqWKtx0DK-NyPjdpeWkc=" broker, err = asapo_consumer.create_server_broker( - source, path, beamtime, token, 1000) + source, path, True, beamtime, token, 1000) group_id, err = broker.generate_group_id() if err is not None: