From 7cdda5c6bab64f1f12f049844f9f5c5d83123f70 Mon Sep 17 00:00:00 2001
From: George Sedov <george.sedov@desy.de>
Date: Thu, 13 Jan 2022 13:45:10 +0100
Subject: [PATCH] Send kafka notifications only for online files

---
 .../request_handler_kafka_notify.cpp          | 15 ++++----
 .../test_request_handler_kafka_notify.cpp     | 37 +++++++++++++++----
 .../check_linux.sh                            |  9 +++--
 3 files changed, 43 insertions(+), 18 deletions(-)

diff --git a/receiver/src/request_handler/request_handler_kafka_notify.cpp b/receiver/src/request_handler/request_handler_kafka_notify.cpp
index c94cce9a4..75302e38d 100644
--- a/receiver/src/request_handler/request_handler_kafka_notify.cpp
+++ b/receiver/src/request_handler/request_handler_kafka_notify.cpp
@@ -1,19 +1,20 @@
 #include "request_handler_kafka_notify.h"
 #include "../request.h"
-#include "file_processors/file_processor.h"
 
 namespace asapo {
 
 Error RequestHandlerKafkaNotify::ProcessRequest(Request* request) const {
-    if (!kafka_client_) {
-        //client was not initialized, ignore
+    bool online = request->GetSourceType() != SourceType::kProcessed &&
+                  !static_cast<bool>(request->GetCustomData()[kPosIngestMode] & IngestModeFlags::kWriteRawDataToOffline);
+
+    if (!kafka_client_ || !online) {
+        //client was not initialized or file written to offline
         return nullptr;
     }
 
-    std::string root_folder;
-
-    if (auto err = GetRootFolder(request, &root_folder)){
-        return err;
+    auto root_folder = request->GetOnlinePath();
+    if (root_folder.empty()) {
+        return ReceiverErrorTemplates::kBadRequest.Generate("online path not available");
     }
 
     std::string message = "{"
diff --git a/receiver/unittests/request_handler/test_request_handler_kafka_notify.cpp b/receiver/unittests/request_handler/test_request_handler_kafka_notify.cpp
index 458214f86..fdd4c299b 100644
--- a/receiver/unittests/request_handler/test_request_handler_kafka_notify.cpp
+++ b/receiver/unittests/request_handler/test_request_handler_kafka_notify.cpp
@@ -15,19 +15,14 @@ class KafkaNotifyHandlerTests : public Test {
     NiceMock<MockKafkaClient> kafka_client;
     RequestHandlerKafkaNotify handler{&kafka_client};
     std::unique_ptr<NiceMock<MockRequest>> mock_request;
-    std::string expected_filename = std::string("processed") + asapo::kPathSeparator + "filename";
-    std::string expected_offline_path = std::string("offline") + asapo::kPathSeparator + "path";
+    std::string expected_filename = std::string("raw") + asapo::kPathSeparator + "filename";
+    std::string expected_online_path = std::string("online") + asapo::kPathSeparator + "path";
     CustomRequestData expected_custom_data {kDefaultIngestMode, 0, 0};
     const std::string expected_topic = "asapo";
 
     void SetUp() override {
         GenericRequestHeader request_header;
         mock_request.reset(new NiceMock<MockRequest> {request_header, 1, "", nullptr});
-        EXPECT_CALL(*mock_request, GetFileName()).Times(2).WillRepeatedly(Return(expected_filename));
-        EXPECT_CALL(*mock_request, GetSourceType()).Times(2).WillRepeatedly(Return(SourceType::kProcessed));
-        EXPECT_CALL(*mock_request, GetCustomData_t()).WillOnce(Return(expected_custom_data));
-        EXPECT_CALL(*mock_request, GetOfflinePath()).WillOnce(ReturnRef(expected_offline_path));
-        EXPECT_CALL(kafka_client, Send_t(HasSubstr(expected_filename), expected_topic)).WillOnce(Return(nullptr));
     }
 
     void TearDown() override {
@@ -35,9 +30,37 @@ class KafkaNotifyHandlerTests : public Test {
 };
 
 TEST_F(KafkaNotifyHandlerTests, KafkaNotifyOK) {
+    EXPECT_CALL(*mock_request, GetFileName()).WillOnce(Return(expected_filename));
+    EXPECT_CALL(*mock_request, GetOnlinePath()).WillOnce(ReturnRef(expected_online_path));
+    EXPECT_CALL(kafka_client, Send_t(HasSubstr(expected_filename), expected_topic)).WillOnce(Return(nullptr));
+    EXPECT_CALL(*mock_request, GetCustomData_t()).WillOnce(Return(expected_custom_data));
+    EXPECT_CALL(*mock_request, GetSourceType()).WillOnce(Return(SourceType::kRaw));
+
+    auto err = handler.ProcessRequest(mock_request.get());
+    ASSERT_THAT(err, Eq(nullptr));
+    Mock::VerifyAndClearExpectations(mock_request.get());
+    Mock::VerifyAndClearExpectations(&kafka_client);
+}
+
+TEST_F(KafkaNotifyHandlerTests, KafkaNotifyNotNeededForProcessed) {
+    EXPECT_CALL(*mock_request, GetSourceType()).WillOnce(Return(SourceType::kProcessed));
+
+
     auto err = handler.ProcessRequest(mock_request.get());
     ASSERT_THAT(err, Eq(nullptr));
     Mock::VerifyAndClearExpectations(mock_request.get());
+    Mock::VerifyAndClearExpectations(&kafka_client);
+}
+
+TEST_F(KafkaNotifyHandlerTests, KafkaNotifyNotNeededForOfflineRaw) {
+    EXPECT_CALL(*mock_request, GetSourceType()).WillOnce(Return(SourceType::kRaw));
+    EXPECT_CALL(*mock_request, GetCustomData_t()).WillOnce(Return(expected_custom_data));
+    expected_custom_data[kPosIngestMode] |= IngestModeFlags::kWriteRawDataToOffline;
+
+    auto err = handler.ProcessRequest(mock_request.get());
+    ASSERT_THAT(err, Eq(nullptr));
     Mock::VerifyAndClearExpectations(mock_request.get());
+    Mock::VerifyAndClearExpectations(&kafka_client);
 }
+
 }
\ No newline at end of file
diff --git a/tests/automatic/producer_receiver/transfer_single_file_with_kafka/check_linux.sh b/tests/automatic/producer_receiver/transfer_single_file_with_kafka/check_linux.sh
index 4aaa80ddc..6af384ac6 100644
--- a/tests/automatic/producer_receiver/transfer_single_file_with_kafka/check_linux.sh
+++ b/tests/automatic/producer_receiver/transfer_single_file_with_kafka/check_linux.sh
@@ -11,7 +11,7 @@ receiver_root_folder=/tmp/asapo/receiver/files
 facility=test_facility
 year=2019
 receiver_folder=${receiver_root_folder}/${facility}/gpfs/${beamline}/${year}/data/${beamtime_id}
-
+receiver_folder_online=${receiver_root_folder}/beamline/${beamline}/current
 
 Cleanup() {
     echo cleanup
@@ -32,7 +32,7 @@ Cleanup() {
 
 rm -f bootstrap
 
-./transfer-single-file_kafka ${receiver_folder}/processed/1 & KAFKA_PID=$!
+./transfer-single-file_kafka ${receiver_folder_online}/raw/1 & KAFKA_PID=$!
 
 echo "Started the kafka listener"
 
@@ -59,10 +59,11 @@ do
 done
 
 mkdir -p ${receiver_folder}
+mkdir -p ${receiver_folder_online}
 
-$1 localhost:8400 ${beamtime_id} 100 1 1  0 30
+$1 localhost:8400 ${beamtime_id} 100 1 1  100 30
 
-ls -ln ${receiver_folder}/processed/1 | awk '{ print $5 }'| grep 100000
+ls -ln ${receiver_folder_online}/raw/1 | awk '{ print $5 }'| grep 100000
 
 wait $KAFKA_PID
 RESULT=$?
-- 
GitLab