Skip to content
Snippets Groups Projects
Forked from asapo / asapo
504 commits behind the upstream repository.
test_consumer_impl.cpp 70.52 KiB
#include <gmock/gmock.h>
#include <gmock/gmock.h>
#include "gtest/gtest.h"
#include <chrono>

#include "asapo/consumer/consumer.h"
#include "asapo/consumer/consumer_error.h"
#include "asapo/io/io.h"
#include "../../../../common/cpp/src/system_io/system_io.h"
#include "../src/consumer_impl.h"
#include "../../../../common/cpp/src/http_client/curl_http_client.h"
#include "asapo/unittests/MockIO.h"
#include "asapo/unittests/MockHttpClient.h"
#include "asapo/http_client/http_error.h"
#include "mocking.h"
#include "../src/tcp_consumer_client.h"
#include "asapo/common/internal/version.h"

using asapo::ConsumerFactory;
using asapo::Consumer;
using asapo::ConsumerImpl;
using asapo::IO;
using asapo::MessageMeta;
using asapo::MessageData;
using asapo::MockIO;
using asapo::MockHttpClient;
using asapo::MockNetClient;
using asapo::HttpCode;

using ::testing::AtLeast;
using ::testing::Eq;
using ::testing::HasSubstr;
using ::testing::Ne;
using ::testing::Test;
using ::testing::_;
using ::testing::Mock;
using ::testing::NiceMock;
using ::testing::Return;
using ::testing::SetArgPointee;
using ::testing::SetArgReferee;
using testing::AllOf;
using ::testing::DoAll;
using ::testing::ElementsAre;

namespace {

TEST(FolderDataBroker, Constructor) {
    auto consumer =
    std::unique_ptr<ConsumerImpl> {new ConsumerImpl("test", "path", false,
                                                        asapo::SourceCredentials{asapo::SourceType::kProcessed,
                                                                "instance", "step", "beamtime_id", "", "", "token"})
    };
    ASSERT_THAT(dynamic_cast<asapo::SystemIO*>(consumer->io__.get()), Ne(nullptr));
    ASSERT_THAT(dynamic_cast<asapo::CurlHttpClient*>(consumer->httpclient__.get()), Ne(nullptr));
    ASSERT_THAT(consumer->net_client__.get(), Eq(nullptr));
}

const uint8_t expected_value = 1;

class ConsumerImplTests : public Test {
  public:
    std::unique_ptr<ConsumerImpl> consumer, fts_consumer;
    NiceMock<MockIO> mock_io{};
    NiceMock<MockHttpClient> mock_http_client{};
    NiceMock<MockNetClient> mock_netclient{};
    MessageMeta info;
    std::string expected_server_uri = "test:8400";
    std::string expected_broker_uri = "asapo-broker:5005";
    std::string expected_consumer_protocol = asapo::kConsumerProtocol.GetVersion();
    std::string expected_broker_protocol = asapo::kConsumerProtocol.GetBrokerVersion();
    std::string expected_broker_api = expected_broker_uri + "/" + expected_broker_protocol;
    std::string expected_fts_uri = "asapo-file-transfer:5008";
    std::string expected_token = "token";
    std::string expected_path = "/tmp/beamline/beamtime";
    std::string expected_filename = "filename";
    std::string expected_full_path = std::string("/tmp/beamline/beamtime") + asapo::kPathSeparator + expected_filename;
    std::string expected_group_id = "groupid$";
    std::string expected_data_source = "source/$.?";
    std::string expected_stream = "str $ eam$";
    std::string expected_group_id_encoded = "groupid%24";
    std::string expected_data_source_encoded = "source%2F%24.%3F";
    std::string expected_stream_encoded = "str%20%24%20eam%24";

    std::string expected_metadata = "{\"meta\":1}";
    std::string expected_query_string = "bla";
    std::string expected_folder_token = "folder_token";

    std::string expected_instance_id = "some instance";
    std::string expected_pipeline_step = "a new step";
    std::string expected_instance_id_encoded = "some%20instance";
    std::string expected_pipeline_step_encoded = "a%20new%20step";

    std::string expected_token_url_with_sourceinfo = std::string("token") +
            "&instanceid=" + expected_instance_id_encoded + "&pipelinestep=" + expected_pipeline_step_encoded;

    std::string expected_beamtime_id = "beamtime_id";
    uint64_t expected_message_size = 100;
    uint64_t expected_dataset_id = 1;
    static const uint64_t expected_buf_id = 123;
    std::string expected_next_stream = "nextstream";
    std::string expected_fts_query_string = "{\"Folder\":\"" + expected_path + "\",\"FileName\":\"" + expected_filename +
                                            "\"}";
    std::string expected_cookie = "Authorization=Bearer " + expected_folder_token;

    std::string expected_request_sender_details_prefix = expected_instance_id + "§" +
            expected_pipeline_step + "§" +
            expected_beamtime_id + "§" +
            expected_data_source + "§";
    std::string expected_request_sender_details_with_stream = expected_instance_id + "§" +
            expected_pipeline_step + "§" +
            expected_beamtime_id + "§" +
            expected_data_source + "§" +
            expected_stream;

    void AssertSingleFileTransfer();
    void SetUp() override {
        consumer = std::unique_ptr<ConsumerImpl> {
            new ConsumerImpl(expected_server_uri,
                             expected_path,
                             true,
            asapo::SourceCredentials{
                asapo::SourceType::kProcessed, expected_instance_id, expected_pipeline_step, expected_beamtime_id, "",
                expected_data_source, expected_token})
        };
        fts_consumer = std::unique_ptr<ConsumerImpl> {
            new ConsumerImpl(expected_server_uri,
                             expected_path,
                             false,
            asapo::SourceCredentials{
                asapo::SourceType::kProcessed, expected_instance_id, expected_pipeline_step, expected_beamtime_id, "",
                expected_data_source, expected_token})
        };
        consumer->io__ = std::unique_ptr<IO> {&mock_io};
        consumer->httpclient__ = std::unique_ptr<asapo::HttpClient> {&mock_http_client};
        consumer->net_client__ = std::unique_ptr<asapo::NetClient> {&mock_netclient};
        fts_consumer->io__ = std::unique_ptr<IO> {&mock_io};
        fts_consumer->httpclient__ = std::unique_ptr<asapo::HttpClient> {&mock_http_client};
        fts_consumer->net_client__ = std::unique_ptr<asapo::NetClient> {&mock_netclient};

        {
            ON_CALL(mock_http_client, UrlEscape_t(expected_instance_id)).WillByDefault(Return(expected_instance_id_encoded));
            ON_CALL(mock_http_client, UrlEscape_t(expected_pipeline_step)).WillByDefault(Return(expected_pipeline_step_encoded));
            ON_CALL(mock_http_client, UrlEscape_t(expected_stream)).WillByDefault(Return(expected_stream_encoded));
            ON_CALL(mock_http_client, UrlEscape_t(expected_group_id)).WillByDefault(Return(expected_group_id_encoded));
            ON_CALL(mock_http_client, UrlEscape_t(expected_data_source)).WillByDefault(Return(expected_data_source_encoded));
            ON_CALL(mock_http_client, UrlEscape_t("0")).WillByDefault(Return("0"));
            ON_CALL(mock_http_client, UrlEscape_t("")).WillByDefault(Return(""));
            ON_CALL(mock_http_client, UrlEscape_t("default")).WillByDefault(Return("default"));
            ON_CALL(mock_http_client, UrlEscape_t("stream")).WillByDefault(Return("stream"));
            ON_CALL(mock_http_client, UrlEscape_t("instance")).WillByDefault(Return("instance"));
            ON_CALL(mock_http_client, UrlEscape_t("step")).WillByDefault(Return("step"));
            ON_CALL(mock_http_client, UrlEscape_t("DefaultStep")).WillByDefault(Return("DefaultStep"));
            ON_CALL(mock_http_client, UrlEscape_t("a")).WillByDefault(Return("b"));
            ON_CALL(mock_http_client, UrlEscape_t("b")).WillByDefault(Return("b"));
        }
    }
    void TearDown() override {
        consumer->io__.release();
        consumer->httpclient__.release();
        consumer->net_client__.release();

        fts_consumer->io__.release();
        fts_consumer->httpclient__.release();
        fts_consumer->net_client__.release();

        Mock::VerifyAndClear(&mock_io);
        Mock::VerifyAndClear(&mock_http_client);
        Mock::VerifyAndClear(&mock_netclient);
    }
    void MockGet(const std::string& response, asapo::HttpCode return_code = HttpCode::OK) {
        EXPECT_CALL(mock_http_client, Get_t(HasSubstr(expected_broker_api), _, _)).WillOnce(DoAll(
                    SetArgPointee<1>(return_code),
                    SetArgPointee<2>(nullptr),
                    Return(response)
                ));
    }

