Skip to content
Snippets Groups Projects
Commit 7cdda5c6 authored by George Sedov's avatar George Sedov
Browse files

Send kafka notifications only for online files

parent 54e963aa
No related branches found
No related tags found
No related merge requests found
#include "request_handler_kafka_notify.h"
#include "../request.h"
#include "file_processors/file_processor.h"
namespace asapo {
Error RequestHandlerKafkaNotify::ProcessRequest(Request* request) const {
if (!kafka_client_) {
//client was not initialized, ignore
bool online = request->GetSourceType() != SourceType::kProcessed &&
!static_cast<bool>(request->GetCustomData()[kPosIngestMode] & IngestModeFlags::kWriteRawDataToOffline);
if (!kafka_client_ || !online) {
//client was not initialized or file written to offline
return nullptr;
}
std::string root_folder;
if (auto err = GetRootFolder(request, &root_folder)){
return err;
auto root_folder = request->GetOnlinePath();
if (root_folder.empty()) {
return ReceiverErrorTemplates::kBadRequest.Generate("online path not available");
}
std::string message = "{"
......
......@@ -15,19 +15,14 @@ class KafkaNotifyHandlerTests : public Test {
NiceMock<MockKafkaClient> kafka_client;
RequestHandlerKafkaNotify handler{&kafka_client};
std::unique_ptr<NiceMock<MockRequest>> mock_request;
std::string expected_filename = std::string("processed") + asapo::kPathSeparator + "filename";
std::string expected_offline_path = std::string("offline") + asapo::kPathSeparator + "path";
std::string expected_filename = std::string("raw") + asapo::kPathSeparator + "filename";
std::string expected_online_path = std::string("online") + asapo::kPathSeparator + "path";
CustomRequestData expected_custom_data {kDefaultIngestMode, 0, 0};
const std::string expected_topic = "asapo";
void SetUp() override {
GenericRequestHeader request_header;
mock_request.reset(new NiceMock<MockRequest> {request_header, 1, "", nullptr});
EXPECT_CALL(*mock_request, GetFileName()).Times(2).WillRepeatedly(Return(expected_filename));
EXPECT_CALL(*mock_request, GetSourceType()).Times(2).WillRepeatedly(Return(SourceType::kProcessed));
EXPECT_CALL(*mock_request, GetCustomData_t()).WillOnce(Return(expected_custom_data));
EXPECT_CALL(*mock_request, GetOfflinePath()).WillOnce(ReturnRef(expected_offline_path));
EXPECT_CALL(kafka_client, Send_t(HasSubstr(expected_filename), expected_topic)).WillOnce(Return(nullptr));
}
void TearDown() override {
......@@ -35,9 +30,37 @@ class KafkaNotifyHandlerTests : public Test {
};
TEST_F(KafkaNotifyHandlerTests, KafkaNotifyOK) {
EXPECT_CALL(*mock_request, GetFileName()).WillOnce(Return(expected_filename));
EXPECT_CALL(*mock_request, GetOnlinePath()).WillOnce(ReturnRef(expected_online_path));
EXPECT_CALL(kafka_client, Send_t(HasSubstr(expected_filename), expected_topic)).WillOnce(Return(nullptr));
EXPECT_CALL(*mock_request, GetCustomData_t()).WillOnce(Return(expected_custom_data));
EXPECT_CALL(*mock_request, GetSourceType()).WillOnce(Return(SourceType::kRaw));
auto err = handler.ProcessRequest(mock_request.get());
ASSERT_THAT(err, Eq(nullptr));
Mock::VerifyAndClearExpectations(mock_request.get());
Mock::VerifyAndClearExpectations(&kafka_client);
}
TEST_F(KafkaNotifyHandlerTests, KafkaNotifyNotNeededForProcessed) {
EXPECT_CALL(*mock_request, GetSourceType()).WillOnce(Return(SourceType::kProcessed));
auto err = handler.ProcessRequest(mock_request.get());
ASSERT_THAT(err, Eq(nullptr));
Mock::VerifyAndClearExpectations(mock_request.get());
Mock::VerifyAndClearExpectations(&kafka_client);
}
TEST_F(KafkaNotifyHandlerTests, KafkaNotifyNotNeededForOfflineRaw) {
EXPECT_CALL(*mock_request, GetSourceType()).WillOnce(Return(SourceType::kRaw));
EXPECT_CALL(*mock_request, GetCustomData_t()).WillOnce(Return(expected_custom_data));
expected_custom_data[kPosIngestMode] |= IngestModeFlags::kWriteRawDataToOffline;
auto err = handler.ProcessRequest(mock_request.get());
ASSERT_THAT(err, Eq(nullptr));
Mock::VerifyAndClearExpectations(mock_request.get());
Mock::VerifyAndClearExpectations(&kafka_client);
}
}
\ No newline at end of file
......@@ -11,7 +11,7 @@ receiver_root_folder=/tmp/asapo/receiver/files
facility=test_facility
year=2019
receiver_folder=${receiver_root_folder}/${facility}/gpfs/${beamline}/${year}/data/${beamtime_id}
receiver_folder_online=${receiver_root_folder}/beamline/${beamline}/current
Cleanup() {
echo cleanup
......@@ -32,7 +32,7 @@ Cleanup() {
rm -f bootstrap
./transfer-single-file_kafka ${receiver_folder}/processed/1 & KAFKA_PID=$!
./transfer-single-file_kafka ${receiver_folder_online}/raw/1 & KAFKA_PID=$!
echo "Started the kafka listener"
......@@ -59,10 +59,11 @@ do
done
mkdir -p ${receiver_folder}
mkdir -p ${receiver_folder_online}
$1 localhost:8400 ${beamtime_id} 100 1 1 0 30
$1 localhost:8400 ${beamtime_id} 100 1 1 100 30
ls -ln ${receiver_folder}/processed/1 | awk '{ print $5 }'| grep 100000
ls -ln ${receiver_folder_online}/raw/1 | awk '{ print $5 }'| grep 100000
wait $KAFKA_PID
RESULT=$?
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment