diff --git a/consumer/api/cpp/include/consumer/data_broker.h b/consumer/api/cpp/include/consumer/data_broker.h index f142bf497d4206b2b5697b1e579c87fa4497ad62..5930faf87bc2624445a305610cad2fffa7ad0d7b 100644 --- a/consumer/api/cpp/include/consumer/data_broker.h +++ b/consumer/api/cpp/include/consumer/data_broker.h @@ -7,6 +7,7 @@ #include "common/data_structs.h" #include "common/error.h" +#include "common/networking.h" namespace asapo { @@ -136,8 +137,7 @@ class DataBroker { class DataBrokerFactory { public: static std::unique_ptr<DataBroker> CreateServerBroker(std::string server_name, std::string source_path, - bool has_filesystem, - SourceCredentials source, + bool has_filesystem, SourceCredentials source, std::string networkType, Error* error) noexcept; }; diff --git a/consumer/api/cpp/src/data_broker.cpp b/consumer/api/cpp/src/data_broker.cpp index 451dd17fce9d379722906e9d84a5ffc18085550f..482cdcf400b3564a04c230eeadd6c16cff12b587 100644 --- a/consumer/api/cpp/src/data_broker.cpp +++ b/consumer/api/cpp/src/data_broker.cpp @@ -1,3 +1,4 @@ +#include <common/networking.h> #include "consumer/data_broker.h" #include "server_data_broker.h" @@ -25,12 +26,21 @@ std::unique_ptr<DataBroker> Create(const std::string& source_name, } -// TODO Add NetworkConnectionType std::unique_ptr<DataBroker> DataBrokerFactory::CreateServerBroker(std::string server_name, std::string source_path, - bool has_filesystem, SourceCredentials source, + bool has_filesystem, SourceCredentials source, std::string networkType, Error* error) noexcept { + NetworkConnectionType networkConnectionType; + if (networkType == "tcp") { + networkConnectionType = NetworkConnectionType::kAsapoTcp; + } else if (networkType == "fabric") { + networkConnectionType = NetworkConnectionType::kFabric; + } else { + *error = TextError("Unknown network type"); + return nullptr; + } + return Create<ServerDataBroker>(std::move(server_name), error, std::move(source_path), has_filesystem, - std::move(source)); + std::move(source), networkConnectionType); } diff --git a/consumer/api/cpp/src/server_data_broker.cpp b/consumer/api/cpp/src/server_data_broker.cpp index e7ea682be7bd0f4e213d156208b55f4259d9d949..b4288b37c56f15483f027278acf65d4d7dcac809 100644 --- a/consumer/api/cpp/src/server_data_broker.cpp +++ b/consumer/api/cpp/src/server_data_broker.cpp @@ -9,6 +9,7 @@ #include "tcp_client.h" #include "asapo_consumer.h" +#include "fabric_consumer_client.h" using std::chrono::system_clock; @@ -85,11 +86,19 @@ Error ProcessRequestResponce(const Error& server_err, const RequestOutput* respo ServerDataBroker::ServerDataBroker(std::string server_uri, std::string source_path, bool has_filesystem, - SourceCredentials source) : + SourceCredentials source, + NetworkConnectionType networkType) : io__{GenerateDefaultIO()}, httpclient__{DefaultHttpClient()}, - net_client__{new TcpClient()}, - endpoint_{std::move(server_uri)}, source_path_{std::move(source_path)}, has_filesystem_{has_filesystem}, -source_credentials_(std::move(source)) { + endpoint_{std::move(server_uri)}, source_path_{std::move(source_path)}, has_filesystem_{has_filesystem}, + source_credentials_(std::move(source)) { + switch (networkType) { + case NetworkConnectionType::kAsapoTcp: + net_client__.reset(new TcpClient()); + break; + case NetworkConnectionType::kFabric: + net_client__.reset(new FabricConsumerClient()); + break; + } if (source_credentials_.stream.empty()) { source_credentials_.stream = SourceCredentials::kDefaultStream; diff --git a/consumer/api/cpp/src/server_data_broker.h b/consumer/api/cpp/src/server_data_broker.h index 702632c65b2de5cc0c8c398aa42dd3ce9dd3ef05..eaa810623addd6d3e9743a8afa5773ce68c90033 100644 --- a/consumer/api/cpp/src/server_data_broker.h +++ b/consumer/api/cpp/src/server_data_broker.h @@ -1,6 +1,7 @@ #ifndef ASAPO_SERVER_DATA_BROKER_H #define ASAPO_SERVER_DATA_BROKER_H +#include <common/networking.h> #include "consumer/data_broker.h" #include "io/io.h" #include "http_client/http_client.h" @@ -52,7 +53,7 @@ Error ConsumerErrorFromNoDataResponse(const std::string& response); class ServerDataBroker final : public asapo::DataBroker { public: explicit ServerDataBroker(std::string server_uri, std::string source_path, bool has_filesystem, - SourceCredentials source); + SourceCredentials source, NetworkConnectionType networkType); Error ResetLastReadMarker(std::string group_id) override; Error ResetLastReadMarker(std::string group_id, std::string substream) override; diff --git a/consumer/api/cpp/unittests/test_consumer_api.cpp b/consumer/api/cpp/unittests/test_consumer_api.cpp index 90e5b0b0182c91f63a0a1b03d056a809b76c0d63..9645c58d0bd65e160f818f0862d883e8425d769e 100644 --- a/consumer/api/cpp/unittests/test_consumer_api.cpp +++ b/consumer/api/cpp/unittests/test_consumer_api.cpp @@ -25,13 +25,32 @@ class DataBrokerFactoryTests : public Test { }; -TEST_F(DataBrokerFactoryTests, CreateServerDataSource) { +TEST_F(DataBrokerFactoryTests, CreateServerDataSource_tcp) { - auto data_broker = DataBrokerFactory::CreateServerBroker("server", "path", false, asapo::SourceCredentials{"beamtime_id", "", "", "token"}, &error); + auto data_broker = DataBrokerFactory::CreateServerBroker("server", "path", false, asapo::SourceCredentials{"beamtime_id", "", "", "token"}, + "tcp", &error); ASSERT_THAT(error, Eq(nullptr)); ASSERT_THAT(dynamic_cast<ServerDataBroker*>(data_broker.get()), Ne(nullptr)); } +TEST_F(DataBrokerFactoryTests, CreateServerDataSource_fabric) { + + auto data_broker = DataBrokerFactory::CreateServerBroker("server", "path", false, asapo::SourceCredentials{"beamtime_id", "", "", "token"}, + "fabric", &error); + + ASSERT_THAT(error, Eq(nullptr)); + ASSERT_THAT(dynamic_cast<ServerDataBroker*>(data_broker.get()), Ne(nullptr)); +} + +TEST_F(DataBrokerFactoryTests, CreateServerDataSource_invalid) { + + auto data_broker = DataBrokerFactory::CreateServerBroker("server", "path", false, asapo::SourceCredentials{"beamtime_id", "", "", "token"}, + "test", &error); + + ASSERT_THAT(error, Ne(nullptr)); + ASSERT_THAT(dynamic_cast<ServerDataBroker*>(data_broker.get()), Eq(nullptr)); +} + } diff --git a/consumer/api/cpp/unittests/test_server_broker.cpp b/consumer/api/cpp/unittests/test_server_broker.cpp index 1b30dc30db7f433766401f74d32ac9df778beb88..f2d6e25c579738e9249a9f6e81650ac79fc95ba9 100644 --- a/consumer/api/cpp/unittests/test_server_broker.cpp +++ b/consumer/api/cpp/unittests/test_server_broker.cpp @@ -46,7 +46,7 @@ namespace { TEST(FolderDataBroker, Constructor) { auto data_broker = std::unique_ptr<ServerDataBroker> {new ServerDataBroker("test", "path", false, - asapo::SourceCredentials{"beamtime_id", "", "", "token"}) + asapo::SourceCredentials{"beamtime_id", "", "", "token"}, asapo::NetworkConnectionType::kAsapoTcp) }; ASSERT_THAT(dynamic_cast<asapo::SystemIO*>(data_broker->io__.get()), Ne(nullptr)); ASSERT_THAT(dynamic_cast<asapo::CurlHttpClient*>(data_broker->httpclient__.get()), Ne(nullptr)); @@ -87,10 +87,10 @@ class ServerDataBrokerTests : public Test { void AssertSingleFileTransfer(); void SetUp() override { data_broker = std::unique_ptr<ServerDataBroker> { - new ServerDataBroker(expected_server_uri, expected_path, true, asapo::SourceCredentials{expected_beamtime_id, "", expected_stream, expected_token}) + new ServerDataBroker(expected_server_uri, expected_path, true, asapo::SourceCredentials{expected_beamtime_id, "", expected_stream, expected_token}, asapo::NetworkConnectionType::kAsapoTcp) }; fts_data_broker = std::unique_ptr<ServerDataBroker> { - new ServerDataBroker(expected_server_uri, expected_path, false, asapo::SourceCredentials{expected_beamtime_id, "", expected_stream, expected_token}) + new ServerDataBroker(expected_server_uri, expected_path, false, asapo::SourceCredentials{expected_beamtime_id, "", expected_stream, expected_token}, asapo::NetworkConnectionType::kAsapoTcp) }; data_broker->io__ = std::unique_ptr<IO> {&mock_io}; data_broker->httpclient__ = std::unique_ptr<asapo::HttpClient> {&mock_http_client}; @@ -176,7 +176,7 @@ TEST_F(ServerDataBrokerTests, DefaultStreamIsDetector) { data_broker->httpclient__.release(); data_broker->net_client__.release(); data_broker = std::unique_ptr<ServerDataBroker> { - new ServerDataBroker(expected_server_uri, expected_path, false, asapo::SourceCredentials{"beamtime_id", "", "", expected_token}) + new ServerDataBroker(expected_server_uri, expected_path, false, asapo::SourceCredentials{"beamtime_id", "", "", expected_token}, asapo::NetworkConnectionType::kAsapoTcp) }; data_broker->io__ = std::unique_ptr<IO> {&mock_io}; data_broker->httpclient__ = std::unique_ptr<asapo::HttpClient> {&mock_http_client}; diff --git a/consumer/api/python/asapo_consumer.pxd b/consumer/api/python/asapo_consumer.pxd index e55b036a581cc5c8f1ec013be8b9869d6902f0fc..d3e987ffb8d35c57e717e51c3a80d8b060c18c80 100644 --- a/consumer/api/python/asapo_consumer.pxd +++ b/consumer/api/python/asapo_consumer.pxd @@ -66,7 +66,7 @@ cdef extern from "asapo_consumer.h" namespace "asapo" nogil: cdef extern from "asapo_consumer.h" namespace "asapo" nogil: cdef cppclass DataBrokerFactory: DataBrokerFactory() except + - unique_ptr[DataBroker] CreateServerBroker(string server_name,string source_path,bool has_filesystem,SourceCredentials source,Error* error) + unique_ptr[DataBroker] CreateServerBroker(string server_name,string source_path,bool has_filesystem,SourceCredentials source,string network_type,Error* error) cdef extern from "asapo_consumer.h" namespace "asapo": @@ -81,4 +81,3 @@ cdef extern from "asapo_consumer.h" namespace "asapo": uint64_t id uint64_t id_max string next_substream - diff --git a/consumer/api/python/asapo_consumer.pyx.in b/consumer/api/python/asapo_consumer.pyx.in index 679b699ccc1b597c14a1bf9dd91799f6ef9e240f..6628db898e46b20fc93491aaa9646f16a1e8193a 100644 --- a/consumer/api/python/asapo_consumer.pyx.in +++ b/consumer/api/python/asapo_consumer.pyx.in @@ -256,7 +256,7 @@ cdef class __PyDataBrokerFactory: def __cinit__(self): with nogil: self.c_factory = DataBrokerFactory() - def create_server_broker(self,server_name,source_path,has_filesystem,beamtime_id,stream,token,timeout): + def create_server_broker(self,server_name,source_path,has_filesystem,beamtime_id,stream,token,timeout,network_type): cdef string b_server_name = _bytes(server_name) cdef string b_source_path = _bytes(source_path) cdef bool b_has_filesystem = has_filesystem @@ -264,10 +264,11 @@ cdef class __PyDataBrokerFactory: source.beamtime_id = _bytes(beamtime_id) source.user_token = _bytes(token) source.stream = _bytes(stream) + cdef string b_network_type = _bytes(network_type) cdef Error err cdef unique_ptr[DataBroker] c_broker with nogil: - c_broker = self.c_factory.CreateServerBroker(b_server_name,b_source_path,b_has_filesystem,source,&err) + c_broker = self.c_factory.CreateServerBroker(b_server_name,b_source_path,b_has_filesystem,source,b_network_type,&err) broker = PyDataBroker() broker.c_broker = c_broker.release() broker.c_broker.SetTimeout(timeout) @@ -275,7 +276,7 @@ cdef class __PyDataBrokerFactory: throw_exception(err) return broker -def create_server_broker(server_name,source_path,has_filesystem,beamtime_id,stream,token,timeout_ms): +def create_server_broker(server_name,source_path,has_filesystem,beamtime_id,stream,token,timeout_ms,network_type): """ :param server_name: Server endpoint (hostname:port) :type server_name: string @@ -283,11 +284,13 @@ def create_server_broker(server_name,source_path,has_filesystem,beamtime_id,stre :type source_path: string :param has_filesystem: True if the source_path is accessible locally, otherwise will use file transfer service to get data :type has_filesystem: bool + :param network_type: The networking mode that should be used. Can be "tcp" or "fabric". + :type network_type: string :return: Broker object and error. (None,err) if case of error, (broker, None) if success :rtype: Tuple with broker object and error. """ factory = __PyDataBrokerFactory() - return factory.create_server_broker(_bytes(server_name),_bytes(source_path),has_filesystem, _bytes(beamtime_id),_bytes(stream),_bytes(token),timeout_ms) + return factory.create_server_broker(_bytes(server_name),_bytes(source_path),has_filesystem, _bytes(beamtime_id),_bytes(stream),_bytes(token),timeout_ms,_bytes(network_type)) __version__ = "@PYTHON_ASAPO_VERSION@@ASAPO_VERSION_COMMIT@" diff --git a/examples/consumer/getnext_broker/getnext_broker.cpp b/examples/consumer/getnext_broker/getnext_broker.cpp index a792e1f2199cab85cfa32f01e1ae6cfca3e43b14..9a1ec908da96b95eed1bd7144710ad626d1a74c7 100644 --- a/examples/consumer/getnext_broker/getnext_broker.cpp +++ b/examples/consumer/getnext_broker/getnext_broker.cpp @@ -24,6 +24,7 @@ uint64_t file_size = 0; struct Args { std::string server; + std::string network_type; std::string file_path; std::string beamtime_id; std::string stream; @@ -55,7 +56,11 @@ std::vector<std::thread> StartThreads(const Args& params, asapo::FileInfo fi; Error err; auto broker = asapo::DataBrokerFactory::CreateServerBroker(params.server, params.file_path, true, - asapo::SourceCredentials{params.beamtime_id, "", params.stream, params.token}, &err); + asapo::SourceCredentials{params.beamtime_id, "", params.stream, params.token}, params.network_type, &err); + if (err) { + std::cout << "Error CreateServerBroker: " << err << std::endl; + exit(EXIT_FAILURE); + } broker->SetTimeout((uint64_t) params.timeout_ms); asapo::FileData data; @@ -170,23 +175,24 @@ int main(int argc, char* argv[]) { asapo::ExitAfterPrintVersionIfNeeded("GetNext Broker Example", argc, argv); Args params; params.datasets = false; - if (argc != 8 && argc != 9) { + if (argc != 9 && argc != 10) { std::cout << "Usage: " + std::string{argv[0]} - + " <server> <files_path> <run_name> <nthreads> <token> <timeout ms> <metaonly> [use datasets]" + + " <server> <network_type> <files_path> <run_name> <nthreads> <token> <timeout ms> <metaonly> [use datasets]" << std::endl; exit(EXIT_FAILURE); } params.server = std::string{argv[1]}; - params.file_path = std::string{argv[2]}; - params.beamtime_id = std::string{argv[3]}; + params.network_type = std::string{argv[2]}; + params.file_path = std::string{argv[3]}; + params.beamtime_id = std::string{argv[4]}; TryGetStream(¶ms); - params.nthreads = atoi(argv[4]); - params.token = std::string{argv[5]}; - params.timeout_ms = atoi(argv[6]); - params.read_data = atoi(argv[7]) != 1; - if (argc == 9) { - params.datasets = atoi(argv[8]) == 1; + params.nthreads = atoi(argv[5]); + params.token = std::string{argv[6]}; + params.timeout_ms = atoi(argv[7]); + params.read_data = atoi(argv[8]) != 1; + if (argc == 10) { + params.datasets = atoi(argv[9]) == 1; } uint64_t duration_ms; int nerrors, nbuf, nfiles_total; diff --git a/examples/consumer/getnext_broker_python/getnext.py b/examples/consumer/getnext_broker_python/getnext.py index 10d6517cdedc885ff9ddf5ec79828d5908fb8d3c..3979bfee88a36cfdb31d18cbdf3f2c70a4f8c680 100644 --- a/examples/consumer/getnext_broker_python/getnext.py +++ b/examples/consumer/getnext_broker_python/getnext.py @@ -4,9 +4,9 @@ import asapo_consumer import json import sys -source, path, beamtime, token, group_id = sys.argv[1:] +source, network_type, path, beamtime, token, group_id = sys.argv[1:] -broker = asapo_consumer.create_server_broker(source,path,True, beamtime,"",token,60000) +broker = asapo_consumer.create_server_broker(source,path,True, beamtime,"",token,60000,network_type) if group_id == "new": diff --git a/examples/pipeline/in_to_out/in_to_out.cpp b/examples/pipeline/in_to_out/in_to_out.cpp index 3721cfd5d790a1afb08559bb43b3db80fa1e2a31..dd7ec134a788b7df293f7b6dcb4449cb01bf8c64 100644 --- a/examples/pipeline/in_to_out/in_to_out.cpp +++ b/examples/pipeline/in_to_out/in_to_out.cpp @@ -27,6 +27,7 @@ system_clock::time_point streamout_finish; struct Args { std::string server; + std::string network_type; std::string file_path; std::string beamtime_id; std::string stream_in; @@ -65,7 +66,7 @@ int ProcessError(const Error& err) { BrokerPtr CreateBrokerAndGroup(const Args& args, Error* err) { auto broker = asapo::DataBrokerFactory::CreateServerBroker(args.server, args.file_path, true, - asapo::SourceCredentials{args.beamtime_id, "", args.stream_in, args.token}, err); + asapo::SourceCredentials{args.beamtime_id, "", args.stream_in, args.token}, args.network_type, err); if (*err) { return nullptr; } @@ -202,23 +203,24 @@ std::unique_ptr<asapo::Producer> CreateProducer(const Args& args) { int main(int argc, char* argv[]) { asapo::ExitAfterPrintVersionIfNeeded("GetNext Broker Example", argc, argv); Args args; - if (argc != 11) { + if (argc != 12) { std::cout << "Usage: " + std::string{argv[0]} - + " <server> <files_path> <beamtime_id> <stream_in> <stream_out> <nthreads> <token> <timeout ms> <timeout ms producer> <transfer data>" + + " <server> <network_type> <files_path> <beamtime_id> <stream_in> <stream_out> <nthreads> <token> <timeout ms> <timeout ms producer> <transfer data>" << std::endl; exit(EXIT_FAILURE); } args.server = std::string{argv[1]}; - args.file_path = std::string{argv[2]}; - args.beamtime_id = std::string{argv[3]}; - args.stream_in = std::string{argv[4]}; - args.stream_out = std::string{argv[5]}; - args.token = std::string{argv[6]}; - args.nthreads = atoi(argv[7]); - args.timeout_ms = atoi(argv[8]); - args.timeout_ms_producer = atoi(argv[9]); - args.transfer_data = atoi(argv[10]) == 1; + args.network_type = std::string{argv[2]}; + args.file_path = std::string{argv[3]}; + args.beamtime_id = std::string{argv[4]}; + args.stream_in = std::string{argv[5]}; + args.stream_out = std::string{argv[6]}; + args.token = std::string{argv[7]}; + args.nthreads = atoi(argv[8]); + args.timeout_ms = atoi(argv[9]); + args.timeout_ms_producer = atoi(argv[10]); + args.transfer_data = atoi(argv[11]) == 1; auto producer = CreateProducer(args); files_sent = 0; diff --git a/examples/pipeline/in_to_out_python/in_to_out.py b/examples/pipeline/in_to_out_python/in_to_out.py index e3444b92a91b28ab1dcd5aa83ab6cfa9fdc8318e..d42203994abb954ea7c8cc40920b6c05bfb1367c 100644 --- a/examples/pipeline/in_to_out_python/in_to_out.py +++ b/examples/pipeline/in_to_out_python/in_to_out.py @@ -2,11 +2,9 @@ from __future__ import print_function import asapo_consumer import asapo_producer -import json import sys -import time - import threading + lock = threading.Lock() @@ -22,13 +20,13 @@ def callback(header,err): n_send = n_send + 1 lock.release() -source, path, beamtime,stream_in, stream_out, token, timeout_s,timeout_s_producer,nthreads, transfer_data = sys.argv[1:] +source, network_type, path, beamtime,stream_in, stream_out, token, timeout_s,timeout_s_producer,nthreads, transfer_data = sys.argv[1:] timeout_s=int(timeout_s) timeout_s_producer=int(timeout_s_producer) nthreads=int(nthreads) transfer_data=int(transfer_data)>0 -broker = asapo_consumer.create_server_broker(source,path, True,beamtime,stream_in,token,timeout_s*1000) +broker = asapo_consumer.create_server_broker(source,path, True,beamtime,stream_in,token,timeout_s*1000,network_type) producer = asapo_producer.create_producer(source,beamtime,'auto', stream_out, token, nthreads, 600) diff --git a/tests/automatic/consumer/consumer_api/consumer_api.cpp b/tests/automatic/consumer/consumer_api/consumer_api.cpp index d9065c56e36752e0be9d22dec784694e12187fc1..98f95556c75ea328a820b1f81cf8e83352639c78 100644 --- a/tests/automatic/consumer/consumer_api/consumer_api.cpp +++ b/tests/automatic/consumer/consumer_api/consumer_api.cpp @@ -8,22 +8,24 @@ struct Args { std::string server; + std::string network_type; std::string run_name; std::string token; std::string datasets; }; Args GetArgs(int argc, char* argv[]) { - if (argc != 5) { + if (argc != 6) { std::cout << "Wrong number of arguments" << std::endl; exit(EXIT_FAILURE); } std::string server{argv[1]}; - std::string source_name{argv[2]}; - std::string token{argv[3]}; - std::string datasets{argv[4]}; + std::string network_type{argv[2]}; + std::string source_name{argv[3]}; + std::string token{argv[4]}; + std::string datasets{argv[5]}; - return Args{server, source_name, token, datasets}; + return Args{server, network_type, source_name, token, datasets}; } @@ -183,7 +185,12 @@ void TestDataset(const std::unique_ptr<asapo::DataBroker>& broker, const std::st void TestAll(const Args& args) { asapo::Error err; auto broker = asapo::DataBrokerFactory::CreateServerBroker(args.server, ".", true, - asapo::SourceCredentials{args.run_name, "", "", args.token}, &err); + asapo::SourceCredentials{args.run_name, "", "", args.token}, args.network_type, &err); + if (err) { + std::cout << "Error CreateServerBroker: " << err << std::endl; + exit(EXIT_FAILURE); + } + broker->SetTimeout(100); auto group_id = broker->GenerateNewGroupId(&err); diff --git a/tests/automatic/consumer/next_multithread_broker/next_multithread_broker.cpp b/tests/automatic/consumer/next_multithread_broker/next_multithread_broker.cpp index 93917960d16747306a8189b0e4be975018d12e59..af8e72430ed467dd49058e517343abb0223f0ed7 100644 --- a/tests/automatic/consumer/next_multithread_broker/next_multithread_broker.cpp +++ b/tests/automatic/consumer/next_multithread_broker/next_multithread_broker.cpp @@ -31,6 +31,7 @@ void Assert(std::vector<asapo::FileInfos> file_infos, int nthreads, int nfiles) struct Args { std::string server; + std::string network_type; std::string run_name; std::string token; int nthreads; @@ -38,22 +39,29 @@ struct Args { }; Args GetArgs(int argc, char* argv[]) { - if (argc != 6) { + if (argc != 7) { std::cout << "Wrong number of arguments" << std::endl; exit(EXIT_FAILURE); } std::string server{argv[1]}; - std::string source_name{argv[2]}; - int nthreads = std::stoi(argv[3]); - int nfiles = std::stoi(argv[4]); - std::string token{argv[5]}; + std::string network_type{argv[2]}; + std::string source_name{argv[3]}; + int nthreads = std::stoi(argv[4]); + int nfiles = std::stoi(argv[5]); + std::string token{argv[6]}; - return Args{server, source_name, token, nthreads, nfiles}; + return Args{server, network_type, source_name, token, nthreads, nfiles}; } void TestAll(const Args& args) { asapo::Error err; - auto broker = asapo::DataBrokerFactory::CreateServerBroker(args.server, "dummy", true, asapo::SourceCredentials{args.run_name, "", "", args.token}, &err); + auto broker = asapo::DataBrokerFactory::CreateServerBroker(args.server, "dummy", true, asapo::SourceCredentials{args.run_name, "", "", args.token}, + args.network_type, &err); + if (err) { + std::cout << "Error CreateServerBroker: " << err << std::endl; + exit(EXIT_FAILURE); + } + auto group_id = broker->GenerateNewGroupId(&err); broker->SetTimeout(10000); std::vector<asapo::FileInfos>file_infos(args.nthreads); diff --git a/tests/automatic/curl_http_client/curl_http_client_command/curl_httpclient_command.cpp b/tests/automatic/curl_http_client/curl_http_client_command/curl_httpclient_command.cpp index 760c6d94034c70ca360c7a187f77902735ec292d..88fdfc0fc0d847bd48291fef3b878dd59e79c9df 100644 --- a/tests/automatic/curl_http_client/curl_http_client_command/curl_httpclient_command.cpp +++ b/tests/automatic/curl_http_client/curl_http_client_command/curl_httpclient_command.cpp @@ -9,6 +9,7 @@ struct Args { std::string uri_authorizer; + std::string network_type; std::string uri_fts; std::string folder; }; @@ -19,9 +20,10 @@ Args GetArgs(int argc, char* argv[]) { exit(EXIT_FAILURE); } std::string uri_authorizer{argv[1]}; - std::string uri_fts{argv[2]}; - std::string folder{argv[3]}; - return Args{uri_authorizer, uri_fts, folder}; + std::string network_type{argv[2]}; + std::string uri_fts{argv[3]}; + std::string folder{argv[4]}; + return Args{uri_authorizer, network_type, uri_fts, folder}; } @@ -32,8 +34,10 @@ int main(int argc, char* argv[]) { std::string authorize_request = "{\"Folder\":\"" + args.folder + "\",\"BeamtimeId\":\"aaa\",\"Token\":\"" + token + "\"}"; asapo::Error err; - auto broker = asapo::DataBrokerFactory::CreateServerBroker(args.uri_authorizer, "", true, asapo::SourceCredentials{"", "", "", ""}, &err); + auto broker = asapo::DataBrokerFactory::CreateServerBroker(args.uri_authorizer, "", true, asapo::SourceCredentials{"", "", "", ""}, + args.network_type, &err); auto server_broker = static_cast<asapo::ServerDataBroker*>(broker.get()); + M_AssertEq(nullptr, err); asapo::HttpCode code; std::string response; diff --git a/tests/automatic/full_chain/send_recv_substreams/send_recv_substreams.cpp b/tests/automatic/full_chain/send_recv_substreams/send_recv_substreams.cpp index 333d139fde158448629714a472e9c1ea631c94b1..3b901888d67249aa4f75e2c8f1454c81ee95fe49 100644 --- a/tests/automatic/full_chain/send_recv_substreams/send_recv_substreams.cpp +++ b/tests/automatic/full_chain/send_recv_substreams/send_recv_substreams.cpp @@ -22,6 +22,7 @@ int files_sent; struct Args { std::string server; + std::string network_type; std::string beamtime_id; std::string token; }; @@ -36,7 +37,7 @@ void ProcessAfterSend(asapo::GenericRequestHeader header, asapo::Error err) { BrokerPtr CreateBrokerAndGroup(const Args& args, Error* err) { auto broker = asapo::DataBrokerFactory::CreateServerBroker(args.server, ".", true, - asapo::SourceCredentials{args.beamtime_id, "", "", args.token}, err); + asapo::SourceCredentials{args.beamtime_id, "", "", args.token}, args.network_type, err); if (*err) { return nullptr; } @@ -70,7 +71,7 @@ ProducerPtr CreateProducer(const Args& args) { int main(int argc, char* argv[]) { asapo::ExitAfterPrintVersionIfNeeded("GetNext Broker Example", argc, argv); Args args; - if (argc != 4) { + if (argc != 5) { std::cout << "Usage: " + std::string{argv[0]} + " <server> <beamtime_id> <token>" << @@ -78,8 +79,9 @@ int main(int argc, char* argv[]) { exit(EXIT_FAILURE); } args.server = std::string{argv[1]}; - args.beamtime_id = std::string{argv[2]}; - args.token = std::string{argv[3]}; + args.network_type = std::string{argv[2]}; + args.beamtime_id = std::string{argv[3]}; + args.token = std::string{argv[4]}; auto producer = CreateProducer(args); auto n = 1; @@ -93,6 +95,10 @@ int main(int argc, char* argv[]) { Error err; auto consumer = CreateBrokerAndGroup(args, &err); + if (err) { + std::cout << "Error CreateBrokerAndGroup: " << err << std::endl; + exit(EXIT_FAILURE); + } asapo::FileInfo fi; for (uint64_t i = 0; i < n; i++) { diff --git a/tests/automatic/high_avail/services_restart/CMakeLists.txt b/tests/automatic/high_avail/services_restart/CMakeLists.txt index 5f7029bf9725d9a2c23e59c6556c73f02c3d0a73..145bef8eea3c490a6542bdff10ab239113239433 100644 --- a/tests/automatic/high_avail/services_restart/CMakeLists.txt +++ b/tests/automatic/high_avail/services_restart/CMakeLists.txt @@ -6,4 +6,4 @@ set(TARGET_NAME service_restart) set(RECEIVER_WRITE_TO_DISK false) prepare_asapo() add_script_test("${TARGET_NAME}-all" "$<TARGET_FILE:dummy-data-producer> $<TARGET_FILE:getnext_broker> $<TARGET_PROPERTY:asapo,EXENAME> broker 1000 998" nomem) -add_script_test("${TARGET_NAME}-all-but-broker" "$<TARGET_FILE:dummy-data-producer> $<TARGET_FILE:getnext_broker> $<TARGET_PROPERTY:asapo,EXENAME> receiver 1000 1000" nomem) +add_script_test("${TARGET_NAME}-all-but-broker" "$<TARGET_FILE:dummy-data-producer> $<TARGET_FILE:getnext_broker> $<TARGET_PROPERTY:asapo,EXENAME> receiver_tcp 1000 1000" nomem) diff --git a/tests/manual/performance_broker_receiver/getlast_broker.cpp b/tests/manual/performance_broker_receiver/getlast_broker.cpp index 1adcda25b2a1edee2db8379dfabc6229fa565987..0582d55b697d3a56a09e06bd756eddd410929aa5 100644 --- a/tests/manual/performance_broker_receiver/getlast_broker.cpp +++ b/tests/manual/performance_broker_receiver/getlast_broker.cpp @@ -18,6 +18,7 @@ std::mutex lock; struct Args { std::string server; + std::string network_type; std::string file_path; std::string beamtime_id; std::string token; @@ -48,7 +49,7 @@ std::vector<std::thread> StartThreads(const Args& params, asapo::FileInfo fi; Error err; auto broker = asapo::DataBrokerFactory::CreateServerBroker(params.server, params.file_path, true, - asapo::SourceCredentials{params.beamtime_id, "", "", params.token}, &err); + asapo::SourceCredentials{params.beamtime_id, "", "", params.token}, params.network_type, &err); broker->SetTimeout((uint64_t) params.timeout_ms); asapo::FileData data; @@ -129,22 +130,23 @@ int main(int argc, char* argv[]) { asapo::ExitAfterPrintVersionIfNeeded("GetLast Broker Example", argc, argv); Args params; params.datasets = false; - if (argc != 8 && argc != 9) { + if (argc != 9 && argc != 10) { std::cout << "Usage: " + std::string{argv[0]} - + " <server> <files_path> <run_name> <nthreads> <token> <timeout ms> <metaonly> [use datasets]" + + " <server> <network_type> <files_path> <run_name> <nthreads> <token> <timeout ms> <metaonly> [use datasets]" << std::endl; exit(EXIT_FAILURE); } params.server = std::string{argv[1]}; - params.file_path = std::string{argv[2]}; - params.beamtime_id = std::string{argv[3]}; - params.nthreads = atoi(argv[4]); - params.token = std::string{argv[5]}; - params.timeout_ms = atoi(argv[6]); - params.read_data = atoi(argv[7]) != 1; - if (argc == 9) { - params.datasets = atoi(argv[8]) == 1; + params.network_type = std::string{argv[2]}; + params.file_path = std::string{argv[3]}; + params.beamtime_id = std::string{argv[4]}; + params.nthreads = atoi(argv[5]); + params.token = std::string{argv[6]}; + params.timeout_ms = atoi(argv[7]); + params.read_data = atoi(argv[8]) != 1; + if (argc == 10) { + params.datasets = atoi(argv[9]) == 1; } uint64_t duration_ms; int nerrors, nbuf, nfiles_total;