    void MockGetError() {
        EXPECT_CALL(mock_http_client, Get_t(HasSubstr(expected_broker_api), _, _)).WillOnce(DoAll(
                    SetArgPointee<1>(HttpCode::ServiceUnavailable),
                    SetArgPointee<2>(asapo::IOErrorTemplates::kUnknownIOError.Generate().release()),
                    Return("")
                ));
    }
    void MockGetServiceUri(std::string service, std::string result) {
        EXPECT_CALL(mock_http_client, Get_t(HasSubstr(expected_server_uri + "/asapo-discovery/v0.1/" + service + "?token="
                                                      + expected_token + "&protocol=" + expected_consumer_protocol), _, _)
                                            ).WillOnce(DoAll(
                                                    SetArgPointee<1>(HttpCode::OK),
                                                    SetArgPointee<2>(nullptr),
                                                    Return(result)));
    }

    void MockBeforeFTS(const std::string& expected_request_sender_details, MessageData* data);

    void MockGetFTSUri() {
        MockGetServiceUri("asapo-file-transfer", expected_fts_uri);
    }

    void ExpectFolderToken();
    void ExpectFileTransfer(const asapo::ConsumerErrorTemplate* p_err_template);
    void ExpectRepeatedFileTransfer();
    void ExpectIdList(bool error);
    void ExpectLastAckId(bool empty_response);

    void MockGetBrokerUri() {
        MockGetServiceUri("asapo-broker", expected_broker_uri);
    }
    void MockReadDataFromFile(int times = 1) {
        if (times == 0) {
            EXPECT_CALL(mock_io, GetDataFromFile_t(_, _, _)).Times(0);
            return;
        }

        auto simple_error = [] {
            return asapo::GeneralErrorTemplates::kSimpleError.Generate("s").release();
        };

        EXPECT_CALL(mock_io, GetDataFromFile_t(expected_full_path, testing::Pointee(100), _)).Times(AtLeast(times)).
        WillRepeatedly(DoAll(SetArgPointee<2>(simple_error), testing::Return(nullptr)));
    }

    //Fake info
    MessageMeta CreateFI(uint64_t buf_id = expected_buf_id) {
        MessageMeta fi;
        fi.size = expected_message_size;
        fi.id = 1;
        fi.buf_id = buf_id;
        fi.name = expected_filename;
        fi.stream = expected_stream;
        fi.timestamp = std::chrono::system_clock::now();
        return fi;
    }

    void CheckDefaultingOfCredentials(asapo::SourceCredentials credentials, std::string expectedUrlPath) {
        consumer->io__.release();
        consumer->httpclient__.release();
        consumer->net_client__.release();
        consumer = std::unique_ptr<ConsumerImpl> {
            new ConsumerImpl(expected_server_uri,
                             expected_path,
                             false,
                             std::move(credentials))
        };
        consumer->io__ = std::unique_ptr<IO> {&mock_io};
        consumer->httpclient__ = std::unique_ptr<asapo::HttpClient> {&mock_http_client};
        consumer->net_client__ = std::unique_ptr<asapo::NetClient> {&mock_netclient};
        MockGetBrokerUri();

        EXPECT_CALL(mock_http_client,
                    Get_t(expected_broker_api + expectedUrlPath, _,
                    _)).WillOnce(DoAll(
                            SetArgPointee<1>(HttpCode::OK),
                            SetArgPointee<2>(nullptr),
                            Return("")));

        consumer->GetNext(expected_group_id, &info, nullptr, "stream");
    }
};

TEST_F(ConsumerImplTests, GetMessageReturnsErrorOnWrongInput) {
    auto err = consumer->GetNext("", nullptr, nullptr, expected_stream);
    ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kWrongInput));
}

TEST_F(ConsumerImplTests, DefaultStreamIsDetector) {
    CheckDefaultingOfCredentials(
            asapo::SourceCredentials{
                asapo::SourceType::kProcessed, "instance", "step", "beamtime_id", "", "", expected_token
                },
                "/beamtime/beamtime_id/detector/stream/" + expected_group_id_encoded + "/next?id_key=message_id&token=" + expected_token
                + "&instanceid=instance&pipelinestep=step");
}

TEST_F(ConsumerImplTests, DefaultPipelineStepIsDefaultStep) {
    CheckDefaultingOfCredentials(
            asapo::SourceCredentials{
                asapo::SourceType::kProcessed, "instance", "", "beamtime_id", "a", "b", expected_token
                },
                "/beamtime/beamtime_id/b/stream/" + expected_group_id_encoded + "/next?token=" + expected_token
                + "&instanceid=instance&pipelinestep=DefaultStep");
}

TEST_F(ConsumerImplTests, AutoPipelineStepIsDefaultStep) {
    CheckDefaultingOfCredentials(
            asapo::SourceCredentials{
                asapo::SourceType::kProcessed, "instance", "auto", "beamtime_id", "a", "b", expected_token
                },
                "/beamtime/beamtime_id/b/stream/" + expected_group_id_encoded + "/next?token=" + expected_token
                + "&instanceid=instance&pipelinestep=DefaultStep");
}

/*
 * TODO: Hard to test because instance id is set over DefaultIO in ctor
 *
TEST_F(ConsumerImplTests, DefaultInstanceIdIsHostAndPid) {
    EXPECT_CALL(mock_io, GetHostName_t(_)).
    WillOnce(DoAll(testing::SetArgPointee<0>(nullptr), Return("myHostName")));
    EXPECT_CALL(mock_io, GetCurrentPid()).
    WillOnce(Return(201));

    CheckDefaultingOfCredentials(
            asapo::SourceCredentials{
                asapo::SourceType::kProcessed, "", "step", "beamtime_id", "", "", expected_token
                },
                "/beamtime/beamtime_id/detector/stream/" + expected_group_id_encoded + "/next?token=" + expected_token
                + "&instanceid=myHostName_201&pipelinestep=step");
}

TEST_F(ConsumerImplTests, AutoInstanceIdIsHostAndPid) {
    EXPECT_CALL(mock_io, GetHostName_t(_)).
    WillOnce(DoAll(testing::SetArgPointee<0>(nullptr), Return("myHostName")));
    EXPECT_CALL(mock_io, GetCurrentPid()).
    WillOnce(Return(201));

    CheckDefaultingOfCredentials(
            asapo::SourceCredentials{
                asapo::SourceType::kProcessed, "auto", "step", "beamtime_id", "", "", expected_token
                },
                "/beamtime/beamtime_id/detector/stream/" + expected_group_id_encoded + "/next?token=" + expected_token
                + "&instanceid=myHostName_201&pipelinestep=step");
}
 */

TEST_F(ConsumerImplTests, GetNextUsesCorrectUriWithStream) {
    MockGetBrokerUri();

    EXPECT_CALL(mock_http_client, Get_t(expected_broker_api + "/beamtime/beamtime_id/" + expected_data_source_encoded + "/"
                                        +
                                        expected_stream_encoded + "/" + expected_group_id_encoded + "/next?token="
                                        + expected_token
                                        + "&instanceid=" + expected_instance_id_encoded + "&pipelinestep=" + expected_pipeline_step_encoded, _,
                                        _)).WillOnce(DoAll(
                                                SetArgPointee<1>(HttpCode::OK),
                                                SetArgPointee<2>(nullptr),
                                                Return("")));
    consumer->GetNext(expected_group_id, &info, nullptr, expected_stream);
}

TEST_F(ConsumerImplTests, GetLastOnceUsesCorrectUri) {
    MockGetBrokerUri();

    EXPECT_CALL(mock_http_client,
                Get_t(expected_broker_api + "/beamtime/beamtime_id/" + expected_data_source_encoded + "/"
                      + expected_stream_encoded +
                      "/" + expected_group_id_encoded + "/groupedlast?token="
                      + expected_token
                      + "&instanceid=" + expected_instance_id_encoded + "&pipelinestep=" + expected_pipeline_step_encoded, _,
                      _)).WillOnce(DoAll(
                                       SetArgPointee<1>(HttpCode::OK),
                                       SetArgPointee<2>(nullptr),
                                       Return("")));
    consumer->GetLast(expected_group_id, &info, nullptr, expected_stream);
}

TEST_F(ConsumerImplTests, GetLastUsesCorrectUri) {
    MockGetBrokerUri();

    EXPECT_CALL(mock_http_client,
                Get_t(expected_broker_api + "/beamtime/beamtime_id/" + expected_data_source_encoded + "/" + expected_stream_encoded +
                      "/0/last?token=" + expected_token
                      + "&instanceid=" + expected_instance_id_encoded + "&pipelinestep=" + expected_pipeline_step_encoded, _,
                      _)).WillOnce(DoAll(
                                       SetArgPointee<1>(HttpCode::OK),
                                       SetArgPointee<2>(nullptr),
                                       Return("")));
    consumer->GetLast(&info, nullptr, expected_stream);
}

TEST_F(ConsumerImplTests, GetMessageReturnsEndOfStreamFromHttpClient) {
    MockGetBrokerUri();

    EXPECT_CALL(mock_http_client, Get_t(HasSubstr("next"), _, _)).WillOnce(DoAll(
                SetArgPointee<1>(HttpCode::Conflict),
                SetArgPointee<2>(nullptr),
                Return("{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":1,\"next_stream\":\"\"}")));

    auto err = consumer->GetNext(expected_group_id, &info, nullptr, expected_stream);

    auto err_data = static_cast<const asapo::ConsumerErrorData*>(err->GetCustomData());

    ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kEndOfStream));
    ASSERT_THAT(err_data->id, Eq(1));
    ASSERT_THAT(err_data->id_max, Eq(1));
    ASSERT_THAT(err_data->next_stream, Eq(""));
}

TEST_F(ConsumerImplTests, GetMessageReturnsStreamFinishedFromHttpClient) {
    MockGetBrokerUri();

    EXPECT_CALL(mock_http_client, Get_t(HasSubstr("next"), _, _)).WillOnce(DoAll(
                SetArgPointee<1>(HttpCode::Conflict),
                SetArgPointee<2>(nullptr),
                Return("{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":1,\"next_stream\":\"" + expected_next_stream
                       + "\"}")));

    auto err = consumer->GetNext(expected_group_id, &info, nullptr, expected_stream);

    auto err_data = static_cast<const asapo::ConsumerErrorData*>(err->GetCustomData());

    ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kStreamFinished));
    ASSERT_THAT(err_data->id, Eq(1));
    ASSERT_THAT(err_data->id_max, Eq(1));
    ASSERT_THAT(err_data->next_stream, Eq(expected_next_stream));
}

TEST_F(ConsumerImplTests, GetMessageReturnsNoDataFromHttpClient) {
    MockGetBrokerUri();

    EXPECT_CALL(mock_http_client, Get_t(HasSubstr("next"), _, _)).WillOnce(DoAll(
                SetArgPointee<1>(HttpCode::Conflict),
                SetArgPointee<2>(nullptr),
                Return("{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":2,\"next_stream\":\"""\"}")));

    auto err = consumer->GetNext(expected_group_id, &info, nullptr, expected_stream);
    auto err_data = static_cast<const asapo::ConsumerErrorData*>(err->GetCustomData());

    ASSERT_THAT(err_data, Ne(nullptr));
    ASSERT_THAT(err_data->id, Eq(1));
    ASSERT_THAT(err_data->id_max, Eq(2));
    ASSERT_THAT(err_data->next_stream, Eq(""));

    ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kNoData));
}

TEST_F(ConsumerImplTests, GetMessageReturnsNotAuthorized) {
    MockGetBrokerUri();

    EXPECT_CALL(mock_http_client, Get_t(HasSubstr("next"), _, _)).WillOnce(DoAll(
                SetArgPointee<1>(HttpCode::Unauthorized),
                SetArgPointee<2>(nullptr),
                Return("")));

    auto err = consumer->GetNext(expected_group_id, &info, nullptr, expected_stream);

    ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kWrongInput));
}

TEST_F(ConsumerImplTests, GetMessageReturnsWrongResponseFromHttpClient) {

    MockGetBrokerUri();

    EXPECT_CALL(mock_http_client, Get_t(HasSubstr("next"), _, _)).WillOnce(DoAll(
                SetArgPointee<1>(HttpCode::Conflict),
                SetArgPointee<2>(nullptr),
                Return("id")));

    auto err = consumer->GetNext(expected_group_id, &info, nullptr, expected_stream);

    ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kInterruptedTransaction));
    ASSERT_THAT(err->Explain(), HasSubstr("malformed"));
}

TEST_F(ConsumerImplTests, GetMessageReturnsIfBrokerAddressNotFound) {
    EXPECT_CALL(mock_http_client, Get_t(HasSubstr(expected_server_uri + "/asapo-discovery/v0.1/asapo-broker"), _,
                                        _)).Times(AtLeast(2)).WillRepeatedly(DoAll(
                                                    SetArgPointee<1>(HttpCode::NotFound),
                                                    SetArgPointee<2>(nullptr),
                                                    Return("")));

    consumer->SetTimeout(100);
    auto err = consumer->GetNext(expected_group_id, &info, nullptr, expected_stream);

    ASSERT_THAT(err->Explain(), AllOf(HasSubstr(expected_server_uri), HasSubstr("unavailable")));
}

TEST_F(ConsumerImplTests, GetMessageReturnsUnsupportedClient) {
    EXPECT_CALL(mock_http_client, Get_t(HasSubstr(expected_server_uri + "/asapo-discovery/v0.1/asapo-broker"), _,
                                        _)).Times(AtLeast(2)).WillRepeatedly(DoAll(
                                                    SetArgPointee<1>(HttpCode::UnsupportedMediaType),
                                                    SetArgPointee<2>(nullptr),
                                                    Return("")));

    consumer->SetTimeout(100);
    auto err = consumer->GetNext(expected_group_id, &info, nullptr, expected_stream);

    ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kUnsupportedClient));
}

TEST_F(ConsumerImplTests, GetMessageReturnsIfBrokerUriEmpty) {
    EXPECT_CALL(mock_http_client, Get_t(HasSubstr(expected_server_uri + "/asapo-discovery/v0.1/asapo-broker"), _,
                                        _)).Times(AtLeast(2)).WillRepeatedly(DoAll(
                                                    SetArgPointee<1>(HttpCode::OK),
                                                    SetArgPointee<2>(nullptr),
                                                    Return("")));

    consumer->SetTimeout(100);
    auto err = consumer->GetNext(expected_group_id, &info, nullptr, expected_stream);

    ASSERT_THAT(err->Explain(), AllOf(HasSubstr(expected_server_uri), HasSubstr("unavailable")));
}

TEST_F(ConsumerImplTests, GetDoNotCallBrokerUriIfAlreadyFound) {
    MockGetBrokerUri();
    MockGet("error_response");

    consumer->SetTimeout(100);
    consumer->GetNext(expected_group_id, &info, nullptr, expected_stream);
    Mock::VerifyAndClearExpectations(&mock_http_client);

    EXPECT_CALL(mock_http_client,
                Get_t(_, _, _)).Times(0);
    MockGet("error_response");
    consumer->GetNext(expected_group_id, &info, nullptr, expected_stream);
}

TEST_F(ConsumerImplTests, GetBrokerUriAgainAfterConnectionError) {
    MockGetBrokerUri();
    MockGetError();

    consumer->SetTimeout(0);
    consumer->GetNext(expected_group_id, &info, nullptr, expected_stream);
    Mock::VerifyAndClearExpectations(&mock_http_client);

    MockGetBrokerUri();
    MockGet("error_response");
    consumer->GetNext(expected_group_id, &info, nullptr, expected_stream);
}

TEST_F(ConsumerImplTests, GetMessageReturnsEofStreamFromHttpClientUntilTimeout) {
    MockGetBrokerUri();

    EXPECT_CALL(mock_http_client, Get_t(HasSubstr("next"), _, _)).Times(AtLeast(2)).WillRepeatedly(DoAll(
                SetArgPointee<1>(HttpCode::Conflict),
                SetArgPointee<2>(nullptr),
                Return("{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":1,\"next_stream\":\"""\"}")));

    consumer->SetTimeout(300);
    auto err = consumer->GetNext(expected_group_id, &info, nullptr, expected_stream);

    ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kEndOfStream));
}

TEST_F(ConsumerImplTests, GetMessageReturnsNoDataAfterTimeoutEvenIfOtherErrorOccured) {
    MockGetBrokerUri();
    consumer->SetTimeout(300);

    EXPECT_CALL(mock_http_client, Get_t(HasSubstr("next"), _, _)).WillOnce(DoAll(
                SetArgPointee<1>(HttpCode::Conflict),
                SetArgPointee<2>(nullptr),
                Return("{\"op\":\"get_record_by_id\",\"id\":" + std::to_string(expected_dataset_id) +
                       ",\"id_max\":2,\"next_stream\":\"""\"}")));

    EXPECT_CALL(mock_http_client, Get_t(expected_broker_api + "/beamtime/beamtime_id/" + expected_data_source_encoded +
                                        "/stream/0/"
                                        + std::to_string(expected_dataset_id) + "?token=" + expected_token
                                        + "&instanceid=" + expected_instance_id_encoded + "&pipelinestep=" + expected_pipeline_step_encoded
                                        , _, _)).Times(AtLeast(1)).WillRepeatedly(DoAll(
                                                    SetArgPointee<1>(HttpCode::ServiceUnavailable),
                                                    SetArgPointee<2>(nullptr),
                                                    Return("")));

    consumer->SetTimeout(300);
    auto err = consumer->GetNext(expected_group_id, &info, nullptr, "stream");

    ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kNoData));
}

TEST_F(ConsumerImplTests, GetNextMessageReturnsImmediatelyOnTransferError) {
    MockGetBrokerUri();

    EXPECT_CALL(mock_http_client, Get_t(HasSubstr("next"), _, _)).WillOnce(DoAll(
                SetArgPointee<1>(HttpCode::InternalServerError),
                SetArgPointee<2>(asapo::HttpErrorTemplates::kTransferError.Generate("sss").release()),
                Return("")));

    consumer->SetTimeout(300);
    auto err = consumer->GetNext(expected_group_id, &info, nullptr, expected_stream);

    ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kInterruptedTransaction));
    ASSERT_THAT(err->Explain(), HasSubstr("sss"));
}

ACTION(AssignArg2) {
    *arg2 = asapo::HttpErrorTemplates::kConnectionError.Generate().release();
}

