diff --git a/CHANGELOG.md b/CHANGELOG.md index 4ad0fbbd1894dee6468cb605e076da4c8df56fc3..0a0fcc607fecc90f7c134e56fcfb32def7cea47c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,14 +2,16 @@ FEATURES * introduced substreams for producer/consumer * introduced timeout for producer requests +* producer accepts "auto" for beamtime, will automatically select a current one for a given beamline IMPROVEMENTS * switch to MongoDB 4.2 -* receiver use ASAP3 directory structure to save files to -* API documentation is available at +* receiver use file paths provided during connection authorization structure +* API documentation is available for C++ and Python * switch to using cmake 3.7+ * error messages in Python as Python strings, not byte objects + BUG FIXES * consumer operation timout - take duration of the operation into account * giving warning/error on attempt to send data/metadata with same id \ No newline at end of file diff --git a/common/cpp/include/common/data_structs.h b/common/cpp/include/common/data_structs.h index 459726c01f2d4f49a822d96429e6cd4aae99e7bc..e46b6ec7ae0ff3ec09ba260841c58c45624210ad 100644 --- a/common/cpp/include/common/data_structs.h +++ b/common/cpp/include/common/data_structs.h @@ -49,14 +49,14 @@ using SubDirList = std::vector<std::string>; struct SourceCredentials { - SourceCredentials(std::string beamtime,std::string beamline,std::string stream,std::string token): - beamtime_id{std::move(beamtime)}, - beamline{std::move(beamline)}, - stream{std::move(stream)}, - user_token{std::move(token)} - {}; - SourceCredentials(){}; - static const std::string kDefaultStream; + SourceCredentials(std::string beamtime, std::string beamline, std::string stream, std::string token): + beamtime_id{std::move(beamtime)}, + beamline{std::move(beamline)}, + stream{std::move(stream)}, + user_token{std::move(token)} { + }; + SourceCredentials() {}; + static const std::string kDefaultStream; static const std::string kDefaultBeamline; static const std::string kDefaultBeamtimeId; std::string beamtime_id; @@ -64,7 +64,7 @@ struct SourceCredentials { std::string stream; std::string user_token; std::string GetString() { - return beamtime_id + "%" + beamline+"%"+ stream + "%" + user_token; + return beamtime_id + "%" + beamline + "%" + stream + "%" + user_token; }; }; diff --git a/common/cpp/unittests/request/mocking.h b/common/cpp/unittests/request/mocking.h index 242e9c609ef49f1e10672ab1d4e16519c722a850..fdf7a3c86f2f9326d95c06c230a7a489048593c6 100644 --- a/common/cpp/unittests/request/mocking.h +++ b/common/cpp/unittests/request/mocking.h @@ -15,13 +15,13 @@ class MockRequestHandler : public RequestHandler { MOCK_METHOD0(PrepareProcessingRequestLocked, void()); MOCK_METHOD0(ReadyProcessRequest, bool()); MOCK_METHOD1(TearDownProcessingRequestLocked, void(bool processing_succeeded)); - MOCK_METHOD2(ProcessRequestUnlocked_t, bool (const GenericRequest* request,bool* retry)); + MOCK_METHOD2(ProcessRequestUnlocked_t, bool (const GenericRequest* request, bool* retry)); MOCK_METHOD1(ProcessRequestTimeout, void(GenericRequest* request)); uint64_t retry_counter = 0; bool ProcessRequestUnlocked(GenericRequest* request, bool* retry) override { retry_counter = request->GetRetryCounter(); std::this_thread::sleep_for(std::chrono::milliseconds(50)); - return ProcessRequestUnlocked_t(request,retry); + return ProcessRequestUnlocked_t(request, retry); } diff --git a/common/cpp/unittests/request/test_request_pool.cpp b/common/cpp/unittests/request/test_request_pool.cpp index 0a37536608090a9b9a661537f59acb0cd835b6a5..80894f504c95d616b61181ddc07b44f387839cfd 100644 --- a/common/cpp/unittests/request/test_request_pool.cpp +++ b/common/cpp/unittests/request/test_request_pool.cpp @@ -104,7 +104,7 @@ TEST_F(RequestPoolTests, TimeOut) { EXPECT_CALL(*mock_request_handler, ReadyProcessRequest()).Times(1).WillRepeatedly(Return(true)); EXPECT_CALL(*mock_request_handler, PrepareProcessingRequestLocked()).Times(0); - EXPECT_CALL(*mock_request_handler, ProcessRequestUnlocked_t(_,_)).Times(0); + EXPECT_CALL(*mock_request_handler, ProcessRequestUnlocked_t(_, _)).Times(0); EXPECT_CALL(*mock_request_handler, ProcessRequestTimeout(_)).Times(1); auto err = pool.AddRequest(std::move(request)); @@ -116,21 +116,21 @@ TEST_F(RequestPoolTests, TimeOut) { void ExpectSend(MockRequestHandler* mock_handler, int ntimes = 1) { EXPECT_CALL(*mock_handler, ReadyProcessRequest()).Times(ntimes).WillRepeatedly(Return(true)); EXPECT_CALL(*mock_handler, PrepareProcessingRequestLocked()).Times(ntimes); - EXPECT_CALL(*mock_handler, ProcessRequestUnlocked_t(_,_)).Times(ntimes).WillRepeatedly( + EXPECT_CALL(*mock_handler, ProcessRequestUnlocked_t(_, _)).Times(ntimes).WillRepeatedly( DoAll( testing::SetArgPointee<1>(false), - Return(true) - )); + Return(true) + )); EXPECT_CALL(*mock_handler, TearDownProcessingRequestLocked(true)).Times(ntimes); } void ExpectFailProcessRequest(MockRequestHandler* mock_handler) { EXPECT_CALL(*mock_handler, ReadyProcessRequest()).Times(AtLeast(1)).WillRepeatedly(Return(true)); EXPECT_CALL(*mock_handler, PrepareProcessingRequestLocked()).Times(AtLeast(1)); - EXPECT_CALL(*mock_handler, ProcessRequestUnlocked_t(_,_)).Times(AtLeast(1)).WillRepeatedly( + EXPECT_CALL(*mock_handler, ProcessRequestUnlocked_t(_, _)).Times(AtLeast(1)).WillRepeatedly( DoAll( testing::SetArgPointee<1>(true), Return(false) - )); - EXPECT_CALL(*mock_handler, TearDownProcessingRequestLocked(false)).Times(AtLeast(1)); + )); + EXPECT_CALL(*mock_handler, TearDownProcessingRequestLocked(false)).Times(AtLeast(1)); } diff --git a/consumer/api/cpp/unittests/test_consumer_api.cpp b/consumer/api/cpp/unittests/test_consumer_api.cpp index 24b7c098c8a6f0e3f387750d055d6e398fc0168f..d683a4233d2d5eb9b0260ca48ec8d9a5da4da060 100644 --- a/consumer/api/cpp/unittests/test_consumer_api.cpp +++ b/consumer/api/cpp/unittests/test_consumer_api.cpp @@ -27,7 +27,7 @@ class DataBrokerFactoryTests : public Test { TEST_F(DataBrokerFactoryTests, CreateServerDataSource) { - auto data_broker = DataBrokerFactory::CreateServerBroker("server", "path", asapo::SourceCredentials{"beamtime_id","", "", "token"}, &error); + auto data_broker = DataBrokerFactory::CreateServerBroker("server", "path", asapo::SourceCredentials{"beamtime_id", "", "", "token"}, &error); ASSERT_THAT(error, Eq(nullptr)); ASSERT_THAT(dynamic_cast<ServerDataBroker*>(data_broker.get()), Ne(nullptr)); diff --git a/consumer/api/cpp/unittests/test_server_broker.cpp b/consumer/api/cpp/unittests/test_server_broker.cpp index 43b6559970b858a6f7a22e37a02f727ad7f01a3f..26b9e61ebd8db83c175b4d3eac1ffd8952c12bb6 100644 --- a/consumer/api/cpp/unittests/test_server_broker.cpp +++ b/consumer/api/cpp/unittests/test_server_broker.cpp @@ -74,7 +74,7 @@ class ServerDataBrokerTests : public Test { std::string expected_next_substream = "nextsubstream"; void SetUp() override { data_broker = std::unique_ptr<ServerDataBroker> { - new ServerDataBroker(expected_server_uri, expected_path, asapo::SourceCredentials{"beamtime_id","", expected_stream, expected_token}) + new ServerDataBroker(expected_server_uri, expected_path, asapo::SourceCredentials{"beamtime_id", "", expected_stream, expected_token}) }; data_broker->io__ = std::unique_ptr<IO> {&mock_io}; data_broker->httpclient__ = std::unique_ptr<asapo::HttpClient> {&mock_http_client}; @@ -137,7 +137,7 @@ TEST_F(ServerDataBrokerTests, DefaultStreamIsDetector) { data_broker->httpclient__.release(); data_broker->net_client__.release(); data_broker = std::unique_ptr<ServerDataBroker> { - new ServerDataBroker(expected_server_uri, expected_path, asapo::SourceCredentials{"beamtime_id","", "", expected_token}) + new ServerDataBroker(expected_server_uri, expected_path, asapo::SourceCredentials{"beamtime_id", "", "", expected_token}) }; data_broker->io__ = std::unique_ptr<IO> {&mock_io}; data_broker->httpclient__ = std::unique_ptr<asapo::HttpClient> {&mock_http_client}; diff --git a/examples/consumer/getnext_broker/getnext_broker.cpp b/examples/consumer/getnext_broker/getnext_broker.cpp index a7c8df8f5463afd2a78bf709a00e8a4e8e7d96bc..242c82a898d51e1c2e6cbdebdc4a0a04688502bf 100644 --- a/examples/consumer/getnext_broker/getnext_broker.cpp +++ b/examples/consumer/getnext_broker/getnext_broker.cpp @@ -55,7 +55,7 @@ std::vector<std::thread> StartThreads(const Args& params, asapo::FileInfo fi; Error err; auto broker = asapo::DataBrokerFactory::CreateServerBroker(params.server, params.file_path, - asapo::SourceCredentials{params.beamtime_id, "",params.stream, params.token}, &err); + asapo::SourceCredentials{params.beamtime_id, "", params.stream, params.token}, &err); broker->SetTimeout((uint64_t) params.timeout_ms); asapo::FileData data; diff --git a/examples/pipeline/in_to_out/in_to_out.cpp b/examples/pipeline/in_to_out/in_to_out.cpp index 5153dfeac5244fa4de53a9e4c0a115e22dad347f..d9bd76f51c1e6f5775b0012b3a9f480908123f98 100644 --- a/examples/pipeline/in_to_out/in_to_out.cpp +++ b/examples/pipeline/in_to_out/in_to_out.cpp @@ -65,7 +65,7 @@ int ProcessError(const Error& err) { BrokerPtr CreateBrokerAndGroup(const Args& args, Error* err) { auto broker = asapo::DataBrokerFactory::CreateServerBroker(args.server, args.file_path, - asapo::SourceCredentials{args.beamtime_id, "",args.stream_in, args.token}, err); + asapo::SourceCredentials{args.beamtime_id, "", args.stream_in, args.token}, err); if (*err) { return nullptr; } diff --git a/producer/api/cpp/src/producer_impl.cpp b/producer/api/cpp/src/producer_impl.cpp index 3b7adba6b79a973b5cf89a49aa98a8e854cc2bb1..ff32dbb7f649410ccfc83ca2b709063c8838a3be 100644 --- a/producer/api/cpp/src/producer_impl.cpp +++ b/producer/api/cpp/src/producer_impl.cpp @@ -180,7 +180,8 @@ Error ProducerImpl::SetCredentials(SourceCredentials source_cred) { source_cred.beamtime_id = SourceCredentials::kDefaultBeamtimeId; } - if (source_cred.beamtime_id == SourceCredentials::kDefaultBeamtimeId && source_cred.beamline == SourceCredentials::kDefaultBeamline) { + if (source_cred.beamtime_id == SourceCredentials::kDefaultBeamtimeId + && source_cred.beamline == SourceCredentials::kDefaultBeamline) { log__->Error("beamtime or beamline should be set"); source_cred_string_ = ""; return ProducerErrorTemplates::kWrongInput.Generate("beamtime or beamline should be set"); diff --git a/producer/api/cpp/src/request_handler_tcp.cpp b/producer/api/cpp/src/request_handler_tcp.cpp index 7e3e7e5efc4f33cf41803c7edf36bfd954ef2c4d..5a898116c888ce78a912095021c3e396e4eba0ae 100644 --- a/producer/api/cpp/src/request_handler_tcp.cpp +++ b/producer/api/cpp/src/request_handler_tcp.cpp @@ -101,7 +101,7 @@ Error RequestHandlerTcp::ReceiveResponse(const GenericRequestHeader& request_hea } case kNetErrorReauthorize: { auto res_err = ProducerErrorTemplates::kReAuthorizationNeeded.Generate(); - return res_err; + return res_err; } case kNetErrorNoError : return nullptr; @@ -211,7 +211,7 @@ bool RequestHandlerTcp::ProcessErrorFromReceiver(const Error& error, } -void RequestHandlerTcp::ProcessRequestCallback(Error err, ProducerRequest* request,bool* retry) { +void RequestHandlerTcp::ProcessRequestCallback(Error err, ProducerRequest* request, bool* retry) { if (request->callback) { request->callback(request->header, std::move(err)); } @@ -224,7 +224,7 @@ bool RequestHandlerTcp::SendDataToOneOfTheReceivers(ProducerRequest* request, bo if (Disconnected()) { auto err = ConnectToReceiver(request->source_credentials, receiver_uri); if (err == ProducerErrorTemplates::kWrongInput) { - ProcessRequestCallback(std::move(err),request,retry); + ProcessRequestCallback(std::move(err), request, retry); return false; } else { if (err != nullptr ) continue; @@ -238,7 +238,7 @@ bool RequestHandlerTcp::SendDataToOneOfTheReceivers(ProducerRequest* request, bo } bool success = err && err != ProducerErrorTemplates::kServerWarning ? false : true; - ProcessRequestCallback(std::move(err),request,retry); + ProcessRequestCallback(std::move(err), request, retry); return success; } log__->Warning("put back to the queue, request opcode: " + std::to_string(request->header.op_code) + diff --git a/producer/api/cpp/unittests/test_producer.cpp b/producer/api/cpp/unittests/test_producer.cpp index 77308974b02cacaa86d7d7b130f41df9db625ae0..9e82be03ed1e4dbc8ee3ea52da6303bebf53e904 100644 --- a/producer/api/cpp/unittests/test_producer.cpp +++ b/producer/api/cpp/unittests/test_producer.cpp @@ -15,7 +15,7 @@ namespace { TEST(CreateProducer, TcpProducer) { asapo::Error err; std::unique_ptr<asapo::Producer> producer = asapo::Producer::Create("endpoint", 4, asapo::RequestHandlerType::kTcp, - SourceCredentials{"bt", "","", ""}, 3600, &err); + SourceCredentials{"bt", "", "", ""}, 3600, &err); ASSERT_THAT(dynamic_cast<asapo::ProducerImpl*>(producer.get()), Ne(nullptr)); ASSERT_THAT(err, Eq(nullptr)); } @@ -24,16 +24,16 @@ TEST(CreateProducer, ErrorBeamtime) { asapo::Error err; std::string expected_beamtimeid(asapo::kMaxMessageSize * 10, 'a'); std::unique_ptr<asapo::Producer> producer = asapo::Producer::Create("endpoint", 4, asapo::RequestHandlerType::kTcp, - SourceCredentials{expected_beamtimeid,"", "", ""}, 3600, &err); + SourceCredentials{expected_beamtimeid, "", "", ""}, 3600, &err); ASSERT_THAT(producer, Eq(nullptr)); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kWrongInput)); } TEST(CreateProducer, ErrorOnBothAutoBeamlineBeamtime) { - asapo::SourceCredentials creds{"auto", "auto","subname", "token"}; + asapo::SourceCredentials creds{"auto", "auto", "subname", "token"}; asapo::Error err; std::unique_ptr<asapo::Producer> producer = asapo::Producer::Create("endpoint", 4, asapo::RequestHandlerType::kTcp, - creds, 3600, &err); + creds, 3600, &err); ASSERT_THAT(producer, Eq(nullptr)); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kWrongInput)); } @@ -41,7 +41,7 @@ TEST(CreateProducer, ErrorOnBothAutoBeamlineBeamtime) { TEST(CreateProducer, TooManyThreads) { asapo::Error err; std::unique_ptr<asapo::Producer> producer = asapo::Producer::Create("", asapo::kMaxProcessingThreads + 1, - asapo::RequestHandlerType::kTcp, SourceCredentials{"bt","", "", ""}, 3600, &err); + asapo::RequestHandlerType::kTcp, SourceCredentials{"bt", "", "", ""}, 3600, &err); ASSERT_THAT(producer, Eq(nullptr)); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kWrongInput)); } @@ -50,7 +50,7 @@ TEST(CreateProducer, TooManyThreads) { TEST(CreateProducer, ZeroThreads) { asapo::Error err; std::unique_ptr<asapo::Producer> producer = asapo::Producer::Create("", 0, - asapo::RequestHandlerType::kTcp, SourceCredentials{"bt", "","", ""}, 3600, &err); + asapo::RequestHandlerType::kTcp, SourceCredentials{"bt", "", "", ""}, 3600, &err); ASSERT_THAT(producer, Eq(nullptr)); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kWrongInput)); } @@ -59,7 +59,7 @@ TEST(CreateProducer, ZeroThreads) { TEST(Producer, SimpleWorkflowWihoutConnection) { asapo::Error err; std::unique_ptr<asapo::Producer> producer = asapo::Producer::Create("hello", 5, asapo::RequestHandlerType::kTcp, - SourceCredentials{"bt", "","", ""}, 3600, + SourceCredentials{"bt", "", "", ""}, 3600, &err); asapo::EventHeader event_header{1, 1, "test"}; diff --git a/producer/api/cpp/unittests/test_producer_impl.cpp b/producer/api/cpp/unittests/test_producer_impl.cpp index eb989ffa1cabaf1139551cf7a294e8ba36056e07..1442d96d04f1d469a36c0d41a6ef24ad19aa0cd2 100644 --- a/producer/api/cpp/unittests/test_producer_impl.cpp +++ b/producer/api/cpp/unittests/test_producer_impl.cpp @@ -72,10 +72,10 @@ class ProducerImplTests : public testing::Test { char expected_substream[asapo::kMaxMessageSize] = "test_substream"; std::string expected_next_substream = "next_substream"; - asapo::SourceCredentials expected_credentials{"beamtime_id", "beamline","subname", "token" - }; + asapo::SourceCredentials expected_credentials{"beamtime_id", "beamline", "subname", "token" + }; asapo::SourceCredentials expected_default_credentials{ - "beamtime_id", "","", "token" + "beamtime_id", "", "", "token" }; std::string expected_credentials_str = "beamtime_id%beamline%subname%token"; @@ -386,7 +386,7 @@ TEST_F(ProducerImplTests, OKSendingSendFileRequestWithSubstream) { TEST_F(ProducerImplTests, ErrorSettingBeamtime) { std::string long_str(asapo::kMaxMessageSize * 10, 'a'); - expected_credentials = asapo::SourceCredentials{long_str,"", "", ""}; + expected_credentials = asapo::SourceCredentials{long_str, "", "", ""}; EXPECT_CALL(mock_logger, Error(testing::HasSubstr("too long"))); auto err = producer.SetCredentials(expected_credentials); @@ -397,8 +397,8 @@ TEST_F(ProducerImplTests, ErrorSettingBeamtime) { TEST_F(ProducerImplTests, ErrorSettingSecondTime) { EXPECT_CALL(mock_logger, Error(testing::HasSubstr("already"))); - producer.SetCredentials(asapo::SourceCredentials{"1", "","2", "3"}); - auto err = producer.SetCredentials(asapo::SourceCredentials{"4","", "5", "6"}); + producer.SetCredentials(asapo::SourceCredentials{"1", "", "2", "3"}); + auto err = producer.SetCredentials(asapo::SourceCredentials{"4", "", "5", "6"}); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kWrongInput)); } diff --git a/producer/api/cpp/unittests/test_request_handler_tcp.cpp b/producer/api/cpp/unittests/test_request_handler_tcp.cpp index 2a32c38c2dfc85c42fc2643ebe28ff5b22d4f6a6..b8f480184955b5c1c49ab2dc161e7b63fd3e98d8 100644 --- a/producer/api/cpp/unittests/test_request_handler_tcp.cpp +++ b/producer/api/cpp/unittests/test_request_handler_tcp.cpp @@ -166,10 +166,10 @@ void RequestHandlerTcpTests::ExpectFailConnect(bool only_once) { )); if (only_once) EXPECT_CALL(mock_logger, Debug(AllOf( - HasSubstr("cannot connect"), - HasSubstr(expected_address) - ) - )); + HasSubstr("cannot connect"), + HasSubstr(expected_address) + ) + )); if (only_once) break; } @@ -198,20 +198,19 @@ void RequestHandlerTcpTests::ExpectFailAuthorize(bool only_once) { testing::ReturnArg<2>() )); EXPECT_CALL(mock_io, CloseSocket_t(expected_sd, _)); - if (only_once) - { + if (only_once) { EXPECT_CALL(mock_logger, Debug(AllOf( - HasSubstr("disconnected"), - HasSubstr(receivers_list[i]) - ) - )); + HasSubstr("disconnected"), + HasSubstr(receivers_list[i]) + ) + )); - EXPECT_CALL(mock_logger, Error(AllOf( - HasSubstr("authorization"), - HasSubstr(expected_auth_message), - HasSubstr(receivers_list[i]) - ) - )); + EXPECT_CALL(mock_logger, Error(AllOf( + HasSubstr("authorization"), + HasSubstr(expected_auth_message), + HasSubstr(receivers_list[i]) + ) + )); } if (only_once) break; i++; @@ -239,13 +238,12 @@ void RequestHandlerTcpTests::ExpectOKAuthorize(bool only_once) { A_WriteSendDataResponse(asapo::kNetErrorNoError, expected_auth_message), testing::ReturnArg<2>() )); - if (only_once) - { + if (only_once) { EXPECT_CALL(mock_logger, Info(AllOf( - HasSubstr("authorized"), - HasSubstr(receivers_list[i]) - ) - )); + HasSubstr("authorized"), + HasSubstr(receivers_list[i]) + ) + )); } if (only_once) break; i++; @@ -268,16 +266,16 @@ void RequestHandlerTcpTests::ExpectFailSendHeader(bool only_once) { )); if (only_once) { EXPECT_CALL(mock_logger, Debug(AllOf( - HasSubstr("disconnected"), - HasSubstr(receivers_list[i]) + HasSubstr("disconnected"), + HasSubstr(receivers_list[i]) ) - )); + )); EXPECT_CALL(mock_logger, Warning(AllOf( - HasSubstr("cannot send"), - HasSubstr(receivers_list[i]) + HasSubstr("cannot send"), + HasSubstr(receivers_list[i]) ) - )); + )); } EXPECT_CALL(mock_io, CloseSocket_t(expected_sd, _)); if (only_once) break; @@ -300,10 +298,10 @@ void RequestHandlerTcpTests::ExpectFailSendFile(const asapo::ProducerErrorTempla if (client_error) { EXPECT_CALL(mock_logger, Debug(AllOf( - HasSubstr("disconnected"), - HasSubstr(receivers_list[i]) - ) - )); + HasSubstr("disconnected"), + HasSubstr(receivers_list[i]) + ) + )); EXPECT_CALL(mock_logger, Error(AllOf( HasSubstr("cannot send"), HasSubstr(receivers_list[i]) ) @@ -334,16 +332,16 @@ void RequestHandlerTcpTests::ExpectFailSend(uint64_t expected_size, bool only_on )); if (only_once) { EXPECT_CALL(mock_logger, Debug(AllOf( - HasSubstr("disconnected"), - HasSubstr(receivers_list[i]) + HasSubstr("disconnected"), + HasSubstr(receivers_list[i]) ) - )); + )); EXPECT_CALL(mock_logger, Warning(AllOf( - HasSubstr("cannot send"), - HasSubstr(receivers_list[i]) + HasSubstr("cannot send"), + HasSubstr(receivers_list[i]) ) - )); + )); } EXPECT_CALL(mock_io, CloseSocket_t(expected_sd, _)); @@ -459,10 +457,10 @@ void RequestHandlerTcpTests::ExpectOKConnect(bool only_once) { )); if (only_once) { EXPECT_CALL(mock_logger, Debug(AllOf( - HasSubstr("connected to"), - HasSubstr(expected_address) + HasSubstr("connected to"), + HasSubstr(expected_address) ) - )); + )); } if (only_once) break; i++; @@ -483,10 +481,10 @@ void RequestHandlerTcpTests::ExpectOKReceive(bool only_once, asapo::NetworkError )); if (only_once) { EXPECT_CALL(mock_logger, Debug(AllOf( - HasSubstr("sent data"), - HasSubstr(receivers_list[i]) + HasSubstr("sent data"), + HasSubstr(receivers_list[i]) ) - )); + )); } if (only_once) break; i++; @@ -815,7 +813,7 @@ TEST_F(RequestHandlerTcpTests, RetryOnReauthorize) { ExpectOKConnect(false); ExpectOKAuthorize(false); ExpectOKSendAll(false); - ExpectOKReceive(false,asapo::kNetErrorReauthorize); + ExpectOKReceive(false, asapo::kNetErrorReauthorize); request_handler.PrepareProcessingRequestLocked(); auto success = request_handler.ProcessRequestUnlocked(&request, &retry); diff --git a/producer/api/python/asapo_producer.pyx.in b/producer/api/python/asapo_producer.pyx.in index 6a5421ce23148bda3a3fe852b1d20a134679d55b..80b43bc3bbc641f88ed3ceb8adf808aef4c64142 100644 --- a/producer/api/python/asapo_producer.pyx.in +++ b/producer/api/python/asapo_producer.pyx.in @@ -295,8 +295,10 @@ def create_producer(endpoint,beamtime_id,beamline,stream,token,nthreads,timeout_ """ :param endpoint: server endpoint (url:port) :type endpoint: string - :param beamtime_id: beamtime id + :param beamtime_id: beamtime id, can be "auto" if beamline is given, will automatically select the current beamtime id :type beamtime_id: string + :param beamline: beamline name, can be "auto" if beamtime_id is given + :type beamline: string :param stream: stream to producer data to :type stream: string :param token: authorization token diff --git a/receiver/src/request_handler_authorize.cpp b/receiver/src/request_handler_authorize.cpp index c36683a76fc29260017313a842829f84172c97ae..d158882d38756fe96e90f281f98db7ac24f4023f 100644 --- a/receiver/src/request_handler_authorize.cpp +++ b/receiver/src/request_handler_authorize.cpp @@ -76,7 +76,7 @@ Error RequestHandlerAuthorize::ProcessReAuthorization(Request* request) const { std::string old_beamtimeId = beamtime_id_; auto err = Authorize(request, cached_source_credentials_.c_str()); if (err == asapo::ReceiverErrorTemplates::kAuthorizationFailure || ( - err==nullptr && old_beamtimeId!=beamtime_id_)) { + err == nullptr && old_beamtimeId != beamtime_id_)) { return asapo::ReceiverErrorTemplates::kReAuthorizationFailure.Generate(); } return err; @@ -84,7 +84,7 @@ Error RequestHandlerAuthorize::ProcessReAuthorization(Request* request) const { bool RequestHandlerAuthorize::NeedReauthorize() const { uint64_t elapsed_ms = (uint64_t) std::chrono::duration_cast<std::chrono::milliseconds> - (system_clock::now() - last_updated_).count(); + (system_clock::now() - last_updated_).count(); return elapsed_ms >= GetReceiverConfig()->authorization_interval_ms; } diff --git a/receiver/src/request_handler_file_process.cpp b/receiver/src/request_handler_file_process.cpp index e97d2f2c1b866e6e59452a21f1fbc24e6bee7f4e..e12e42b3df844a3ababe0e8de457e646463e5fc9 100644 --- a/receiver/src/request_handler_file_process.cpp +++ b/receiver/src/request_handler_file_process.cpp @@ -21,7 +21,7 @@ Error RequestHandlerFileProcess::ProcessFileExistSituation(Request* request) con auto err_duplicate = request->CheckForDuplicates(); if (err_duplicate == nullptr) { request->SetWarningMessage("file has been overwritten"); - log__->Warning(std::string("overwriting file " )+ request->GetOfflinePath()+kPathSeparator+request->GetFileName()); + log__->Warning(std::string("overwriting file " ) + request->GetOfflinePath() + kPathSeparator + request->GetFileName()); return file_processor_->ProcessFile(request, true); } diff --git a/receiver/unittests/test_request_handler_authorizer.cpp b/receiver/unittests/test_request_handler_authorizer.cpp index c66100cfab1a868ce74aa07e21a1c3db128de5cf..5f3caa0be9192165520252a6eaf33a6c3a0c06e8 100644 --- a/receiver/unittests/test_request_handler_authorizer.cpp +++ b/receiver/unittests/test_request_handler_authorizer.cpp @@ -75,10 +75,10 @@ class AuthorizerHandlerTests : public Test { void MockRequestData(); void SetUp() override { GenericRequestHeader request_header; - expected_source_credentials = expected_beamtime_id+"%stream%token"; + expected_source_credentials = expected_beamtime_id + "%stream%token"; expect_request_string = std::string("{\"SourceCredentials\":\"") + expected_source_credentials + - "\",\"OriginHost\":\"" + - expected_producer_uri + "\"}"; + "\",\"OriginHost\":\"" + + expected_producer_uri + "\"}"; mock_request.reset(new MockRequest{request_header, 1, expected_producer_uri, nullptr}); handler.http_client__ = std::unique_ptr<asapo::HttpClient> {&mock_http_client}; @@ -144,7 +144,7 @@ class AuthorizerHandlerTests : public Test { MockAuthRequest(error, code); return handler.ProcessRequest(mock_request.get()); } - Error MockRequestAuthorization(bool error, HttpCode code = HttpCode::OK,bool set_request=true) { + Error MockRequestAuthorization(bool error, HttpCode code = HttpCode::OK, bool set_request = true) { EXPECT_CALL(*mock_request, GetOpCode()) .WillOnce(Return(asapo::kOpcodeTransferData)) ; @@ -243,7 +243,7 @@ TEST_F(AuthorizerHandlerTests, RequestAuthorizeReturnsDifferentBeamtimeId) { MockFirstAuthorization(false); expected_beamtime_id = "different_id"; - auto err = MockRequestAuthorization(false,HttpCode::OK,false); + auto err = MockRequestAuthorization(false, HttpCode::OK, false); ASSERT_THAT(err, Eq(asapo::ReceiverErrorTemplates::kReAuthorizationFailure)); } diff --git a/receiver/unittests/test_request_handler_file_process.cpp b/receiver/unittests/test_request_handler_file_process.cpp index 33960ccac8703a2f761f9e6834243513d0c56606..121c0bc764d64a2b262997c1811c120731523353 100644 --- a/receiver/unittests/test_request_handler_file_process.cpp +++ b/receiver/unittests/test_request_handler_file_process.cpp @@ -88,10 +88,10 @@ TEST_F(FileWriteHandlerTests, FileAlreadyExists_NoRecordInDb) { ); std::string ref_str; EXPECT_CALL(*mock_request, GetOfflinePath()).WillOnce - (ReturnRef(ref_str)); + (ReturnRef(ref_str)); EXPECT_CALL(*mock_request, GetFileName()).WillOnce - (Return("")); + (Return("")); EXPECT_CALL(mock_logger, Warning(HasSubstr("overwriting"))); diff --git a/receiver/unittests/test_requests_dispatcher.cpp b/receiver/unittests/test_requests_dispatcher.cpp index 82fb8e6bdb1abd107a0582d34830fd58588f2698..5b887b046e264f1a549221d18cae26a04414bb8e 100644 --- a/receiver/unittests/test_requests_dispatcher.cpp +++ b/receiver/unittests/test_requests_dispatcher.cpp @@ -154,7 +154,7 @@ class RequestsDispatcherTests : public Test { EXPECT_CALL(mock_logger, Debug(AllOf(HasSubstr("processing request"), HasSubstr(connected_uri)))); EXPECT_CALL(mock_request, Handle_t()).WillOnce( - Return(error_mode>0 ? err.release() : nullptr) + Return(error_mode > 0 ? err.release() : nullptr) ); if (error_mode == 1) { EXPECT_CALL(mock_logger, Error(AllOf(HasSubstr("error processing request from"), HasSubstr(connected_uri)))); diff --git a/tests/automatic/consumer/consumer_api/consumer_api.cpp b/tests/automatic/consumer/consumer_api/consumer_api.cpp index 8dccf3da1a8b713c7c35b1e04de4a910ad2fd900..2a85a1d351632ccba6ebf6ae131c2b589c172fb6 100644 --- a/tests/automatic/consumer/consumer_api/consumer_api.cpp +++ b/tests/automatic/consumer/consumer_api/consumer_api.cpp @@ -187,7 +187,7 @@ void TestDataset(const std::unique_ptr<asapo::DataBroker>& broker, const std::st void TestAll(const Args& args) { asapo::Error err; auto broker = asapo::DataBrokerFactory::CreateServerBroker(args.server, ".", - asapo::SourceCredentials{args.run_name, "","", args.token}, &err); + asapo::SourceCredentials{args.run_name, "", "", args.token}, &err); broker->SetTimeout(100); auto group_id = broker->GenerateNewGroupId(&err); diff --git a/tests/automatic/consumer/next_multithread_broker/next_multithread_broker.cpp b/tests/automatic/consumer/next_multithread_broker/next_multithread_broker.cpp index 515b591eb7ecdb96b938405ea07fc4ec1a718498..3f06dda46b641e892bc57064e1d506ee7fde8a59 100644 --- a/tests/automatic/consumer/next_multithread_broker/next_multithread_broker.cpp +++ b/tests/automatic/consumer/next_multithread_broker/next_multithread_broker.cpp @@ -49,7 +49,7 @@ Args GetArgs(int argc, char* argv[]) { void TestAll(const Args& args) { asapo::Error err; - auto broker = asapo::DataBrokerFactory::CreateServerBroker(args.server, "dummy", asapo::SourceCredentials{args.run_name,"", "", args.token}, &err); + auto broker = asapo::DataBrokerFactory::CreateServerBroker(args.server, "dummy", asapo::SourceCredentials{args.run_name, "", "", args.token}, &err); auto group_id = broker->GenerateNewGroupId(&err); broker->SetTimeout(10000); std::vector<asapo::FileInfos>file_infos(args.nthreads); diff --git a/tests/automatic/curl_http_client/curl_http_client_command/curl_httpclient_command.cpp b/tests/automatic/curl_http_client/curl_http_client_command/curl_httpclient_command.cpp index ea34c81ae8b2462d22a4755e78f5c3e914a5418e..49efa4afae83f9f87a6a00f93e10a697126ed803 100644 --- a/tests/automatic/curl_http_client/curl_http_client_command/curl_httpclient_command.cpp +++ b/tests/automatic/curl_http_client/curl_http_client_command/curl_httpclient_command.cpp @@ -32,7 +32,7 @@ int main(int argc, char* argv[]) { auto args = GetArgs(argc, argv); asapo::Error err; - auto broker = asapo::DataBrokerFactory::CreateServerBroker(args.uri, "", asapo::SourceCredentials{"", "","", ""}, &err); + auto broker = asapo::DataBrokerFactory::CreateServerBroker(args.uri, "", asapo::SourceCredentials{"", "", "", ""}, &err); auto server_broker = static_cast<asapo::ServerDataBroker*>(broker.get()); asapo::HttpCode code; diff --git a/tests/automatic/full_chain/send_recv_substreams/send_recv_substreams.cpp b/tests/automatic/full_chain/send_recv_substreams/send_recv_substreams.cpp index 500b1d261dd8d6f22fbc98acca72798ea491d76c..34a1623ad0c80f60e60b985f9aacfd4f53aa3895 100644 --- a/tests/automatic/full_chain/send_recv_substreams/send_recv_substreams.cpp +++ b/tests/automatic/full_chain/send_recv_substreams/send_recv_substreams.cpp @@ -36,7 +36,7 @@ void ProcessAfterSend(asapo::GenericRequestHeader header, asapo::Error err) { BrokerPtr CreateBrokerAndGroup(const Args& args, Error* err) { auto broker = asapo::DataBrokerFactory::CreateServerBroker(args.server, ".", - asapo::SourceCredentials{args.beamtime_id,"", "", args.token}, err); + asapo::SourceCredentials{args.beamtime_id, "", "", args.token}, err); if (*err) { return nullptr; } @@ -56,7 +56,7 @@ ProducerPtr CreateProducer(const Args& args) { asapo::Error err; auto producer = asapo::Producer::Create(args.server, 1, asapo::RequestHandlerType::kTcp, - asapo::SourceCredentials{args.beamtime_id,"", "", args.token }, 60, &err); + asapo::SourceCredentials{args.beamtime_id, "", "", args.token }, 60, &err); if(err) { std::cerr << "Cannot start producer. ProducerError: " << err << std::endl; exit(EXIT_FAILURE); diff --git a/tests/automatic/producer/beamtime_metadata/beamtime_metadata.cpp b/tests/automatic/producer/beamtime_metadata/beamtime_metadata.cpp index 0817a9d840adf258e0e34cac1bc5d4d174c7198c..d3f31d010d0b3f98af8cdc4575eb6a784a2b03a0 100644 --- a/tests/automatic/producer/beamtime_metadata/beamtime_metadata.cpp +++ b/tests/automatic/producer/beamtime_metadata/beamtime_metadata.cpp @@ -69,7 +69,7 @@ std::unique_ptr<asapo::Producer> CreateProducer(const Args& args) { auto producer = asapo::Producer::Create(args.receiver_address, 1, args.mode == 0 ? asapo::RequestHandlerType::kTcp : asapo::RequestHandlerType::kFilesystem, - asapo::SourceCredentials{args.beamtime_id, "","", ""}, 60, &err); + asapo::SourceCredentials{args.beamtime_id, "", "", ""}, 60, &err); if (err) { std::cerr << "Cannot start producer. ProducerError: " << err << std::endl; exit(EXIT_FAILURE); diff --git a/tests/manual/performance_broker_receiver/getlast_broker.cpp b/tests/manual/performance_broker_receiver/getlast_broker.cpp index adebed571edffbdd6ee3f989929a39cf73f3783f..4272ae17c239cd8a7139d03835619af80761046a 100644 --- a/tests/manual/performance_broker_receiver/getlast_broker.cpp +++ b/tests/manual/performance_broker_receiver/getlast_broker.cpp @@ -48,7 +48,7 @@ std::vector<std::thread> StartThreads(const Args& params, asapo::FileInfo fi; Error err; auto broker = asapo::DataBrokerFactory::CreateServerBroker(params.server, params.file_path, - asapo::SourceCredentials{params.beamtime_id, "","", params.token}, &err); + asapo::SourceCredentials{params.beamtime_id, "", "", params.token}, &err); broker->SetTimeout((uint64_t) params.timeout_ms); asapo::FileData data;