diff --git a/CHANGELOG.md b/CHANGELOG.md
index 6e0e8e990b51c8e1acd3da7a8a6b41f329ff7c1e..314ff079617d01832170149e9317695f3dbf5bf5 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -9,6 +9,9 @@ IMPROVEMENTS
 BUG FIXES
 * Producer API: fixed bug segfault in Python code when sending data object which memory is from some other object
 
+INTERNAL
+* Do not return error when memory cache is not allocatable - just write to disk
+
 
 ## 21.12.0
 
diff --git a/CMakeIncludes/dependencies.cmake b/CMakeIncludes/dependencies.cmake
index a9b19a74c0e7f8fbe0f353946fc082349ad4fe7f..38e74c10a268be2f44778cb3e428a76ae12c2ac8 100644
--- a/CMakeIncludes/dependencies.cmake
+++ b/CMakeIncludes/dependencies.cmake
@@ -1,32 +1,39 @@
-if(BUILD_PYTHON)
+if (BUILD_PYTHON)
     set(BUILD_PYTHON_PACKAGES "" CACHE STRING "which python packages to build")
     set_property(CACHE BUILD_PYTHON_PACKAGES PROPERTY STRINGS source rpm deb win)
-endif()
+endif ()
 
-set (CMAKE_PREFIX_PATH "${LIBCURL_DIR}")
-find_package (CURL REQUIRED)
-message (STATUS "Found cURL libraries: ${CURL_LIBRARIES}")
-message (STATUS "cURL include: ${CURL_INCLUDE_DIRS}")
-if(CURL_FOUND) #old FindCURL versions do not create CURL::libcurl target, so we do it here if CURL::libcurl is missing
-    if(NOT TARGET CURL::libcurl)
+set(CMAKE_PREFIX_PATH "${LIBCURL_DIR}")
+find_package(CURL REQUIRED)
+message(STATUS "Found cURL libraries: ${CURL_LIBRARIES}")
+message(STATUS "cURL include: ${CURL_INCLUDE_DIRS}")
+if (CURL_FOUND) #old FindCURL versions do not create CURL::libcurl target, so we do it here if CURL::libcurl is missing
+    if (NOT TARGET CURL::libcurl)
         add_library(CURL::libcurl UNKNOWN IMPORTED)
         set_target_properties(CURL::libcurl PROPERTIES
                 INTERFACE_INCLUDE_DIRECTORIES "${CURL_INCLUDE_DIRS}")
         set_target_properties(CURL::libcurl PROPERTIES
                 IMPORTED_LINK_INTERFACE_LANGUAGES "C"
                 IMPORTED_LOCATION "${CURL_LIBRARIES}")
-    endif()
-endif()
-
+    endif ()
+endif ()
 
+if (NOT BUILD_CLIENTS_ONLY)
+    find_package(RdKafka REQUIRED)
+    message(STATUS "Found rdkafka++ libraries: ${RDKAFKA_LIBRARIES}")
+    message(STATUS "rdkafka++ include dir : ${RDKAFKA_INCLUDE_DIR}")
+    if (WIN32)
+        message(STATUS "rdkafka++ binary dir (dll): ${RDKAFKA_BIN_DIR}")
+    endif ()
+endif ()
 
 # python is needed anyway, even if no Python packages are build (e.g. to parse test results)
 if ("${Python_EXECUTABLE}" STREQUAL "")
-    find_package (Python COMPONENTS Interpreter Development)
+    find_package(Python COMPONENTS Interpreter Development)
     if (NOT Python_FOUND)
-        message (FATAL "Cannot find Python")
-    endif()
-endif()
-message (STATUS "Using Python: ${Python_EXECUTABLE}")
+        message(FATAL "Cannot find Python")
+    endif ()
+endif ()
+message(STATUS "Using Python: ${Python_EXECUTABLE}")
 
 include(libfabric)
\ No newline at end of file
diff --git a/CMakeModules/FindRdKafka.cmake b/CMakeModules/FindRdKafka.cmake
new file mode 100644
index 0000000000000000000000000000000000000000..84d4cd540bcc6b1977354421649b36797048abb3
--- /dev/null
+++ b/CMakeModules/FindRdKafka.cmake
@@ -0,0 +1,24 @@
+# FindRdKafka
+# -------------
+#
+# Tries to find RdKafka on the system
+#
+# Available variables
+# RDKAFKA_LIBRARIES         - Path to the library
+# RDKAFKA_INCLUDE_DIR     - Path to the include dir
+
+cmake_minimum_required(VERSION 3.7)
+
+find_path(RDKAFKA_INCLUDE_DIR librdkafka/rdkafka.h HINTS ${RdKafka_DIR}/include)
+find_library(RDKAFKA_LIBRARIES rdkafka++ HINTS ${RdKafka_DIR}/lib ${RdKafka_DIR}/lib64)
+find_library(RDKAFKA_C_LIBRARIES rdkafka HINTS ${RdKafka_DIR}/lib ${RdKafka_DIR}/lib64)
+
+IF(WIN32)
+    find_path(RDKAFKA_BIN_DIR rdkafka++.dll HINTS ${RdKafka_DIR}/bin ${RdKafka_DIR}/lib)
+    mark_as_advanced(RDKAFKA_BIN_DIR)
+    find_package_handle_standard_args(RdKafka REQUIRED_VARS RDKAFKA_INCLUDE_DIR RDKAFKA_C_LIBRARIES RDKAFKA_LIBRARIES RDKAFKA_BIN_DIR)
+ELSE()
+    find_package_handle_standard_args(RdKafka REQUIRED_VARS RDKAFKA_INCLUDE_DIR RDKAFKA_C_LIBRARIES RDKAFKA_LIBRARIES)
+ENDIF()
+
+mark_as_advanced(RDKAFKA_LIBRARIES RDKAFKA_INCLUDE_DIR)
diff --git a/CMakeModules/prepare_asapo.cmake b/CMakeModules/prepare_asapo.cmake
index bcf2db0fcd819039856d419e85e7f3d00cc8be3b..e2badd2b9ca9e49bfd2414f7da3e431a3a12cf21 100644
--- a/CMakeModules/prepare_asapo.cmake
+++ b/CMakeModules/prepare_asapo.cmake
@@ -32,12 +32,14 @@ function(prepare_asapo)
     else()
         configure_file(${CMAKE_SOURCE_DIR}/tests/automatic/settings/receiver_tcp.json.tpl.lin.in receiver_tcp.json.tpl @ONLY)
         configure_file(${CMAKE_SOURCE_DIR}/tests/automatic/settings/receiver_fabric.json.tpl.lin.in receiver_fabric.json.tpl @ONLY)
+        configure_file(${CMAKE_SOURCE_DIR}/tests/automatic/settings/receiver_kafka.json.tpl.lin.in receiver_kafka.json.tpl @ONLY)
         configure_file(${CMAKE_SOURCE_DIR}/tests/automatic/settings/authorizer_settings.json.tpl.lin authorizer.json.tpl COPYONLY)
         configure_file(${CMAKE_SOURCE_DIR}/tests/automatic/common_scripts/start_services.sh start_services.sh COPYONLY)
         configure_file(${CMAKE_SOURCE_DIR}/tests/automatic/common_scripts/stop_services.sh stop_services.sh COPYONLY)
 
         configure_file(${CMAKE_SOURCE_DIR}/config/nomad/receiver_tcp.nmd.in  receiver_tcp.nmd @ONLY)
         configure_file(${CMAKE_SOURCE_DIR}/config/nomad/receiver_fabric.nmd.in  receiver_fabric.nmd @ONLY)
+        configure_file(${CMAKE_SOURCE_DIR}/config/nomad/receiver_kafka.nmd.in  receiver_kafka.nmd @ONLY)
         configure_file(${CMAKE_SOURCE_DIR}/config/nomad/nginx_kill_lin.nmd nginx_kill.nmd @ONLY)
     endif()
 
diff --git a/common/cpp/CMakeLists.txt b/common/cpp/CMakeLists.txt
index 8f90ff257a0db963ffc133518e2768c020975fc7..b8ff069c4e0c3dae6784283c97a5485a08e55e22 100644
--- a/common/cpp/CMakeLists.txt
+++ b/common/cpp/CMakeLists.txt
@@ -8,6 +8,9 @@ add_subdirectory(src/data_structs)
 
 add_subdirectory(src/version)
 
+if (NOT BUILD_CLIENTS_ONLY)
+    add_subdirectory(src/kafka_client)
+endif()
 
 add_subdirectory(src/http_client)
 
diff --git a/common/cpp/include/asapo/common/error.tpp b/common/cpp/include/asapo/common/error.tpp
index 255e9e214e1e87a87841643c6534e1d1432438eb..3c2c90a662aa8e2f4397e545f93198615c75a15c 100644
--- a/common/cpp/include/asapo/common/error.tpp
+++ b/common/cpp/include/asapo/common/error.tpp
@@ -48,7 +48,7 @@ std::string ServiceError<ServiceErrorType>::ExplainPretty(uint8_t shift) const n
     }
     if (cause_err_ != nullptr) {
         err += "\n" + base_shift + shift_s + "caused by: ";
-        err += "\n" + base_shift + shift_s + cause_err_->ExplainPretty((uint8_t)(shift + 2));
+        err += "\n" + base_shift + shift_s + cause_err_->ExplainPretty(static_cast<uint8_t>(shift + 2));
     }
     return err;
 }
diff --git a/common/cpp/include/asapo/json_parser/json_parser.h b/common/cpp/include/asapo/json_parser/json_parser.h
index a2baa383dc52f4b65afadae2029b5a70cae9f255..cf123e712547fb42ef0c693c6f60404e13656405 100644
--- a/common/cpp/include/asapo/json_parser/json_parser.h
+++ b/common/cpp/include/asapo/json_parser/json_parser.h
@@ -4,6 +4,7 @@
 #include <string>
 #include <memory>
 #include <vector>
+#include <map>
 #include <string>
 
 #include "asapo/common/error.h"