TEST_F(ConsumerImplTests, GetNextRetriesIfConnectionHttpClientErrorUntilTimeout) {
    EXPECT_CALL(mock_http_client, Get_t(HasSubstr(expected_server_uri + "/asapo-discovery/v0.1/asapo-broker"), _,
                                        _)).Times(AtLeast(2)).WillRepeatedly(DoAll(
                                                    SetArgPointee<1>(HttpCode::OK),
                                                    SetArgPointee<2>(nullptr),
                                                    Return(expected_broker_uri)));

    EXPECT_CALL(mock_http_client, Get_t(HasSubstr("next"), _, _)).Times(AtLeast(2)).WillRepeatedly(DoAll(
                SetArgPointee<1>(HttpCode::Conflict),
                AssignArg2(),
                Return("")));

    consumer->SetTimeout(300);
    auto err = consumer->GetNext(expected_group_id, &info, nullptr, expected_stream);

    ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kUnavailableService));
}

TEST_F(ConsumerImplTests, GetNextMessageReturnsImmediatelyOnFinshedStream) {
    MockGetBrokerUri();

    EXPECT_CALL(mock_http_client, Get_t(HasSubstr("next"), _, _)).WillOnce(DoAll(
                SetArgPointee<1>(HttpCode::Conflict),
                SetArgPointee<2>(nullptr),
                Return("{\"op\":\"get_record_by_id\",\"id\":2,\"id_max\":2,\"next_stream\":\"next\"}")));

    consumer->SetTimeout(300);
    auto err = consumer->GetNext(expected_group_id, &info, nullptr, expected_stream);

    ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kStreamFinished));
}

TEST_F(ConsumerImplTests, GetMessageReturnsMessageMeta) {
    MockGetBrokerUri();

    auto to_send = CreateFI();
    auto json = to_send.Json();

    MockGet(json);

    auto err = consumer->GetNext(expected_group_id, &info, nullptr, expected_stream);

    ASSERT_THAT(err, Eq(nullptr));

    ASSERT_THAT(info.name, Eq(to_send.name));
    ASSERT_THAT(info.size, Eq(to_send.size));
    ASSERT_THAT(info.id, Eq(to_send.id));
    ASSERT_THAT(info.timestamp, Eq(to_send.timestamp));
}

TEST_F(ConsumerImplTests, GetMessageReturnsParseError) {
    MockGetBrokerUri();
    MockGet("error_response");

    auto err = consumer->GetNext(expected_group_id, &info, nullptr, expected_stream);

    ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kInterruptedTransaction));
}

TEST_F(ConsumerImplTests, GetMessageReturnsIfNoDataNeeded) {
    MockGetBrokerUri();
    MockGet("error_response");

    EXPECT_CALL(mock_netclient, GetData_t(_, _, _)).Times(0);
    EXPECT_CALL(mock_io, GetDataFromFile_t(_, _, _)).Times(0);

    consumer->GetNext(expected_group_id, &info, nullptr, expected_stream);
}

TEST_F(ConsumerImplTests, GetMessageTriesToGetDataFromMemoryCache) {
    MockGetBrokerUri();
    auto to_send = CreateFI();
    auto json = to_send.Json();
    MockGet(json);
    MessageData data;

    EXPECT_CALL(mock_netclient, GetData_t(&info, expected_request_sender_details_with_stream, &data)).WillOnce(Return(nullptr));
    MockReadDataFromFile(0);

    consumer->GetNext(expected_group_id, &info, &data, expected_stream);

    ASSERT_THAT(info.buf_id, Eq(expected_buf_id));

}

TEST_F(ConsumerImplTests, GetMessageCallsReadFromFileIfCannotReadFromCache) {
    MockGetBrokerUri();
    auto to_send = CreateFI();
    auto json = to_send.Json();
    MockGet(json);

    MessageData data;

    EXPECT_CALL(mock_netclient, GetData_t(&info, expected_request_sender_details_with_stream,
                                          &data)).WillOnce(Return(asapo::IOErrorTemplates::kUnknownIOError.Generate().release()));
    MockReadDataFromFile();

    consumer->GetNext(expected_group_id, &info, &data, expected_stream);
    ASSERT_THAT(info.buf_id, Eq(0));
}

TEST_F(ConsumerImplTests, GetMessageReturnsErrorIfCannotReadFromCache) {
    MockGetBrokerUri();
    auto to_send = CreateFI();
    to_send.ingest_mode = asapo::kCacheOnlyIngestMode;
    to_send.buf_id = 0;
    auto json = to_send.Json();
    MockGet(json);

    MessageData data;

    auto err = consumer->GetNext(expected_group_id, &info, &data, expected_stream);
    ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kDataNotInCache));
}

TEST_F(ConsumerImplTests, GetMessageCallsReadFromFileIfZeroBufId) {
    MockGetBrokerUri();
    auto to_send = CreateFI(0);
    auto json = to_send.Json();
    MockGet(json);

    MessageData data;
    EXPECT_CALL(mock_netclient, GetData_t(_, _, _)).Times(0);

    MockReadDataFromFile();

    consumer->GetNext(expected_group_id, &info, &data, expected_stream);
}

TEST_F(ConsumerImplTests, GetMessageCallsRetriesReadFromFile) {
    MockGetBrokerUri();
    auto to_send = CreateFI(0);
    auto json = to_send.Json();
    MockGet(json);
    consumer->SetTimeout(200);

    MessageData data;

    EXPECT_CALL(mock_netclient, GetData_t(_, _, _)).Times(0);

    MockReadDataFromFile(2);

    consumer->GetNext(expected_group_id, &info, &data, expected_stream);
}

TEST_F(ConsumerImplTests, GenerateNewGroupIdReturnsErrorCreateGroup) {
    MockGetBrokerUri();

    EXPECT_CALL(mock_http_client, Post_t(HasSubstr("creategroup"), _, "", _, _)).WillOnce(DoAll(
                SetArgPointee<3>(HttpCode::BadRequest),
                SetArgPointee<4>(nullptr),
                Return("")));

    consumer->SetTimeout(100);
    asapo::Error err;
    auto groupid = consumer->GenerateNewGroupId(&err);
    ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kWrongInput));
    ASSERT_THAT(groupid, Eq(""));
}

TEST_F(ConsumerImplTests, GenerateNewGroupIdReturnsGroupID) {
    MockGetBrokerUri();

    EXPECT_CALL(mock_http_client, Post_t(expected_broker_api + "/creategroup?token=" + expected_token, _, "", _,
                                         _)).WillOnce(DoAll(
                                                 SetArgPointee<3>(HttpCode::OK),
                                                 SetArgPointee<4>(nullptr),
                                                 Return(expected_group_id)));

    consumer->SetTimeout(100);
    asapo::Error err;
    auto groupid = consumer->GenerateNewGroupId(&err);
    ASSERT_THAT(err, Eq(nullptr));
    ASSERT_THAT(groupid, Eq(expected_group_id));
}

TEST_F(ConsumerImplTests, ResetCounterByDefaultUsesCorrectUri) {
    MockGetBrokerUri();
    consumer->SetTimeout(100);

    EXPECT_CALL(mock_http_client,
                Post_t(expected_broker_api + "/beamtime/beamtime_id/" + expected_data_source_encoded + "/stream/" +
                       expected_group_id_encoded +
                       "/resetcounter?token=" + expected_token + "&value=0", _, _, _, _)).WillOnce(DoAll(
                                   SetArgPointee<3>(HttpCode::OK),
                                   SetArgPointee<4>(nullptr),
                                   Return("")));
    auto err = consumer->ResetLastReadMarker(expected_group_id, "stream");
    ASSERT_THAT(err, Eq(nullptr));
}

TEST_F(ConsumerImplTests, ResetCounterUsesCorrectUri) {
    MockGetBrokerUri();
    consumer->SetTimeout(100);

    EXPECT_CALL(mock_http_client,
                Post_t(expected_broker_api + "/beamtime/beamtime_id/" + expected_data_source_encoded + "/"
                       +
                       expected_stream_encoded + "/" +
                       expected_group_id_encoded +
                       "/resetcounter?token=" + expected_token + "&value=10", _, _, _, _)).WillOnce(DoAll(
                                   SetArgPointee<3>(HttpCode::OK),
                                   SetArgPointee<4>(nullptr),
                                   Return("")));
    auto err = consumer->SetLastReadMarker(expected_group_id, 10, expected_stream);
    ASSERT_THAT(err, Eq(nullptr));
}

TEST_F(ConsumerImplTests, GetCurrentSizeUsesCorrectUri) {
    MockGetBrokerUri();
    consumer->SetTimeout(100);

    EXPECT_CALL(mock_http_client,
                Get_t(expected_broker_api + "/beamtime/beamtime_id/" + expected_data_source_encoded + "/"
                      +
                      expected_stream_encoded + "/size?token="
                      + expected_token + "&instanceid=" + expected_instance_id_encoded + "&pipelinestep=" + expected_pipeline_step_encoded, _, _)).WillOnce(DoAll(
                                  SetArgPointee<1>(HttpCode::OK),
                                  SetArgPointee<2>(nullptr),
                                  Return("{\"size\":10}")));
    asapo::Error err;
    auto size = consumer->GetCurrentSize(expected_stream, &err);
    ASSERT_THAT(err, Eq(nullptr));
    ASSERT_THAT(size, Eq(10));
}

