From d1564b4906fe282207c1dd62733c4edf86e58359 Mon Sep 17 00:00:00 2001 From: Sergey Yakubov <sergey.yakubov@desy.de> Date: Tue, 15 Feb 2022 11:47:48 +0100 Subject: [PATCH] update clients --- .../src/asapo_authorizer/server/authorize.go | 5 + .../asapo_broker/server/process_request.go | 2 - .../cpp/include/asapo/common/data_structs.h | 381 +++++++++--------- .../data_structs/test_data_structs.cpp | 17 +- .../api/cpp/include/asapo/consumer/consumer.h | 4 +- consumer/api/cpp/src/consumer_c_glue.cpp | 4 +- consumer/api/cpp/src/consumer_impl.cpp | 8 +- consumer/api/cpp/src/consumer_impl.h | 4 +- .../api/cpp/unittests/test_consumer_impl.cpp | 22 +- consumer/api/python/asapo_consumer.pxd | 2 +- consumer/api/python/asapo_consumer.pyx.in | 4 +- .../dummy_data_producer.cpp | 1 - producer/api/cpp/src/producer_request.cpp | 1 - producer/api/cpp/src/request_handler_tcp.cpp | 3 + .../unittests/test_request_handler_tcp.cpp | 1 + producer/api/python/asapo_producer.pxd | 2 +- producer/api/python/asapo_producer.pyx.in | 7 - tests/automatic/support/getnext/getnext.cpp | 2 +- 18 files changed, 226 insertions(+), 244 deletions(-) diff --git a/authorizer/src/asapo_authorizer/server/authorize.go b/authorizer/src/asapo_authorizer/server/authorize.go index 45e3dcc4e..42927d0a0 100644 --- a/authorizer/src/asapo_authorizer/server/authorize.go +++ b/authorizer/src/asapo_authorizer/server/authorize.go @@ -62,6 +62,11 @@ func getSourceCredentials(request authorizationRequest) (SourceCredentials, erro creds.BeamtimeId = "auto" } + log.WithFields(map[string]interface{}{ + "creds": request.SourceCredentials, + }).Debug("received credentials") + + if creds.InstanceId == "auto" || creds.PipelineStep == "auto" { return SourceCredentials{}, errors.New("InstanceId and PipelineStep must be already set on client side") } diff --git a/broker/src/asapo_broker/server/process_request.go b/broker/src/asapo_broker/server/process_request.go index f27e6227e..76f85640b 100644 --- a/broker/src/asapo_broker/server/process_request.go +++ b/broker/src/asapo_broker/server/process_request.go @@ -116,9 +116,7 @@ func processRequest(w http.ResponseWriter, r *http.Request, op string, extra_par monitoring.SendBrokerRequest( consumerInstanceId, pipelineStepId, - op, - beamtimeId, datasource, stream, diff --git a/common/cpp/include/asapo/common/data_structs.h b/common/cpp/include/asapo/common/data_structs.h index 172e6d88a..7c06ccc69 100644 --- a/common/cpp/include/asapo/common/data_structs.h +++ b/common/cpp/include/asapo/common/data_structs.h @@ -9,6 +9,9 @@ #include "error.h" +#include "asapo/preprocessor/deprecated.h" + + namespace asapo { const std::string kFinishStreamKeyword = "asapo_finish_stream"; @@ -22,45 +25,44 @@ std::chrono::system_clock::time_point TimePointfromNanosec(uint64_t nanoseconds_ std::string IsoDateFromEpochNanosecs(uint64_t time_from_epoch_nanosec); uint64_t NanosecsEpochFromISODate(std::string date_time); -std::string HostFromUri(const std::string& uri); - +std::string HostFromUri(const std::string &uri); -bool TimeFromJson(const JsonStringParser& parser, const std::string& name, std::chrono::system_clock::time_point* val); +bool TimeFromJson(const JsonStringParser &parser, const std::string &name, std::chrono::system_clock::time_point *val); class MessageMeta { - public: - std::string name; - std::chrono::system_clock::time_point timestamp; - uint64_t size{0}; - uint64_t id{0}; - std::string source; - std::string metadata; - uint64_t buf_id{0}; - std::string stream; // might be "unknownStream" for older datasets - uint64_t dataset_substream{0}; - std::string Json() const; - bool SetFromJson(const std::string& json_string); - std::string FullName(const std::string& base_path) const; + public: + std::string name; + std::chrono::system_clock::time_point timestamp; + uint64_t size{0}; + uint64_t id{0}; + std::string source; + std::string metadata; + uint64_t buf_id{0}; + std::string stream; // might be "unknownStream" for older datasets + uint64_t dataset_substream{0}; + std::string Json() const; + bool SetFromJson(const std::string &json_string); + std::string FullName(const std::string &base_path) const; }; struct StreamInfo { - uint64_t last_id{0}; - std::string name; - bool finished{false}; - std::string next_stream; - std::chrono::system_clock::time_point timestamp_created; - std::chrono::system_clock::time_point timestamp_lastentry; - std::string Json() const; - bool SetFromJson(const std::string& json_string); + uint64_t last_id{0}; + std::string name; + bool finished{false}; + std::string next_stream; + std::chrono::system_clock::time_point timestamp_created; + std::chrono::system_clock::time_point timestamp_lastentry; + std::string Json() const; + bool SetFromJson(const std::string &json_string); }; using StreamInfos = std::vector<StreamInfo>; -inline bool operator==(const MessageMeta& lhs, const MessageMeta& rhs) { +inline bool operator==(const MessageMeta &lhs, const MessageMeta &rhs) { return (lhs.name == rhs.name && - lhs.id == rhs.id && - lhs.timestamp == rhs.timestamp && - lhs.size == rhs.size); + lhs.id == rhs.id && + lhs.timestamp == rhs.timestamp && + lhs.size == rhs.size); } using MessageData = std::unique_ptr<uint8_t[]>; @@ -71,194 +73,207 @@ using MessageMetas = std::vector<MessageMeta>; using IdList = std::vector<uint64_t>; struct DataSet { - uint64_t id; - uint64_t expected_size; - MessageMetas content; - bool SetFromJson(const std::string& json_string); + uint64_t id; + uint64_t expected_size; + MessageMetas content; + bool SetFromJson(const std::string &json_string); }; using SubDirList = std::vector<std::string>; enum class SourceType { - kProcessed, - kRaw + kProcessed, + kRaw }; -Error GetSourceTypeFromString(std::string stype, SourceType* type); +Error GetSourceTypeFromString(std::string stype, SourceType *type); std::string GetStringFromSourceType(SourceType type); struct SourceCredentials { - SourceCredentials(SourceType type, - std::string instanceId, - std::string pipelineStep, - std::string beamtimeId, - std::string beamline, - std::string data_source, - std::string token) : - instance_id{std::move(instanceId)}, - pipeline_step{std::move(pipelineStep)}, - beamtime_id{std::move(beamtimeId)}, - beamline{std::move(beamline)}, - data_source{std::move(data_source)}, - user_token{std::move(token)}, - type{type} {}; - SourceCredentials() {}; - static const std::string kDefaultInstanceId; - static const std::string kDefaultPipelineStep; - static const std::string kDefaultDataSource; - static const std::string kDefaultBeamline; - static const std::string kDefaultBeamtimeId; - std::string instance_id; - std::string pipeline_step; - std::string beamtime_id; - std::string beamline; - std::string data_source; - std::string user_token; - SourceType type = SourceType::kProcessed; - std::string GetString() { - return (type == SourceType::kRaw ? std::string("raw") : std::string("processed")) - + "%" + instance_id + "%" + pipeline_step - + "%" + beamtime_id + "%" + beamline + "%" + data_source + "%" + user_token; - }; + ASAPO_DEPRECATED("obsolates 01.03.2023, use SourceCredentials with instanceId/pipelineStep instead") SourceCredentials(SourceType type, + std::string beamtimeId, + std::string beamline, + std::string data_source, + std::string token) : + instance_id{kDefaultInstanceId}, + pipeline_step{kDefaultPipelineStep}, + beamtime_id{std::move(beamtimeId)}, + beamline{std::move(beamline)}, + data_source{std::move(data_source)}, + user_token{std::move(token)}, + type{type} {}; + SourceCredentials(SourceType type, + std::string instanceId, + std::string pipelineStep, + std::string beamtimeId, + std::string beamline, + std::string data_source, + std::string token) : + instance_id{std::move(instanceId)}, + pipeline_step{std::move(pipelineStep)}, + beamtime_id{std::move(beamtimeId)}, + beamline{std::move(beamline)}, + data_source{std::move(data_source)}, + user_token{std::move(token)}, + type{type} {}; + SourceCredentials() {}; + static const std::string kDefaultInstanceId; + static const std::string kDefaultPipelineStep; + static const std::string kDefaultDataSource; + static const std::string kDefaultBeamline; + static const std::string kDefaultBeamtimeId; + std::string instance_id; + std::string pipeline_step; + std::string beamtime_id; + std::string beamline; + std::string data_source; + std::string user_token; + SourceType type = SourceType::kProcessed; + std::string GetString() { + return (type == SourceType::kRaw ? std::string("raw") : std::string("processed")) + + "%" + instance_id + "%" + pipeline_step + + "%" + beamtime_id + "%" + beamline + "%" + data_source + "%" + user_token; + }; }; struct DeleteStreamOptions { - private: - enum DeleteStreamFlags : uint64_t { - kDeleteMeta = 1 << 0, - kErrorOnNotFound = 1 << 1, - }; - public: - DeleteStreamOptions() = default; - DeleteStreamOptions(bool delete_meta, bool error_on_not_exist): delete_meta{delete_meta}, error_on_not_exist{error_on_not_exist} {}; - bool delete_meta{true}; - bool error_on_not_exist{true}; - uint64_t Encode() { - uint64_t flag = 0; - flag = delete_meta ? flag | DeleteStreamFlags::kDeleteMeta : flag; - flag = error_on_not_exist ? flag | DeleteStreamFlags::kErrorOnNotFound : flag; - return flag; - }; - void Decode(uint64_t flag) { - delete_meta = (flag & DeleteStreamFlags::kDeleteMeta) > 0; - error_on_not_exist = (flag & DeleteStreamFlags::kErrorOnNotFound) > 0; - }; - std::string Json() { - return std::string("{\"ErrorOnNotExist\":") + (error_on_not_exist ? "true" : "false") + ",\"DeleteMeta\":" - + (delete_meta ? "true" : "false") + "}"; - } + private: + enum DeleteStreamFlags : uint64_t { + kDeleteMeta = 1 << 0, + kErrorOnNotFound = 1 << 1, + }; + public: + DeleteStreamOptions() = default; + DeleteStreamOptions(bool delete_meta, bool error_on_not_exist) : delete_meta{delete_meta}, + error_on_not_exist{error_on_not_exist} {}; + bool delete_meta{true}; + bool error_on_not_exist{true}; + uint64_t Encode() { + uint64_t flag = 0; + flag = delete_meta ? flag | DeleteStreamFlags::kDeleteMeta : flag; + flag = error_on_not_exist ? flag | DeleteStreamFlags::kErrorOnNotFound : flag; + return flag; + }; + void Decode(uint64_t flag) { + delete_meta = (flag & DeleteStreamFlags::kDeleteMeta) > 0; + error_on_not_exist = (flag & DeleteStreamFlags::kErrorOnNotFound) > 0; + }; + std::string Json() { + return std::string("{\"ErrorOnNotExist\":") + (error_on_not_exist ? "true" : "false") + ",\"DeleteMeta\":" + + (delete_meta ? "true" : "false") + "}"; + } }; enum IngestModeFlags : uint64_t { - kTransferData = 1 << 0, - kTransferMetaDataOnly = 1 << 1, - kStoreInFilesystem = 1 << 2, - kStoreInDatabase = 1 << 3, - kWriteRawDataToOffline = 1 << 4, + kTransferData = 1 << 0, + kTransferMetaDataOnly = 1 << 1, + kStoreInFilesystem = 1 << 2, + kStoreInDatabase = 1 << 3, + kWriteRawDataToOffline = 1 << 4, }; const uint64_t kDefaultIngestMode = kTransferData | kStoreInFilesystem | kStoreInDatabase; enum class MetaIngestOp : uint64_t { - kInsert = 1, - kReplace = 2, - kUpdate = 3, + kInsert = 1, + kReplace = 2, + kUpdate = 3, }; struct MetaIngestMode { - MetaIngestOp op; - bool upsert; - MetaIngestMode() = default; - MetaIngestMode(MetaIngestOp aOp, bool aUpsert): op(aOp), upsert(aUpsert) {}; - uint64_t Encode() { - return static_cast<uint64_t>(op) + 10 * static_cast<uint64_t>(upsert); - } - void Decode(uint64_t code) { - upsert = code > 10; - uint64_t val = code - (upsert ? 10 : 0); - if (val <= 3) { - op = static_cast<MetaIngestOp>(val); - } - } + MetaIngestOp op; + bool upsert; + MetaIngestMode() = default; + MetaIngestMode(MetaIngestOp aOp, bool aUpsert) : op(aOp), upsert(aUpsert) {}; + uint64_t Encode() { + return static_cast<uint64_t>(op) + 10 * static_cast<uint64_t>(upsert); + } + void Decode(uint64_t code) { + upsert = code > 10; + uint64_t val = code - (upsert ? 10 : 0); + if (val <= 3) { + op = static_cast<MetaIngestOp>(val); + } + } }; class ClientProtocol { - private: - std::string version_; - std::string discovery_version_; - std::string name_; - public: - ClientProtocol(std::string version, std::string name, std::string discovery_version) : version_{version}, - name_{name} { - discovery_version_ = discovery_version; - }; - ClientProtocol() = delete; - virtual std::string GetString() = 0; - const std::string& GetVersion() const { - return version_; - } - const std::string& GetDiscoveryVersion() const { - return discovery_version_; - } - const std::string& GetName() const { - return name_; - } + private: + std::string version_; + std::string discovery_version_; + std::string name_; + public: + ClientProtocol(std::string version, std::string name, std::string discovery_version) : version_{version}, + name_{name} { + discovery_version_ = discovery_version; + }; + ClientProtocol() = delete; + virtual std::string GetString() = 0; + const std::string &GetVersion() const { + return version_; + } + const std::string &GetDiscoveryVersion() const { + return discovery_version_; + } + const std::string &GetName() const { + return name_; + } }; class ConsumerProtocol final : public ClientProtocol { - private: - std::string authorizer_version_; - std::string file_transfer_service_version_; - std::string broker_version_; - std::string rds_version_; - public: - ConsumerProtocol(std::string version, - std::string discovery_version, - std::string authorizer_version, - std::string file_transfer_service_version, - std::string broker_version, - std::string rds_version) - : ClientProtocol(version, "consumer protocol", discovery_version) { - authorizer_version_ = authorizer_version; - file_transfer_service_version_ = file_transfer_service_version; - broker_version_ = broker_version; - rds_version_ = rds_version; - } - const std::string& GetAuthorizerVersion() const { - return authorizer_version_; - } - const std::string& GetFileTransferServiceVersion() const { - return file_transfer_service_version_; - } - const std::string& GetRdsVersion() const { - return rds_version_; - } - const std::string& GetBrokerVersion() const { - return broker_version_; - }; - ConsumerProtocol() = delete; - std::string GetString() override { - return std::string(); - } + private: + std::string authorizer_version_; + std::string file_transfer_service_version_; + std::string broker_version_; + std::string rds_version_; + public: + ConsumerProtocol(std::string version, + std::string discovery_version, + std::string authorizer_version, + std::string file_transfer_service_version, + std::string broker_version, + std::string rds_version) + : ClientProtocol(version, "consumer protocol", discovery_version) { + authorizer_version_ = authorizer_version; + file_transfer_service_version_ = file_transfer_service_version; + broker_version_ = broker_version; + rds_version_ = rds_version; + } + const std::string &GetAuthorizerVersion() const { + return authorizer_version_; + } + const std::string &GetFileTransferServiceVersion() const { + return file_transfer_service_version_; + } + const std::string &GetRdsVersion() const { + return rds_version_; + } + const std::string &GetBrokerVersion() const { + return broker_version_; + }; + ConsumerProtocol() = delete; + std::string GetString() override { + return std::string(); + } }; class ProducerProtocol final : public ClientProtocol { - private: - std::string receiver_version_; - public: - ProducerProtocol(std::string version, - std::string discovery_version, - std::string receiver_version) - : ClientProtocol(version, "producer protocol", discovery_version) { - receiver_version_ = receiver_version; - }; - const std::string& GetReceiverVersion() const { - return receiver_version_; - } - ProducerProtocol() = delete; - std::string GetString() override { - return std::string(); - } + private: + std::string receiver_version_; + public: + ProducerProtocol(std::string version, + std::string discovery_version, + std::string receiver_version) + : ClientProtocol(version, "producer protocol", discovery_version) { + receiver_version_ = receiver_version; + }; + const std::string &GetReceiverVersion() const { + return receiver_version_; + } + ProducerProtocol() = delete; + std::string GetString() override { + return std::string(); + } }; } diff --git a/common/cpp/unittests/data_structs/test_data_structs.cpp b/common/cpp/unittests/data_structs/test_data_structs.cpp index de730e9fa..2fcd83ea1 100644 --- a/common/cpp/unittests/data_structs/test_data_structs.cpp +++ b/common/cpp/unittests/data_structs/test_data_structs.cpp @@ -237,22 +237,9 @@ TEST(SourceCredentials, ConvertToString) { std::string expected1 = "raw%instance%step%beamtime%beamline%source%token"; std::string expected2 = "processed%instance%step%beamtime%beamline%source%token"; - auto res1 = sc.GetString(asapo::SourceCredentialsVersion::NewVersion); + auto res1 = sc.GetString(); sc.type = asapo::SourceType::kProcessed; - auto res2 = sc.GetString(asapo::SourceCredentialsVersion::NewVersion); - - ASSERT_THAT(res1, Eq(expected1)); - ASSERT_THAT(res2, Eq(expected2)); -} - -TEST(SourceCredentials, ConvertToString_OldFormat) { - auto sc = SourceCredentials{SourceType::kRaw, "instance", "step", "beamtime", "beamline", "source", "token"}; - std::string expected1 = "raw%beamtime%beamline%source%token"; - std::string expected2 = "processed%beamtime%beamline%source%token"; - - auto res1 = sc.GetString(asapo::SourceCredentialsVersion::OldVersion); - sc.type = asapo::SourceType::kProcessed; - auto res2 = sc.GetString(asapo::SourceCredentialsVersion::OldVersion); + auto res2 = sc.GetString(); ASSERT_THAT(res1, Eq(expected1)); ASSERT_THAT(res2, Eq(expected2)); diff --git a/consumer/api/cpp/include/asapo/consumer/consumer.h b/consumer/api/cpp/include/asapo/consumer/consumer.h index efe1a2d1b..2527ca21c 100644 --- a/consumer/api/cpp/include/asapo/consumer/consumer.h +++ b/consumer/api/cpp/include/asapo/consumer/consumer.h @@ -81,8 +81,8 @@ class Consumer { //! This will only have an effect if no previous connection attempted was made on this Consumer. virtual void ForceNoRdma() = 0; - //! Should be done before files are transferred, if set to true, connected services will receive the InstanceId and PipelineStep of this Consumer. - virtual Error EnableNewMonitoringApiFormat(bool enabled) = 0; + //! Should be done before files are transferred, if set to true, connected services will not receive the InstanceId and PipelineStep of this Consumer. + virtual Error DisableMonitoring(bool disable) = 0; //! Returns the current network connection type /*! diff --git a/consumer/api/cpp/src/consumer_c_glue.cpp b/consumer/api/cpp/src/consumer_c_glue.cpp index aac8998bb..240bfcb90 100644 --- a/consumer/api/cpp/src/consumer_c_glue.cpp +++ b/consumer/api/cpp/src/consumer_c_glue.cpp @@ -240,8 +240,8 @@ extern "C" { return static_cast<AsapoNetworkConnectionType>(consumer->handle->CurrentConnectionType()); } -//! wraps asapo::Consumer::EnableNewMonitoringApiFormat() -/// \copydoc asapo::Consumer::EnableNewMonitoringApiFormat() +//! wraps asapo::Consumer::DisableMonitoring() +/// \copydoc asapo::Consumer::DisableMonitoring() /// \param[in] consumer the handle of the consumer concerned /// \param[in] enabled set this to true if the new API format should be used /// \param[out] error will contain a pointer to an AsapoErrorHandle if a problem occured, NULL else. diff --git a/consumer/api/cpp/src/consumer_impl.cpp b/consumer/api/cpp/src/consumer_impl.cpp index 44dd68fc2..beb12a9fd 100644 --- a/consumer/api/cpp/src/consumer_impl.cpp +++ b/consumer/api/cpp/src/consumer_impl.cpp @@ -182,12 +182,8 @@ void ConsumerImpl::ForceNoRdma() { should_try_rdma_first_ = false; } -Error ConsumerImpl::EnableNewMonitoringApiFormat(bool enabled) { - if (enabled && (request_sender_details_prefix_.length()-5 /*spare a few bytes for stream*/) >= kMaxMessageSize) { - return GeneralErrorTemplates::kSimpleError.Generate("Source credentials are too long"); - } - - use_new_api_format_ = enabled; +Error ConsumerImpl::DisableMonitoring(bool disable) { + use_new_api_format_ = !disable; return nullptr; } diff --git a/consumer/api/cpp/src/consumer_impl.h b/consumer/api/cpp/src/consumer_impl.h index 971b04c60..6928e7cf2 100644 --- a/consumer/api/cpp/src/consumer_impl.h +++ b/consumer/api/cpp/src/consumer_impl.h @@ -91,7 +91,7 @@ class ConsumerImpl final : public asapo::Consumer { Error DeleteStream(std::string stream, DeleteStreamOptions options) override; void SetTimeout(uint64_t timeout_ms) override; void ForceNoRdma() override; - Error EnableNewMonitoringApiFormat(bool enabled) override; + Error DisableMonitoring(bool disable) override; NetworkConnectionType CurrentConnectionType() const override; @@ -163,7 +163,7 @@ class ConsumerImpl final : public asapo::Consumer { bool has_filesystem_; SourceCredentials source_credentials_; std::string data_source_encoded_; - bool use_new_api_format_ = false; + bool use_new_api_format_ = true; std::string request_sender_details_prefix_; uint64_t timeout_ms_ = 0; bool should_try_rdma_first_ = true; diff --git a/consumer/api/cpp/unittests/test_consumer_impl.cpp b/consumer/api/cpp/unittests/test_consumer_impl.cpp index 846ee69a8..2fee68271 100644 --- a/consumer/api/cpp/unittests/test_consumer_impl.cpp +++ b/consumer/api/cpp/unittests/test_consumer_impl.cpp @@ -26,7 +26,6 @@ using asapo::MockIO; using asapo::MockHttpClient; using asapo::MockNetClient; using asapo::HttpCode; -using asapo::SourceCredentialsVersion; using ::testing::AtLeast; using ::testing::Eq; @@ -138,9 +137,6 @@ class ConsumerImplTests : public Test { fts_consumer->httpclient__ = std::unique_ptr<asapo::HttpClient> {&mock_http_client}; fts_consumer->net_client__ = std::unique_ptr<asapo::NetClient> {&mock_netclient}; - asapo::Error err1 = consumer->EnableNewMonitoringApiFormat(true); - asapo::Error err2 = fts_consumer->EnableNewMonitoringApiFormat(true); - { ON_CALL(mock_http_client, UrlEscape_t(expected_instance_id)).WillByDefault(Return(expected_instance_id_encoded)); ON_CALL(mock_http_client, UrlEscape_t(expected_pipeline_step)).WillByDefault(Return(expected_pipeline_step_encoded)); @@ -236,7 +232,7 @@ class ConsumerImplTests : public Test { return fi; } - void CheckDefaultingOfCredentials(asapo::SourceCredentials credentials, SourceCredentialsVersion formatVersion, std::string expectedUrlPath) { + void CheckDefaultingOfCredentials(asapo::SourceCredentials credentials, std::string expectedUrlPath) { consumer->io__.release(); consumer->httpclient__.release(); consumer->net_client__.release(); @@ -249,8 +245,6 @@ class ConsumerImplTests : public Test { consumer->io__ = std::unique_ptr<IO> {&mock_io}; consumer->httpclient__ = std::unique_ptr<asapo::HttpClient> {&mock_http_client}; consumer->net_client__ = std::unique_ptr<asapo::NetClient> {&mock_netclient}; - consumer->EnableNewMonitoringApiFormat(formatVersion == asapo::SourceCredentialsVersion::NewVersion); - MockGetBrokerUri(); EXPECT_CALL(mock_http_client, @@ -273,24 +267,16 @@ TEST_F(ConsumerImplTests, DefaultStreamIsDetector) { CheckDefaultingOfCredentials( asapo::SourceCredentials{ asapo::SourceType::kProcessed, "instance", "step", "beamtime_id", "", "", expected_token - }, asapo::SourceCredentialsVersion::NewVersion, + }, "/beamtime/beamtime_id/detector/stream/" + expected_group_id_encoded + "/next?token=" + expected_token + "&instanceid=instance&pipelinestep=step"); } -TEST_F(ConsumerImplTests, DefaultStreamIsDetector_OldFormat) { - CheckDefaultingOfCredentials( - asapo::SourceCredentials{ - asapo::SourceType::kProcessed, "instance", "step", "beamtime_id", "", "", expected_token - }, asapo::SourceCredentialsVersion::OldVersion, - "/beamtime/beamtime_id/detector/stream/" + expected_group_id_encoded + "/next?token=" + expected_token); -} - TEST_F(ConsumerImplTests, DefaultPipelineStepIsDefaultStep) { CheckDefaultingOfCredentials( asapo::SourceCredentials{ asapo::SourceType::kProcessed, "instance", "", "beamtime_id", "a", "b", expected_token - }, asapo::SourceCredentialsVersion::NewVersion, + }, "/beamtime/beamtime_id/b/stream/" + expected_group_id_encoded + "/next?token=" + expected_token + "&instanceid=instance&pipelinestep=DefaultStep"); } @@ -299,7 +285,7 @@ TEST_F(ConsumerImplTests, AutoPipelineStepIsDefaultStep) { CheckDefaultingOfCredentials( asapo::SourceCredentials{ asapo::SourceType::kProcessed, "instance", "auto", "beamtime_id", "a", "b", expected_token - }, asapo::SourceCredentialsVersion::NewVersion, + }, "/beamtime/beamtime_id/b/stream/" + expected_group_id_encoded + "/next?token=" + expected_token + "&instanceid=instance&pipelinestep=DefaultStep"); } diff --git a/consumer/api/python/asapo_consumer.pxd b/consumer/api/python/asapo_consumer.pxd index 751388ed4..1306b95b8 100644 --- a/consumer/api/python/asapo_consumer.pxd +++ b/consumer/api/python/asapo_consumer.pxd @@ -70,7 +70,7 @@ cdef extern from "asapo/asapo_consumer.h" namespace "asapo" nogil: Consumer() except + void SetTimeout(uint64_t timeout_ms) void ForceNoRdma() - Error EnableNewMonitoringApiFormat(bool enabled) + Error DisableMonitoring(bool enabled) NetworkConnectionType CurrentConnectionType() Error GetNext(string group_id, MessageMeta* info, MessageData* data,string stream) Error GetLast(MessageMeta* info, MessageData* data, string stream) diff --git a/consumer/api/python/asapo_consumer.pyx.in b/consumer/api/python/asapo_consumer.pyx.in index 38cfe0362..268554ba6 100644 --- a/consumer/api/python/asapo_consumer.pyx.in +++ b/consumer/api/python/asapo_consumer.pyx.in @@ -199,10 +199,10 @@ cdef class PyConsumer: self.c_consumer.get().SetTimeout(timeout) def force_no_rdma(self): self.c_consumer.get().ForceNoRdma() - def enable_new_monitoring_api_format(self, bool enabled): + def disable_monitoring(self, bool disabled): cdef Error err with nogil: - err = self.c_consumer.get().EnableNewMonitoringApiFormat(enabled) + err = self.c_consumer.get().DisableMonitoring(disabled) if err: throw_exception(err) return diff --git a/examples/producer/dummy-data-producer/dummy_data_producer.cpp b/examples/producer/dummy-data-producer/dummy_data_producer.cpp index 085eca43f..ce71b475a 100644 --- a/examples/producer/dummy-data-producer/dummy_data_producer.cpp +++ b/examples/producer/dummy-data-producer/dummy_data_producer.cpp @@ -225,7 +225,6 @@ std::unique_ptr<asapo::Producer> CreateProducer(const Args& args) { producer->EnableLocalLog(true); producer->SetLogLevel(asapo::LogLevel::Info); - producer->EnableNewMonitoringApiFormat(false); return producer; } diff --git a/producer/api/cpp/src/producer_request.cpp b/producer/api/cpp/src/producer_request.cpp index dbb0408b6..3b6d584c4 100644 --- a/producer/api/cpp/src/producer_request.cpp +++ b/producer/api/cpp/src/producer_request.cpp @@ -32,7 +32,6 @@ ProducerRequest::ProducerRequest( } else { strcpy(header.api_version, "v0.0"); } - } bool ProducerRequest::NeedSend() const { diff --git a/producer/api/cpp/src/request_handler_tcp.cpp b/producer/api/cpp/src/request_handler_tcp.cpp index 97c932b33..53cc9e4c4 100644 --- a/producer/api/cpp/src/request_handler_tcp.cpp +++ b/producer/api/cpp/src/request_handler_tcp.cpp @@ -4,6 +4,8 @@ #include "asapo/io/io_factory.h" #include "producer_request.h" +#include "asapo/common/internal/version.h" + namespace asapo { RequestHandlerTcp::RequestHandlerTcp(ReceiverDiscoveryService* discovery_service, uint64_t thread_id, @@ -17,6 +19,7 @@ RequestHandlerTcp::RequestHandlerTcp(ReceiverDiscoveryService* discovery_service Error RequestHandlerTcp::Authorize(const std::string& source_credentials) { GenericRequestHeader header{kOpcodeAuthorize, 0, 0, source_credentials.size(), ""}; + strcpy(header.api_version, kProducerProtocol.GetReceiverVersion().c_str()); Error err; io__->Send(sd_, &header, sizeof(header), &err); if (err) { diff --git a/producer/api/cpp/unittests/test_request_handler_tcp.cpp b/producer/api/cpp/unittests/test_request_handler_tcp.cpp index c01ef5be1..7051f3a88 100644 --- a/producer/api/cpp/unittests/test_request_handler_tcp.cpp +++ b/producer/api/cpp/unittests/test_request_handler_tcp.cpp @@ -169,6 +169,7 @@ MATCHER_P5(M_CheckSendRequest, op_code, file_id, file_size, message, stream, ((asapo::GenericRequestHeader *) arg)->data_id == uint64_t(file_id) && ((asapo::GenericRequestHeader *) arg)->data_size == uint64_t(file_size) && strcmp(((asapo::GenericRequestHeader *) arg)->message, message) == 0 && + strcmp(((asapo::GenericRequestHeader *) arg)->api_version, "v0.6") == 0 && strcmp(((asapo::GenericRequestHeader *) arg)->stream, stream) == 0; } diff --git a/producer/api/python/asapo_producer.pxd b/producer/api/python/asapo_producer.pxd index 1349e5cde..359758a8a 100644 --- a/producer/api/python/asapo_producer.pxd +++ b/producer/api/python/asapo_producer.pxd @@ -116,7 +116,7 @@ cdef extern from "asapo/asapo_producer.h" namespace "asapo" nogil: void SetLogLevel(LogLevel level) uint64_t GetRequestsQueueSize() uint64_t GetRequestsQueueVolumeMb() - Error EnableNewMonitoringApiFormat(bool enabled) + Error DisableMonitoring(bool enabled) void SetRequestsQueueLimits(uint64_t size, uint64_t volume) Error WaitRequestsFinished(uint64_t timeout_ms) Error SendStreamFinishedFlag(string stream, uint64_t last_id, string next_stream, RequestCallback callback) diff --git a/producer/api/python/asapo_producer.pyx.in b/producer/api/python/asapo_producer.pyx.in index bb001d97d..f41e7024a 100644 --- a/producer/api/python/asapo_producer.pyx.in +++ b/producer/api/python/asapo_producer.pyx.in @@ -416,13 +416,6 @@ cdef class PyProducer: return self.c_producer.get().GetRequestsQueueSize() def get_requests_queue_volume_mb(self): return self.c_producer.get().GetRequestsQueueVolumeMb() - def enable_new_monitoring_api_format(self, bool enabled): - cdef Error err - with nogil: - err = self.c_producer.get().EnableNewMonitoringApiFormat(enabled) - if err: - throw_exception(err) - return def set_requests_queue_limits(self,uint64_t size = 0, uint64_t volume_mb = 0): return self.c_producer.get().SetRequestsQueueLimits(size,volume_mb) def wait_requests_finished(self,timeout_ms): diff --git a/tests/automatic/support/getnext/getnext.cpp b/tests/automatic/support/getnext/getnext.cpp index 21bfa8cec..0c3f6c14b 100644 --- a/tests/automatic/support/getnext/getnext.cpp +++ b/tests/automatic/support/getnext/getnext.cpp @@ -116,7 +116,7 @@ StartThreads(const Args& params, std::vector<int>* nfiles, std::vector<int>* err //consumer->ForceNoRdma(); consumer->SetTimeout((uint64_t) params.timeout_ms); - consumer->EnableNewMonitoringApiFormat(true); + consumer->DisableMonitoring(true); asapo::MessageData data; lock.lock(); -- GitLab