@@ -20,6 +21,8 @@ class JsonParser {
     Error GetString(const std::string& name, std::string* val) const noexcept;
     Error GetArrayUInt64(const std::string& name, std::vector<uint64_t>* val) const noexcept;
     Error GetArrayString(const std::string& name, std::vector<std::string>* val) const noexcept;
+    Error GetArrayObjectMembers(const std::string& name, std::vector<std::string>* val) const noexcept;
+    Error GetDictionaryString(const std::string& name, std::map<std::string, std::string>* val) const noexcept;
     Error GetArrayRawStrings(const std::string& name, std::vector<std::string>* val) const noexcept;
     Error GetRawString(std::string* val) const noexcept;
     Error GetFlattenedString(const std::string& prefix, const std::string& separator, std::string* val) const noexcept;
diff --git a/common/cpp/include/asapo/kafka_client/kafka_client.h b/common/cpp/include/asapo/kafka_client/kafka_client.h
new file mode 100644
index 0000000000000000000000000000000000000000..a920d36e1e1e0bc3fa53256dfc8d41052965acb6
--- /dev/null
+++ b/common/cpp/include/asapo/kafka_client/kafka_client.h
@@ -0,0 +1,23 @@
+#ifndef ASAPO_KAFKA_CLIENT_H
+#define ASAPO_KAFKA_CLIENT_H
+
+#include "asapo/common/error.h"
+#include "asapo/common/data_structs.h"
+#include "asapo/kafka_client/kafka_config.h"
+#include "asapo/kafka_client/kafka_error.h"
+#include <map>
+
+namespace asapo {
+
+class KafkaClient {
+  public:
+    virtual Error Send(const std::string& data,
+                       const std::string& topic) noexcept = 0;
+    virtual ~KafkaClient() = default;
+};
+
+KafkaClient* CreateKafkaClient(const KafkaClientConfig& config, Error* err);
+
+}
+
+#endif //ASAPO_KAFKA_CLIENT_H
diff --git a/common/cpp/include/asapo/kafka_client/kafka_config.h b/common/cpp/include/asapo/kafka_client/kafka_config.h
new file mode 100644
index 0000000000000000000000000000000000000000..51bdad9d8272d8e1c56bb158a79c180261ea7a35
--- /dev/null
+++ b/common/cpp/include/asapo/kafka_client/kafka_config.h
@@ -0,0 +1,17 @@
+#ifndef ASAPO_KAFKA_CONFIG_H
+#define ASAPO_KAFKA_CONFIG_H
+
+#include <string>
+#include <map>
+
+namespace asapo {
+
+    struct KafkaClientConfig {
+        bool enabled{false};
+        std::map<std::string, std::string> global_config;
+        std::map<std::string, std::map<std::string, std::string>> topics_config;
+    };
+
+}
+
+#endif //ASAPO_KAFKA_CONFIG_H
diff --git a/common/cpp/include/asapo/kafka_client/kafka_error.h b/common/cpp/include/asapo/kafka_client/kafka_error.h
new file mode 100644
index 0000000000000000000000000000000000000000..3ef69183d3b25e77ba6f0a4a0cee64f9721f60ae
--- /dev/null
+++ b/common/cpp/include/asapo/kafka_client/kafka_error.h
@@ -0,0 +1,43 @@
+#ifndef ASAPO_KAFKA_ERROR_H
+#define ASAPO_KAFKA_ERROR_H
+
+#include "asapo/common/error.h"
+
+namespace asapo {
+
+    enum class KafkaErrorType {
+        kQueueFullError,
+        kMessageTooLargeError,
+        kUnknownPartitionError,
+        kUnknownTopicError,
+        kGeneralError
+    };
+
+    using KafkaErrorTemplate = ServiceErrorTemplate<KafkaErrorType>;
+
+    namespace KafkaErrorTemplates {
+
+        auto const kQueueFullError = KafkaErrorTemplate {
+                "kafka queue is full", KafkaErrorType::kQueueFullError
+        };
+
+        auto const kMessageTooLargeError = KafkaErrorTemplate {
+                "kafka message is too large", KafkaErrorType::kMessageTooLargeError
+        };
+
+        auto const kUnknownPartitionError = KafkaErrorTemplate {
+                "partition is unknown in the kafka cluster", KafkaErrorType::kUnknownPartitionError
+        };
+
+        auto const kUnknownTopicError = KafkaErrorTemplate {
+                "partition is unknown in the kafka cluster", KafkaErrorType::kUnknownTopicError
+        };
+
+        auto const kGeneralError = KafkaErrorTemplate {
+                "unexpected kafka error occurred", KafkaErrorType::kGeneralError
+        };
+    }
+
+}
+
+#endif //ASAPO_KAFKA_ERROR_H
diff --git a/common/cpp/include/asapo/unittests/MockKafkaClient.h b/common/cpp/include/asapo/unittests/MockKafkaClient.h
new file mode 100644
index 0000000000000000000000000000000000000000..f51ea3a6e553fdda1aaefd85b32decb2d3851d91
--- /dev/null
+++ b/common/cpp/include/asapo/unittests/MockKafkaClient.h
@@ -0,0 +1,22 @@
+#ifndef ASAPO_MOCKKAFKACLIENT_H
+#define ASAPO_MOCKKAFKACLIENT_H
+
+#include <gtest/gtest.h>
+#include <gmock/gmock.h>
+
+#include "asapo/kafka_client/kafka_client.h"
+
+namespace asapo {
+
+class MockKafkaClient : public KafkaClient {
+  public:
+    Error Send(const std::string& data, const std::string& topic) noexcept override {
+        return Error{Send_t(data, topic)};
+    }
+
+    MOCK_METHOD(ErrorInterface *, Send_t, (const std::string& data, const std::string& topic), ());
+};
+
+}
+
+#endif //ASAPO_MOCKKAFKACLIENT_H
diff --git a/common/cpp/src/json_parser/json_parser.cpp b/common/cpp/src/json_parser/json_parser.cpp
index b25cb6b4a9d1f96226e60b4f5ccfe5fdfaf1afe3..32457c07ab127c4606e8be1ae7f54cb57a9f1083 100644
--- a/common/cpp/src/json_parser/json_parser.cpp
+++ b/common/cpp/src/json_parser/json_parser.cpp
@@ -24,6 +24,13 @@ Error JsonParser::GetArrayString(const std::string& name, std::vector<std::strin
     return rapid_json_->GetArrayString(name, val);
 }
 
+Error JsonParser::GetArrayObjectMembers(const std::string& name, std::vector<std::string>* val) const noexcept {
+    return rapid_json_->GetArrayObjectMembers(name, val);
+}
+
+Error JsonParser::GetDictionaryString(const std::string& name, std::map<std::string, std::string>* val) const noexcept {
+    return rapid_json_->GetDictionaryString(name, val);
+}
 
 Error JsonParser::GetBool(const std::string& name, bool* val) const noexcept {
     return rapid_json_->GetBool(name, val);
diff --git a/common/cpp/src/json_parser/rapid_json.cpp b/common/cpp/src/json_parser/rapid_json.cpp
index 62441650548c13facabab9a781a42cf7578e2f3f..d86d0987261ed28e4f7e5ebcf0d8dbfdd479a964 100644
--- a/common/cpp/src/json_parser/rapid_json.cpp
+++ b/common/cpp/src/json_parser/rapid_json.cpp
@@ -168,6 +168,40 @@ Error RapidJson::GetArrayString(const std::string& name, std::vector<std::string
 
 }
 
+Error RapidJson::GetArrayObjectMembers(const std::string& name, std::vector<std::string>* val) const noexcept {
+    Value* json_val;
+    if (Error err = GetValuePointer(name, ValueType::kObject, &json_val)) {
+        return err;
+    }
+
+    val->clear();
+    for (auto& m : json_val->GetObject()) {
+        if (!m.name.IsString()) {
+            return GeneralErrorTemplates::kSimpleError.Generate("wrong type of object element: " + name);
+        }
+        val->push_back(m.name.GetString());
+    }
+    return nullptr;
+
+}
+
+Error RapidJson::GetDictionaryString(const std::string& name, std::map<std::string, std::string>* val) const noexcept {
+    Value* json_val;
+    if (Error err = GetValuePointer(name, ValueType::kObject, &json_val)) {
+        return err;
+    }
+
+    val->clear();
+    for (auto& m : json_val->GetObject()) {
+        if (!m.value.IsString() || !m.name.IsString()) {
+            return GeneralErrorTemplates::kSimpleError.Generate("wrong type of dictionary element: " + name);
+        }
+        (*val)[m.name.GetString()] = m.value.GetString();
+    }
+    return nullptr;
+
+}
+
 RapidJson::RapidJson(const RapidJson& parent, const std::string& subname) {
     auto err = parent.GetValuePointer(subname, ValueType::kObject, &object_p_);
     if (err) {
diff --git a/common/cpp/src/json_parser/rapid_json.h b/common/cpp/src/json_parser/rapid_json.h
index b4cbc88711dfcc0f8e7ae6f21257277b9c10469e..477d6e4ee24ef6c7186ef7b0496c8edc786e10c7 100644
--- a/common/cpp/src/json_parser/rapid_json.h
+++ b/common/cpp/src/json_parser/rapid_json.h
@@ -4,6 +4,7 @@
 #include "rapidjson/document.h"
 #include "asapo/common/error.h"
 #include "asapo/io/io.h"
+#include <map>
 
 
 namespace asapo {
@@ -26,6 +27,8 @@ class RapidJson {
     Error GetString(const std::string& name, std::string* val) const noexcept;
     Error GetArrayUInt64(const std::string& name, std::vector<uint64_t>* val) const noexcept;
     Error GetArrayString(const std::string& name, std::vector<std::string>* val) const noexcept;
+    Error GetArrayObjectMembers(const std::string& name, std::vector<std::string>* val) const noexcept;
+    Error GetDictionaryString(const std::string& name, std::map<std::string, std::string>* val) const noexcept;
     Error GetArrayRawStrings(const std::string& name, std::vector<std::string>* val) const noexcept;
     Error GetRawString(std::string* val) const noexcept;
     Error GetFlattenedString(const std::string& prefix, const std::string& separator, std::string* val)const noexcept;
diff --git a/common/cpp/src/kafka_client/CMakeLists.txt b/common/cpp/src/kafka_client/CMakeLists.txt
new file mode 100644
index 0000000000000000000000000000000000000000..53d4ff666f3cb4e491d6417f12ed78be87b69aba
--- /dev/null
+++ b/common/cpp/src/kafka_client/CMakeLists.txt
@@ -0,0 +1,13 @@
+set(TARGET_NAME rdkafka_client)
+set(SOURCE_FILES
+        rdkafka_client.cpp
+        kafka_client_factory.cpp ../../include/asapo/preprocessor/definitions.h)
+
+
+################################
+# Library
+################################
+add_library(${TARGET_NAME} OBJECT ${SOURCE_FILES})
+target_include_directories(${TARGET_NAME} PUBLIC ${ASAPO_CXX_COMMON_INCLUDE_DIR} ${RDKAFKA_INCLUDE_DIR})
+
+
diff --git a/common/cpp/src/kafka_client/kafka_client_factory.cpp b/common/cpp/src/kafka_client/kafka_client_factory.cpp
new file mode 100644
index 0000000000000000000000000000000000000000..eabae085efb94ec96bfa9a8ff056256119ce8227
--- /dev/null
+++ b/common/cpp/src/kafka_client/kafka_client_factory.cpp
@@ -0,0 +1,14 @@
+#include "rdkafka_client.h"
+
+namespace asapo {
+
+KafkaClient* CreateKafkaClient(const KafkaClientConfig& config, Error* err) {
+    try {
+        return new RdKafkaClient(config);
+    }
+    catch (std::string errstr) {
+        (*err) = KafkaErrorTemplates::kGeneralError.Generate(errstr);
+    }
+    return nullptr;
+}
+}
diff --git a/common/cpp/src/kafka_client/rdkafka_client.cpp b/common/cpp/src/kafka_client/rdkafka_client.cpp
new file mode 100644
index 0000000000000000000000000000000000000000..55a24cfc9cc65542ec90aa96766c4c0fe497ee01
--- /dev/null
+++ b/common/cpp/src/kafka_client/rdkafka_client.cpp
@@ -0,0 +1,87 @@
+#include "rdkafka_client.h"
+
+#include <cstring>
+#include  "asapo/common/data_structs.h"
+#include "asapo/io/io_factory.h"
+
+namespace asapo {
+
+RdKafkaClient::RdKafkaClient(const KafkaClientConfig& config) : default_topic_conf_(nullptr) {
+    std::string err;
+    RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
+    for (const auto& configItem : config.global_config) {
+        if (conf->set(configItem.first, configItem.second, err) != RdKafka::Conf::CONF_OK) {
+            throw "cannot initialize kafka: " + err;
+        }
+    }
+
+    producer_ = RdKafka::Producer::create(conf, err);
+    if (!producer_) {
+        throw "cannot initialize kafka";
+    }
+
+    for (const auto& topic : config.topics_config) {
+        auto topic_config = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
+        for (const auto& configItem : topic.second) {
+            if (topic_config->set(configItem.first, configItem.second, err) != RdKafka::Conf::CONF_OK) {
+                throw "cannot initialize kafka: " + err;
+            }
+        }
+        if (topic.first == "default") {
+            default_topic_conf_ = topic_config;
+        } else
+        {
+            auto topic_obj = RdKafka::Topic::create(producer_, topic.first, topic_config, err);
+            if (!topic_obj) {
+                throw "cannot initialize kafka topic [" + topic.first + "]: " + err;
+
+            }
+            kafka_topics_[topic.first] = topic_obj;
+        }
+    }
+}
+
+RdKafkaClient::~RdKafkaClient() {
+    if (producer_) {
+        producer_->flush(1000);
+    }
+    delete producer_;
+}
+
+Error RdKafkaClient::Send(const std::string& data, const std::string& topic_name) noexcept {
+    auto topicIt = kafka_topics_.find(topic_name);
+    RdKafka::Topic* topic;
+    if (topicIt == kafka_topics_.end())
+    {
+        if (!default_topic_conf_) {
+            return KafkaErrorTemplates::kUnknownTopicError.Generate();
+        }
+        std::string err;
+        topic = RdKafka::Topic::create(producer_, topic_name, default_topic_conf_, err);
+        if (!topic) {
+            return KafkaErrorTemplates::kGeneralError.Generate("Cannot create kafka topic [" + topic_name + "]: " + err);
+        }
+        kafka_topics_[topic_name] = topic;
+    }
+    else
+    {
+        topic = topicIt->second;
+    }
+
+    auto err = producer_->produce(topic, RdKafka::Topic::PARTITION_UA,
+                                 RdKafka::Producer::RK_MSG_COPY,
+                                 const_cast<void*>(static_cast<const void *>(data.data())), data.size(),
+                                 nullptr, nullptr);
+
+    switch (err) {
+        case RdKafka::ERR_NO_ERROR: return nullptr;
+        case RdKafka::ERR__QUEUE_FULL: return KafkaErrorTemplates::kQueueFullError.Generate();
+        case RdKafka::ERR_MSG_SIZE_TOO_LARGE: return KafkaErrorTemplates::kMessageTooLargeError.Generate();
+        case RdKafka::ERR__UNKNOWN_PARTITION: return KafkaErrorTemplates::kUnknownPartitionError.Generate();
+        case RdKafka::ERR__UNKNOWN_TOPIC: return KafkaErrorTemplates::kUnknownTopicError.Generate();
+        default: return KafkaErrorTemplates::kGeneralError.Generate(err2str(err));
+    }
+
+}
+
+}
diff --git a/common/cpp/src/kafka_client/rdkafka_client.h b/common/cpp/src/kafka_client/rdkafka_client.h
new file mode 100644
index 0000000000000000000000000000000000000000..3997ff3585b71134ce2f34fdfd3bc6683873b587
--- /dev/null
+++ b/common/cpp/src/kafka_client/rdkafka_client.h
@@ -0,0 +1,25 @@
+#ifndef ASAPO_RDKAFKA_CLIENT_H
+#define ASAPO_RDKAFKA_CLIENT_H
+
+#include <string>
+
+#include "asapo/kafka_client/kafka_client.h"
+#include "librdkafka/rdkafkacpp.h"
+
+namespace asapo {
+
+class RdKafkaClient final : public KafkaClient {
+  public:
+    RdKafkaClient(const KafkaClientConfig& config);
+    Error Send(const std::string& data, const std::string& topic) noexcept override;
+
+    virtual ~RdKafkaClient();
+  private:
+    RdKafka::Producer* producer_;
+    RdKafka::Conf* default_topic_conf_;
+    std::map<std::string, RdKafka::Topic *> kafka_topics_;
+};
+
+}
+
+#endif //ASAPO_RDKAFKA_CLIENT_H
diff --git a/common/cpp/unittests/json_parser/test_json_parser.cpp b/common/cpp/unittests/json_parser/test_json_parser.cpp
index d3017cac350dbbdd9ad9182472df99423baad8c0..ceed36b1b353aa3597e61217a30a115905de39cd 100644
--- a/common/cpp/unittests/json_parser/test_json_parser.cpp
+++ b/common/cpp/unittests/json_parser/test_json_parser.cpp
@@ -19,6 +19,7 @@ using ::testing::Return;
 using ::testing::SetArgPointee;
 using ::testing::HasSubstr;
 using ::testing::ElementsAre;
+using ::testing::Pair;
 using ::testing::DoAll;
 
 using asapo::JsonFileParser;
@@ -209,6 +210,54 @@ TEST(ParseString, StringArrayConvertToJson) {
     ASSERT_THAT(vec, ElementsAre("s1", "s2", "s3"));
 }
 
+TEST(ParseString, ObjectMemberArrayConvertToJson) {
+    std::string json = R"({"object":{"k1":"v1","k2":"v2","k3":"v3"}})";
+
+    JsonStringParser parser{json};
+
+    std::vector<std::string> vec;
+    auto err = parser.GetArrayObjectMembers("object", &vec);
+
+    ASSERT_THAT(err, Eq(nullptr));
+    ASSERT_THAT(vec, ElementsAre("k1", "k2", "k3"));
+}
+
+TEST(ParseString, DictionaryStringConvertToJson) {
+    std::string json = R"({"object":{"k1":"v1","k2":"v2","k3":"v3"}})";
+
+    JsonStringParser parser{json};
+
+    std::map<std::string, std::string> map;
+    auto err = parser.GetDictionaryString("object", &map);
+
+    ASSERT_THAT(err, Eq(nullptr));
+    ASSERT_THAT(map, ElementsAre(Pair("k1", "v1"), Pair("k2", "v2"), Pair("k3", "v3")));
+}
+
+TEST(ParseString, RawStringConvertToJson) {
+    std::string json = R"({"object":{"k1":"v1","k2":"v2","k3":"v3"}})";
+
+    JsonStringParser parser{json};
+
+    std::string value;
+    auto err = parser.GetRawString(&value);
+
+    ASSERT_THAT(err, Eq(nullptr));
+    ASSERT_THAT(json, Eq(value));
+}
+
+TEST(ParseString, ArrayRawStringConvertToJson) {
+    std::string json = R"({"array":[{"k1":"v1"},{"k2":"v2"},{"k3":"v3"}]})";
+
+    JsonStringParser parser{json};
+
+    std::vector<std::string> vec;
+    auto err = parser.GetArrayRawStrings("array", &vec);
+
+    ASSERT_THAT(err, Eq(nullptr));
+    ASSERT_THAT(vec, ElementsAre(R"({"k1":"v1"})", R"({"k2":"v2"})", R"({"k3":"v3"})"));
+}
+
 class ParseFileTests : public Test {
   public:
     NiceMock<MockIO> mock_io;
diff --git a/config/nomad/receiver_kafka.nmd.in b/config/nomad/receiver_kafka.nmd.in
new file mode 100644
index 0000000000000000000000000000000000000000..71bfe537dda1dc88d59ce61817618102e75291fc
--- /dev/null
+++ b/config/nomad/receiver_kafka.nmd.in
@@ -0,0 +1,60 @@
+variable "receiver_kafka_metadata_broker_list" {
+    type = string
+}
+
+job "receiver" {
+  datacenters = ["dc1"]
+
+  type = "service"
+
+  group "group" {
+    count = 1
+
+    task "receiver" {
+      driver = "raw_exec"
+
+      config {
+        command = "@RECEIVER_DIR@/@RECEIVER_NAME@"
+        args =  ["${NOMAD_TASK_DIR}/receiver.json"]
+      }
+
+      resources {
+        cpu    = 500 # 500 MHz
+        memory = 256 # 256MB
+        network {
+          port "recv" {}
+          port "recv_ds" {}
+          port "recv_metrics" {}
+        }
+      }
+
+      service {
+        name = "asapo-receiver"
+        port = "recv"
+        check {
+          name     = "metrics"
+          type     = "http"
+          port = "recv_metrics"
+          path     = "/metrics"
+          interval = "10s"
+          timeout  = "2s"
+          initial_status =   "passing"
+        }
+        meta {
+          metrics-port = "${NOMAD_PORT_recv_metrics}"
+        }
+      }
+
+      meta {
+        receiver_kafka_metadata_broker_list = "${var.receiver_kafka_metadata_broker_list}"
+      }
+
+      template {
+         source        = "@WORK_DIR@/receiver_kafka.json.tpl"
+         destination   = "local/receiver.json"
+         change_mode   = "signal"
+         change_signal = "SIGHUP"
+      }
+    }
+  }
+}
diff --git a/consumer/api/cpp/src/consumer_impl.cpp b/consumer/api/cpp/src/consumer_impl.cpp
index afc7142167c51b7f20f99dec5ecd8472c3a5fce5..44dd68fc21b463705e48e9ca58084cdd2edcdcd4 100644
--- a/consumer/api/cpp/src/consumer_impl.cpp
+++ b/consumer/api/cpp/src/consumer_impl.cpp
@@ -14,8 +14,6 @@
 
 #include "asapo/common/internal/version.h"
 
-using std::chrono::system_clock;
-
 namespace asapo {
 
 const std::string ConsumerImpl::kBrokerServiceName = "asapo-broker";
@@ -335,14 +333,14 @@ Error ConsumerImpl::GetRecordFromServer(std::string* response, std::string group
     std::string request_group = OpToUriCmd(op);
 
     auto baseRequestInfo = CreateBrokerApiRequest(std::move(stream), "", "");
-    uint64_t elapsed_ms = 0;
     Error no_data_error;
+    auto start = std::chrono::steady_clock::now();
+    bool timeout_triggered = false;
     while (true) {
         if (interrupt_flag_) {
             return ConsumerErrorTemplates::kInterruptedTransaction.Generate("interrupted by user request");
         }
 
-        auto start = system_clock::now();
         auto err = DiscoverService(kBrokerServiceName, &current_broker_uri_);
         if (err == nullptr) {
             auto ri = PrepareRequestInfo(baseRequestInfo.api + "/" + httpclient__->UrlEscape(group_id) + "/" + request_suffix, dataset,
@@ -374,12 +372,15 @@ Error ConsumerImpl::GetRecordFromServer(std::string* response, std::string group
                 no_data_error = std::move(err);
             }
         }
+        auto elapsed_ms = static_cast<uint64_t>(std::chrono::duration_cast<std::chrono::milliseconds>
+            (std::chrono::steady_clock::now() - start).count());
         if (elapsed_ms >= timeout_ms_) {
-            return no_data_error ? std::move(no_data_error) : std::move(err);
+            if (timeout_triggered || timeout_ms_ == 0) {
+                return no_data_error ? std::move(no_data_error) : std::move(err);
+            }
+            timeout_triggered = true; // to give a chance make another one request if the previous one took too long
         }
         std::this_thread::sleep_for(std::chrono::milliseconds(100));
-        elapsed_ms += static_cast<uint64_t>(std::chrono::duration_cast<std::chrono::milliseconds>
-                                            (system_clock::now() - start).count());
     }
     return nullptr;
 }
@@ -460,14 +461,14 @@ Error ConsumerImpl::GetDataFromFile(MessageMeta* info, MessageData* data) {
             err = ConsumerErrorTemplates::kInterruptedTransaction.Generate("interrupted by user request");
             break;
         }
-        auto start = system_clock::now();
+        auto start = std::chrono::steady_clock::now();
         *data = io__->GetDataFromFile(info->FullName(source_path_), &info->size, &err);
         if (err == nullptr) {
             break;
         }
         std::this_thread::sleep_for(std::chrono::milliseconds(100));
         elapsed_ms += static_cast<uint64_t>(std::chrono::duration_cast<std::chrono::milliseconds>
-                                            (system_clock::now() - start).count());
+                                            (std::chrono::steady_clock::now() - start).count());
     }
     if (err != nullptr) {
         return ConsumerErrorTemplates::kLocalIOError.Generate(std::move(err));
@@ -574,7 +575,7 @@ Error ConsumerImpl::ServiceRequestWithTimeout(const std::string& service_name,
             err = ConsumerErrorTemplates::kInterruptedTransaction.Generate("interrupted by user request");
             break;
         }
-        auto start = system_clock::now();
+        auto start = std::chrono::steady_clock::now();
         err = DiscoverService(service_name, service_uri);
         if (err == nullptr) {
             request.host = *service_uri;
@@ -585,7 +586,7 @@ Error ConsumerImpl::ServiceRequestWithTimeout(const std::string& service_name,
         }
         std::this_thread::sleep_for(std::chrono::milliseconds(100));
         elapsed_ms += static_cast<uint64_t>(std::chrono::duration_cast<std::chrono::milliseconds>
-                                            (system_clock::now() - start).count());
+                                            (std::chrono::steady_clock::now() - start).count());
     }
     return err;
 }
diff --git a/deploy/asapo_helm_chart/asapo/configs/asapo-receiver.json b/deploy/asapo_helm_chart/asapo/configs/asapo-receiver.json
index c32c6703ad90abc586f4c47c4e3a758a56417dd8..7ae6db9d0e5f29539e8f706f8a5c7e248e9772ed 100644
--- a/deploy/asapo_helm_chart/asapo/configs/asapo-receiver.json
+++ b/deploy/asapo_helm_chart/asapo/configs/asapo-receiver.json
@@ -24,5 +24,14 @@
   },
   "Tag": "receiver",
   "ReceiveToDiskThresholdMB": {{ .Values.ownServices.receiver.receiveToDiskThresholdMB }},
-  "LogLevel": "info"
+  "LogLevel": "info",
+  "Kafka": {
+    "Enabled": true,
+    "KafkaClient": {
+      "metadata.broker.list": {{ .Values.ownServices.receiver.kafkaBrokerList }}
+    },
+    "KafkaTopics": {
+      "asapo": {}
+    }
+  }
 }
diff --git a/deploy/asapo_helm_chart/asapo/values.yaml b/deploy/asapo_helm_chart/asapo/values.yaml
index beccb7c2193ae10139e8325d3aa9ac175901bea3..946bc9394d2c4f9b089aa4a6124e15ec25fc00e8 100644
--- a/deploy/asapo_helm_chart/asapo/values.yaml
+++ b/deploy/asapo_helm_chart/asapo/values.yaml
@@ -33,6 +33,7 @@ ownServices:
       enable: true
       sizeGb: 1
     receiveToDiskThresholdMB: 200
+    kafkaBrokerList: asap3-events-01,asap3-events-02
   fileTransfer:
     serviceName: asapo-file-transfer
     replicaCount: 1
diff --git a/deploy/asapo_services/asap3.tfvars b/deploy/asapo_services/asap3.tfvars
index c9ff680fd27dba76f99ac745e1b6d5c081d0cf5d..b9d17576097e140d4382fa3e07adb3680e357d60 100644
--- a/deploy/asapo_services/asap3.tfvars
+++ b/deploy/asapo_services/asap3.tfvars
@@ -21,6 +21,7 @@ receiver_dataserver_cache_size = 30 #gb
 receiver_receive_to_disk_threshold = 50 # mb
 receiver_dataserver_nthreads = 8
 receiver_network_modes = "tcp"
+receiver_kafka_enabled = true
 
 grafana_total_memory_size = 2000
 influxdb_total_memory_size = 2000
diff --git a/deploy/asapo_services/scripts/asapo-receivers.nmd.tpl b/deploy/asapo_services/scripts/asapo-receivers.nmd.tpl
index 197ff857f0dda9ea27373702189adb715db4fa65..a109eda57063f5d2734ae25c9fd31f54eb77c5fa 100644
--- a/deploy/asapo_services/scripts/asapo-receivers.nmd.tpl
+++ b/deploy/asapo_services/scripts/asapo-receivers.nmd.tpl
@@ -100,6 +100,8 @@ job "asapo-receivers" {
         receiver_network_modes = "${receiver_network_modes}"
         perf_monitor = "${perf_monitor}"
         receiver_expose_metrics = "${receiver_expose_metrics}"
+        receiver_kafka_enabled = "${receiver_kafka_enabled}"
+        receiver_kafka_metadata_broker_list = "${receiver_kafka_metadata_broker_list}"
       }
 
       template {
diff --git a/deploy/asapo_services/scripts/receiver.json.tpl b/deploy/asapo_services/scripts/receiver.json.tpl
index da0b30cd3d668b0a1eed5b0fed60033237c1961e..3183e72ab7ef371dc799f48ceab967af43531f6a 100644
--- a/deploy/asapo_services/scripts/receiver.json.tpl
+++ b/deploy/asapo_services/scripts/receiver.json.tpl
@@ -24,5 +24,14 @@
   },
   "Tag": "{{ env "attr.unique.hostname" }}",
   "ReceiveToDiskThresholdMB": {{ env "NOMAD_META_receiver_receive_to_disk_threshold" }},
-  "LogLevel": "{{ keyOrDefault "receiver_log_level" "info" }}"
+  "LogLevel": "{{ keyOrDefault "receiver_log_level" "info" }}",
+  "Kafka": {
+    "Enabled": {{ env "NOMAD_META_receiver_kafka_enabled" }},
+    "KafkaClient": {
+      "metadata.broker.list": "{{ env "NOMAD_META_receiver_kafka_metadata_broker_list" }}"
+    },
+    "KafkaTopics": {
+      "asapo": {}
+    }
+  }
 }
diff --git a/deploy/asapo_services/scripts/templates.tf b/deploy/asapo_services/scripts/templates.tf
index ac510ba1d39901a6d2df1b9836a53c499ce3f78c..ddd5656dc49d5f6bce2f3475a69cede2367204b6 100644
--- a/deploy/asapo_services/scripts/templates.tf
+++ b/deploy/asapo_services/scripts/templates.tf
@@ -57,6 +57,8 @@ data "template_file" "asapo_receivers" {
     force_pull_images = "${var.force_pull_images}"
     perf_monitor = "${var.perf_monitor}"
     receiver_expose_metrics = "${var.receiver_expose_metrics}"
+    receiver_kafka_enabled = "${var.receiver_kafka_enabled}"
+    receiver_kafka_metadata_broker_list = "${var.receiver_kafka_metadata_broker_list}"
   }
 }
 
diff --git a/deploy/asapo_services/scripts/vars.tf b/deploy/asapo_services/scripts/vars.tf
index c7d16e5f1c2200764cf6f3b0ea26d2a35fd96423..13068f32a322ec22e78a8342b98719fdd2071fac 100644
--- a/deploy/asapo_services/scripts/vars.tf
+++ b/deploy/asapo_services/scripts/vars.tf
@@ -58,6 +58,10 @@ variable "receiver_network_modes" {}
 
 variable "receiver_expose_metrics" {}
 
+variable "receiver_kafka_enabled" {}
+
+variable "receiver_kafka_metadata_broker_list" {}
+
 variable "grafana_total_memory_size" {}
 
 variable "influxdb_total_memory_size" {}
@@ -111,4 +115,4 @@ variable "n_brokers" {}
 
 variable "n_fts" {}
 
-variable "ldap_uri" {}
\ No newline at end of file
+variable "ldap_uri" {}
diff --git a/receiver/CMakeLists.txt b/receiver/CMakeLists.txt
index 26da571232e68d58017e71141682818a343dfd38..95567b9ad51bdd4a6fea2bbf6a55541b25734b56 100644
--- a/receiver/CMakeLists.txt
+++ b/receiver/CMakeLists.txt
@@ -29,6 +29,7 @@ set(RECEIVER_CORE_FILES
         src/request_handler/request_handler_db_check_request.cpp
         src/request_handler/request_handler_db_delete_stream.cpp
         src/request_handler/request_handler_db_get_meta.cpp
+        src/request_handler/request_handler_kafka_notify.cpp
         src/request_handler/request_factory.cpp
         src/request_handler/request_handler_db.cpp
         src/request_handler/request_handler_monitoring.cpp
@@ -126,11 +127,11 @@ set(SOURCE_FILES
 GET_PROPERTY(ASAPO_COMMON_FABRIC_LIBRARIES GLOBAL PROPERTY ASAPO_COMMON_FABRIC_LIBRARIES)
 
 add_library(${TARGET_NAME} STATIC ${SOURCE_FILES} $<TARGET_OBJECTS:system_io> $<TARGET_OBJECTS:curl_http_client>
-        $<TARGET_OBJECTS:json_parser> $<TARGET_OBJECTS:logger> $<TARGET_OBJECTS:request_pool>)
+        $<TARGET_OBJECTS:json_parser> $<TARGET_OBJECTS:logger> $<TARGET_OBJECTS:request_pool> $<TARGET_OBJECTS:rdkafka_client>)
 set_target_properties(${TARGET_NAME} PROPERTIES LINKER_LANGUAGE CXX)
 target_include_directories(${TARGET_NAME} PUBLIC ${ASAPO_CXX_COMMON_INCLUDE_DIR})
-target_include_directories(${TARGET_NAME} SYSTEM PUBLIC  ${LIBFABRIC_INCLUDE_DIR})
-target_link_libraries(${TARGET_NAME} CURL::libcurl ${CMAKE_THREAD_LIBS_INIT} database
+target_include_directories(${TARGET_NAME} SYSTEM PUBLIC  ${LIBFABRIC_INCLUDE_DIR} ${RDKAFKA_INCLUDE_DIR})
+target_link_libraries(${TARGET_NAME} CURL::libcurl ${RDKAFKA_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT} database
         asapo-fabric ${ASAPO_COMMON_FABRIC_LIBRARIES})
 if (ENABLE_NEW_RECEIVER_MONITORING)
 target_link_libraries(${TARGET_NAME} grpc_ingest_service_proto ${_REFLECTION} ${_GRPC_GRPCPP} ${_PROTOBUF_LIBPROTOBUF})
@@ -145,8 +146,14 @@ set_target_properties(${TARGET_NAME}-bin PROPERTIES RUNTIME_OUTPUT_DIRECTORY
         ${CMAKE_CURRENT_BINARY_DIR}$<$<CONFIG:Debug>:>
         )
 
+if (WIN32)
+    file(GLOB KAFKA_DLLS "${RDKAFKA_BIN_DIR}/rdkafka*.dll")
+    file(COPY ${KAFKA_DLLS} DESTINATION ${CMAKE_CURRENT_BINARY_DIR})
+endif()
+
 configure_file(docker/Dockerfile . COPYONLY)
 configure_file(docker/install_libfabric.sh . COPYONLY)
+configure_file(docker/install_rdkafka.sh . COPYONLY)
 
 
 ################################
@@ -180,6 +187,7 @@ set(TEST_SOURCE_FILES
         unittests/request_handler/test_request_handler_db_get_meta.cpp
         unittests/request_handler/test_request_handler_monitoring.cpp
 
+        unittests/request_handler/test_request_handler_kafka_notify.cpp
         unittests/statistics/test_statistics_sender_influx_db.cpp
         unittests/statistics/test_statistics_sender_fluentd.cpp
 
diff --git a/receiver/docker/Dockerfile b/receiver/docker/Dockerfile
index e03a3cf5c8fee45ded2c608ddc5d4ab711bd5a36..00df9b405fa013a8d48d81a3b180b23f0f76019e 100644
--- a/receiver/docker/Dockerfile
+++ b/receiver/docker/Dockerfile
@@ -1,6 +1,7 @@
 FROM ubuntu:18.04
 ADD receiver /
 ADD install_libfabric.sh install_libfabric.sh
-RUN apt update && ./install_libfabric.sh
+ADD install_rdkafka.sh install_rdkakfa.sh
+RUN apt update && ./install_libfabric.sh && ./install_rdkakfa.sh
 
 CMD ["/receiver","/var/lib/receiver/config.json"]
diff --git a/receiver/docker/install_rdkafka.sh b/receiver/docker/install_rdkafka.sh
new file mode 100755
index 0000000000000000000000000000000000000000..e1bb3c207aa627ca17a481d56a472dc4a94eb69a
--- /dev/null
+++ b/receiver/docker/install_rdkafka.sh
@@ -0,0 +1,15 @@
+#!/usr/bin/env bash
+
+apt install -y wget build-essential autoconf libtool make zlib1g-dev libzstd-dev
+
+wget https://github.com/edenhill/librdkafka/archive/refs/tags/v1.8.2.tar.gz
+tar xzf v1.8.2.tar.gz
+cd librdkafka-1.8.2
+
+./configure --enable-zlib --enable-zstd --disable-lz4 --disable-lz4-ext --disable-ssl --disable-gssapi --disable-sasl
+make -j 4
+make install
+ldconfig
+cd -
+rm -rf librdkafka-1.8.2
+rm v1.8.2.tar.gz
\ No newline at end of file
diff --git a/receiver/src/connection.cpp b/receiver/src/connection.cpp
index e76fbeed9f0a0613cb4f8c6c7035f42b9ac2996d..9064c642b44244ab7e0534e3f9b26d55c9575035 100644
--- a/receiver/src/connection.cpp
+++ b/receiver/src/connection.cpp
@@ -9,12 +9,12 @@
 
 namespace asapo {
 
-Connection::Connection(SocketDescriptor socket_fd, const std::string& address,
-                       SharedReceiverMonitoringClient monitoring, SharedCache cache, const std::string& receiver_tag) :
-        io__{GenerateDefaultIO()},
-        statistics__{new ReceiverStatistics},
-        log__{GetDefaultReceiverLogger()},
-        requests_dispatcher__{new RequestsDispatcher{socket_fd, address, statistics__.get(), std::move(monitoring), std::move(cache)}}  {
+Connection::Connection(SocketDescriptor socket_fd, const std::string& address,SharedReceiverMonitoringClient monitoring,
+                       SharedCache cache, KafkaClient* kafkaClient, std::string receiver_tag) :
+    io__{GenerateDefaultIO()},
+    statistics__{new ReceiverStatistics},
+             log__{GetDefaultReceiverLogger()},
+requests_dispatcher__{new RequestsDispatcher{socket_fd, address, statistics__.get(),std::move(monitoring), cache,kafkaClient}} {
     socket_fd_ = socket_fd;
     address_ = address;
     statistics__->AddTag("connection_from", address);
diff --git a/receiver/src/connection.h b/receiver/src/connection.h
index 8b721ac75b01658b058e4e2cd9cfdbf7937fc4fe..29c158181cb9507f5b6c145ed3ebf7c4abca9a12 100644
--- a/receiver/src/connection.h
+++ b/receiver/src/connection.h
@@ -29,7 +29,7 @@ class Connection {
     int socket_fd_;
   public:
 
-    Connection(SocketDescriptor socket_fd, const std::string& address, SharedReceiverMonitoringClient monitoring, SharedCache cache, const std::string& receiver_tag);
+    Connection(SocketDescriptor socket_fd, const std::string& address, SharedReceiverMonitoringClient monitoring, SharedCache cache,KafkaClient* kafkaClient, std::string receiver_tag);
     ~Connection() = default;
 
     void Listen() const noexcept;
diff --git a/receiver/src/data_cache.cpp b/receiver/src/data_cache.cpp
index b7a62ab07479fe573026aa62b33f349cf7373251..fbfd65d953672a648b5f24fc69bab80b59eb47e9 100644
--- a/receiver/src/data_cache.cpp
+++ b/receiver/src/data_cache.cpp
@@ -6,6 +6,7 @@
 #include <cstdint>
 #include <utility>
 
+#include "receiver_error.h"
 
 namespace asapo {
 
@@ -22,7 +23,7 @@ DataCache::DataCache(uint64_t cache_size, float keepunlocked_ratio) : cache_size
     counter_ = static_cast<uint32_t>(rand() % 100 + 1);
 }
 
-void* DataCache::AllocateSlot(uint64_t size) {
+void* DataCache::AllocateSlot(uint64_t size,CacheMeta** blocking_meta) {
     auto tmp = cur_pointer_;
 
     if (cur_pointer_ + size > cache_size_) {
@@ -31,23 +32,30 @@ void* DataCache::AllocateSlot(uint64_t size) {
     auto addr = cache_.get() + cur_pointer_;
     cur_pointer_ += size;
 
-    if (!CleanOldSlots(size)) {
+    if (!CleanOldSlots(size,blocking_meta)) {
         cur_pointer_ = tmp;
         return nullptr;
     }
     return addr;
 }
 
-void* DataCache::GetFreeSlotAndLock(uint64_t size, CacheMeta** meta,
-                                    std::string beamtime, std::string source, std::string stream) {
+void* DataCache::GetFreeSlotAndLock(uint64_t size, CacheMeta** meta,std::string beamtime, std::string source, std::string stream,Error* err) {
     std::lock_guard<std::mutex> lock{mutex_};
     *meta = nullptr;
     if (!CheckAllocationSize(size)) {
+        *err = ReceiverErrorTemplates::kCacheAllocationError.Generate("size too large");
         return nullptr;
     }
-
-    auto addr = AllocateSlot(size);
+    CacheMeta* blocking_meta;
+    auto addr = AllocateSlot(size,&blocking_meta);
     if (!addr) {
+        *err =  ReceiverErrorTemplates::kCacheAllocationError.Generate("no slot available");
+        (*err)->AddDetails("curPointer",cur_pointer_)->AddDetails("cacheSize",cache_size_);
+        if (blocking_meta) {
+            (*err)->AddDetails("blockingMetaId",blocking_meta->id)->AddDetails("blockingMetaSize",blocking_meta->size);
+            uint64_t start_position = static_cast<uint64_t>((uint8_t*) blocking_meta->addr - cache_.get());
+            (*err)->AddDetails("starPosition",start_position);
+        }
         return nullptr;
     }
 
@@ -56,6 +64,7 @@ void* DataCache::GetFreeSlotAndLock(uint64_t size, CacheMeta** meta,
     *meta = new CacheMeta{id, addr, size, 1, std::move(beamtime), std::move(source), std::move(stream)};
     meta_.emplace_back(std::unique_ptr<CacheMeta> {*meta});
 
+    *err = nullptr;
     return addr;
 }
 
@@ -99,7 +108,8 @@ bool Intersects(uint64_t left1, uint64_t right1, uint64_t left2, uint64_t right2
     return (left1 >= left2 && left1 < right2) || (right1 <= right2 && right1 > left2);
 }
 
-bool DataCache::CleanOldSlots(uint64_t size) {
+bool DataCache::CleanOldSlots(uint64_t size,CacheMeta** blocking_meta) {
+    *blocking_meta = nullptr;
     int64_t last_del = -1;
     bool was_intersecting = false;
     for (uint64_t i = 0; i < meta_.size(); i++) {
@@ -115,7 +125,10 @@ bool DataCache::CleanOldSlots(uint64_t size) {
     }
 
     for (int i = 0; i <= last_del; i++) {
-        if (meta_[static_cast<unsigned long>(i)]->lock > 0) return false;
+        if (meta_[static_cast<unsigned long>(i)]->lock > 0)  {
+            *blocking_meta = meta_[static_cast<unsigned long>(i)].get();
+            return false;
+        }
     }
 
     if (last_del >= 0) {
diff --git a/receiver/src/data_cache.h b/receiver/src/data_cache.h
index 2c1f083866451334e7c6e8f2b37ec25096cb74ac..e58c460abfd64436ce09de9f902f1bf4d58d5abf 100644
--- a/receiver/src/data_cache.h
+++ b/receiver/src/data_cache.h
@@ -8,7 +8,7 @@
 #include <vector>
 
 #include "asapo/preprocessor/definitions.h"
-
+#include "asapo/common/error.h"
 
 namespace asapo {
 
@@ -26,7 +26,7 @@ class DataCache {
   public:
     explicit DataCache(uint64_t cache_size_gb, float keepunlocked_ratio);
     ASAPO_VIRTUAL void* GetFreeSlotAndLock(uint64_t size, CacheMeta** meta,
-                                     std::string beamtime, std::string source, std::string stream);
+                                     std::string beamtime, std::string source, std::string stream, Error* err);
     ASAPO_VIRTUAL std::vector<std::shared_ptr<const CacheMeta>> AllMetaInfosAsVector() const;
     ASAPO_VIRTUAL uint64_t GetCacheSize() const;
     ASAPO_VIRTUAL void* GetSlotToReadAndLock(uint64_t id, uint64_t data_size, CacheMeta** meta);
@@ -41,8 +41,8 @@ class DataCache {
     mutable std::mutex mutex_;
     std::deque<std::shared_ptr<CacheMeta>> meta_;
     bool SlotTooCloseToCurrentPointer(const CacheMeta* meta);
-    bool CleanOldSlots(uint64_t size);
-    void* AllocateSlot(uint64_t size);
+    bool CleanOldSlots(uint64_t size,CacheMeta** blocking_meta);
+    void* AllocateSlot(uint64_t size,CacheMeta** blocking_meta);
     bool CheckAllocationSize(uint64_t size);
     uint64_t GetNextId();
 };
diff --git a/receiver/src/main.cpp b/receiver/src/main.cpp
index fc5ce7b7c74ae58cf44c41b1c45a608c38fe3a0e..d7d5905f73b548c479eca9b9a9be1039757dce4a 100644
--- a/receiver/src/main.cpp
+++ b/receiver/src/main.cpp
@@ -6,6 +6,7 @@
 
 #include "receiver_data_server/receiver_data_server_logger.h"
 #include "asapo/common/internal/version.h"
+#include "asapo/kafka_client/kafka_client.h"
 
 #include "receiver_data_server/receiver_data_server.h"
 #include "receiver_data_server/net_server/rds_tcp_server.h"
@@ -87,11 +88,12 @@ std::vector<std::thread> StartDataServers(const asapo::ReceiverConfig* config, a
 }
 
 int StartReceiver(const asapo::ReceiverConfig* config, asapo::SharedCache cache,
-                  asapo::SharedReceiverMonitoringClient monitoring, asapo::AbstractLogger* logger) {
+                  asapo::SharedReceiverMonitoringClient monitoring,asapo::KafkaClient* kafkaClient,asapo::AbstractLogger* logger) {
     static const std::string address = "0.0.0.0:" + std::to_string(config->listen_port);
 
     logger->Info(std::string("starting receiver, version ") + asapo::kVersion);
-    auto* receiver = new asapo::Receiver(std::move(cache), std::move(monitoring));
+    auto receiver = std::unique_ptr<asapo::Receiver>{new asapo::Receiver(cache,monitoring, kafkaClient)};
+
     logger->Info("listening on " + address);
 
     asapo::Error err;
@@ -148,7 +150,20 @@ int main(int argc, char* argv[]) {
     }
 
     auto metrics_thread = StartMetricsServer(config->metrics, logger);
-    auto exit_code = StartReceiver(config, cache, monitoring, logger);
-    // todo: implement graceful exit, currently it never reaches this point
+
+    std::unique_ptr<asapo::KafkaClient> kafkaClient;
+    if (config->kafka_config.enabled) {
+        kafkaClient.reset(asapo::CreateKafkaClient(config->kafka_config, &err));
+        if (kafkaClient == nullptr) {
+            logger->Error("error initializing kafka client: " + err->Explain());
+            return EXIT_FAILURE;
+        }
+    }
+    else {
+        logger->Info("kafka notifications disabled.");
+    }
+
+    auto exit_code = StartReceiver(config, cache,monitoring, kafkaClient.get(), logger);
+// todo: implement graceful exit, currently it never reaches this point
     return exit_code;
 }
diff --git a/receiver/src/receiver.cpp b/receiver/src/receiver.cpp
index b120c76e3a630997908222a6c1a07728696316e1..0cb9b6dc30c6c8ca771b2e303b33b8ee0b2b8232 100644
--- a/receiver/src/receiver.cpp
+++ b/receiver/src/receiver.cpp
@@ -11,11 +11,7 @@ namespace asapo {
 
 const int Receiver::kMaxUnacceptedConnectionsBacklog = 5;
 
-Receiver::Receiver(SharedCache cache, SharedReceiverMonitoringClient monitoring):
-    cache_{std::move(cache)},
-    monitoring_{std::move(monitoring)},
-    io__{GenerateDefaultIO()},
-    log__{GetDefaultReceiverLogger()} {
+Receiver::Receiver(SharedCache cache, SharedReceiverMonitoringClient monitoring,KafkaClient* kafkaClient): cache_{cache},monitoring_{monitoring},  kafka_client_{kafkaClient}, io__{GenerateDefaultIO()}, log__{GetDefaultReceiverLogger()} {
 
 }
 
@@ -59,7 +55,7 @@ void Receiver::StartNewConnectionInSeparateThread(int connection_socket_fd, cons
     log__->Info(LogMessageWithFields("new connection with producer").Append("origin", HostFromUri(address)));
     auto thread = io__->NewThread("ConFd:" + std::to_string(connection_socket_fd),
     [connection_socket_fd, address, this] {
-        auto connection = std::unique_ptr<Connection>(new Connection(connection_socket_fd, address, monitoring_, cache_, GetReceiverConfig()->tag));
+        auto connection = std::unique_ptr<Connection>(new Connection(connection_socket_fd, address,monitoring_, cache_,kafka_client_.get(), GetReceiverConfig()->tag));
         connection->Listen();
     });
 
diff --git a/receiver/src/receiver.h b/receiver/src/receiver.h
index a4e04b2faafca02c4660e8a80b73d7105c72a9d8..5af25a2351cf8a7eb6987c5500a25c7bbbd7cd85 100644
--- a/receiver/src/receiver.h
+++ b/receiver/src/receiver.h
@@ -22,11 +22,12 @@ class Receiver {
     std::vector<std::unique_ptr<std::thread>> threads_;
     SharedCache cache_;
     SharedReceiverMonitoringClient monitoring_;
+    std::unique_ptr<KafkaClient> kafka_client_;
   public:
     static const int kMaxUnacceptedConnectionsBacklog;//TODO: Read from config
     Receiver(const Receiver&) = delete;
     Receiver& operator=(const Receiver&) = delete;
-    Receiver(SharedCache cache, asapo::SharedReceiverMonitoringClient monitoring);
+    Receiver(SharedCache cache,SharedReceiverMonitoringClient monitoring,KafkaClient* kafkaClient);
 
     void Listen(std::string listener_address, Error* err, bool exit_after_first_connection = false);
     std::unique_ptr<IO> io__;
diff --git a/receiver/src/receiver_config.cpp b/receiver/src/receiver_config.cpp
index 8514d27aaf30cc23dbcc5509849e8ff91eddccdc..6466d39a040f82cd6f8f60f64f44a0520d5a2959 100644
--- a/receiver/src/receiver_config.cpp
+++ b/receiver/src/receiver_config.cpp
@@ -17,13 +17,12 @@ Error ReceiverConfigManager::ReadConfigFromFile(std::string file_name) {
     std::string log_level;
     Error err;
 
+    std::vector<std::string> kafkaTopics;
+
     // New monitoring
     (err = parser.GetString("MonitoringServer", &config.monitoring_server_url)) ||
-
-    // Old monitoring
     (err = parser.GetString("PerformanceDbServer", &config.performance_db_uri)) ||
     (err = parser.GetBool("MonitorPerformance", &config.monitor_performance)) ||
-
     (err = parser.GetUInt64("ListenPort", &config.listen_port)) ||
     (err = parser.GetUInt64("ReceiveToDiskThresholdMB", &config.receive_to_disk_threshold_mb)) ||
     (err = parser.Embedded("DataServer").GetUInt64("ListenPort", &config.dataserver.listen_port)) ||
@@ -41,12 +40,29 @@ Error ReceiverConfigManager::ReadConfigFromFile(std::string file_name) {
     (err = parser.Embedded("DataServer").GetArrayString("NetworkMode", &config.dataserver.network_mode)) ||
     (err = parser.Embedded("Metrics").GetBool("Expose", &config.metrics.expose)) ||
     (err = parser.Embedded("Metrics").GetUInt64("ListenPort", &config.metrics.listen_port)) ||
-    (err = parser.GetString("LogLevel", &log_level));
+    (err = parser.GetString("LogLevel", &log_level)) ||
+    (err = parser.Embedded("Kafka").GetBool("Enabled", &config.kafka_config.enabled));
 
     if (err) {
         return err;
     }
 
+    if (config.kafka_config.enabled) {
+        (err = parser.Embedded("Kafka").GetDictionaryString("KafkaClient", &config.kafka_config.global_config)) ||
+        (err = parser.Embedded("Kafka").GetArrayObjectMembers("KafkaTopics", &kafkaTopics));
+        if (err) {
+            return err;
+        }
+
+        for(const auto& topic : kafkaTopics) {
+            auto topicConfig = config.kafka_config.topics_config[topic];
+            err = parser.Embedded("Kafka").Embedded("KafkaTopics").GetDictionaryString(topic, &topicConfig);
+            if (err) {
+                return err;
+            }
+        }
+    }
+
     config.dataserver.tag = config.tag + "_ds";
 
     config.log_level = StringToLogLevel(log_level, &err);
diff --git a/receiver/src/receiver_config.h b/receiver/src/receiver_config.h
index ec520b6dae58692fe0fb4ca154f4623cf30125aa..728f6d81c698f4bfd41e98477f1c943dfd3bf334 100644
--- a/receiver/src/receiver_config.h
+++ b/receiver/src/receiver_config.h
@@ -7,6 +7,7 @@
 
 #include "receiver_data_server/receiver_data_server_config.h"
 #include "metrics/receiver_metrics_config.h"
+#include "asapo/kafka_client/kafka_config.h"
 
 namespace asapo {
 
@@ -28,6 +29,7 @@ struct ReceiverConfig {
     ReceiverDataServerConfig dataserver;
     ReceiverMetricsConfig metrics;
     std::string discovery_server;
+    KafkaClientConfig kafka_config;
 };
 
 class ReceiverConfigManager {
diff --git a/receiver/src/receiver_error.h b/receiver/src/receiver_error.h
index dff04fc0655265a5de163b14ae28f1e532548998..eb08f560461d39ae08c364ddcaa6c6256340a49f 100644
--- a/receiver/src/receiver_error.h
+++ b/receiver/src/receiver_error.h
@@ -13,7 +13,8 @@ enum class ReceiverErrorType {
     kReAuthorizationFailure,
     kWarningDuplicatedRequest,
     kUnsupportedClient,
-    kProcessingError
+    kProcessingError,
+    kCacheAllocationError
 };
 
 using ReceiverErrorTemplate = ServiceErrorTemplate<ReceiverErrorType>;
@@ -38,6 +39,9 @@ auto const kProcessingError = ReceiverErrorTemplate {
     "processing error", ReceiverErrorType::kProcessingError
 };
 
+auto const kCacheAllocationError = ReceiverErrorTemplate {
+    "cache allocation error", ReceiverErrorType::kCacheAllocationError
+};
 
 auto const kBadRequest = ReceiverErrorTemplate {
     "Bad request", ReceiverErrorType::kBadRequest
diff --git a/receiver/src/request.cpp b/receiver/src/request.cpp
index e0c2f02720c6ebf26a11b5bcbcc09502da542cac..500bd688e9775c51a2c33ca7b6578bae497b7cbe 100644
--- a/receiver/src/request.cpp
+++ b/receiver/src/request.cpp
@@ -3,6 +3,7 @@
 #include <utility>
 #include "asapo/io/io_factory.h"
 #include "request_handler/request_handler_db_check_request.h"
+#include "receiver_logger.h"
 
 namespace asapo {
 
@@ -10,47 +11,63 @@ Request::Request(const GenericRequestHeader& header,
                  SocketDescriptor socket_fd, std::string origin_uri, DataCache* cache,
                  const RequestHandlerDbCheckRequest* db_check_handler,
                  SharedInstancedStatistics statistics) : io__{GenerateDefaultIO()},
-                                                         cache__{cache}, statistics_{std::move(statistics)}, request_header_(header),
+                                                         cache__{cache}, log__{GetDefaultReceiverLogger()},statistics_{std::move(statistics)}, request_header_(header),
                                                          socket_fd_{socket_fd}, origin_uri_{std::move(origin_uri)},
                                                          check_duplicate_request_handler_{db_check_handler} {
     origin_host_ = HostFromUri(origin_uri_);
 }
 
+Error Request::PrepareDataBufferFromMemory() {
+    try {
+        data_buffer_.reset(new uint8_t[(size_t)request_header_.data_size]);
+    } catch(std::exception& e) {
+        auto err = GeneralErrorTemplates::kMemoryAllocationError.Generate(
+            std::string("cannot allocate memory for request"));
+        err->AddDetails("reason", e.what())->AddDetails("size", std::to_string(request_header_.data_size));
+        return err;
+    }
+    return nullptr;
+}
+
+Error Request::PrepareDataBufferFromCache() {
+    Error err;
+    CacheMeta* slot;
+    data_ptr = cache__->GetFreeSlotAndLock(request_header_.data_size, &slot, GetBeamtimeId(), GetDataSource(), GetStream(), &err);
+    if (err == nullptr) {
+        slot_meta_ = slot;
+    } else {
+        err->AddDetails("size", std::to_string(request_header_.data_size));
+        return err;
+    }
+    return nullptr;
+}
+
+
 Error Request::PrepareDataBufferAndLockIfNeeded() {
     if (cache__ == nullptr) {
-        try {
-            data_buffer_.reset(new uint8_t[(size_t)request_header_.data_size]);
-        } catch(std::exception& e) {
-            auto err = GeneralErrorTemplates::kMemoryAllocationError.Generate(
-                std::string("cannot allocate memory for request"));
-            err->AddDetails("reason", e.what())->AddDetails("size", std::to_string(request_header_.data_size));
-            return err;
-        }
-    } else {
-        CacheMeta* slot;
-        data_ptr = cache__->GetFreeSlotAndLock(request_header_.data_size, &slot, GetBeamtimeId(), GetDataSource(), GetStream());
-        if (data_ptr) {
-            slot_meta_ = slot;
-        } else {
-            auto err = GeneralErrorTemplates::kMemoryAllocationError.Generate("cannot allocate slot in cache");
-            err->AddDetails("size", std::to_string(request_header_.data_size));
-            return err;
-        }
+        return PrepareDataBufferFromMemory();
+    }
+    auto err = PrepareDataBufferFromCache();
+    if (err) {
+        log__->Warning(LogMessageWithFields(err).Append(RequestLog("", this)));
+        return PrepareDataBufferFromMemory();
     }
     return nullptr;
 }
 
 
 Error Request::Handle() {
+    Error err;
     for (auto handler : handlers_) {
         statistics_->StartTimer(handler->GetStatisticEntity());
-        auto err = handler->ProcessRequest(this);
+        err = handler->ProcessRequest(this);
         statistics_->StopTimer();
         if (err) {
-            return err;
+            break;
         }
     }
-    return nullptr;
+    UnlockDataBufferIfNeeded();
+    return err;
 }
 
 const RequestHandlerList& Request::GetListHandlers() const {
diff --git a/receiver/src/request.h b/receiver/src/request.h
index e46d62d7cce63068f69de1e66295c480467c8059..b1b1755f41a568ce432a4ec449ce1c183b5879ed 100644
--- a/receiver/src/request.h
+++ b/receiver/src/request.h
@@ -13,6 +13,7 @@
 
 #include "asapo/preprocessor/definitions.h"
 #include "statistics/instanced_statistics_provider.h"
+#include "asapo/logger/logger.h"
 
 namespace asapo {
 
@@ -83,6 +84,10 @@ class Request {
     ASAPO_VIRTUAL const std::string& GetResponseMessage() const;
     ASAPO_VIRTUAL Error CheckForDuplicates();
     ASAPO_VIRTUAL SharedInstancedStatistics GetInstancedStatistics();
+    const AbstractLogger* log__;
+ private:
+    Error PrepareDataBufferFromMemory();
+    Error PrepareDataBufferFromCache();
   private:
     SharedInstancedStatistics statistics_;
     const GenericRequestHeader request_header_;
diff --git a/receiver/src/request_handler/request_factory.cpp b/receiver/src/request_handler/request_factory.cpp
index a0684c017171eff7880d26a3a8d2fa0a4e922128..6084f89ced7a43530f44a6baedd0d60192e81c50 100644
--- a/receiver/src/request_handler/request_factory.cpp
+++ b/receiver/src/request_handler/request_factory.cpp
@@ -34,6 +34,9 @@ void RequestFactory::AddReceiveViaBufferHandlers(std::unique_ptr<Request>& reque
     request->AddHandler(&request_handler_receivedata_);
     if (NeedFileWriteHandler(request_header)) {
         request->AddHandler(&request_handler_filewrite_);
+        if (GetReceiverConfig()->kafka_config.enabled) {
+            request->AddHandler(&request_handler_kafka_notify_);
+        }
     }
 }
 
@@ -44,6 +47,9 @@ Error RequestFactory::AddReceiveDirectToFileHandler(std::unique_ptr<Request>& re
                    "ingest mode should include kStoreInFilesystem for large files ");
     }
     request->AddHandler(&request_handler_filereceive_);
+    if (GetReceiverConfig()->kafka_config.enabled) {
+        request->AddHandler(&request_handler_kafka_notify_);
+    }
     return nullptr;
 }
 
@@ -119,8 +125,10 @@ std::unique_ptr<Request> RequestFactory::GenerateRequest(const GenericRequestHea
     return request;
 }
 
-RequestFactory::RequestFactory(SharedReceiverMonitoringClient monitoring, SharedCache cache) : monitoring_{std::move(monitoring)}, cache_{std::move(cache)} {
-
+RequestFactory::RequestFactory(SharedReceiverMonitoringClient monitoring, SharedCache cache, KafkaClient* kafka_client) :
+    monitoring_{std::move(monitoring)},cache_{cache},
+    request_handler_monitoring_{monitoring_},
+    request_handler_kafka_notify_{kafka_client} {
 }
 
 }
diff --git a/receiver/src/request_handler/request_factory.h b/receiver/src/request_handler/request_factory.h
index 26eac85652444c2516932a6d47bbcb747f5aa554..7be05eaaa575b1d4d082dbb14a4b869754a158b0 100644
--- a/receiver/src/request_handler/request_factory.h
+++ b/receiver/src/request_handler/request_factory.h
@@ -9,6 +9,7 @@
 #include "request_handler_db_delete_stream.h"
 #include "request_handler_db_get_meta.h"
 #include "request_handler_monitoring.h"
+#include "request_handler_kafka_notify.h"
 
 #include "request_handler_file_process.h"
 #include "request_handler_db_write.h"
@@ -24,7 +25,7 @@ namespace asapo {
 
 class RequestFactory {
   public:
-    explicit RequestFactory (SharedReceiverMonitoringClient monitoring, SharedCache cache);
+    explicit RequestFactory (SharedReceiverMonitoringClient monitoring, SharedCache cache, KafkaClient* kafka_client);
     virtual std::unique_ptr<Request> GenerateRequest(const GenericRequestHeader& request_header,
                                                      SocketDescriptor socket_fd, std::string origin_uri,
                                                      const SharedInstancedStatistics& statistics, Error* err) const noexcept;
@@ -44,7 +45,7 @@ class RequestFactory {
     RequestHandlerReceiveData request_handler_receivedata_;
     RequestHandlerReceiveMetaData request_handler_receive_metadata_;
     RequestHandlerDbWrite request_handler_dbwrite_{kDBDataCollectionNamePrefix};
-    RequestHandlerMonitoring request_handler_monitoring_{monitoring_};
+    RequestHandlerMonitoring request_handler_monitoring_;
     RequestHandlerDbStreamInfo request_handler_db_stream_info_{kDBDataCollectionNamePrefix};
     RequestHandlerDbDeleteStream request_handler_delete_stream_{kDBDataCollectionNamePrefix};
     RequestHandlerDbLastStream request_handler_db_last_stream_{kDBDataCollectionNamePrefix};
@@ -54,6 +55,7 @@ class RequestFactory {
     RequestHandlerSecondaryAuthorization request_handler_secondary_authorize_{&shared_auth_cache_};
     RequestHandlerDbCheckRequest request_handler_db_check_{kDBDataCollectionNamePrefix};
 
+    RequestHandlerKafkaNotify request_handler_kafka_notify_;
     bool ReceiveDirectToFile(const GenericRequestHeader& request_header) const;
     Error AddReceiveDirectToFileHandler(std::unique_ptr<Request>& request,
                                         const GenericRequestHeader& request_header) const;
diff --git a/receiver/src/request_handler/request_handler_kafka_notify.cpp b/receiver/src/request_handler/request_handler_kafka_notify.cpp
new file mode 100644
index 0000000000000000000000000000000000000000..d99cc2bef4d8b9be0fcbc630af2969c9ea4f8269
--- /dev/null
+++ b/receiver/src/request_handler/request_handler_kafka_notify.cpp
@@ -0,0 +1,33 @@
+#include "request_handler_kafka_notify.h"
+#include "../request.h"
+
+namespace asapo {
+
+Error RequestHandlerKafkaNotify::ProcessRequest(Request* request) const {
+    bool write_to_offline = request->GetSourceType() == SourceType::kProcessed ||
+                  static_cast<bool>(request->GetCustomData()[kPosIngestMode] & IngestModeFlags::kWriteRawDataToOffline);
+
+    if (!kafka_client_ || write_to_offline) {
+        return nullptr;
+    }
+
+    auto root_folder = request->GetOnlinePath();
+    if (root_folder.empty()) {
+        return ReceiverErrorTemplates::kBadRequest.Generate("online path not available");
+    }
+
+    std::string message = "{"
+            "\"event\":\"IN_CLOSE_WRITE\","
+            "\"path\":\"" + root_folder + kPathSeparator + request->GetFileName() + "\""
+    "}";
+
+    return kafka_client_->Send(message, "asapo");
+}
+
+StatisticEntity RequestHandlerKafkaNotify::GetStatisticEntity() const {
+    return StatisticEntity::kNetworkOutgoing;
+}
+
+RequestHandlerKafkaNotify::RequestHandlerKafkaNotify(KafkaClient* kafka_client) : kafka_client_{kafka_client} {
+}
+}
\ No newline at end of file
diff --git a/receiver/src/request_handler/request_handler_kafka_notify.h b/receiver/src/request_handler/request_handler_kafka_notify.h
new file mode 100644
index 0000000000000000000000000000000000000000..399b01126dce0865f273b8446746c5acde53692d
--- /dev/null
+++ b/receiver/src/request_handler/request_handler_kafka_notify.h
@@ -0,0 +1,20 @@
+#ifndef ASAPO_REQUEST_HANDLER_KAFKA_NOTIFY_H
+#define ASAPO_REQUEST_HANDLER_KAFKA_NOTIFY_H
+
+#include "request_handler.h"
+#include "asapo/kafka_client/kafka_client.h"
+
+namespace asapo {
+
+class RequestHandlerKafkaNotify final : public ReceiverRequestHandler {
+  public:
+    RequestHandlerKafkaNotify(KafkaClient* kafka_client);
+    StatisticEntity GetStatisticEntity() const override;
+    Error ProcessRequest(Request* request) const override;
+  private:
+    KafkaClient* kafka_client_;
+};
+
+}
+
+#endif //ASAPO_REQUEST_HANDLER_KAFKA_NOTIFY_H
diff --git a/receiver/src/request_handler/request_handler_receive_data.cpp b/receiver/src/request_handler/request_handler_receive_data.cpp
index 7b66c3ceebc77242185812c523393da274743f4f..3103c194f86d1117c3273d0a2368326d2d5d24b3 100644
--- a/receiver/src/request_handler/request_handler_receive_data.cpp
+++ b/receiver/src/request_handler/request_handler_receive_data.cpp
@@ -24,7 +24,6 @@ Error RequestHandlerReceiveData::ProcessRequest(Request* request) const {
     if (io_err) {
         err = ReceiverErrorTemplates::kProcessingError.Generate("cannot receive data",std::move(io_err));
     }
-    request->UnlockDataBufferIfNeeded();
     if (err == nullptr) {
         log__->Debug(RequestLog("received request data", request).Append("size",request->GetDataSize()));
     }
diff --git a/receiver/src/request_handler/requests_dispatcher.cpp b/receiver/src/request_handler/requests_dispatcher.cpp
index bcb76b78b082ac090adcbf4fa3f5129ec53e7bfc..d7ef7bf1ac1154014b119708b0f73f14eb958581 100644
--- a/receiver/src/request_handler/requests_dispatcher.cpp
+++ b/receiver/src/request_handler/requests_dispatcher.cpp
@@ -8,14 +8,16 @@
 namespace asapo {
 
 RequestsDispatcher::RequestsDispatcher(SocketDescriptor socket_fd, std::string address,
-                                       ReceiverStatistics* statistics, SharedReceiverMonitoringClient monitoring, SharedCache cache) : statistics__{statistics},
+                                       ReceiverStatistics* statistics,SharedReceiverMonitoringClient monitoring, SharedCache cache,KafkaClient* kafka_client) : statistics__{statistics},
     io__{GenerateDefaultIO()},
     log__{
     GetDefaultReceiverLogger()},
 request_factory__{
     new RequestFactory{
-        std::move(monitoring), std::move(cache)}}, socket_fd_{socket_fd},
-producer_uri_{std::move(address)} {
+        std::move(monitoring),cache,kafka_client}},
+socket_fd_{socket_fd},
+producer_uri_{
+    std::move(address)} {
 }
 
 NetworkErrorCode GetNetworkCodeFromError(const Error& err) {
diff --git a/receiver/src/request_handler/requests_dispatcher.h b/receiver/src/request_handler/requests_dispatcher.h
index bea7f7464a674cbc4e944ebe1a4a2f35bb69714c..e942b5888314c521108519f46197dfaccab35383 100644
--- a/receiver/src/request_handler/requests_dispatcher.h
+++ b/receiver/src/request_handler/requests_dispatcher.h
@@ -14,7 +14,7 @@ namespace asapo {
 
 class RequestsDispatcher {
   public:
-    RequestsDispatcher(SocketDescriptor socket_fd, std::string address, ReceiverStatistics* statistics, SharedReceiverMonitoringClient monitoring, SharedCache cache);
+    RequestsDispatcher(SocketDescriptor socket_fd, std::string address, ReceiverStatistics* statistics,SharedReceiverMonitoringClient monitoring, SharedCache cache, KafkaClient* kafka_client);
     ASAPO_VIRTUAL Error ProcessRequest(const std::unique_ptr<Request>& request) const noexcept;
     ASAPO_VIRTUAL std::unique_ptr<Request> GetNextRequest(Error* err) const noexcept;
     ASAPO_VIRTUAL ~RequestsDispatcher() = default;
diff --git a/receiver/unittests/mock_receiver_config.cpp b/receiver/unittests/mock_receiver_config.cpp
index e9f2ac61f97344f388eeff4db7f8bd0607e76c35..026eeb66891154a9eb1eb3298b9a63a931ce89bf 100644
--- a/receiver/unittests/mock_receiver_config.cpp
+++ b/receiver/unittests/mock_receiver_config.cpp
@@ -82,6 +82,9 @@ Error SetReceiverConfigWithError (const ReceiverConfig& config, std::string erro
     config_string += "," +  Key("AuthorizationServer", error_field) + "\"" + config.authorization_server + "\"";
     config_string += "," +  Key("LogLevel", error_field) + "\"" + log_level + "\"";
     config_string += "," +  Key("Tag", error_field) + "\"" + config.tag + "\"";
+    config_string += "," +  Key("Kafka", error_field) + "{";
+    config_string += Key("Enabled", error_field) + (config.kafka_config.enabled ? "true" : "false") ;
+    config_string += "}";
     config_string += "}";
 
 
@@ -97,8 +100,7 @@ Error SetReceiverConfigWithError (const ReceiverConfig& config, std::string erro
 }
 
 void SetReceiverConfig (const ReceiverConfig& config, std::string error_field) {
-    auto err = SetReceiverConfigWithError(config, error_field);
-    ASSERT_THAT(err, Eq(nullptr));
+    SetReceiverConfigWithError(config, error_field);
 }
 
 }
diff --git a/receiver/unittests/receiver_mocking.h b/receiver/unittests/receiver_mocking.h
index a65c216439092a3bcbd30dd8495477f60a3ed509..f08978bf5f03c1b88037a215e85df940701d0de2 100644
--- a/receiver/unittests/receiver_mocking.h
+++ b/receiver/unittests/receiver_mocking.h
@@ -137,12 +137,18 @@ class MockRequest: public Request {
 class MockDataCache: public DataCache {
   public:
     MockDataCache(): DataCache(0, 0) {};
-    MOCK_METHOD(void*, GetFreeSlotAndLock, (uint64_t size, CacheMeta** meta, std::string beamtime, std::string source, std::string stream), (override));
+    void* GetFreeSlotAndLock(uint64_t size, CacheMeta** meta,std::string beamtime, std::string source, std::string stream, Error* err) override{
+      ErrorInterface* error = nullptr;
+      auto data = GetFreeSlotAndLock_t(size, meta,beamtime,source,stream, &error);
+      err->reset(error);
+      return data;
+    }
+
+    MOCK_METHOD(void*, GetFreeSlotAndLock_t, (uint64_t size, CacheMeta** meta,std::string beamtime, std::string source, std::string stream,ErrorInterface** err));
     MOCK_METHOD(bool, UnlockSlot, (CacheMeta* meta), (override));
     MOCK_CONST_METHOD0(AllMetaInfosAsVector, std::vector<std::shared_ptr<const CacheMeta>>());
     MOCK_CONST_METHOD0(GetCacheSize, uint64_t());
     MOCK_METHOD(void*, GetSlotToReadAndLock, (uint64_t id, uint64_t data_size, CacheMeta** meta), (override));
-
 };
 
 class MockStatisticsSender: public StatisticsSender {
diff --git a/receiver/unittests/request_handler/test_request_factory.cpp b/receiver/unittests/request_handler/test_request_factory.cpp
index 05cf9bfe5ffcc5765e387205288c42622e6c9b4b..60865b2d543b075bbd4974e80be7c2bb76e352d4 100644
--- a/receiver/unittests/request_handler/test_request_factory.cpp
+++ b/receiver/unittests/request_handler/test_request_factory.cpp
@@ -5,6 +5,7 @@
 
 #include "asapo/unittests/MockIO.h"
 #include "asapo/unittests/MockDatabase.h"
+#include "asapo/unittests/MockKafkaClient.h"
 #include "../../src/connection.h"
 #include "../../src/receiver_error.h"
 #include "../../src/request.h"
@@ -27,7 +28,8 @@ namespace {
 
 class FactoryTests : public Test {
   public:
-    RequestFactory factory{nullptr, nullptr};
+    asapo::MockKafkaClient kafkaClient;
+    RequestFactory factory{nullptr,nullptr, &kafkaClient};
     Error err{nullptr};
     GenericRequestHeader generic_request_header;
     ReceiverConfig config;
@@ -49,18 +51,24 @@ TEST_F(FactoryTests, ErrorOnWrongCode) {
 }
 
 TEST_F(FactoryTests, ReturnsDataRequestOnkNetOpcodeSendCode) {
+    config.kafka_config.enabled = true;
+    SetReceiverConfig(config, "none");
+
     for (auto code : std::vector<asapo::Opcode> {asapo::Opcode::kOpcodeTransferData, asapo::Opcode::kOpcodeTransferDatasetData}) {
         generic_request_header.op_code = code;
-        auto request = factory.GenerateRequest(generic_request_header, 1, origin_uri, nullptr, &err);
+        auto request = factory.GenerateRequest(generic_request_header, 1, origin_uri,nullptr, &err);
+
 
         ASSERT_THAT(err, Eq(nullptr));
         ASSERT_THAT(dynamic_cast<asapo::Request*>(request.get()), Ne(nullptr));
-        ASSERT_THAT(request->GetListHandlers().size(), Eq(6));
-        ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerSecondaryAuthorization*>(request->GetListHandlers()[0]), Ne(nullptr));
+        ASSERT_THAT(request->GetListHandlers().size(), Eq(7));
+        ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerSecondaryAuthorization*>(request->GetListHandlers()[0]),
+                    Ne(nullptr));
         ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerReceiveMetaData*>(request->GetListHandlers()[1]), Ne(nullptr));
         ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerReceiveData*>(request->GetListHandlers()[2]), Ne(nullptr));
         ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerFileProcess*>(request->GetListHandlers()[3]), Ne(nullptr));
-        ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerDbWrite*>(request->GetListHandlers()[4]), Ne(nullptr));
+        ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerKafkaNotify*>(request->GetListHandlers()[4]), Ne(nullptr));
+        ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerDbWrite*>(request->GetListHandlers()[5]), Ne(nullptr));
         ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerMonitoring*>(request->GetListHandlers().back()), Ne(nullptr));
     }
 }
@@ -69,6 +77,7 @@ TEST_F(FactoryTests, ReturnsDataRequestOnkNetOpcodeSendCodeLargeFile) {
     for (auto code : std::vector<asapo::Opcode> {asapo::Opcode::kOpcodeTransferData, asapo::Opcode::kOpcodeTransferDatasetData}) {
         generic_request_header.op_code = code;
         config.receive_to_disk_threshold_mb = 0;
+        config.kafka_config.enabled = true;
         SetReceiverConfig(config, "none");
 
         generic_request_header.data_size = 1;
@@ -76,12 +85,15 @@ TEST_F(FactoryTests, ReturnsDataRequestOnkNetOpcodeSendCodeLargeFile) {
 
         ASSERT_THAT(err, Eq(nullptr));
         ASSERT_THAT(dynamic_cast<asapo::Request*>(request.get()), Ne(nullptr));
-        ASSERT_THAT(request->GetListHandlers().size(), Eq(5));
-        ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerSecondaryAuthorization*>(request->GetListHandlers()[0]), Ne(nullptr));
+        ASSERT_THAT(request->GetListHandlers().size(), Eq(6));
+        ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerSecondaryAuthorization*>(request->GetListHandlers()[0]),
+                    Ne(nullptr));
         ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerReceiveMetaData*>(request->GetListHandlers()[1]), Ne(nullptr));
         ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerFileProcess*>(request->GetListHandlers()[2]), Ne(nullptr));
-        ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerDbWrite*>(request->GetListHandlers()[3]), Ne(nullptr));
+        ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerKafkaNotify*>(request->GetListHandlers()[3]), Ne(nullptr));
+        ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerDbWrite*>(request->GetListHandlers()[4]), Ne(nullptr));
         ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerMonitoring*>(request->GetListHandlers().back()), Ne(nullptr));
+
     }
 }
 
@@ -111,6 +123,25 @@ TEST_F(FactoryTests, DoNotAddDbWriterIfNotWanted) {
     generic_request_header.custom_data[asapo::kPosIngestMode] = asapo::IngestModeFlags::kTransferData |
             asapo::IngestModeFlags::kStoreInFilesystem;
 
+    config.kafka_config.enabled = true;
+    SetReceiverConfig(config, "none");
+
+    auto request = factory.GenerateRequest(generic_request_header, 1, origin_uri,nullptr, &err);
+    ASSERT_THAT(err, Eq(nullptr));
+    ASSERT_THAT(request->GetListHandlers().size(), Eq(6));
+    ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerSecondaryAuthorization*>(request->GetListHandlers()[0]),
+                Ne(nullptr));
+    ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerReceiveMetaData*>(request->GetListHandlers()[1]), Ne(nullptr));
+    ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerReceiveData*>(request->GetListHandlers()[2]), Ne(nullptr));
+    ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerFileProcess*>(request->GetListHandlers()[3]), Ne(nullptr));
+    ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerKafkaNotify*>(request->GetListHandlers()[4]), Ne(nullptr));
+}
+
+TEST_F(FactoryTests, DoNotAddKafkaIfNotWanted) {
+    generic_request_header.custom_data[asapo::kPosIngestMode] = asapo::IngestModeFlags::kTransferData |
+        asapo::IngestModeFlags::kStoreInFilesystem;
+
+    config.kafka_config.enabled = false;
     SetReceiverConfig(config, "none");
 
     auto request = factory.GenerateRequest(generic_request_header, 1, origin_uri, nullptr, &err);
@@ -125,8 +156,7 @@ TEST_F(FactoryTests, DoNotAddDbWriterIfNotWanted) {
 
 TEST_F(FactoryTests, CachePassedToRequest) {
     auto cache = asapo::SharedCache{new asapo::DataCache{0, 0}};
-    RequestFactory factory{asapo::SharedReceiverMonitoringClient{new asapo::ReceiverMonitoringClientNoop{}}, cache};
-
+    RequestFactory factory{asapo::SharedReceiverMonitoringClient{new asapo::ReceiverMonitoringClientNoop{}}, cache,nullptr};
     auto request = factory.GenerateRequest(generic_request_header, 1, origin_uri, nullptr, &err);
     ASSERT_THAT(err, Eq(nullptr));
     ASSERT_THAT(request->cache__, Eq(cache.get()));
diff --git a/receiver/unittests/request_handler/test_request_handler_kafka_notify.cpp b/receiver/unittests/request_handler/test_request_handler_kafka_notify.cpp
new file mode 100644
index 0000000000000000000000000000000000000000..bfe2471a2c651685cbdcf9cafaf9a99ba879e8d0
--- /dev/null
+++ b/receiver/unittests/request_handler/test_request_handler_kafka_notify.cpp
@@ -0,0 +1,66 @@
+#include <gtest/gtest.h>
+#include <gmock/gmock.h>
+
+#include "asapo/unittests/MockKafkaClient.h"
+#include "../../src/request_handler/request_factory.h"
+#include "../receiver_mocking.h"
+
+using namespace testing;
+using namespace asapo;
+
+namespace {
+
+class KafkaNotifyHandlerTests : public Test {
+  public:
+    NiceMock<MockKafkaClient> kafka_client;
+    RequestHandlerKafkaNotify handler{&kafka_client};
+    std::unique_ptr<NiceMock<MockRequest>> mock_request;
+    std::string expected_filename = std::string("raw") + asapo::kPathSeparator + "filename";
+    std::string expected_online_path = std::string("online") + asapo::kPathSeparator + "path";
+    CustomRequestData expected_custom_data {kDefaultIngestMode, 0, 0};
+    const std::string expected_topic = "asapo";
+
+    void SetUp() override {
+        GenericRequestHeader request_header;
+        mock_request.reset(new NiceMock<MockRequest> {request_header, 1, "", nullptr, nullptr});
+    }
+
+    void TearDown() override {
+    }
+};
+
+TEST_F(KafkaNotifyHandlerTests, KafkaNotifyOK) {
+    EXPECT_CALL(*mock_request, GetFileName()).WillOnce(Return(expected_filename));
+    EXPECT_CALL(*mock_request, GetOnlinePath()).WillOnce(ReturnRef(expected_online_path));
+    EXPECT_CALL(kafka_client, Send_t(HasSubstr(expected_filename), expected_topic)).WillOnce(Return(nullptr));
+    EXPECT_CALL(*mock_request, GetCustomData_t()).WillOnce(Return(expected_custom_data));
+    EXPECT_CALL(*mock_request, GetSourceType()).WillOnce(Return(SourceType::kRaw));
+
+    auto err = handler.ProcessRequest(mock_request.get());
+    ASSERT_THAT(err, Eq(nullptr));
+    Mock::VerifyAndClearExpectations(mock_request.get());
+    Mock::VerifyAndClearExpectations(&kafka_client);
+}
+
+TEST_F(KafkaNotifyHandlerTests, KafkaNotifyNotNeededForProcessed) {
+    EXPECT_CALL(*mock_request, GetSourceType()).WillOnce(Return(SourceType::kProcessed));
+
+
+    auto err = handler.ProcessRequest(mock_request.get());
+    ASSERT_THAT(err, Eq(nullptr));
+    Mock::VerifyAndClearExpectations(mock_request.get());
+    Mock::VerifyAndClearExpectations(&kafka_client);
+}
+
+TEST_F(KafkaNotifyHandlerTests, KafkaNotifyNotNeededForOfflineRaw) {
+    EXPECT_CALL(*mock_request, GetSourceType()).WillOnce(Return(SourceType::kRaw));
+    EXPECT_CALL(*mock_request, GetCustomData_t()).WillOnce(Return(expected_custom_data));
+    expected_custom_data[kPosIngestMode] |= IngestModeFlags::kWriteRawDataToOffline;
+
+    auto err = handler.ProcessRequest(mock_request.get());
+    ASSERT_THAT(err, Eq(nullptr));
+    Mock::VerifyAndClearExpectations(mock_request.get());
+    Mock::VerifyAndClearExpectations(&kafka_client);
+}
+
+}
\ No newline at end of file
diff --git a/receiver/unittests/request_handler/test_request_handler_receive_data.cpp b/receiver/unittests/request_handler/test_request_handler_receive_data.cpp
index 6aa087cd431edb669cad673bb3c30d3926ddeeae..4eaee78f8c42256ba7bc8593a4e7b6ac77a6eb15 100644
--- a/receiver/unittests/request_handler/test_request_handler_receive_data.cpp
+++ b/receiver/unittests/request_handler/test_request_handler_receive_data.cpp
@@ -132,44 +132,36 @@ TEST_F(ReceiveDataHandlerTests, HandleGetsMemoryFromCache) {
     request->cache__ = &mock_cache;
     asapo::CacheMeta meta;
     meta.id = expected_slot_id;
-    EXPECT_CALL(mock_cache, GetFreeSlotAndLock(data_size_, _, expected_beamtime, expected_source, expected_stream)).WillOnce(
+    EXPECT_CALL(mock_cache, GetFreeSlotAndLock_t(data_size_, _, expected_beamtime, expected_source, expected_stream,_)).WillOnce(
         DoAll(SetArgPointee<1>(&meta),
+              SetArgPointee<5>(nullptr),
               Return(&mock_cache)
              ));
-
-    EXPECT_CALL(mock_cache, UnlockSlot(&meta));
-    /*
-    EXPECT_CALL(mock_io, Receive_t(socket_fd_, _, _, _)).WillOnce(
-            DoAll(
-                    CopyStr(expected_metadata),
-                    SetArgPointee<3>(nullptr),
-                    Return(0)
-                    ));
-    */
     EXPECT_CALL(*mock_instanced_statistics, AddIncomingBytes(0));
 
     auto err = handler.ProcessRequest(request.get());
 
     ASSERT_THAT(request->GetSlotId(), Eq(expected_slot_id));
+    ASSERT_THAT(err, Eq(nullptr));
+
 }
 
 
 TEST_F(ReceiveDataHandlerTests, ErrorGetMemoryFromCache) {
     request->cache__ = &mock_cache;
 
-    EXPECT_CALL(mock_cache, GetFreeSlotAndLock(data_size_, _, expected_beamtime, expected_source, expected_stream)).WillOnce(
-        Return(nullptr)
-    );
+    EXPECT_CALL(mock_cache, GetFreeSlotAndLock_t(data_size_, _, expected_beamtime, expected_source, expected_stream,_)).WillOnce(
+        DoAll(SetArgPointee<5>(asapo::ReceiverErrorTemplates::kProcessingError.Generate().release()),
+              Return(nullptr)
+    ));
 
-    EXPECT_CALL(mock_cache, UnlockSlot(_)).Times(0);
+    EXPECT_CALL(*mock_instanced_statistics, AddIncomingBytes(0));
 
 
     auto err = handler.ProcessRequest(request.get());
 
     ASSERT_THAT(request->GetSlotId(), Eq(0));
-    ASSERT_THAT(err, Eq(asapo::GeneralErrorTemplates::kMemoryAllocationError));
+    ASSERT_THAT(err, Eq(nullptr));
 }
 
-
-
 }
diff --git a/receiver/unittests/request_handler/test_requests_dispatcher.cpp b/receiver/unittests/request_handler/test_requests_dispatcher.cpp
index 517c01fee3ecdc2249c438793d45568c62d5a052..13dd15f3d5ab242afa8cecac1e7084673c68af8b 100644
--- a/receiver/unittests/request_handler/test_requests_dispatcher.cpp
+++ b/receiver/unittests/request_handler/test_requests_dispatcher.cpp
@@ -25,7 +25,7 @@ TEST(RequestDispatcher, Constructor) {
 
     auto monitoring = asapo::SharedReceiverMonitoringClient{new asapo::ReceiverMonitoringClientNoop{}};
 
-    RequestsDispatcher dispatcher{0,  "some_address", stat.get(), monitoring, cache};
+    RequestsDispatcher dispatcher{0,  "some_address", stat.get(), monitoring, cache, nullptr};
     ASSERT_THAT(dynamic_cast<const asapo::ReceiverStatistics*>(dispatcher.statistics__), Ne(nullptr));
     ASSERT_THAT(dynamic_cast<asapo::IO*>(dispatcher.io__.get()), Ne(nullptr));
     ASSERT_THAT(dynamic_cast<asapo::RequestFactory*>(dispatcher.request_factory__.get()), Ne(nullptr));
@@ -47,7 +47,7 @@ class MockRequest: public Request {
 
 class MockRequestFactory: public asapo::RequestFactory {
   public:
-    MockRequestFactory(): RequestFactory(nullptr, nullptr) {};
+    MockRequestFactory(): RequestFactory(nullptr, nullptr, nullptr) {};
     std::unique_ptr<Request> GenerateRequest(const GenericRequestHeader& request_header,
                                              SocketDescriptor socket_fd, std::string origin_uri,
                                              const asapo::SharedInstancedStatistics& statistics,
@@ -94,7 +94,7 @@ class RequestsDispatcherTests : public Test {
         mock_instanced_statistics.reset(new StrictMock<asapo::MockInstancedStatistics>);
         test_config.authorization_interval_ms = 0;
         SetReceiverConfig(test_config, "none");
-        dispatcher = std::unique_ptr<RequestsDispatcher> {new RequestsDispatcher{0, connected_uri, &mock_statistics, nullptr, nullptr}};
+        dispatcher = std::unique_ptr<RequestsDispatcher> {new RequestsDispatcher{0, connected_uri, &mock_statistics, nullptr, nullptr, nullptr}};
         dispatcher->io__ = std::unique_ptr<asapo::IO> {&mock_io};
         dispatcher->statistics__ = &mock_statistics;
         dispatcher->request_factory__ = std::unique_ptr<asapo::RequestFactory> {&mock_factory};
diff --git a/receiver/unittests/test_config.cpp b/receiver/unittests/test_config.cpp
index 45a684ed431af29dbf865949ee0c90e2b4726965..904f4b7d51ebe9b9ddb4187c9c3b2f05eff96055 100644
--- a/receiver/unittests/test_config.cpp
+++ b/receiver/unittests/test_config.cpp
@@ -81,6 +81,8 @@ TEST_F(ConfigTests, ReadSettings) {
     ASSERT_THAT(config->receive_to_disk_threshold_mb, Eq(50));
     ASSERT_THAT(config->metrics.expose, Eq(true));
     ASSERT_THAT(config->metrics.listen_port, Eq(123));
+    ASSERT_THAT(config->kafka_config.enabled, Eq(false));
+
 }
 
 
@@ -92,7 +94,7 @@ TEST_F(ConfigTests, ErrorReadSettings) {
                                     "DataCache", "Use", "SizeGB", "ReservedShare", "DatabaseServer", "Tag",
                                     "AuthorizationServer", "AuthorizationInterval", "PerformanceDbName", "LogLevel",
                                     "NThreads", "DiscoveryServer", "AdvertiseURI", "NetworkMode", "MonitorPerformance",
-                                    "ReceiveToDiskThresholdMB", "Metrics", "Expose"};
+                                    "ReceiveToDiskThresholdMB", "Metrics", "Expose","Enabled"};
     for (const auto& field : fields) {
         auto err = asapo::SetReceiverConfigWithError(test_config, field);
         ASSERT_THAT(err, Ne(nullptr));
diff --git a/receiver/unittests/test_connection.cpp b/receiver/unittests/test_connection.cpp
index 99a52280415fe395c4947890fd1840693525e420..48b5ff34277621e7460efa759b6b680a5e5dd7b3 100644
--- a/receiver/unittests/test_connection.cpp
+++ b/receiver/unittests/test_connection.cpp
@@ -15,7 +15,7 @@ using namespace asapo;
 namespace {
 
 TEST(Connection, Constructor) {
-    Connection connection{0, "some_address", nullptr, nullptr, "some_tag"};
+    Connection connection{0, "some_address", nullptr, nullptr,nullptr, "some_tag"};
     ASSERT_THAT(dynamic_cast<asapo::Statistics*>(connection.statistics__.get()), Ne(nullptr));
     ASSERT_THAT(dynamic_cast<asapo::IO*>(connection.io__.get()), Ne(nullptr));
     ASSERT_THAT(dynamic_cast<const asapo::AbstractLogger*>(connection.log__), Ne(nullptr));
@@ -25,7 +25,7 @@ TEST(Connection, Constructor) {
 
 class MockDispatcher: public asapo::RequestsDispatcher {
   public:
-    MockDispatcher(): asapo::RequestsDispatcher(0, "", nullptr, nullptr, nullptr) {};
+    MockDispatcher(): asapo::RequestsDispatcher(0, "", nullptr, nullptr, nullptr, nullptr) {};
     Error ProcessRequest(const std::unique_ptr<Request>& request) const noexcept override {
         return Error{ProcessRequest_t(request.get())};
     }
@@ -56,7 +56,7 @@ class ConnectionTests : public Test {
     void SetUp() override {
         asapo::SharedCache cache; /*nullptr*/
         mock_monitoring.reset(new NiceMock<asapo::MockReceiverMonitoringClient>);
-        connection = std::unique_ptr<Connection> {new Connection{0, connected_uri, mock_monitoring, cache, "some_tag"}};
+        connection = std::unique_ptr<Connection> {new Connection{0, connected_uri, mock_monitoring, cache,nullptr, "some_tag"}};
         connection->io__ = std::unique_ptr<asapo::IO> {&mock_io};
         connection->statistics__ = std::unique_ptr<asapo::ReceiverStatistics> {&mock_statictics};
         connection->log__ = &mock_logger;
diff --git a/receiver/unittests/test_datacache.cpp b/receiver/unittests/test_datacache.cpp
index 64f8177c1c6c963f4ab9097773be2b877acea133..9a7f400a33a5fe23933ae47fff64ce0841ae62ad 100644
--- a/receiver/unittests/test_datacache.cpp
+++ b/receiver/unittests/test_datacache.cpp
@@ -30,8 +30,10 @@ class DataCacheTests : public Test {
 };
 
 TEST_F(DataCacheTests, GetFreeSlotFailsDueToSize) {
-    auto addr = cache.GetFreeSlotAndLock(expected_cache_size + 1, &meta1, "b", "so", "st");
+    asapo::Error err;
+    auto addr = cache.GetFreeSlotAndLock(expected_cache_size + 1, &meta1, "b", "so", "st", &err);
     ASSERT_THAT(addr, Eq(nullptr));
+    ASSERT_THAT(err, Ne(nullptr));
 }
 
 void set_array(uint8_t* addr, uint64_t size, uint8_t val) {
@@ -41,16 +43,18 @@ void set_array(uint8_t* addr, uint64_t size, uint8_t val) {
 }
 
 TEST_F(DataCacheTests, GetFreeSlotOK) {
-    uint8_t* addr = (uint8_t*) cache.GetFreeSlotAndLock(1, &meta1, "b", "so", "st");
+    asapo::Error err;
+    uint8_t* addr = (uint8_t*) cache.GetFreeSlotAndLock(1, &meta1, "b", "so", "st", &err);
     set_array(addr, 1, 2);
     ASSERT_THAT(addr[0], Eq(2));
     ASSERT_THAT(meta1->id, Gt(0));
 }
 
 TEST_F(DataCacheTests, GetFreeSlotStartsFromLastPointer) {
-    uint8_t* ini_addr = (uint8_t*) cache.GetFreeSlotAndLock(1, &meta1, "b", "so", "st");
+    asapo::Error err;
+    uint8_t* ini_addr = (uint8_t*) cache.GetFreeSlotAndLock(1, &meta1, "b", "so", "st", &err);
     set_array(ini_addr, 1, 2);
-    uint8_t* addr = (uint8_t*) cache.GetFreeSlotAndLock(1, &meta2, "b", "so", "st");
+    uint8_t* addr = (uint8_t*) cache.GetFreeSlotAndLock(1, &meta2, "b", "so", "st", &err);
     set_array(addr, 1, 1);
     ASSERT_THAT(ini_addr[0], Eq(2));
     ASSERT_THAT(ini_addr[1], Eq(1));
@@ -58,8 +62,9 @@ TEST_F(DataCacheTests, GetFreeSlotStartsFromLastPointer) {
 }
 
 TEST_F(DataCacheTests, GetFreeSlotLocks) {
-    uint8_t* ini_addr = (uint8_t*) cache.GetFreeSlotAndLock(1, &meta1, "b", "so", "st");
-    uint8_t* addr = (uint8_t*) cache.GetFreeSlotAndLock(expected_cache_size, &meta2, "b", "so", "st");
+    asapo::Error err;
+    uint8_t* ini_addr = (uint8_t*) cache.GetFreeSlotAndLock(1, &meta1, "b", "so", "st", &err);
+    uint8_t* addr = (uint8_t*) cache.GetFreeSlotAndLock(expected_cache_size, &meta2, "b", "so", "st", &err);
     ASSERT_THAT(ini_addr, Ne(nullptr));
     ASSERT_THAT(addr, Eq(nullptr));
     ASSERT_THAT(meta1, Ne(nullptr));
@@ -67,19 +72,21 @@ TEST_F(DataCacheTests, GetFreeSlotLocks) {
 }
 
 TEST_F(DataCacheTests, GetFreeSlotStartsFromBeginIfNotFit) {
-    uint8_t* ini_addr = (uint8_t*) cache.GetFreeSlotAndLock(1, &meta1, "b", "so", "st");
+    asapo::Error err;
+    uint8_t* ini_addr = (uint8_t*) cache.GetFreeSlotAndLock(1, &meta1, "b", "so", "st", &err);
     auto id = meta1->id;
     set_array(ini_addr, 1, 2);
     cache.UnlockSlot(meta1);
-    uint8_t* addr = (uint8_t*) cache.GetFreeSlotAndLock(expected_cache_size, &meta2, "b", "so", "st");
+    uint8_t* addr = (uint8_t*) cache.GetFreeSlotAndLock(expected_cache_size, &meta2, "b", "so", "st", &err);
     set_array(addr, expected_cache_size, 1);
     ASSERT_THAT(ini_addr[0], Eq(1));
     ASSERT_THAT(id, Ne(meta2->id));
 }
 
 TEST_F(DataCacheTests, GetFreeSlotCannotWriteIfAlreadyWriting) {
-    cache.GetFreeSlotAndLock(1, &meta1, "b", "so", "st");
-    uint8_t* addr = (uint8_t*) cache.GetFreeSlotAndLock(expected_cache_size, &meta2, "b", "so", "st");
+    asapo::Error err;
+    cache.GetFreeSlotAndLock(1, &meta1, "b", "so", "st", &err);
+    uint8_t* addr = (uint8_t*) cache.GetFreeSlotAndLock(expected_cache_size, &meta2, "b", "so", "st", &err);
     ASSERT_THAT(addr, Eq(nullptr));
     ASSERT_THAT(meta2, Eq(nullptr));
 
@@ -94,8 +101,9 @@ TEST_F(DataCacheTests, PrepareToReadIdNotFound) {
 }
 
 TEST_F(DataCacheTests, PrepareToReadOk) {
+    asapo::Error err;
     auto data_size = static_cast<uint64_t>(static_cast<double>(expected_cache_size) * 0.7);
-    uint8_t* ini_addr = (uint8_t*) cache.GetFreeSlotAndLock(data_size, &meta1, "b1", "so1", "st1");
+    uint8_t* ini_addr = (uint8_t*) cache.GetFreeSlotAndLock(data_size, &meta1, "b1", "so1", "st1", &err);
 
     uint8_t* addr = (uint8_t*) cache.GetSlotToReadAndLock(meta1->id, data_size, &meta2);
     ASSERT_THAT(addr, Eq(ini_addr));
@@ -113,8 +121,9 @@ TEST_F(DataCacheTests, PrepareToReadOk) {
 
 
 TEST_F(DataCacheTests, PrepareToReadFailsIfTooCloseToCurrentPointer) {
+    asapo::Error err;
     auto data_size = static_cast<uint64_t>(static_cast<double>(expected_cache_size) * 0.9);
-    cache.GetFreeSlotAndLock(data_size, &meta1, "b", "so", "st");
+    cache.GetFreeSlotAndLock(data_size, &meta1, "b", "so", "st", &err);
 
     uint8_t* addr = (uint8_t*) cache.GetSlotToReadAndLock(meta1->id, data_size, &meta2);
     ASSERT_THAT(addr, Eq(nullptr));
@@ -123,11 +132,12 @@ TEST_F(DataCacheTests, PrepareToReadFailsIfTooCloseToCurrentPointer) {
 
 TEST_F(DataCacheTests, GetFreeSlotRemovesOldMetadataRecords) {
     DataCache cache{expected_cache_size, 0};
+    asapo::Error err;
     CacheMeta* meta3, *meta4, *meta5;
     CacheMeta* meta;
-    cache.GetFreeSlotAndLock(10, &meta1, "b1", "so1", "st1");
-    cache.GetFreeSlotAndLock(10, &meta2, "b2", "so2", "st2");
-    cache.GetFreeSlotAndLock(expected_cache_size - 30, &meta3, "b3", "so3", "st3");
+    cache.GetFreeSlotAndLock(10, &meta1, "b1", "so1", "st1", &err);
+    cache.GetFreeSlotAndLock(10, &meta2, "b2", "so2", "st2", &err);
+    cache.GetFreeSlotAndLock(expected_cache_size - 30, &meta3, "b3", "so3", "st3", &err);
     auto id1 = meta1->id;
     auto id2 = meta2->id;
     auto id3 = meta3->id;
@@ -136,11 +146,11 @@ TEST_F(DataCacheTests, GetFreeSlotRemovesOldMetadataRecords) {
     cache.UnlockSlot(meta2);
     cache.UnlockSlot(meta3);
 
-    cache.GetFreeSlotAndLock(10, &meta4, "b4", "so4", "st4");
+    cache.GetFreeSlotAndLock(10, &meta4, "b4", "so4", "st4", &err);
     auto id4 = meta4->id;
 
     cache.UnlockSlot(meta4);
-    cache.GetFreeSlotAndLock(30, &meta5, "b5", "so5", "st5");
+    cache.GetFreeSlotAndLock(30, &meta5, "b5", "so5", "st5", &err);
     uint8_t* addr1 = (uint8_t*) cache.GetSlotToReadAndLock(id1, 10, &meta);
     uint8_t* addr2 = (uint8_t*) cache.GetSlotToReadAndLock(id2, 10, &meta);
     uint8_t* addr3 = (uint8_t*) cache.GetSlotToReadAndLock(id3, expected_cache_size - 30, &meta);
@@ -157,22 +167,23 @@ TEST_F(DataCacheTests, GetFreeSlotRemovesOldMetadataRecords) {
 }
 
 TEST_F(DataCacheTests, GetFreeSlotRemovesOldWhenCrossTheBoundary) {
+    asapo::Error err;
     DataCache cache{expected_cache_size, 0};
     CacheMeta* meta1, *meta2, *meta3;
     CacheMeta* meta4, *meta5, *meta;
-    auto addr1_alloc = cache.GetFreeSlotAndLock(expected_cache_size / 3 - 1, &meta1, "b", "so", "st");
-    auto addr2_alloc = cache.GetFreeSlotAndLock(expected_cache_size / 3 - 1, &meta2, "b", "so", "st");
-    auto addr3_alloc = cache.GetFreeSlotAndLock(expected_cache_size / 3 - 1, &meta3, "b", "so", "st");
+    auto addr1_alloc = cache.GetFreeSlotAndLock(expected_cache_size / 3 - 1, &meta1, "b", "so", "st", &err);
+    auto addr2_alloc = cache.GetFreeSlotAndLock(expected_cache_size / 3 - 1, &meta2, "b", "so", "st", &err);
+    auto addr3_alloc = cache.GetFreeSlotAndLock(expected_cache_size / 3 - 1, &meta3, "b", "so", "st", &err);
     auto id1 = meta1->id;
     auto id2 = meta2->id;
     auto id3 = meta3->id;
     cache.UnlockSlot(meta1);
     cache.UnlockSlot(meta2);
     cache.UnlockSlot(meta3);
-    auto addr4_alloc = cache.GetFreeSlotAndLock(expected_cache_size / 2 + 5, &meta4, "b", "so", "st");
+    auto addr4_alloc = cache.GetFreeSlotAndLock(expected_cache_size / 2 + 5, &meta4, "b", "so", "st", &err);
     auto id4 = meta4->id;
     cache.UnlockSlot(meta4);
-    auto addr5_alloc = cache.GetFreeSlotAndLock(expected_cache_size / 2 + 5, &meta5, "b", "so", "st");
+    auto addr5_alloc = cache.GetFreeSlotAndLock(expected_cache_size / 2 + 5, &meta5, "b", "so", "st", &err);
     auto id5 = meta5->id;
 
     uint8_t* addr1 = (uint8_t*) cache.GetSlotToReadAndLock(id1, expected_cache_size / 3 - 1, &meta);
@@ -200,8 +211,9 @@ TEST_F(DataCacheTests, GetFreeSlotRemovesOldWhenCrossTheBoundary) {
 
 
 TEST_F(DataCacheTests, GetSlotToReadSizeOk) {
+    asapo::Error err;
     CacheMeta* meta;
-    cache.GetFreeSlotAndLock(expected_size, &meta1, "b", "so", "st");
+    cache.GetFreeSlotAndLock(expected_size, &meta1, "b", "so", "st", &err);
 
     uint8_t* addr1 = (uint8_t*) cache.GetSlotToReadAndLock(meta1->id, expected_size, &meta);
 
@@ -213,8 +225,9 @@ TEST_F(DataCacheTests, GetSlotToReadSizeOk) {
 }
 
 TEST_F(DataCacheTests, GetSlotToReadWrongSize) {
+    asapo::Error err;
     CacheMeta* meta;
-    cache.GetFreeSlotAndLock(expected_size, &meta1, "b", "so", "st");
+    cache.GetFreeSlotAndLock(expected_size, &meta1, "b", "so", "st", &err);
 
     uint8_t* addr1 = (uint8_t*) cache.GetSlotToReadAndLock(meta1->id, expected_size + 1, &meta);
 
@@ -225,10 +238,11 @@ TEST_F(DataCacheTests, GetSlotToReadWrongSize) {
 
 TEST_F(DataCacheTests, CannotGetFreeSlotIfNeedCleanOnebeingReaded) {
     CacheMeta* meta;
-
-    uint8_t* ini_addr = (uint8_t*) cache.GetFreeSlotAndLock(10, &meta1, "b", "so", "st");
+    asapo::Error err;
+    
+    uint8_t* ini_addr = (uint8_t*) cache.GetFreeSlotAndLock(10, &meta1, "b", "so", "st", &err);
     auto res = cache.GetSlotToReadAndLock(meta1->id, 10, &meta);
-    uint8_t* addr = (uint8_t*) cache.GetFreeSlotAndLock(expected_cache_size, &meta2, "b", "so", "st");
+    uint8_t* addr = (uint8_t*) cache.GetFreeSlotAndLock(expected_cache_size, &meta2, "b", "so", "st", &err);
 
     ASSERT_THAT(ini_addr, Ne(nullptr));
     ASSERT_THAT(res, Eq(ini_addr));
@@ -237,51 +251,55 @@ TEST_F(DataCacheTests, CannotGetFreeSlotIfNeedCleanOnebeingReaded) {
 
 
 TEST_F(DataCacheTests, CanGetFreeSlotIfWasUnlocked) {
+    asapo::Error err;    
     CacheMeta* meta;
-    cache.GetFreeSlotAndLock(10, &meta1, "b", "so", "st");
+    cache.GetFreeSlotAndLock(10, &meta1, "b", "so", "st", &err);
     cache.UnlockSlot(meta1);
     cache.GetSlotToReadAndLock(meta1->id, 10, &meta);
     cache.UnlockSlot(meta);
-    auto addr = cache.GetFreeSlotAndLock(expected_cache_size, &meta2, "b", "so", "st");
+    auto addr = cache.GetFreeSlotAndLock(expected_cache_size, &meta2, "b", "so", "st", &err);
 
     ASSERT_THAT(addr, Ne(nullptr));
 }
 
 TEST_F(DataCacheTests, IncreasLockForEveryRead) {
+    asapo::Error err;
     CacheMeta* meta;
-    cache.GetFreeSlotAndLock(10, &meta1, "b", "so", "st");
+    cache.GetFreeSlotAndLock(10, &meta1, "b", "so", "st", &err);
     cache.GetSlotToReadAndLock(meta1->id, 10, &meta);
     cache.GetSlotToReadAndLock(meta1->id, 10, &meta);
     cache.UnlockSlot(meta);
-    auto addr = cache.GetFreeSlotAndLock(expected_cache_size, &meta2, "b", "so", "st");
+    auto addr = cache.GetFreeSlotAndLock(expected_cache_size, &meta2, "b", "so", "st", &err);
 
     ASSERT_THAT(addr, Eq(nullptr));
 }
 
 TEST_F(DataCacheTests, DecreasLockForEveryUnlock) {
+    asapo::Error err;
     CacheMeta* meta;
-    cache.GetFreeSlotAndLock(10, &meta1, "b", "so", "st");
+    cache.GetFreeSlotAndLock(10, &meta1, "b", "so", "st", &err);
     cache.UnlockSlot(meta1);
 
     cache.GetSlotToReadAndLock(meta1->id, 10, &meta);
     cache.GetSlotToReadAndLock(meta1->id, 10, &meta);
     cache.UnlockSlot(meta);
     cache.UnlockSlot(meta);
-    auto addr = cache.GetFreeSlotAndLock(expected_cache_size, &meta2, "b", "so", "st");
+    auto addr = cache.GetFreeSlotAndLock(expected_cache_size, &meta2, "b", "so", "st", &err);
 
     ASSERT_THAT(addr, Ne(nullptr));
 }
 
 
 TEST_F(DataCacheTests, GetFreeSlotCreatesCorrectIds) {
+    asapo::Error err;
     CacheMeta* meta3, *meta4;
-    cache.GetFreeSlotAndLock(10, &meta1, "b", "so", "st");
+    cache.GetFreeSlotAndLock(10, &meta1, "b", "so", "st", &err);
     std::this_thread::sleep_for(std::chrono::milliseconds(100));
-    cache.GetFreeSlotAndLock(10, &meta2, "b", "so", "st");
+    cache.GetFreeSlotAndLock(10, &meta2, "b", "so", "st", &err);
     std::this_thread::sleep_for(std::chrono::milliseconds(10));
-    cache.GetFreeSlotAndLock(10, &meta3, "b", "so", "st");
+    cache.GetFreeSlotAndLock(10, &meta3, "b", "so", "st", &err);
     std::this_thread::sleep_for(std::chrono::milliseconds(1));
-    cache.GetFreeSlotAndLock(10, &meta4, "b", "so", "st");
+    cache.GetFreeSlotAndLock(10, &meta4, "b", "so", "st", &err);
 
     auto c1 = static_cast<uint32_t>(meta1->id);
     auto c2 = static_cast<uint32_t>(meta2->id);
@@ -306,61 +324,4 @@ TEST_F(DataCacheTests, GetFreeSlotCreatesCorrectIds) {
 
 }
 
-/*
-TEST_F(DataCacheTests, AllMetaInfosAsVector) {
-    cache.GetFreeSlotAndLock(20, &meta1, "b1", "so1", "st1");
-    cache.GetFreeSlotAndLock(30, &meta1, "b1", "so1", "st2");
-    cache.GetFreeSlotAndLock(40, &meta1, "b1", "so2", "st1");
-    cache.GetFreeSlotAndLock(50, &meta1, "b2", "so2", "st1");
-    cache.GetFreeSlotAndLock(60, &meta1, "b1", "so1", "st1");
-
-    auto metas = cache.AllMetaInfosAsVector();
-
-    EXPECT_THAT(metas.size(), Eq(5));
-
-    ASSERT_THAT(metas.size(), Ge(1));
-    EXPECT_THAT(metas[0]->beamtime, Eq("b1"));
-    EXPECT_THAT(metas[0]->source, Eq("so1"));
-    EXPECT_THAT(metas[0]->stream, Eq("st1"));
-
-    ASSERT_THAT(metas.size(), Ge(2));
-    EXPECT_THAT(metas[1]->beamtime, Eq("b1"));
-    EXPECT_THAT(metas[1]->source, Eq("so1"));
-    EXPECT_THAT(metas[1]->stream, Eq("st2"));
-
-    ASSERT_THAT(metas.size(), Ge(3));
-    EXPECT_THAT(metas[2]->beamtime, Eq("b1"));
-    EXPECT_THAT(metas[2]->source, Eq("so2"));
-    EXPECT_THAT(metas[2]->stream, Eq("st1"));
-
-    ASSERT_THAT(metas.size(), Ge(4));
-    EXPECT_THAT(metas[3]->beamtime, Eq("b2"));
-    EXPECT_THAT(metas[3]->source, Eq("so2"));
-    EXPECT_THAT(metas[3]->stream, Eq("st1"));
-
-    ASSERT_THAT(metas.size(), Ge(5));
-    EXPECT_THAT(metas[4]->beamtime, Eq("b1"));
-    EXPECT_THAT(metas[4]->source, Eq("so1"));
-    EXPECT_THAT(metas[4]->stream, Eq("st1"));
-}
-
-TEST_F(DataCacheTests, AllMetaInfosAsVector_deleted) {
-    cache.GetFreeSlotAndLock(20, &meta1, "b1", "so1", "st1");
-    cache.UnlockSlot(meta1);
-    cache.GetFreeSlotAndLock(30, &meta1, "b1", "so1", "st2");
-    cache.UnlockSlot(meta1);
-    cache.GetFreeSlotAndLock(40, &meta1, "b1", "so2", "st1");
-    cache.UnlockSlot(meta1);
-
-    cache.GetFreeSlotAndLock(expected_cache_size, &meta2, "b3", "so3", "st3");
-
-    auto metas = cache.AllMetaInfosAsVector();
-
-    ASSERT_THAT(metas.size(), Eq(1));
-    EXPECT_THAT(metas[0]->beamtime, Eq("b1"));
-    EXPECT_THAT(metas[0]->source, Eq("so1"));
-    EXPECT_THAT(metas[0]->stream, Eq("st1"));
-}
- */
-
 }
diff --git a/receiver/unittests/test_receiver.cpp b/receiver/unittests/test_receiver.cpp
index 1d032201124db3a5a4644a9485d3bd57c7280fa0..23249877d13ae835ff9f41f915ddbb0dd1458a86 100644
--- a/receiver/unittests/test_receiver.cpp
+++ b/receiver/unittests/test_receiver.cpp
@@ -12,7 +12,7 @@ using namespace asapo;
 namespace {
 
 TEST(Receiver, Constructor) {
-    asapo::Receiver receiver(nullptr, nullptr);
+    asapo::Receiver receiver(nullptr, nullptr,nullptr);
     ASSERT_THAT(dynamic_cast<const asapo::AbstractLogger*>(receiver.log__), Ne(nullptr));
     ASSERT_THAT(dynamic_cast<asapo::IO*>(receiver.io__.get()), Ne(nullptr));
 }
@@ -31,7 +31,7 @@ class StartListenerFixture : public testing::Test {
     Error err;
     ::testing::NiceMock<asapo::MockLogger> mock_logger;
     ::testing::NiceMock<asapo::MockIO> mock_io;
-    asapo::Receiver receiver{nullptr, nullptr};
+    asapo::Receiver receiver{nullptr, nullptr,nullptr};
 
     void SetUp() override {
         err = nullptr;
diff --git a/receiver/unittests/test_request.cpp b/receiver/unittests/test_request.cpp
index 39f3d6b6813c6b7097a846415a84a18a7e2f84d8..09df5cc7c7806699fdd427c6dfdaae98a870fe66 100644
--- a/receiver/unittests/test_request.cpp
+++ b/receiver/unittests/test_request.cpp
@@ -70,6 +70,7 @@ class RequestTests : public Test {
                   Return(0)
                  ));
     }
+    void MockAllocateRequestSlot();
     void TearDown() override {
         request->io__.release();
     }
@@ -77,8 +78,19 @@ class RequestTests : public Test {
 };
 
 
-TEST_F(RequestTests, HandleProcessesRequests) {
+void RequestTests::MockAllocateRequestSlot()
+{
+    request->cache__ = &mock_cache;
+    asapo::CacheMeta meta;
+    EXPECT_CALL(mock_cache, GetFreeSlotAndLock_t(data_size_, _, _,_,_,_)).WillOnce(
+        DoAll(SetArgPointee<1>(&meta),
+              SetArgPointee<5>(nullptr),
+              Return(&mock_cache)
+        ));
+    request->PrepareDataBufferAndLockIfNeeded();
+}
 
+TEST_F(RequestTests, HandleProcessesRequests) {
     MockReqestHandler mock_request_handler;
 
     EXPECT_CALL(mock_request_handler, ProcessRequest_t(_)).WillOnce(
@@ -87,12 +99,14 @@ TEST_F(RequestTests, HandleProcessesRequests) {
         Return(new asapo::IOError("Test Send Error", "", asapo::IOErrorType::kUnknownIOError))
     );
 
+    MockAllocateRequestSlot();
     request->AddHandler(&mock_request_handler);
     request->AddHandler(&mock_request_handler);
 
     EXPECT_CALL(*mock_instanced_statistics, StartTimer(asapo::StatisticEntity::kDisk)).Times(2);
 
     EXPECT_CALL(*mock_instanced_statistics, StopTimer()).Times(2);
+    EXPECT_CALL(mock_cache, UnlockSlot(_));
 
     auto err = request->Handle();
 
diff --git a/tests/automatic/producer_receiver/CMakeLists.txt b/tests/automatic/producer_receiver/CMakeLists.txt
index aae8d4bf65ba9db9753972c71493c900cb5a8310..c3689a65a9c31061619848dbf12a02b393981075 100644
--- a/tests/automatic/producer_receiver/CMakeLists.txt
+++ b/tests/automatic/producer_receiver/CMakeLists.txt
@@ -1,5 +1,6 @@
 add_subdirectory(transfer_single_file)
 add_subdirectory(transfer_single_file_bypass_buffer)
+add_subdirectory(transfer_single_file_with_kafka)
 
 if (BUILD_PYTHON)
     add_subdirectory(transfer_single_file_write_to_raw)
diff --git a/tests/automatic/producer_receiver/transfer_single_file_with_kafka/CMakeLists.txt b/tests/automatic/producer_receiver/transfer_single_file_with_kafka/CMakeLists.txt
new file mode 100644
index 0000000000000000000000000000000000000000..daf1cbec1ffc1308ca52bfbce65ef2bc19a0e037
--- /dev/null
+++ b/tests/automatic/producer_receiver/transfer_single_file_with_kafka/CMakeLists.txt
@@ -0,0 +1,20 @@
+set(TARGET_NAME transfer-single-file_kafka)
+if (NOT WIN32)
+    set(SOURCE_FILES kafka_mock.cpp)
+
+    add_executable(${TARGET_NAME} ${SOURCE_FILES})
+    target_link_libraries(${TARGET_NAME} ${RDKAFKA_C_LIBRARIES})
+    target_include_directories(${TARGET_NAME} PUBLIC ${RDKAFKA_INCLUDE_DIR})
+
+    #use expression generator to get rid of VS adding Debug/Release folders
+    set_target_properties(${TARGET_NAME} PROPERTIES RUNTIME_OUTPUT_DIRECTORY
+            ${CMAKE_CURRENT_BINARY_DIR}$<$<CONFIG:Debug>:>
+            )
+
+    ################################
+    # Testing
+    ################################
+    prepare_asapo()
+
+    add_script_test("${TARGET_NAME}" "$<TARGET_FILE:dummy-data-producer>" nomem)
+endif()
\ No newline at end of file
diff --git a/tests/automatic/producer_receiver/transfer_single_file_with_kafka/check_linux.sh b/tests/automatic/producer_receiver/transfer_single_file_with_kafka/check_linux.sh
new file mode 100644
index 0000000000000000000000000000000000000000..6af384ac67285ab62fff4094197c7bea4bcdf572
--- /dev/null
+++ b/tests/automatic/producer_receiver/transfer_single_file_with_kafka/check_linux.sh
@@ -0,0 +1,71 @@
+#!/usr/bin/env bash
+
+set -e
+
+trap Cleanup EXIT
+
+database_name=db_test
+beamtime_id=asapo_test
+beamline=test
+receiver_root_folder=/tmp/asapo/receiver/files
+facility=test_facility
+year=2019
+receiver_folder=${receiver_root_folder}/${facility}/gpfs/${beamline}/${year}/data/${beamtime_id}
+receiver_folder_online=${receiver_root_folder}/beamline/${beamline}/current
+
+Cleanup() {
+    echo cleanup
+    set +e
+    nomad stop receiver
+    nomad run receiver_tcp.nmd
+    while true
+    do
+      sleep 1
+      curl --silent 127.0.0.1:8400/asapo-discovery/v0.1/asapo-receiver?protocol=v0.1 --stderr - | grep 127.0.0.1  || continue
+      echo recevier started
+      break
+    done
+	rm -rf ${receiver_root_folder}
+    echo "db.dropDatabase()" | mongo ${beamtime_id}_detector
+    influx -database ${database_name} -execute "drop series from statistics, RequestsRate"
+}
+
+rm -f bootstrap
+
+./transfer-single-file_kafka ${receiver_folder_online}/raw/1 & KAFKA_PID=$!
+
+echo "Started the kafka listener"
+
+while [ ! -f bootstrap ]; do
+    if ! kill -0 $KAFKA_PID > /dev/null 2>&1; then
+        echo Kafka listener exited unexpectedly
+        exit 1
+    fi
+    sleep 1
+done
+
+BOOTSTRAP=$(cat bootstrap)
+
+echo "Read kafka bootstrap: ${BOOTSTRAP}"
+
+nomad stop receiver
+nomad run -var receiver_kafka_metadata_broker_list="${BOOTSTRAP}" receiver_kafka.nmd
+while true
+do
+  sleep 1
+  curl --silent 127.0.0.1:8400/asapo-discovery/v0.1/asapo-receiver?protocol=v0.1 --stderr - | grep 127.0.0.1  || continue
+  echo recevier started
+  break
+done
+
+mkdir -p ${receiver_folder}
+mkdir -p ${receiver_folder_online}
+
+$1 localhost:8400 ${beamtime_id} 100 1 1  100 30
+
+ls -ln ${receiver_folder_online}/raw/1 | awk '{ print $5 }'| grep 100000
+
+wait $KAFKA_PID
+RESULT=$?
+
+echo "Mock kafka returned $RESULT"
diff --git a/tests/automatic/producer_receiver/transfer_single_file_with_kafka/kafka_mock.cpp b/tests/automatic/producer_receiver/transfer_single_file_with_kafka/kafka_mock.cpp
new file mode 100644
index 0000000000000000000000000000000000000000..8da0c966103be4de971edc3451273a7559374536
--- /dev/null
+++ b/tests/automatic/producer_receiver/transfer_single_file_with_kafka/kafka_mock.cpp
@@ -0,0 +1,92 @@
+#include <stdlib.h>
+#include <string.h>
+
+#include "librdkafka/rdkafka.h"
+#include "librdkafka/rdkafka_mock.h"
+
+int main(int argc, char *argv[]) {
+
+    char *expectedmsg;
+    asprintf(&expectedmsg, "{\"event\":\"IN_CLOSE_WRITE\",\"path\":\"%s\"}", argc > 1 ? argv[argc - 1] : "processed/1");
+
+    rd_kafka_conf_t *conf = rd_kafka_conf_new();
+    char errstr[256];
+
+    if(rd_kafka_conf_set(conf, "client.id", "MOCK", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
+        fprintf(stderr, "Failed to set kafka config: %s\n", errstr);
+        return EXIT_FAILURE;
+    }
+    /*if(rd_kafka_conf_set(conf, "bootstrap.servers", "127.0.0.1:23456", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
+        fprintf(stderr, "Failed to set kafka config: %s\n", errstr);
+        return EXIT_FAILURE;
+    }*/
+
+    rd_kafka_t *rkproducer = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
+    if(!rkproducer) {
+        fprintf(stderr, "Failed to create kafka producer: %s\n", errstr);
+        return EXIT_FAILURE;
+    }
+
+    rd_kafka_mock_cluster_t *mcluster = rd_kafka_mock_cluster_new(rkproducer, 1);
+    if(!mcluster) {
+        fprintf(stderr, "Failed to create kafka cluster: %s\n", errstr);
+        return EXIT_FAILURE;
+    }
+
+    const char* bootstrap = rd_kafka_mock_cluster_bootstraps(mcluster);
+
+    FILE *fp = fopen("bootstrap", "w");
+    fprintf(fp, "%s", bootstrap);
+    fclose(fp);
+
+    conf = rd_kafka_conf_new();
+    if(rd_kafka_conf_set(conf, "client.id", "MOCK_CONSUMER", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
+        fprintf(stderr, "Failed to set kafka config: %s\n", errstr);
+        return EXIT_FAILURE;
+    }
+    if(rd_kafka_conf_set(conf, "group.id", "asapo", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
+        fprintf(stderr, "Failed to set kafka config: %s\n", errstr);
+        return EXIT_FAILURE;
+    }
+    if(rd_kafka_conf_set(conf, "bootstrap.servers", bootstrap, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
+        fprintf(stderr, "Failed to set kafka config: %s\n", errstr);
+        return EXIT_FAILURE;
+    }
+
+    rd_kafka_t *rkconsumer = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));
+
+    if(!rkconsumer) {
+        fprintf(stderr, "Failed to create kafka consumer: %s\n", errstr);
+        return EXIT_FAILURE;
+    }
+
+    rd_kafka_poll_set_consumer(rkconsumer);
+    rd_kafka_topic_partition_list_t* subscription = rd_kafka_topic_partition_list_new(1);
+    rd_kafka_topic_partition_list_add(subscription, "asapo", RD_KAFKA_PARTITION_UA);
+    rd_kafka_resp_err_t err = rd_kafka_subscribe(rkconsumer, subscription);
+    if (err) {
+        fprintf(stderr, "Failed to subscribe to topic: %s\n", rd_kafka_err2str(err));
+        return EXIT_FAILURE;
+    }
+
+
+    rd_kafka_message_t *rkmessage = rd_kafka_consumer_poll(rkconsumer, 30000);
+
+    if(!rkmessage) {
+        fprintf(stderr, "No kafka message received\n");
+        return EXIT_FAILURE;
+    } else {
+        if (rkmessage->err) {
+            fprintf(stderr, "Got error: %s\n", rd_kafka_message_errstr(rkmessage));
+            return EXIT_FAILURE;
+        } else {
+            if (!strncmp((const char *)rkmessage->payload, expectedmsg, rkmessage->len)) {
+                fprintf(stdout, "Kafka message is correct: %.*s\n", (int)rkmessage->len, (const char *)rkmessage->payload);
+                return EXIT_SUCCESS;
+            } else {
+                fprintf(stderr, "Kafka message is incorrect: %.*s (expected %s)\n", (int)rkmessage->len, (const char *)rkmessage->payload, expectedmsg);
+                return EXIT_FAILURE;
+            }
+        }
+    }
+}
diff --git a/tests/automatic/settings/receiver_fabric.json.tpl.lin.in b/tests/automatic/settings/receiver_fabric.json.tpl.lin.in
index 7316f51ee18429a5b9a7c3ab42aa80e3a07d3b0b..ff23cda05e003c5f20dfee4a716a61743bf8892e 100644
--- a/tests/automatic/settings/receiver_fabric.json.tpl.lin.in
+++ b/tests/automatic/settings/receiver_fabric.json.tpl.lin.in
@@ -25,5 +25,8 @@
   "ListenPort": {{ env "NOMAD_PORT_recv" }},
   "Tag": "{{ env "NOMAD_ADDR_recv" }}",
   "ReceiveToDiskThresholdMB":50,
-  "LogLevel" : "debug"
+  "LogLevel" : "debug",
+  "Kafka" : {
+    "Enabled" : false
+  }
   }
diff --git a/tests/automatic/settings/receiver_kafka.json.tpl.lin.in b/tests/automatic/settings/receiver_kafka.json.tpl.lin.in
new file mode 100644
index 0000000000000000000000000000000000000000..f9d0fae47e197697f9174e37b2dfb87ce634e7ba
--- /dev/null
+++ b/tests/automatic/settings/receiver_kafka.json.tpl.lin.in
@@ -0,0 +1,37 @@
+{
+  "PerformanceDbServer":"localhost:8086",
+  "MonitorPerformance": true,
+  "PerformanceDbName": "db_test",
+  "DatabaseServer":"auto",
+  "DiscoveryServer": "localhost:8400/asapo-discovery",
+  "DataServer": {
+    "AdvertiseURI": "127.0.0.1:{{ env "NOMAD_PORT_recv_ds" }}",
+    "NThreads": 2,
+    "ListenPort": {{ env "NOMAD_PORT_recv_ds" }},
+    "NetworkMode": ["tcp"]
+  },
+  "Metrics": {
+    "Expose": true,
+    "ListenPort": {{ env "NOMAD_PORT_recv_metrics" }}
+  },
+  "DataCache": {
+    "Use": @RECEIVER_USE_CACHE@,
+    "SizeGB": 1,
+    "ReservedShare": 10
+  },
+  "AuthorizationServer": "localhost:8400/asapo-authorizer",
+  "AuthorizationInterval": 1000,
+  "ListenPort": {{ env "NOMAD_PORT_recv" }},
+  "Tag": "{{ env "NOMAD_ADDR_recv" }}",
+  "ReceiveToDiskThresholdMB":50,
+  "LogLevel" : "debug",
+  "Kafka" : {
+    "Enabled" : true,
+    "KafkaClient": {
+      "metadata.broker.list": "{{ env "NOMAD_META_receiver_kafka_metadata_broker_list" }}"
+    },
+    "KafkaTopics": {
+      "asapo": {}
+    }
+  }
+  }
diff --git a/tests/automatic/settings/receiver_tcp.json.tpl.lin.in b/tests/automatic/settings/receiver_tcp.json.tpl.lin.in
index bc26dd6b832b65a0f3ac91023f2e13ada2f626c7..386a9db3cd48193d1d335f938f0a16272670e99e 100644
--- a/tests/automatic/settings/receiver_tcp.json.tpl.lin.in
+++ b/tests/automatic/settings/receiver_tcp.json.tpl.lin.in
@@ -25,5 +25,8 @@
   "ListenPort": {{ env "NOMAD_PORT_recv" }},
   "Tag": "{{ env "NOMAD_ADDR_recv" }}",
   "ReceiveToDiskThresholdMB":50,
-  "LogLevel" : "debug"
+  "LogLevel" : "debug",
+  "Kafka" : {
+    "Enabled" : false
+  }
   }
diff --git a/tests/automatic/settings/receiver_tcp.json.tpl.win.in b/tests/automatic/settings/receiver_tcp.json.tpl.win.in
index be0b21a5a546672f2ca022acfc0ab2c0300f6d83..de39191ca2c7fb1ed0219d28dd5c47187d811429 100644
--- a/tests/automatic/settings/receiver_tcp.json.tpl.win.in
+++ b/tests/automatic/settings/receiver_tcp.json.tpl.win.in
@@ -25,5 +25,8 @@
   },
   "Tag": "{{ env "NOMAD_ADDR_recv" }}",
   "ReceiveToDiskThresholdMB":50,
-  "LogLevel" : "debug"
+  "LogLevel" : "debug",
+  "Kafka" : {
+    "Enabled" : false
+  }
 }