TEST_F(ConsumerImplTests, GetCurrentSizeErrorOnWrongResponce) {
    MockGetBrokerUri();
    consumer->SetTimeout(100);

    EXPECT_CALL(mock_http_client, Get_t(expected_broker_api + "/beamtime/beamtime_id/" + expected_data_source_encoded +
                                        "/" + expected_stream_encoded + "/size?token="
                                        + expected_token
                                        + "&instanceid=" + expected_instance_id_encoded + "&pipelinestep=" + expected_pipeline_step_encoded, _, _)).WillRepeatedly(DoAll(
                                                    SetArgPointee<1>(HttpCode::Unauthorized),
                                                    SetArgPointee<2>(nullptr),
                                                    Return("")));
    asapo::Error err;
    auto size = consumer->GetCurrentSize(expected_stream, &err);
    ASSERT_THAT(err, Ne(nullptr));
    ASSERT_THAT(size, Eq(0));
}

TEST_F(ConsumerImplTests, GetNDataErrorOnWrongParse) {
    MockGetBrokerUri();
    consumer->SetTimeout(100);

    EXPECT_CALL(mock_http_client, Get_t(expected_broker_api + "/beamtime/beamtime_id/" + expected_data_source_encoded +
                                        "/stream/size?token="
                                        + expected_token_url_with_sourceinfo, _, _)).WillOnce(DoAll(
                                                    SetArgPointee<1>(HttpCode::OK),
                                                    SetArgPointee<2>(nullptr),
                                                    Return("{\"siz\":10}")));
    asapo::Error err;
    auto size = consumer->GetCurrentSize("stream", &err);
    ASSERT_THAT(err, Ne(nullptr));
    ASSERT_THAT(size, Eq(0));
}

TEST_F(ConsumerImplTests, GetByIdUsesCorrectUri) {
    MockGetBrokerUri();
    consumer->SetTimeout(100);
    auto to_send = CreateFI();
    auto json = to_send.Json();

    EXPECT_CALL(mock_http_client, Get_t(expected_broker_api + "/beamtime/beamtime_id/" + expected_data_source_encoded +
                                        "/stream/0/"
                                        + std::to_string(
                                            expected_dataset_id) + "?token="
                                        + expected_token
                                        + "&instanceid=" + expected_instance_id_encoded + "&pipelinestep=" + expected_pipeline_step_encoded, _,
                                        _)).WillOnce(DoAll(
                                                SetArgPointee<1>(HttpCode::OK),
                                                SetArgPointee<2>(nullptr),
                                                Return(json)));

    auto err = consumer->GetById(expected_dataset_id, &info, nullptr, "stream");

    ASSERT_THAT(err, Eq(nullptr));
    ASSERT_THAT(info.name, Eq(to_send.name));
}

TEST_F(ConsumerImplTests, GetByIdTimeouts) {
    MockGetBrokerUri();
    consumer->SetTimeout(10);

    EXPECT_CALL(mock_http_client, Get_t(expected_broker_api + "/beamtime/beamtime_id/" + expected_data_source_encoded +
                                        "/stream/0/"
                                        + std::to_string(expected_dataset_id) + "?token="
                                        + expected_token
                                        + "&instanceid=" + expected_instance_id_encoded + "&pipelinestep=" + expected_pipeline_step_encoded, _, _)).WillOnce(DoAll(
                                                    SetArgPointee<1>(HttpCode::Conflict),
                                                    SetArgPointee<2>(nullptr),
                                                    Return("")));

    auto err = consumer->GetById(expected_dataset_id, &info, nullptr, "stream");

    ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kNoData));
}

TEST_F(ConsumerImplTests, GetByIdReturnsEndOfStream) {
    MockGetBrokerUri();
    consumer->SetTimeout(10);

    EXPECT_CALL(mock_http_client, Get_t(expected_broker_api + "/beamtime/beamtime_id/" + expected_data_source_encoded +
                                        "/stream/0/"
                                        + std::to_string(expected_dataset_id) + "?token="
                                        + expected_token_url_with_sourceinfo, _, _)).WillOnce(DoAll(
                                                    SetArgPointee<1>(HttpCode::Conflict),
                                                    SetArgPointee<2>(nullptr),
                                                    Return("{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":1,\"next_stream\":\"""\"}")));

    auto err = consumer->GetById(expected_dataset_id, &info, nullptr, "stream");

    ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kEndOfStream));
}

TEST_F(ConsumerImplTests, GetByIdReturnsEndOfStreamWhenIdTooLarge) {
    MockGetBrokerUri();
    consumer->SetTimeout(10);

    EXPECT_CALL(mock_http_client, Get_t(expected_broker_api + "/beamtime/beamtime_id/" + expected_data_source_encoded +
                                        "/stream/0/"
                                        + std::to_string(expected_dataset_id) + "?token="
                                        + expected_token
                                        + "&instanceid=" + expected_instance_id_encoded + "&pipelinestep=" + expected_pipeline_step_encoded, _, _)).WillOnce(DoAll(
                                                    SetArgPointee<1>(HttpCode::Conflict),
                                                    SetArgPointee<2>(nullptr),
                                                    Return("{\"op\":\"get_record_by_id\",\"id\":100,\"id_max\":1,\"next_stream\":\"""\"}")));

    auto err = consumer->GetById(expected_dataset_id, &info, nullptr, "stream");
    ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kEndOfStream));
}

TEST_F(ConsumerImplTests, GetBeamtimeMetaDataOK) {
    MockGetBrokerUri();
    consumer->SetTimeout(100);

    EXPECT_CALL(mock_http_client, Get_t(expected_broker_api + "/beamtime/beamtime_id/" + expected_data_source_encoded +
                                        "/default/0/meta/0?token="
                                        + expected_token_url_with_sourceinfo, _,
                                        _)).WillOnce(DoAll(
                                                SetArgPointee<1>(HttpCode::OK),
                                                SetArgPointee<2>(nullptr),
                                                Return(expected_metadata)));

    asapo::Error err;
    auto res = consumer->GetBeamtimeMeta(&err);

    ASSERT_THAT(err, Eq(nullptr));
    ASSERT_THAT(res, Eq(expected_metadata));

}

TEST_F(ConsumerImplTests, GetStreamMetaDataOK) {
    MockGetBrokerUri();
    consumer->SetTimeout(100);

    EXPECT_CALL(mock_http_client, Get_t(expected_broker_api + "/beamtime/beamtime_id/" + expected_data_source_encoded +
                                        "/" + expected_stream_encoded + "/0/meta/1?token="
                                        + expected_token
                                        + "&instanceid=" + expected_instance_id_encoded + "&pipelinestep=" + expected_pipeline_step_encoded, _,
                                        _)).WillOnce(DoAll(
                                                SetArgPointee<1>(HttpCode::OK),
                                                SetArgPointee<2>(nullptr),
                                                Return(expected_metadata)));

    asapo::Error err;
    auto res = consumer->GetStreamMeta(expected_stream, &err);

    ASSERT_THAT(err, Eq(nullptr));
    ASSERT_THAT(res, Eq(expected_metadata));
}

TEST_F(ConsumerImplTests, QueryMessagesReturnError) {
    MockGetBrokerUri();

    EXPECT_CALL(mock_http_client, Post_t(HasSubstr("querymessages"), _, expected_query_string, _, _)).WillOnce(DoAll(
                SetArgPointee<3>(HttpCode::BadRequest),
                SetArgPointee<4>(nullptr),
                Return("error in query")));

    consumer->SetTimeout(1000);
    asapo::Error err;
    auto messages = consumer->QueryMessages(expected_query_string, expected_stream, &err);

    ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kWrongInput));
    ASSERT_THAT(err->Explain(), HasSubstr("query"));
    ASSERT_THAT(messages.size(), Eq(0));
}

TEST_F(ConsumerImplTests, QueryMessagesReturnEmptyResults) {
    MockGetBrokerUri();

    EXPECT_CALL(mock_http_client, Post_t(HasSubstr("querymessages"), _, expected_query_string, _, _)).WillOnce(DoAll(
                SetArgPointee<3>(HttpCode::OK),
                SetArgPointee<4>(nullptr),
                Return("[]")));

    consumer->SetTimeout(100);
    asapo::Error err;
    auto messages = consumer->QueryMessages(expected_query_string, expected_stream, &err);

    ASSERT_THAT(err, Eq(nullptr));
    ASSERT_THAT(messages.size(), Eq(0));
}

TEST_F(ConsumerImplTests, QueryMessagesWrongResponseArray) {

    MockGetBrokerUri();

    auto rec1 = CreateFI();
    auto rec2 = CreateFI();
    auto json1 = rec1.Json();
    auto json2 = rec2.Json();
    auto responce_string = json1 + "," + json2 + "]"; // no [ at the beginning


    EXPECT_CALL(mock_http_client, Post_t(HasSubstr("querymessages"), _, expected_query_string, _, _)).WillOnce(DoAll(
                SetArgPointee<3>(HttpCode::OK),
                SetArgPointee<4>(nullptr),
                Return(responce_string)));

    consumer->SetTimeout(100);
    asapo::Error err;
    auto messages = consumer->QueryMessages(expected_query_string, expected_stream, &err);

    ASSERT_THAT(err, Ne(nullptr));
    ASSERT_THAT(messages.size(), Eq(0));
    ASSERT_THAT(err->Explain(), HasSubstr("response"));
}

TEST_F(ConsumerImplTests, QueryMessagesWrongResponseRecorsd) {

    MockGetBrokerUri();

    auto responce_string = R"([{"bla":1},{"err":}])";

    EXPECT_CALL(mock_http_client, Post_t(HasSubstr("querymessages"), _, expected_query_string, _, _)).WillOnce(DoAll(
                SetArgPointee<3>(HttpCode::OK),
                SetArgPointee<4>(nullptr),
                Return(responce_string)));

    consumer->SetTimeout(100);
    asapo::Error err;
    auto messages = consumer->QueryMessages(expected_query_string, expected_stream, &err);

    ASSERT_THAT(err, Ne(nullptr));
    ASSERT_THAT(messages.size(), Eq(0));
    ASSERT_THAT(err->Explain(), HasSubstr("response"));
}

TEST_F(ConsumerImplTests, QueryMessagesReturnRecords) {

    MockGetBrokerUri();

    auto rec1 = CreateFI();
    auto rec2 = CreateFI();
    rec2.name = "ttt";
    auto json1 = rec1.Json();
    auto json2 = rec2.Json();
    auto responce_string = "[" + json1 + "," + json2 + "]";

    EXPECT_CALL(mock_http_client,
                Post_t(expected_broker_api + "/beamtime/beamtime_id/" + expected_data_source_encoded + "/stream/0" +
                "/querymessages?token=" + expected_token_url_with_sourceinfo, _, expected_query_string, _, _)).WillOnce(DoAll(
                                   SetArgPointee<3>(HttpCode::OK),
                                   SetArgPointee<4>(nullptr),
                                   Return(responce_string)));

    consumer->SetTimeout(100);
    asapo::Error err;
    auto messages = consumer->QueryMessages(expected_query_string, "stream", &err);

    ASSERT_THAT(err, Eq(nullptr));
    ASSERT_THAT(messages.size(), Eq(2));

    ASSERT_THAT(messages[0].name, Eq(rec1.name));
    ASSERT_THAT(messages[1].name, Eq(rec2.name));
}

TEST_F(ConsumerImplTests, GetNextDatasetUsesCorrectUri) {
    MockGetBrokerUri();

    EXPECT_CALL(mock_http_client, Get_t(expected_broker_api + "/beamtime/beamtime_id/" + expected_data_source_encoded +
                                        "/stream/" +
                                        expected_group_id_encoded + "/next?token="
                                        + expected_token + "&dataset=true&minsize=0"
                                        + "&instanceid=" + expected_instance_id_encoded + "&pipelinestep=" + expected_pipeline_step_encoded, _,
                                        _)).WillOnce(DoAll(
                                                SetArgPointee<1>(HttpCode::OK),
                                                SetArgPointee<2>(nullptr),
                                                Return("")));
    asapo::Error err;
    consumer->GetNextDataset(expected_group_id, 0, "stream", &err);
}

TEST_F(ConsumerImplTests, GetNextErrorOnEmptyStream) {
    MessageData data;
    auto err = consumer->GetNext(expected_group_id, &info, &data, "");
    ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kWrongInput));
}

TEST_F(ConsumerImplTests, GetDataSetReturnsMessageMetas) {
    asapo::Error err;
    MockGetBrokerUri();

    auto to_send1 = CreateFI();
    auto json1 = to_send1.Json();
    auto to_send2 = CreateFI();
    to_send2.id = 2;
    auto json2 = to_send2.Json();
    auto to_send3 = CreateFI();
    to_send3.id = 3;
    auto json3 = to_send3.Json();

    auto json = std::string("{") +
                "\"_id\":1," +
                "\"size\":3," +
                "\"messages\":[" + json1 + "," + json2 + "," + json3 + "]" +
                "}";

    MockGet(json);

    auto dataset = consumer->GetNextDataset(expected_group_id, 0, expected_stream, &err);

    ASSERT_THAT(err, Eq(nullptr));

    ASSERT_THAT(dataset.id, Eq(1));
    ASSERT_THAT(dataset.content.size(), Eq(3));
    ASSERT_THAT(dataset.content[0].id, Eq(to_send1.id));
    ASSERT_THAT(dataset.content[1].id, Eq(to_send2.id));
    ASSERT_THAT(dataset.content[2].id, Eq(to_send3.id));
}

TEST_F(ConsumerImplTests, GetDataSetReturnsPartialMessageMetas) {
    asapo::Error err;
    MockGetBrokerUri();

    auto to_send1 = CreateFI();
    auto json1 = to_send1.Json();
    auto to_send2 = CreateFI();
    to_send2.id = 2;
    auto json2 = to_send2.Json();
    auto to_send3 = CreateFI();
    to_send3.id = 3;
    auto json3 = to_send3.Json();

    auto json = std::string("{") +
                "\"_id\":1," +
                "\"size\":3," +
                "\"messages\":[" + json1 + "," + json2 + "]" +
                "}";

    MockGet(json, asapo::HttpCode::PartialContent);

    auto dataset = consumer->GetNextDataset(expected_group_id, 0, expected_stream, &err);

    ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kPartialData));

    auto err_data = static_cast<const asapo::PartialErrorData*>(err->GetCustomData());
    ASSERT_THAT(err_data->expected_size, Eq(3));
    ASSERT_THAT(err_data->id, Eq(1));

    ASSERT_THAT(dataset.id, Eq(1));
    ASSERT_THAT(dataset.content.size(), Eq(2));
    ASSERT_THAT(dataset.content[0].id, Eq(to_send1.id));
    ASSERT_THAT(dataset.content[1].id, Eq(to_send2.id));
}

TEST_F(ConsumerImplTests, GetDataSetByIdReturnsPartialMessageMetas) {
    asapo::Error err;
    MockGetBrokerUri();

    auto to_send1 = CreateFI();
    auto json1 = to_send1.Json();
    auto to_send2 = CreateFI();
    to_send2.id = 2;
    auto json2 = to_send2.Json();
    auto to_send3 = CreateFI();
    to_send3.id = 3;
    auto json3 = to_send3.Json();

    auto json = std::string("{") +
                "\"_id\":1," +
                "\"size\":3," +
                "\"messages\":[" + json1 + "," + json2 + "]" +
                "}";

    MockGet(json, asapo::HttpCode::PartialContent);

    auto dataset = consumer->GetDatasetById(1, 0, expected_stream, &err);

    ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kPartialData));
    auto err_data = static_cast<const asapo::PartialErrorData*>(err->GetCustomData());
    ASSERT_THAT(err_data->expected_size, Eq(3));
    ASSERT_THAT(err_data->id, Eq(1));

    ASSERT_THAT(dataset.id, Eq(1));
    ASSERT_THAT(dataset.content.size(), Eq(2));
    ASSERT_THAT(dataset.content[0].id, Eq(to_send1.id));
    ASSERT_THAT(dataset.content[1].id, Eq(to_send2.id));
}

TEST_F(ConsumerImplTests, GetDataSetReturnsParseError) {
    MockGetBrokerUri();
    MockGet("error_response");

    asapo::Error err;
    auto dataset = consumer->GetNextDataset(expected_group_id, 0, expected_stream, &err);
    ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kInterruptedTransaction));
    ASSERT_THAT(dataset.content.size(), Eq(0));
    ASSERT_THAT(dataset.id, Eq(0));

}

TEST_F(ConsumerImplTests, GetLastDatasetUsesCorrectUri) {
    MockGetBrokerUri();

    EXPECT_CALL(mock_http_client,
                Get_t(expected_broker_api + "/beamtime/beamtime_id/" + expected_data_source_encoded + "/"
                      +
                      expected_stream_encoded + "/0/last?token="
                      + expected_token + "&dataset=true&minsize=1"
                                        + "&instanceid=" + expected_instance_id_encoded + "&pipelinestep=" + expected_pipeline_step_encoded, _,
                      _)).WillOnce(DoAll(
                                       SetArgPointee<1>(HttpCode::OK),
                                       SetArgPointee<2>(nullptr),
                                       Return("")));
    asapo::Error err;
    consumer->GetLastDataset(1, expected_stream, &err);
}

TEST_F(ConsumerImplTests, GetLastDatasetInGroupUsesCorrectUri) {
    MockGetBrokerUri();

    EXPECT_CALL(mock_http_client,
                Get_t(expected_broker_api + "/beamtime/beamtime_id/" + expected_data_source_encoded + "/"
                      +
                      expected_stream_encoded + "/" + expected_group_id_encoded + "/groupedlast?token="
                      + expected_token + "&dataset=true&minsize=1"
                      + "&instanceid=" + expected_instance_id_encoded + "&pipelinestep=" + expected_pipeline_step_encoded, _,
                      _)).WillOnce(DoAll(
                                       SetArgPointee<1>(HttpCode::OK),
                                       SetArgPointee<2>(nullptr),
                                       Return("")));
    asapo::Error err;
    consumer->GetLastDataset(expected_group_id, 1, expected_stream, &err);
}


TEST_F(ConsumerImplTests, GetDatasetByIdUsesCorrectUri) {
    MockGetBrokerUri();

    EXPECT_CALL(mock_http_client, Get_t(expected_broker_api + "/beamtime/beamtime_id/" + expected_data_source_encoded +
                                        "/stream/0/"
                                        + std::to_string(expected_dataset_id) + "?token="
                                        + expected_token
                                        + "&instanceid=" + expected_instance_id_encoded + "&pipelinestep=" + expected_pipeline_step_encoded
                                        + "&dataset=true" + "&minsize=0", _,
                                        _)).WillOnce(DoAll(
                                                SetArgPointee<1>(HttpCode::OK),
                                                SetArgPointee<2>(nullptr),
                                                Return("")));
    asapo::Error err;
    consumer->GetDatasetById(expected_dataset_id, 0, "stream", &err);
}

TEST_F(ConsumerImplTests, DeleteStreamUsesCorrectUri) {
    MockGetBrokerUri();
    std::string expected_delete_stream_query_string = "{\"ErrorOnNotExist\":true,\"DeleteMeta\":true}";
    EXPECT_CALL(mock_http_client, Post_t(expected_broker_api + "/beamtime/beamtime_id/" + expected_data_source_encoded + "/"
                                         + expected_stream_encoded + "/delete"
                                         + "?token=" + expected_token
                                         + "&instanceid=" + expected_instance_id_encoded + "&pipelinestep=" + expected_pipeline_step_encoded, _,
                                         expected_delete_stream_query_string, _, _)).WillOnce(DoAll(
                                                     SetArgPointee<3>(HttpCode::OK),
                                                     SetArgPointee<4>(nullptr),
                                                     Return("")
                                                 ));

    asapo::DeleteStreamOptions opt;
    opt.delete_meta = true;
    opt.error_on_not_exist = true;
    auto err = consumer->DeleteStream(expected_stream, opt);
    ASSERT_THAT(err, Eq(nullptr));

}

TEST_F(ConsumerImplTests, GetStreamListUsesCorrectUri) {
    MockGetBrokerUri();
    std::string return_streams =
        std::string(
            R"({"streams":[{"lastId":123,"name":"test","timestampCreated":1000000,"timestampLast":1000,"finished":false,"nextStream":""},)")
        +
        R"({"lastId":124,"name":"test1","timestampCreated":2000000,"timestampLast":2000,"finished":true,"nextStream":"next"}]})";
    EXPECT_CALL(mock_http_client,
                Get_t(expected_broker_api + "/beamtime/beamtime_id/" + expected_data_source_encoded + "/0/streams"
                      + "?token=" + expected_token + "&from=" + expected_stream_encoded + "&filter=all"
                      + "&detailed=true", _,
                      _)).WillOnce(DoAll(
                                       SetArgPointee<1>(HttpCode::OK),
                                       SetArgPointee<2>(nullptr),
                                       Return(return_streams)));

    asapo::Error err;
    auto streams = consumer->GetStreamList(expected_stream, asapo::StreamFilter::kAllStreams, &err);
    ASSERT_THAT(err, Eq(nullptr));
    ASSERT_THAT(streams.size(), Eq(2));
    ASSERT_THAT(streams.size(), 2);
    ASSERT_THAT(streams[0].Json(),
                R"({"lastId":123,"name":"test","timestampCreated":1000000,"timestampLast":1000,"finished":false,"nextStream":""})");
    ASSERT_THAT(streams[1].Json(),
                R"({"lastId":124,"name":"test1","timestampCreated":2000000,"timestampLast":2000,"finished":true,"nextStream":"next"})");
}

TEST_F(ConsumerImplTests, GetStreamListUsesCorrectUriWithoutFrom) {
    MockGetBrokerUri();
    EXPECT_CALL(mock_http_client,
                Get_t(expected_broker_api + "/beamtime/beamtime_id/" + expected_data_source_encoded + "/0/streams"
                + "?token=" + expected_token
                + "&instanceid=" + expected_instance_id_encoded + "&pipelinestep=" + expected_pipeline_step_encoded
                + "&filter=finished&detailed=true", _,
                      _)).WillOnce(DoAll(
                                       SetArgPointee<1>(HttpCode::OK),
                                       SetArgPointee<2>(nullptr),
                                       Return("")));

    asapo::Error err;
    auto streams = consumer->GetStreamList("", asapo::StreamFilter::kFinishedStreams, &err);
}

void ConsumerImplTests::MockBeforeFTS(const std::string& expected_request_sender_details, MessageData* data) {
    auto to_send = CreateFI();
    auto json = to_send.Json();
    MockGet(json);

    EXPECT_CALL(mock_netclient, GetData_t(&info, expected_request_sender_details,
                                          data)).WillOnce(Return(asapo::IOErrorTemplates::kUnknownIOError.Generate().release()));
}

void ConsumerImplTests::ExpectFolderToken() {
    std::string expected_folder_query_string = "{\"Folder\":\"" + expected_path + "\",\"BeamtimeId\":\"" +
                                               expected_beamtime_id
                                               + "\",\"Token\":\"" + expected_token + "\"}";

    EXPECT_CALL(mock_http_client, Post_t(HasSubstr(expected_server_uri + "/asapo-authorizer/v0.2/folder"), _,
                                         expected_folder_query_string, _, _)).WillOnce(DoAll(
                                                     SetArgPointee<3>(HttpCode::OK),
                                                     SetArgPointee<4>(nullptr),
                                                     Return(expected_folder_token)
                                                 ));

}

ACTION_P(AssignArg3, assign) {
    if (assign) {
        asapo::MessageData data = asapo::MessageData{new uint8_t[1]};
        data[0] = expected_value;
        *arg3 = std::move(data);
    }
}

void ConsumerImplTests::ExpectFileTransfer(const asapo::ConsumerErrorTemplate* p_err_template) {
    EXPECT_CALL(mock_http_client, PostReturnArray_t(HasSubstr(expected_fts_uri + "/v0.2/transfer"),
                                                    expected_cookie,
                                                    expected_fts_query_string,
                                                    _,
                                                    expected_message_size,
                                                    _)).WillOnce(DoAll(
                                                            SetArgPointee<5>(HttpCode::OK),
                                                            AssignArg3(p_err_template == nullptr),
                                                            Return(p_err_template == nullptr ? nullptr : p_err_template->Generate().release())
                                                            ));
}

void ConsumerImplTests::ExpectRepeatedFileTransfer() {
    EXPECT_CALL(mock_http_client, PostReturnArray_t(HasSubstr(expected_fts_uri + "/v0.2/transfer"),
                                                    expected_cookie,
                                                    expected_fts_query_string,
                                                    _,
                                                    expected_message_size,
                                                    _)).
    WillOnce(DoAll(
                 SetArgPointee<5>(HttpCode::Unauthorized),
                 Return(nullptr))).
    WillOnce(DoAll(
                 SetArgPointee<5>(HttpCode::OK),
                 Return(nullptr)
             ));
}

void ConsumerImplTests::AssertSingleFileTransfer() {
    asapo::MessageData data = asapo::MessageData{new uint8_t[1]};
    MockGetBrokerUri();
    MockBeforeFTS(expected_request_sender_details_with_stream, &data);
    ExpectFolderToken();
    MockGetFTSUri();
    ExpectFileTransfer(nullptr);

    fts_consumer->GetNext(expected_group_id, &info, &data, expected_stream);

    ASSERT_THAT(data[0], Eq(expected_value));
    Mock::VerifyAndClearExpectations(&mock_http_client);
    Mock::VerifyAndClearExpectations(&mock_netclient);
    Mock::VerifyAndClearExpectations(&mock_io);
}

TEST_F(ConsumerImplTests, GetMessageUsesFileTransferServiceIfCannotReadFromCache) {
    AssertSingleFileTransfer();
}

TEST_F(ConsumerImplTests, FileTransferReadsFileSize) {
    AssertSingleFileTransfer();
    EXPECT_CALL(mock_http_client, Post_t(HasSubstr("sizeonly=true"),
                                         expected_cookie, expected_fts_query_string, _, _)).WillOnce(DoAll(

                                                     SetArgPointee<3>(HttpCode::OK),
                                                     SetArgPointee<4>(nullptr),
                                                     Return("{\"file_size\":5}")
                                                 ));

    EXPECT_CALL(mock_http_client, PostReturnArray_t(HasSubstr(expected_fts_uri + "/v0.2/transfer"),
                                                    expected_cookie,
                                                    expected_fts_query_string,
                                                    _,
                                                    5,
                                                    _)).WillOnce(DoAll(
                                                            SetArgPointee<5>(HttpCode::OK),
                                                            AssignArg3(false),
                                                            Return(nullptr)
                                                            ));

    MessageData data;
    info.size = 0;
    info.buf_id = 0;
    auto err = fts_consumer->RetrieveData(&info, &data);
}

TEST_F(ConsumerImplTests, GetMessageReusesTokenAndUri) {
    AssertSingleFileTransfer();

    asapo::MessageData data = asapo::MessageData{new uint8_t[1]};
    MockBeforeFTS(expected_request_sender_details_with_stream, &data);
    ExpectFileTransfer(nullptr);

    auto err = fts_consumer->GetNext(expected_group_id, &info, &data, expected_stream);
}

TEST_F(ConsumerImplTests, GetMessageTriesToGetTokenAgainIfTransferFailed) {
    AssertSingleFileTransfer();

    asapo::MessageData data;
    MockBeforeFTS(expected_request_sender_details_with_stream, &data);
    ExpectRepeatedFileTransfer();
    ExpectFolderToken();

    auto err = fts_consumer->GetNext(expected_group_id, &info, &data, expected_stream);
}

TEST_F(ConsumerImplTests, AcknowledgeUsesCorrectUri) {
    MockGetBrokerUri();
    auto expected_acknowledge_command = "{\"Op\":\"ackmessage\"}";
    EXPECT_CALL(mock_http_client, Post_t(expected_broker_api + "/beamtime/beamtime_id/" + expected_data_source_encoded + "/"
                                         +
                                         expected_stream_encoded + "/" +
                                         expected_group_id_encoded
                                         + "/" + std::to_string(expected_dataset_id) + "?token="
                                         + expected_token_url_with_sourceinfo, _, expected_acknowledge_command, _, _)).WillOnce(DoAll(
                                                     SetArgPointee<3>(HttpCode::OK),
                                                     SetArgPointee<4>(nullptr),
                                                     Return("")));

    auto err = consumer->Acknowledge(expected_group_id, expected_dataset_id, expected_stream);

    ASSERT_THAT(err, Eq(nullptr));
}

void ConsumerImplTests::ExpectIdList(bool error) {
    MockGetBrokerUri();
    EXPECT_CALL(mock_http_client,
                Get_t(expected_broker_api + "/beamtime/beamtime_id/" + expected_data_source_encoded + "/"
                      +
                      expected_stream_encoded + "/" +
                      expected_group_id_encoded + "/nacks?token=" + expected_token + "&from=1&to=0",
                      _,
                      _)).WillOnce(DoAll(
                                       SetArgPointee<1>(HttpCode::OK),
                                       SetArgPointee<2>(nullptr),
                                       Return(error ? "" : "{\"unacknowledged\":[1,2,3]}")));
}

TEST_F(ConsumerImplTests, GetUnAcknowledgedListReturnsIds) {
    ExpectIdList(false);
    asapo::Error err;
    auto list = consumer->GetUnacknowledgedMessages(expected_group_id, 1, 0, expected_stream, &err);

    ASSERT_THAT(list, ElementsAre(1, 2, 3));
    ASSERT_THAT(err, Eq(nullptr));
}

void ConsumerImplTests::ExpectLastAckId(bool empty_response) {
    EXPECT_CALL(mock_http_client, Get_t(expected_broker_api + "/beamtime/beamtime_id/" + expected_data_source_encoded + "/"
                                        +
                                        expected_stream_encoded + "/" +
                                        expected_group_id_encoded + "/lastack?token=" + expected_token
                                        + "&instanceid=" + expected_instance_id_encoded + "&pipelinestep=" + expected_pipeline_step_encoded, _, _)).WillOnce(DoAll(
                                                    SetArgPointee<1>(HttpCode::OK),
                                                    SetArgPointee<2>(nullptr),
                                                    Return(empty_response ? "{\"lastAckId\":0}" : "{\"lastAckId\":1}")));
}

TEST_F(ConsumerImplTests, GetLastAcknowledgeUsesOk) {
    MockGetBrokerUri();
    ExpectLastAckId(false);

    asapo::Error err;
    auto ind = consumer->GetLastAcknowledgedMessage(expected_group_id, expected_stream, &err);
    ASSERT_THAT(err, Eq(nullptr));
    ASSERT_THAT(ind, Eq(1));
}

TEST_F(ConsumerImplTests, GetLastAcknowledgeReturnsNoData) {
    MockGetBrokerUri();
    ExpectLastAckId(true);

    asapo::Error err;
    auto ind = consumer->GetLastAcknowledgedMessage(expected_group_id, expected_stream, &err);
    ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kNoData));
    ASSERT_THAT(ind, Eq(0));
}

TEST_F(ConsumerImplTests, GetByIdErrorsForId0) {

    auto err = consumer->GetById(0, &info, nullptr, expected_stream);

    ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kWrongInput));
}

TEST_F(ConsumerImplTests, ResendNacks) {
    MockGetBrokerUri();

    EXPECT_CALL(mock_http_client, Get_t(expected_broker_api + "/beamtime/beamtime_id/" + expected_data_source_encoded +
                                        "/stream/"
                                        + expected_group_id_encoded + "/next?token="
                                        + expected_token
                                        + "&instanceid=" + expected_instance_id_encoded + "&pipelinestep=" + expected_pipeline_step_encoded
                                        + "&resend_nacks=true&delay_ms=10000&resend_attempts=3", _,
                                        _)).WillOnce(DoAll(
                                                SetArgPointee<1>(HttpCode::OK),
                                                SetArgPointee<2>(nullptr),
                                                Return("")));

    consumer->SetResendNacs(true, 10000, 3);
    consumer->GetNext(expected_group_id, &info, nullptr, "stream");
}

TEST_F(ConsumerImplTests, NegativeAcknowledgeUsesCorrectUri) {
    MockGetBrokerUri();
    auto expected_neg_acknowledge_command = R"({"Op":"negackmessage","Params":{"DelayMs":10000}})";
    EXPECT_CALL(mock_http_client, Post_t(expected_broker_api + "/beamtime/beamtime_id/" + expected_data_source_encoded + "/"
                                         +
                                         expected_stream_encoded + "/" +
                                         expected_group_id_encoded
                                         + "/" + std::to_string(expected_dataset_id) + "?token="
                                         + expected_token
                                         + "&instanceid=" + expected_instance_id_encoded + "&pipelinestep=" + expected_pipeline_step_encoded
                                         , _, expected_neg_acknowledge_command, _, _)).WillOnce(
                                             DoAll(
                                                 SetArgPointee<3>(HttpCode::OK),
                                                 SetArgPointee<4>(nullptr),
                                                 Return("")));

    auto err = consumer->NegativeAcknowledge(expected_group_id, expected_dataset_id, 10000, expected_stream);

    ASSERT_THAT(err, Eq(nullptr));
}

TEST_F(ConsumerImplTests, CanInterruptOperation) {
    EXPECT_CALL(mock_http_client, Get_t(_, _, _)).Times(AtLeast(1)).WillRepeatedly(DoAll(
                SetArgPointee<1>(HttpCode::ServiceUnavailable),
                SetArgPointee<2>(nullptr),
                Return("")));

    auto start = std::chrono::system_clock::now();
    asapo::Error err;
    auto exec = [this, &err]() {
        consumer->SetTimeout(10000);
        err = consumer->GetNext("", &info, nullptr, expected_stream);
    };
    auto thread = std::thread(exec);
    std::this_thread::sleep_for(std::chrono::milliseconds(100));

    consumer->InterruptCurrentOperation();

    thread.join();

    auto elapsed_ms = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now() -
                      start).count();
    ASSERT_THAT(elapsed_ms, testing::Lt(1000));
    ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kInterruptedTransaction));

}

TEST_F(ConsumerImplTests, GetCurrentDataSetCounteUsesCorrectUri) {
    MockGetBrokerUri();
    consumer->SetTimeout(100);

    EXPECT_CALL(mock_http_client,
                Get_t(expected_broker_api + "/beamtime/beamtime_id/" + expected_data_source_encoded + "/"
                      +
                      expected_stream_encoded + "/size?token="
                      + expected_token + "&incomplete=true", _, _)).WillOnce(DoAll(
                                  SetArgPointee<1>(HttpCode::OK),
                                  SetArgPointee<2>(nullptr),
                                  Return("{\"size\":10}")));
    asapo::Error err;
    auto size = consumer->GetCurrentDatasetCount(expected_stream, true, &err);
    ASSERT_THAT(err, Eq(nullptr));
    ASSERT_THAT(size, Eq(10));
}

TEST_F(ConsumerImplTests, GetVersionInfoClientOnly) {
    std::string client_info;
    auto err = consumer->GetVersionInfo(&client_info, nullptr, nullptr);
    ASSERT_THAT(err, Eq(nullptr));
    ASSERT_THAT(client_info, HasSubstr(std::string(asapo::kVersion)));
    ASSERT_THAT(client_info, HasSubstr(asapo::kConsumerProtocol.GetVersion()));
}
TEST_F(ConsumerImplTests, GetVersionInfoWithServer) {

    std::string result =
        R"({"softwareVersion":"20.03.1, build 7a9294ad","clientSupported":"no", "clientProtocol":{"versionInfo":"v0.2"}})";

    EXPECT_CALL(mock_http_client, Get_t(HasSubstr(expected_server_uri +
                                                  "/asapo-discovery/v0.1/version?token=token&client=consumer&protocol=" + expected_consumer_protocol), _,
                                        _)).WillOnce(DoAll(
                                                SetArgPointee<1>(HttpCode::OK),
                                                SetArgPointee<2>(nullptr),
                                                Return(result)));

    std::string client_info, server_info;
    auto err = consumer->GetVersionInfo(&client_info, &server_info, nullptr);
    ASSERT_THAT(err, Eq(nullptr));
    ASSERT_THAT(server_info, HasSubstr("20.03.1"));
    ASSERT_THAT(server_info, HasSubstr("v0.2"));
}

}