diff --git a/CHANGELOG.md b/CHANGELOG.md
index bf89287502ee13cf412ece5aa49a12d1e1070201..c56a4a3309a2b283ad7cb4fa93160ed9029c7920 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,10 @@
-## 21.09.0 (in progress)
+## 21.12.0 (in progress)
+
+FEATURES
+* Consumer API: Get last within consumer group returns message only once
+
+
+## 21.09.0
 
 FEATURES
 * Producer API: C client
@@ -15,6 +21,7 @@ BUG FIXES
 INTERNAL
 * Improved authoriation service caching
 * Added profiling for Go services
+* Added metrics and alerts for asapo services
 
 ## 21.06.0
 
diff --git a/CMakeIncludes/prepare_version_tag.cmake b/CMakeIncludes/prepare_version_tag.cmake
index e2a41293f288770013d83c7664018edbea285a6e..59de5c91e18dec4151309f5bcf23e60e4199c576 100644
--- a/CMakeIncludes/prepare_version_tag.cmake
+++ b/CMakeIncludes/prepare_version_tag.cmake
@@ -20,7 +20,6 @@ execute_process(COMMAND git rev-parse --short=10 HEAD
 string(STRIP ${ASAPO_VERSION_COMMIT} ASAPO_VERSION_COMMIT)
 
 if (${BRANCH} STREQUAL "master")
-    SET (ASAPO_VERSION_IN_DOCS ${ASAPO_TAG})
     SET (ASAPO_VERSION ${ASAPO_TAG})
     SET (ASAPO_VERSION_COMMIT "")
     SET (ASAPO_VERSION_DOCKER_SUFFIX "")
@@ -38,10 +37,6 @@ else()
     SET (ASAPO_WHEEL_VERSION ${ASAPO_VERSION})
 endif()
 
-string(REGEX REPLACE "\\.0([0-9]+)\\."
-        ".\\1." ASAPO_WHEEL_VERSION_IN_DOCS
-        ${ASAPO_VERSION_IN_DOCS})
-
 message("Asapo Version: " ${ASAPO_VERSION})
 message("Python Asapo Version: " ${PYTHON_ASAPO_VERSION})
 message("Asapo commit: " ${ASAPO_VERSION_COMMIT})
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 36302965720bf9265d21426f3afa9bb1aba779f4..b65a84e1e4c9d3f50a75dbae7e5709f682748138 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -3,16 +3,12 @@ project(ASAPO)
 
 set(CMAKE_MODULE_PATH ${PROJECT_SOURCE_DIR}/CMakeModules/ ${PROJECT_SOURCE_DIR}/CMakeIncludes/)
 
-set (ASAPO_VERSION_IN_DOCS 21.06.0) # is overwritten in master branch
-set (ASAPO_EXAMPLES_DIR .)
-#set (ASAPO_EXAMPLES_DIR ./frozen_versions/${ASAPO_VERSION_IN_DOCS})
-
 #protocol version changes if one of the microservice API's change
-set (ASAPO_CONSUMER_PROTOCOL "v0.4")
+set (ASAPO_CONSUMER_PROTOCOL "v0.5")
 set (ASAPO_PRODUCER_PROTOCOL "v0.4")
 set (ASAPO_DISCOVERY_API_VER "v0.1")
 set (ASAPO_AUTHORIZER_API_VER "v0.2")
-set (ASAPO_BROKER_API_VER "v0.4")
+set (ASAPO_BROKER_API_VER "v0.5")
 set (ASAPO_FILE_TRANSFER_SERVICE_API_VER "v0.2")
 set (ASAPO_RECEIVER_API_VER "v0.4")
 set (ASAPO_RDS_API_VER "v0.1")
diff --git a/PROTOCOL-VERSIONS.md b/PROTOCOL-VERSIONS.md
index 3b87af27833e8b8bfeab57e857ca531cac18a631..f91b0d2fb04d1393bab62174ebaad43ff84561ab 100644
--- a/PROTOCOL-VERSIONS.md
+++ b/PROTOCOL-VERSIONS.md
@@ -10,6 +10,7 @@
 ### Consumer Protocol
 | Release      | used by client      | Supported by server  | Status           |
 | ------------ | ------------------- | -------------------- | ---------------- |
+| v0.5         |   |    | In development  |
 | v0.4         | 21.06.0 - 21.09.0   | 21.06.0  - 21.09.0   | Current version  |
 | v0.3         | 21.03.3 - 21.03.3   | 21.03.3  - 21.09.0   | Deprecates from 01.07.2022  |
 | v0.2         | 21.03.2 - 21.03.2   | 21.03.2  - 21.09.0   | Deprecates from 01.06.2022  |
diff --git a/VERSIONS.md b/VERSIONS.md
index 05d9509333bb681295b0a3f1a6c4f852e62c49d7..438a72968942edef808eba10d776fe10aa0df84a 100644
--- a/VERSIONS.md
+++ b/VERSIONS.md
@@ -2,7 +2,8 @@
 
 | Release      | API changed\*\* |  Protocol | Supported by server from/to | Status              |Comment|
 | ------------ | ----------- | -------- | ------------------------- | --------------------- | ------- |
-| 21.09.0      | No         |  v0.4     | 21.09.0/21.09.0           | current version              |beamline token for raw |
+| 21.12.0      | No          |  v0.4     | 21.12.0/21.12.0           | in development  |      |
+| 21.09.0      | No          |  v0.4     | 21.09.0/21.09.0           | current version              |beamline token for raw |
 | 21.06.0      | Yes         |  v0.3     | 21.06.0/21.09.0           | deprecates 01.09.2022         |arbitrary characters|
 | 21.03.3      | No          |  v0.2     | 21.03.2/21.09.0           | deprecates 01.07.2022        |bugfix in server|
 | 21.03.2      | Yes         |  v0.2     | 21.03.2/21.09.0           | deprecates 01.07.2022        |bugfixes, add delete_stream|
@@ -13,6 +14,7 @@
 
 | Release      | API changed\*\* |  Protocol | Supported by server from/to | Status         |Comment|
 | ------------ | ----------- | --------- | ------------------------- | ---------------- | ------- |
+| 21.12.0      | Yes         |  v0.5      | 21.12.0/21.12.0           | in development  | |
 | 21.09.0      | No         |  v0.4      | 21.06.0/21.09.0           | current version  | |
 | 21.06.0      | Yes         |  v0.4     | 21.06.0/21.09.0           |   |arbitrary characters, bugfixes |
 | 21.03.3      | Yes         |  v0.3     | 21.03.3/21.09.0           | deprecates 01.06.2022  |bugfix in server, error type for dublicated ack|
diff --git a/broker/src/asapo_broker/database/mongodb.go b/broker/src/asapo_broker/database/mongodb.go
index 37eb03008fd1ab8df9b4f8c187e5542459ce5c11..0291da19f76a6235dbfcc947306038414a7b53e4 100644
--- a/broker/src/asapo_broker/database/mongodb.go
+++ b/broker/src/asapo_broker/database/mongodb.go
@@ -65,6 +65,9 @@ const inprocess_collection_name_prefix = "inprocess_"
 const meta_collection_name = "meta"
 const pointer_collection_name = "current_location"
 const pointer_field_name = "current_pointer"
+const last_message_collection_name = "last_messages"
+const last_message_field_name = "last_message"
+
 const no_session_msg = "database client not created"
 const already_connected_msg = "already connected"
 
@@ -74,6 +77,19 @@ const stream_filter_all = "all"
 const stream_filter_finished = "finished"
 const stream_filter_unfinished = "unfinished"
 
+const (
+	field_op_inc int = iota
+	field_op_set
+)
+
+type fieldChangeRequest struct {
+	collectionName string
+	fieldName string
+	op        int
+	max_ind   int
+	val       int
+}
+
 var dbSessionLock sync.Mutex
 var dbClientLock sync.RWMutex
 
@@ -217,20 +233,26 @@ func (db *Mongodb) setCounter(request Request, ind int) (err error) {
 	return
 }
 
-func (db *Mongodb) errorWhenCannotIncrementField(request Request, max_ind int) (err error) {
+func (db *Mongodb) errorWhenCannotSetField(request Request, max_ind int) error {
 	if res, err := db.getRecordFromDb(request, max_ind, max_ind); err == nil {
-		if err := checkStreamFinished(request, max_ind, max_ind, res); err != nil {
-			return err
+		if err2 := checkStreamFinished(request, max_ind, max_ind, res); err2 != nil {
+			return err2
 		}
 	}
 	return &DBError{utils.StatusNoData, encodeAnswer(max_ind, max_ind, "")}
 }
 
-func (db *Mongodb) incrementField(request Request, max_ind int, res interface{}) (err error) {
-	update := bson.M{"$inc": bson.M{pointer_field_name: 1}}
+func (db *Mongodb) changeField(request Request, change fieldChangeRequest, res interface{}) (err error) {
+	var update bson.M
+	if change.op == field_op_inc {
+		update = bson.M{"$inc": bson.M{change.fieldName: 1}}
+	} else if change.op == field_op_set {
+		update = bson.M{"$set": bson.M{change.fieldName: change.val}}
+	}
+
 	opts := options.FindOneAndUpdate().SetUpsert(true).SetReturnDocument(options.After)
-	q := bson.M{"_id": request.GroupId + "_" + request.Stream, pointer_field_name: bson.M{"$lt": max_ind}}
-	c := db.client.Database(request.DbName).Collection(pointer_collection_name)
+	q := bson.M{"_id": request.GroupId + "_" + request.Stream, change.fieldName: bson.M{"$lt": change.max_ind}}
+	c := db.client.Database(request.DbName).Collection(change.collectionName)
 
 	err = c.FindOneAndUpdate(context.TODO(), q, update, opts).Decode(res)
 	if err != nil {
@@ -242,7 +264,7 @@ func (db *Mongodb) incrementField(request Request, max_ind int, res interface{})
 			if err2 := c.FindOneAndUpdate(context.TODO(), q, update, opts).Decode(res); err2 == nil {
 				return nil
 			}
-			return db.errorWhenCannotIncrementField(request, max_ind)
+			return db.errorWhenCannotSetField(request, change.max_ind)
 		}
 		return &DBError{utils.StatusTransactionInterrupted, err.Error()}
 	}
@@ -421,7 +443,11 @@ func (db *Mongodb) getCurrentPointer(request Request) (LocationPointer, int, err
 	}
 
 	var curPointer LocationPointer
-	err = db.incrementField(request, max_ind, &curPointer)
+	err = db.changeField(request, fieldChangeRequest{
+		collectionName: pointer_collection_name,
+		fieldName: pointer_field_name,
+		op:        field_op_inc,
+		max_ind:   max_ind}, &curPointer)
 	if err != nil {
 		return LocationPointer{}, 0, err
 	}
@@ -631,6 +657,26 @@ func (db *Mongodb) getLastRecord(request Request) ([]byte, error) {
 	return db.getRecordByIDRaw(request, max_ind, max_ind)
 }
 
+func (db *Mongodb) getLastRecordInGroup(request Request) ([]byte, error) {
+	max_ind, err := db.getMaxIndex(request, false)
+	if err != nil {
+		return nil, err
+	}
+
+	var res map[string]interface{}
+	err = db.changeField(request, fieldChangeRequest{
+		collectionName: last_message_collection_name,
+		fieldName: last_message_field_name,
+		op:        field_op_set,
+		max_ind:   max_ind,
+		val:       max_ind,
+	}, &res)
+	if err != nil {
+		return nil, err
+	}
+	return db.getRecordByIDRaw(request, max_ind, max_ind)
+}
+
 func getSizeFilter(request Request) bson.M {
 	filter := bson.M{}
 	if request.ExtraParam == "false" { // do not return incomplete datasets
@@ -704,7 +750,7 @@ func (db *Mongodb) getMeta(request Request) ([]byte, error) {
 		logger.Debug(log_str)
 		return nil, &DBError{utils.StatusNoData, err.Error()}
 	}
-	userMeta,ok:=res["meta"]
+	userMeta, ok := res["meta"]
 	if !ok {
 		log_str := "error getting meta for " + id + " in " + request.DbName + " : cannot parse database response"
 		logger.Error(log_str)
@@ -1040,6 +1086,8 @@ func (db *Mongodb) ProcessRequest(request Request) (answer []byte, err error) {
 		return db.getRecordByID(request)
 	case "last":
 		return db.getLastRecord(request)
+	case "groupedlast":
+		return db.getLastRecordInGroup(request)
 	case "resetcounter":
 		return db.resetCounter(request)
 	case "size":
diff --git a/broker/src/asapo_broker/database/mongodb_test.go b/broker/src/asapo_broker/database/mongodb_test.go
index c81813611b03dbed3b7124ca1fa8227ce88ca897..09bf8ab1cb9f16605f5261ba17e29929b08b6e08 100644
--- a/broker/src/asapo_broker/database/mongodb_test.go
+++ b/broker/src/asapo_broker/database/mongodb_test.go
@@ -445,6 +445,60 @@ func TestMongoDBGetNextAfterGetLastCorrect(t *testing.T) {
 
 }
 
+func TestMongoDBGetGetLastInGroupCorrect(t *testing.T) {
+	db.Connect(dbaddress)
+	defer cleanup()
+	db.insertRecord(dbname, collection, &rec1)
+	db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next"}) // to check it does not influence groupedlast
+
+	res, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "groupedlast", ExtraParam: ""})
+	assert.Nil(t, err)
+	assert.Equal(t, string(rec1_expect), string(res))
+
+// first record - ok, then error
+	res, err = db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "groupedlast", ExtraParam: ""})
+	assert.NotNil(t, err)
+	if err != nil {
+		assert.Equal(t, utils.StatusNoData, err.(*DBError).Code)
+		assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":1,\"next_stream\":\"\"}", err.(*DBError).Message)
+	}
+// second record - ok, then error
+	db.insertRecord(dbname, collection, &rec2)
+	res, err = db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "groupedlast", ExtraParam: ""})
+	assert.Nil(t, err)
+	assert.Equal(t, string(rec2_expect), string(res))
+	res, err = db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "groupedlast", ExtraParam: ""})
+	assert.NotNil(t, err)
+
+// stream finished - immediately error
+	db.insertRecord(dbname, collection, &rec_finished3)
+	res, err = db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "groupedlast", ExtraParam: ""})
+	assert.NotNil(t, err)
+	if err != nil {
+		assert.Equal(t, utils.StatusNoData, err.(*DBError).Code)
+		assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":2,\"id_max\":2,\"next_stream\":\"next1\"}", err.(*DBError).Message)
+	}
+
+}
+
+func TestMongoDBGetGetLastInGroupImmediateErrorOnFinishStream(t *testing.T) {
+	db.Connect(dbaddress)
+	defer cleanup()
+	db.insertRecord(dbname, collection, &rec1)
+	db.insertRecord(dbname, collection, &rec2)
+	db.insertRecord(dbname, collection, &rec_finished3)
+	_, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "groupedlast", ExtraParam: ""})
+	assert.NotNil(t, err)
+	_, err = db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "groupedlast", ExtraParam: ""})
+	assert.NotNil(t, err)
+	if err != nil {
+		assert.Equal(t, utils.StatusNoData, err.(*DBError).Code)
+		assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":2,\"id_max\":2,\"next_stream\":\"next1\"}", err.(*DBError).Message)
+	}
+}
+
+
+
 func TestMongoDBGetSize(t *testing.T) {
 	db.Connect(dbaddress)
 	defer cleanup()
diff --git a/broker/src/asapo_broker/server/get_commands_test.go b/broker/src/asapo_broker/server/get_commands_test.go
index 3f34750b20451d69d8fc62f456070e905f1327b7..17f3aea5a32a1308f3ec4e7e49f5d894cd6c2aea 100644
--- a/broker/src/asapo_broker/server/get_commands_test.go
+++ b/broker/src/asapo_broker/server/get_commands_test.go
@@ -47,6 +47,7 @@ var testsGetCommand = []struct {
 	{"id", expectedSource,expectedStream, "", expectedStream + "/0/1","","1"},
 	{"meta", expectedSource,"default", "", "default/0/meta/0","","0"},
 	{"nacks",expectedSource, expectedStream, expectedGroupID, expectedStream + "/" + expectedGroupID + "/nacks","","0_0"},
+	{"groupedlast", expectedSource,expectedStream, expectedGroupID, expectedStream + "/" + expectedGroupID + "/groupedlast","",""},
 	{"next", expectedSource,expectedStream, expectedGroupID, expectedStream + "/" + expectedGroupID + "/next","",""},
 	{"next", expectedSource,expectedStream, expectedGroupID, expectedStream + "/" +
 		expectedGroupID + "/next","&resend_nacks=true&delay_ms=10000&resend_attempts=3","10000_3"},
diff --git a/broker/src/asapo_broker/server/get_groupedlast.go b/broker/src/asapo_broker/server/get_groupedlast.go
new file mode 100644
index 0000000000000000000000000000000000000000..747d6fcf0938dd1e1cb9ed78c9b8599f084bb002
--- /dev/null
+++ b/broker/src/asapo_broker/server/get_groupedlast.go
@@ -0,0 +1,9 @@
+package server
+
+import (
+	"net/http"
+)
+
+func routeGetGroupedLast(w http.ResponseWriter, r *http.Request) {
+	processRequest(w, r, "groupedlast", "", true)
+}
diff --git a/broker/src/asapo_broker/server/listroutes.go b/broker/src/asapo_broker/server/listroutes.go
index c6064e117787ebc6a1f97e69a2a4734e34db69c5..3c99b1a8a85f5a6dd4599df35d7e80f143df807a 100644
--- a/broker/src/asapo_broker/server/listroutes.go
+++ b/broker/src/asapo_broker/server/listroutes.go
@@ -35,6 +35,12 @@ var listRoutes = utils.Routes{
 		"/{apiver}/beamtime/{beamtime}/{datasource}/{stream}/0/last",
 		routeGetLast,
 	},
+	utils.Route{
+		"GetGroupedLast",
+		"Get",
+		"/{apiver}/beamtime/{beamtime}/{datasource}/{stream}/{groupid}/groupedlast",
+		routeGetGroupedLast,
+	},
 	utils.Route{
 		"GetLastAck",
 		"Get",
diff --git a/consumer/api/c/include/asapo/consumer_c.h b/consumer/api/c/include/asapo/consumer_c.h
index bc73c17e2ec3c7f8e9207e068c0549ca315dc5b2..dcab928d981c79f63a417c02174af91080388163 100644
--- a/consumer/api/c/include/asapo/consumer_c.h
+++ b/consumer/api/c/include/asapo/consumer_c.h
@@ -134,6 +134,11 @@ AsapoDataSetHandle asapo_consumer_get_last_dataset(AsapoConsumerHandle consumer,
         uint64_t min_size,
         const char* stream,
         AsapoErrorHandle* error);
+AsapoDataSetHandle asapo_consumer_get_last_dataset_ingroup(AsapoConsumerHandle consumer,
+        AsapoStringHandle group_id,
+        uint64_t min_size,
+        const char* stream,
+        AsapoErrorHandle* error);
 AsapoDataSetHandle asapo_consumer_get_dataset_by_id(AsapoConsumerHandle consumer,
         uint64_t id,
         uint64_t min_size,
@@ -144,11 +149,17 @@ int asapo_consumer_get_by_id(AsapoConsumerHandle consumer,
                              AsapoMessageMetaHandle* info,
                              AsapoMessageDataHandle* data,
                              const char* stream, AsapoErrorHandle* error);
-
 int asapo_consumer_get_last(AsapoConsumerHandle consumer,
                             AsapoMessageMetaHandle* info,
                             AsapoMessageDataHandle* data,
                             const char* stream, AsapoErrorHandle* error);
+
+int asapo_consumer_get_last_ingroup(AsapoConsumerHandle consumer,
+                                    AsapoStringHandle group_id,
+                                    AsapoMessageMetaHandle* info,
+                                    AsapoMessageDataHandle* data,
+                                    const char* stream, AsapoErrorHandle* error);
+
 int asapo_consumer_get_next(AsapoConsumerHandle consumer,
                             AsapoStringHandle group_id,
                             AsapoMessageMetaHandle* info,
diff --git a/consumer/api/cpp/include/asapo/consumer/consumer.h b/consumer/api/cpp/include/asapo/consumer/consumer.h
index f7a645f467a4d20feae40bbd045fe92eb0d5f74a..026d4c324cd583068173a6c9eb65428bb3711c54 100644
--- a/consumer/api/cpp/include/asapo/consumer/consumer.h
+++ b/consumer/api/cpp/include/asapo/consumer/consumer.h
@@ -88,7 +88,7 @@ class Consumer {
     virtual NetworkConnectionType CurrentConnectionType() const = 0;
 
     //! Get list of streams with filter, set from to "" to get all streams
-    virtual StreamInfos GetStreamList(std::string from,  StreamFilter filter, Error* err) = 0;
+    virtual StreamInfos GetStreamList(std::string from, StreamFilter filter, Error* err) = 0;
 
     //! Delete stream
     /*!
@@ -177,6 +177,17 @@ class Consumer {
     */
     virtual DataSet GetLastDataset(uint64_t min_size, std::string stream, Error* err) = 0;
 
+    //! Receive last available unique dataset which has min_size messages within a group. Will not return same dataset twice but EndOfStream error.
+    /*!
+      \param group_id - group id to use.
+      \param id - dataset id
+      \param err -  will be set to error data cannot be read or dataset size less than min_size, nullptr otherwise.
+      \param min_size - wait until dataset has min_size messages (0 for maximum size)
+      \param stream - stream to use
+      \return DataSet - information about the dataset
+    */
+    virtual DataSet GetLastDataset(std::string group_id, uint64_t min_size, std::string stream, Error* err) = 0;
+
     //! Receive dataset by id.
     /*!
       \param id - dataset id
@@ -214,6 +225,16 @@ class Consumer {
       \return Error if both pointers are nullptr or data cannot be read, nullptr otherwise.
     */
     virtual Error GetLast(MessageMeta* info, MessageData* data, std::string stream) = 0;
+    //! Receive last available unique message within group. Will not return same message twice but EndOfStream error
+    /*!
+      \param group_id - group id to use.
+      \param info -  where to store message metadata. Can be set to nullptr only message data is needed.
+      \param data - where to store message data. Can be set to nullptr only message metadata is needed.
+      \param stream - stream to use
+      \return Error if both pointers are nullptr, no data or data cannot be read, nullptr otherwise.
+    */
+    virtual Error GetLast(std::string group_id, MessageMeta* info, MessageData* data, std::string stream) = 0;
+
 
     //! Get all messages matching the query.
     /*!
diff --git a/consumer/api/cpp/src/consumer_c_glue.cpp b/consumer/api/cpp/src/consumer_c_glue.cpp
index 969804d432dfcfc09a82ab6ddc0913a606ea3546..83b83d2a1235ad4db8d02afa6fab63c1b1fa4f69 100644
--- a/consumer/api/cpp/src/consumer_c_glue.cpp
+++ b/consumer/api/cpp/src/consumer_c_glue.cpp
@@ -365,6 +365,21 @@ extern "C" {
         return handle_or_null_t(result, error, std::move(err),
                                 &asapo::ConsumerErrorTemplates::kPartialData);
     }
+//! wraps asapo::Consumer::GetLastDataset()
+/// \copydoc asapo::Consumer::GetLastDataset()
+/// \param[in] consumer the consumer that is acted upon
+/// the returned data set must be freed with asapo_free_handle() after use.
+    AsapoDataSetHandle asapo_consumer_get_last_dataset_ingroup(AsapoConsumerHandle consumer,
+            AsapoStringHandle group_id,
+            uint64_t min_size,
+            const char* stream,
+            AsapoErrorHandle* error) {
+        asapo::Error err;
+        auto result = new asapo::DataSet(consumer->handle->GetLastDataset(*group_id->handle, min_size, stream, &err));
+        return handle_or_null_t(result, error, std::move(err),
+                                &asapo::ConsumerErrorTemplates::kPartialData);
+    }
+
 
 //! wraps asapo::Consumer::GetLastAcknowledgedMessage()
 /// \copydoc asapo::Consumer::GetLastAcknowledgedMessage()
@@ -429,6 +444,22 @@ extern "C" {
         return process_error(error, std::move(err));
     }
 
+//! wraps asapo::Consumer::GetLast()
+/// \copydoc asapo::Consumer::GetLast()
+/// \param[in] consumer the consumer that is acted upon
+/// if data are retrieved (data != NULL) they must be freed with asapo_free_handle()
+    int asapo_consumer_get_last_ingroup(AsapoConsumerHandle consumer,
+                                        AsapoStringHandle group_id,
+                                        AsapoMessageMetaHandle* info,
+                                        AsapoMessageDataHandle* data,
+                                        const char* stream,
+                                        AsapoErrorHandle* error) {
+        dataGetterStart;
+        auto err = consumer->handle->GetLast(*group_id->handle, fi, data ? &d : nullptr, stream);
+        dataGetterStop;
+        return process_error(error, std::move(err));
+    }
+
 //! wraps asapo::Consumer::GetNext()
 /// \copydoc asapo::Consumer::GetNext()
 /// \param[in] consumer the consumer that is acted upon
diff --git a/consumer/api/cpp/src/consumer_impl.cpp b/consumer/api/cpp/src/consumer_impl.cpp
index 659ad07a261974ce1cf0dba75cddc107cdb463a7..02cd9b9818c012c4c69326d694a999beca43c90d 100644
--- a/consumer/api/cpp/src/consumer_impl.cpp
+++ b/consumer/api/cpp/src/consumer_impl.cpp
@@ -352,6 +352,16 @@ Error ConsumerImpl::GetNext(std::string group_id, MessageMeta* info, MessageData
                                 data);
 }
 
+Error ConsumerImpl::GetLast(std::string group_id, MessageMeta* info, MessageData* data, std::string stream) {
+    return GetMessageFromServer(GetMessageServerOperation::GetLastInGroup,
+                                0,
+                                std::move(group_id),
+                                std::move(stream),
+                                info,
+                                data);
+}
+
+
 Error ConsumerImpl::GetLast(MessageMeta* info, MessageData* data, std::string stream) {
     return GetMessageFromServer(GetMessageServerOperation::GetLast,
                                 0,
@@ -365,10 +375,12 @@ std::string ConsumerImpl::OpToUriCmd(GetMessageServerOperation op) {
     switch (op) {
     case GetMessageServerOperation::GetNext:
         return "next";
+    case GetMessageServerOperation::GetLastInGroup:
+        return "groupedlast";
     case GetMessageServerOperation::GetLast:
         return "last";
     default:
-        return "last";
+        return "error";
     }
 }
 
@@ -683,6 +695,12 @@ DataSet ConsumerImpl::GetLastDataset(uint64_t min_size, std::string stream, Erro
     return GetDatasetFromServer(GetMessageServerOperation::GetLast, 0, "0", std::move(stream), min_size, err);
 }
 
+DataSet ConsumerImpl::GetLastDataset(std::string group_id, uint64_t min_size, std::string stream, Error* err) {
+    return GetDatasetFromServer(GetMessageServerOperation::GetLastInGroup, 0, std::move(group_id), std::move(stream),
+                                min_size, err);
+}
+
+
 DataSet ConsumerImpl::GetDatasetFromServer(GetMessageServerOperation op,
                                            uint64_t id,
                                            std::string group_id, std::string stream,
diff --git a/consumer/api/cpp/src/consumer_impl.h b/consumer/api/cpp/src/consumer_impl.h
index b4d228021c5297cdf2573db47d67eaf3a58f1f09..e8228e60aacdb135a0bb560266269fe50578020c 100644
--- a/consumer/api/cpp/src/consumer_impl.h
+++ b/consumer/api/cpp/src/consumer_impl.h
@@ -14,7 +14,8 @@ namespace asapo {
 enum class GetMessageServerOperation {
     GetNext,
     GetLast,
-    GetID
+    GetID,
+    GetLastInGroup,
 };
 
 enum class OutputDataMode {
@@ -51,7 +52,6 @@ Error ProcessRequestResponce(const RequestInfo& request, const Error& server_err
 Error ConsumerErrorFromNoDataResponse(const std::string& response);
 Error ConsumerErrorFromPartialDataResponse(const std::string& response);
 DataSet DecodeDatasetFromResponse(std::string response, Error* err);
-
 class ConsumerImpl final : public asapo::Consumer {
   public:
     explicit ConsumerImpl(std::string server_uri, std::string source_path, bool has_filesystem,
@@ -76,6 +76,7 @@ class ConsumerImpl final : public asapo::Consumer {
     Error GetNext(std::string group_id, MessageMeta* info, MessageData* data, std::string stream) override;
 
     Error GetLast(MessageMeta* info, MessageData* data, std::string stream) override;
+    Error GetLast(std::string group_id, MessageMeta* info, MessageData* data, std::string stream) override;
 
     std::string GenerateNewGroupId(Error* err) override;
     std::string GetBeamtimeMeta(Error* err) override;
@@ -98,6 +99,7 @@ class ConsumerImpl final : public asapo::Consumer {
     DataSet GetNextDataset(std::string group_id, uint64_t min_size, std::string stream, Error* err) override;
 
     DataSet GetLastDataset(uint64_t min_size, std::string stream, Error* err) override;
+    DataSet GetLastDataset(std::string group_id, uint64_t min_size, std::string stream, Error* err) override;
 
     DataSet GetDatasetById(uint64_t id, uint64_t min_size, std::string stream, Error* err) override;
 
diff --git a/consumer/api/cpp/unittests/test_consumer_impl.cpp b/consumer/api/cpp/unittests/test_consumer_impl.cpp
index 0b783b42bf055819ae9f07860b4bdb3d34414c66..ef3ea07bf8e8abcd633ae2833c5d5748137c286e 100644
--- a/consumer/api/cpp/unittests/test_consumer_impl.cpp
+++ b/consumer/api/cpp/unittests/test_consumer_impl.cpp
@@ -153,7 +153,8 @@ class ConsumerImplTests : public Test {
     }
     void MockGetServiceUri(std::string service, std::string result) {
         EXPECT_CALL(mock_http_client, Get_t(HasSubstr(expected_server_uri + "/asapo-discovery/v0.1/" + service + "?token="
-                                                      + expected_token + "&protocol=" + expected_consumer_protocol), _,
+                                                      + expected_token + "&protocol=" + expected_consumer_protocol),
+                                            _,
                                             _)).WillOnce(DoAll(
                                                     SetArgPointee<1>(HttpCode::OK),
                                                     SetArgPointee<2>(nullptr),
@@ -239,22 +240,39 @@ TEST_F(ConsumerImplTests, DefaultStreamIsDetector) {
 TEST_F(ConsumerImplTests, GetNextUsesCorrectUriWithStream) {
     MockGetBrokerUri();
 
-    EXPECT_CALL(mock_http_client, Get_t(expected_broker_api + "/beamtime/beamtime_id/" + expected_data_source_encoded + "/"
-                                        +
-                                        expected_stream_encoded + "/" + expected_group_id_encoded + "/next?token="
-                                        + expected_token, _,
-                                        _)).WillOnce(DoAll(
-                                                SetArgPointee<1>(HttpCode::OK),
-                                                SetArgPointee<2>(nullptr),
-                                                Return("")));
+    EXPECT_CALL(mock_http_client,
+                Get_t(expected_broker_api + "/beamtime/beamtime_id/" + expected_data_source_encoded + "/"
+                      +
+                      expected_stream_encoded + "/" + expected_group_id_encoded + "/next?token="
+                      + expected_token, _,
+                      _)).WillOnce(DoAll(
+                                       SetArgPointee<1>(HttpCode::OK),
+                                       SetArgPointee<2>(nullptr),
+                                       Return("")));
     consumer->GetNext(expected_group_id, &info, nullptr, expected_stream);
 }
 
+TEST_F(ConsumerImplTests, GetLastOnceUsesCorrectUri) {
+    MockGetBrokerUri();
+
+    EXPECT_CALL(mock_http_client,
+                Get_t(expected_broker_api + "/beamtime/beamtime_id/" + expected_data_source_encoded + "/"
+                      + expected_stream_encoded +
+                      "/" + expected_group_id_encoded + "/groupedlast?token="
+                      + expected_token, _,
+                      _)).WillOnce(DoAll(
+                                       SetArgPointee<1>(HttpCode::OK),
+                                       SetArgPointee<2>(nullptr),
+                                       Return("")));
+    consumer->GetLast(expected_group_id, &info, nullptr, expected_stream);
+}
+
 TEST_F(ConsumerImplTests, GetLastUsesCorrectUri) {
     MockGetBrokerUri();
 
     EXPECT_CALL(mock_http_client,
-                Get_t(expected_broker_api + "/beamtime/beamtime_id/" + expected_data_source_encoded + "/" + expected_stream_encoded +
+                Get_t(expected_broker_api + "/beamtime/beamtime_id/" + expected_data_source_encoded + "/"
+                      + expected_stream_encoded +
                       "/0/last?token="
                       + expected_token, _,
                       _)).WillOnce(DoAll(
@@ -373,7 +391,6 @@ TEST_F(ConsumerImplTests, GetMessageReturnsUnsupportedClient) {
     ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kUnsupportedClient));
 }
 
-
 TEST_F(ConsumerImplTests, GetMessageReturnsIfBrokerUriEmpty) {
     EXPECT_CALL(mock_http_client, Get_t(HasSubstr(expected_server_uri + "/asapo-discovery/v0.1/asapo-broker"), _,
                                         _)).Times(AtLeast(2)).WillRepeatedly(DoAll(
@@ -603,7 +620,6 @@ TEST_F(ConsumerImplTests, GetMessageCallsRetriesReadFromFile) {
     consumer->GetNext(expected_group_id, &info, &data, expected_stream);
 }
 
-
 TEST_F(ConsumerImplTests, GenerateNewGroupIdReturnsErrorCreateGroup) {
     MockGetBrokerUri();
 
@@ -654,14 +670,15 @@ TEST_F(ConsumerImplTests, ResetCounterUsesCorrectUri) {
     MockGetBrokerUri();
     consumer->SetTimeout(100);
 
-    EXPECT_CALL(mock_http_client, Post_t(expected_broker_api + "/beamtime/beamtime_id/" + expected_data_source_encoded + "/"
-                                         +
-                                         expected_stream_encoded + "/" +
-                                         expected_group_id_encoded +
-                                         "/resetcounter?token=" + expected_token + "&value=10", _, _, _, _)).WillOnce(DoAll(
-                                                     SetArgPointee<3>(HttpCode::OK),
-                                                     SetArgPointee<4>(nullptr),
-                                                     Return("")));
+    EXPECT_CALL(mock_http_client,
+                Post_t(expected_broker_api + "/beamtime/beamtime_id/" + expected_data_source_encoded + "/"
+                       +
+                       expected_stream_encoded + "/" +
+                       expected_group_id_encoded +
+                       "/resetcounter?token=" + expected_token + "&value=10", _, _, _, _)).WillOnce(DoAll(
+                                   SetArgPointee<3>(HttpCode::OK),
+                                   SetArgPointee<4>(nullptr),
+                                   Return("")));
     auto err = consumer->SetLastReadMarker(expected_group_id, 10, expected_stream);
     ASSERT_THAT(err, Eq(nullptr));
 }
@@ -670,13 +687,14 @@ TEST_F(ConsumerImplTests, GetCurrentSizeUsesCorrectUri) {
     MockGetBrokerUri();
     consumer->SetTimeout(100);
 
-    EXPECT_CALL(mock_http_client, Get_t(expected_broker_api + "/beamtime/beamtime_id/" + expected_data_source_encoded + "/"
-                                        +
-                                        expected_stream_encoded + "/size?token="
-                                        + expected_token, _, _)).WillOnce(DoAll(
-                                                    SetArgPointee<1>(HttpCode::OK),
-                                                    SetArgPointee<2>(nullptr),
-                                                    Return("{\"size\":10}")));
+    EXPECT_CALL(mock_http_client,
+                Get_t(expected_broker_api + "/beamtime/beamtime_id/" + expected_data_source_encoded + "/"
+                      +
+                      expected_stream_encoded + "/size?token="
+                      + expected_token, _, _)).WillOnce(DoAll(
+                                  SetArgPointee<1>(HttpCode::OK),
+                                  SetArgPointee<2>(nullptr),
+                                  Return("{\"size\":10}")));
     asapo::Error err;
     auto size = consumer->GetCurrentSize(expected_stream, &err);
     ASSERT_THAT(err, Eq(nullptr));
@@ -950,12 +968,11 @@ TEST_F(ConsumerImplTests, GetNextDatasetUsesCorrectUri) {
 }
 
 TEST_F(ConsumerImplTests, GetNextErrorOnEmptyStream) {
-    MessageData  data;
+    MessageData data;
     auto err = consumer->GetNext(expected_group_id, &info, &data, "");
     ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kWrongInput));
 }
 
-
 TEST_F(ConsumerImplTests, GetDataSetReturnsMessageMetas) {
     asapo::Error err;
     MockGetBrokerUri();
@@ -1073,18 +1090,36 @@ TEST_F(ConsumerImplTests, GetDataSetReturnsParseError) {
 TEST_F(ConsumerImplTests, GetLastDatasetUsesCorrectUri) {
     MockGetBrokerUri();
 
-    EXPECT_CALL(mock_http_client, Get_t(expected_broker_api + "/beamtime/beamtime_id/" + expected_data_source_encoded + "/"
-                                        +
-                                        expected_stream_encoded + "/0/last?token="
-                                        + expected_token + "&dataset=true&minsize=1", _,
-                                        _)).WillOnce(DoAll(
-                                                SetArgPointee<1>(HttpCode::OK),
-                                                SetArgPointee<2>(nullptr),
-                                                Return("")));
+    EXPECT_CALL(mock_http_client,
+                Get_t(expected_broker_api + "/beamtime/beamtime_id/" + expected_data_source_encoded + "/"
+                      +
+                      expected_stream_encoded + "/0/last?token="
+                      + expected_token + "&dataset=true&minsize=1", _,
+                      _)).WillOnce(DoAll(
+                                       SetArgPointee<1>(HttpCode::OK),
+                                       SetArgPointee<2>(nullptr),
+                                       Return("")));
     asapo::Error err;
     consumer->GetLastDataset(1, expected_stream, &err);
 }
 
+TEST_F(ConsumerImplTests, GetLastDatasetInGroupUsesCorrectUri) {
+    MockGetBrokerUri();
+
+    EXPECT_CALL(mock_http_client,
+                Get_t(expected_broker_api + "/beamtime/beamtime_id/" + expected_data_source_encoded + "/"
+                      +
+                      expected_stream_encoded + "/" + expected_group_id_encoded + "/groupedlast?token="
+                      + expected_token + "&dataset=true&minsize=1", _,
+                      _)).WillOnce(DoAll(
+                                       SetArgPointee<1>(HttpCode::OK),
+                                       SetArgPointee<2>(nullptr),
+                                       Return("")));
+    asapo::Error err;
+    consumer->GetLastDataset(expected_group_id, 1, expected_stream, &err);
+}
+
+
 TEST_F(ConsumerImplTests, GetDatasetByIdUsesCorrectUri) {
     MockGetBrokerUri();
 
@@ -1103,14 +1138,15 @@ TEST_F(ConsumerImplTests, GetDatasetByIdUsesCorrectUri) {
 TEST_F(ConsumerImplTests, DeleteStreamUsesCorrectUri) {
     MockGetBrokerUri();
     std::string expected_delete_stream_query_string = "{\"ErrorOnNotExist\":true,\"DeleteMeta\":true}";
-    EXPECT_CALL(mock_http_client, Post_t(expected_broker_api + "/beamtime/beamtime_id/" + expected_data_source_encoded + "/"
-                                         + expected_stream_encoded + "/delete"
-                                         + "?token=" + expected_token, _,
-                                         expected_delete_stream_query_string, _, _)).WillOnce(DoAll(
-                                                     SetArgPointee<3>(HttpCode::OK),
-                                                     SetArgPointee<4>(nullptr),
-                                                     Return("")
-                                                 ));
+    EXPECT_CALL(mock_http_client,
+                Post_t(expected_broker_api + "/beamtime/beamtime_id/" + expected_data_source_encoded + "/"
+                       + expected_stream_encoded + "/delete"
+                       + "?token=" + expected_token, _,
+                       expected_delete_stream_query_string, _, _)).WillOnce(DoAll(
+                                   SetArgPointee<3>(HttpCode::OK),
+                                   SetArgPointee<4>(nullptr),
+                                   Return("")
+                               ));
 
     asapo::DeleteStreamOptions opt;
     opt.delete_meta = true;
@@ -1291,15 +1327,16 @@ TEST_F(ConsumerImplTests, GetMessageTriesToGetTokenAgainIfTransferFailed) {
 TEST_F(ConsumerImplTests, AcknowledgeUsesCorrectUri) {
     MockGetBrokerUri();
     auto expected_acknowledge_command = "{\"Op\":\"ackmessage\"}";
-    EXPECT_CALL(mock_http_client, Post_t(expected_broker_api + "/beamtime/beamtime_id/" + expected_data_source_encoded + "/"
-                                         +
-                                         expected_stream_encoded + "/" +
-                                         expected_group_id_encoded
-                                         + "/" + std::to_string(expected_dataset_id) + "?token="
-                                         + expected_token, _, expected_acknowledge_command, _, _)).WillOnce(DoAll(
-                                                     SetArgPointee<3>(HttpCode::OK),
-                                                     SetArgPointee<4>(nullptr),
-                                                     Return("")));
+    EXPECT_CALL(mock_http_client,
+                Post_t(expected_broker_api + "/beamtime/beamtime_id/" + expected_data_source_encoded + "/"
+                       +
+                       expected_stream_encoded + "/" +
+                       expected_group_id_encoded
+                       + "/" + std::to_string(expected_dataset_id) + "?token="
+                       + expected_token, _, expected_acknowledge_command, _, _)).WillOnce(DoAll(
+                                   SetArgPointee<3>(HttpCode::OK),
+                                   SetArgPointee<4>(nullptr),
+                                   Return("")));
 
     auto err = consumer->Acknowledge(expected_group_id, expected_dataset_id, expected_stream);
 
@@ -1308,13 +1345,16 @@ TEST_F(ConsumerImplTests, AcknowledgeUsesCorrectUri) {
 
 void ConsumerImplTests::ExpectIdList(bool error) {
     MockGetBrokerUri();
-    EXPECT_CALL(mock_http_client, Get_t(expected_broker_api + "/beamtime/beamtime_id/" + expected_data_source_encoded + "/"
-                                        +
-                                        expected_stream_encoded + "/" +
-                                        expected_group_id_encoded + "/nacks?token=" + expected_token + "&from=1&to=0", _, _)).WillOnce(DoAll(
-                                                    SetArgPointee<1>(HttpCode::OK),
-                                                    SetArgPointee<2>(nullptr),
-                                                    Return(error ? "" : "{\"unacknowledged\":[1,2,3]}")));
+    EXPECT_CALL(mock_http_client,
+                Get_t(expected_broker_api + "/beamtime/beamtime_id/" + expected_data_source_encoded + "/"
+                      +
+                      expected_stream_encoded + "/" +
+                      expected_group_id_encoded + "/nacks?token=" + expected_token + "&from=1&to=0",
+                      _,
+                      _)).WillOnce(DoAll(
+                                       SetArgPointee<1>(HttpCode::OK),
+                                       SetArgPointee<2>(nullptr),
+                                       Return(error ? "" : "{\"unacknowledged\":[1,2,3]}")));
 }
 
 TEST_F(ConsumerImplTests, GetUnAcknowledgedListReturnsIds) {
@@ -1327,13 +1367,14 @@ TEST_F(ConsumerImplTests, GetUnAcknowledgedListReturnsIds) {
 }
 
 void ConsumerImplTests::ExpectLastAckId(bool empty_response) {
-    EXPECT_CALL(mock_http_client, Get_t(expected_broker_api + "/beamtime/beamtime_id/" + expected_data_source_encoded + "/"
-                                        +
-                                        expected_stream_encoded + "/" +
-                                        expected_group_id_encoded + "/lastack?token=" + expected_token, _, _)).WillOnce(DoAll(
-                                                    SetArgPointee<1>(HttpCode::OK),
-                                                    SetArgPointee<2>(nullptr),
-                                                    Return(empty_response ? "{\"lastAckId\":0}" : "{\"lastAckId\":1}")));
+    EXPECT_CALL(mock_http_client,
+                Get_t(expected_broker_api + "/beamtime/beamtime_id/" + expected_data_source_encoded + "/"
+                      +
+                      expected_stream_encoded + "/" +
+                      expected_group_id_encoded + "/lastack?token=" + expected_token, _, _)).WillOnce(DoAll(
+                                  SetArgPointee<1>(HttpCode::OK),
+                                  SetArgPointee<2>(nullptr),
+                                  Return(empty_response ? "{\"lastAckId\":0}" : "{\"lastAckId\":1}")));
 }
 
 TEST_F(ConsumerImplTests, GetLastAcknowledgeUsesOk) {
@@ -1382,16 +1423,17 @@ TEST_F(ConsumerImplTests, ResendNacks) {
 TEST_F(ConsumerImplTests, NegativeAcknowledgeUsesCorrectUri) {
     MockGetBrokerUri();
     auto expected_neg_acknowledge_command = R"({"Op":"negackmessage","Params":{"DelayMs":10000}})";
-    EXPECT_CALL(mock_http_client, Post_t(expected_broker_api + "/beamtime/beamtime_id/" + expected_data_source_encoded + "/"
-                                         +
-                                         expected_stream_encoded + "/" +
-                                         expected_group_id_encoded
-                                         + "/" + std::to_string(expected_dataset_id) + "?token="
-                                         + expected_token, _, expected_neg_acknowledge_command, _, _)).WillOnce(
-                                             DoAll(
-                                                 SetArgPointee<3>(HttpCode::OK),
-                                                 SetArgPointee<4>(nullptr),
-                                                 Return("")));
+    EXPECT_CALL(mock_http_client,
+                Post_t(expected_broker_api + "/beamtime/beamtime_id/" + expected_data_source_encoded + "/"
+                       +
+                       expected_stream_encoded + "/" +
+                       expected_group_id_encoded
+                       + "/" + std::to_string(expected_dataset_id) + "?token="
+                       + expected_token, _, expected_neg_acknowledge_command, _, _)).WillOnce(
+                           DoAll(
+                               SetArgPointee<3>(HttpCode::OK),
+                               SetArgPointee<4>(nullptr),
+                               Return("")));
 
     auto err = consumer->NegativeAcknowledge(expected_group_id, expected_dataset_id, 10000, expected_stream);
 
@@ -1424,25 +1466,24 @@ TEST_F(ConsumerImplTests, CanInterruptOperation) {
 
 }
 
-
 TEST_F(ConsumerImplTests, GetCurrentDataSetCounteUsesCorrectUri) {
     MockGetBrokerUri();
     consumer->SetTimeout(100);
 
-    EXPECT_CALL(mock_http_client, Get_t(expected_broker_api + "/beamtime/beamtime_id/" + expected_data_source_encoded + "/"
-                                        +
-                                        expected_stream_encoded + "/size?token="
-                                        + expected_token + "&incomplete=true", _, _)).WillOnce(DoAll(
-                                                    SetArgPointee<1>(HttpCode::OK),
-                                                    SetArgPointee<2>(nullptr),
-                                                    Return("{\"size\":10}")));
+    EXPECT_CALL(mock_http_client,
+                Get_t(expected_broker_api + "/beamtime/beamtime_id/" + expected_data_source_encoded + "/"
+                      +
+                      expected_stream_encoded + "/size?token="
+                      + expected_token + "&incomplete=true", _, _)).WillOnce(DoAll(
+                                  SetArgPointee<1>(HttpCode::OK),
+                                  SetArgPointee<2>(nullptr),
+                                  Return("{\"size\":10}")));
     asapo::Error err;
     auto size = consumer->GetCurrentDatasetCount(expected_stream, true, &err);
     ASSERT_THAT(err, Eq(nullptr));
     ASSERT_THAT(size, Eq(10));
 }
 
-
 TEST_F(ConsumerImplTests, GetVersionInfoClientOnly) {
     std::string client_info;
     auto err = consumer->GetVersionInfo(&client_info, nullptr, nullptr);
@@ -1470,5 +1511,4 @@ TEST_F(ConsumerImplTests, GetVersionInfoWithServer) {
     ASSERT_THAT(server_info, HasSubstr("v0.2"));
 }
 
-
 }
diff --git a/consumer/api/python/asapo_consumer.pxd b/consumer/api/python/asapo_consumer.pxd
index 1347d886237eb27133ff20d87f2a11d3e1e7a54f..f9b808d0ea360edcfa2fa40bd857f0c048d81edf 100644
--- a/consumer/api/python/asapo_consumer.pxd
+++ b/consumer/api/python/asapo_consumer.pxd
@@ -71,6 +71,7 @@ cdef extern from "asapo/asapo_consumer.h" namespace "asapo" nogil:
         NetworkConnectionType CurrentConnectionType()
         Error GetNext(string group_id, MessageMeta* info, MessageData* data,string stream)
         Error GetLast(MessageMeta* info, MessageData* data, string stream)
+        Error GetLast(string group_id, MessageMeta* info, MessageData* data, string stream)
         Error GetById(uint64_t id, MessageMeta* info, MessageData* data, string stream)
         uint64_t GetCurrentSize(string stream, Error* err)
         uint64_t GetCurrentDatasetCount(string stream, bool include_incomplete, Error* err)
@@ -86,6 +87,7 @@ cdef extern from "asapo/asapo_consumer.h" namespace "asapo" nogil:
         MessageMetas QueryMessages(string query, string stream, Error* err)
         DataSet GetNextDataset(string group_id, uint64_t min_size, string stream, Error* err)
         DataSet GetLastDataset(uint64_t min_size, string stream, Error* err)
+        DataSet GetLastDataset(string group_id, uint64_t min_size, string stream, Error* err)
         DataSet GetDatasetById(uint64_t id, uint64_t min_size, string stream, Error* err)
         Error RetrieveData(MessageMeta* info, MessageData* data)
         vector[StreamInfo] GetStreamList(string from_stream, StreamFilter filter, Error* err)
diff --git a/consumer/api/python/asapo_consumer.pyx.in b/consumer/api/python/asapo_consumer.pyx.in
index f4232d7c74a914ee65d19a50f82d9782ac7fc977..685d4ddd147f6f056be493c0b69e9234eb1ade24 100644
--- a/consumer/api/python/asapo_consumer.pyx.in
+++ b/consumer/api/python/asapo_consumer.pyx.in
@@ -130,9 +130,12 @@ cdef class PyConsumer:
         if op == "next":
             with nogil:
                 err =  self.c_consumer.get().GetNext(b_group_id, &info, p_data,b_stream)
-        elif op == "last":
+        elif op == "last" and group_id == "":
             with nogil:
                 err =  self.c_consumer.get().GetLast(&info, p_data, b_stream)
+        elif op == "last":
+            with nogil:
+                err =  self.c_consumer.get().GetLast(b_group_id,&info, p_data, b_stream)
         elif op == "id":
             with nogil:
                 err =  self.c_consumer.get().GetById(id, &info, p_data, b_stream)
@@ -149,8 +152,8 @@ cdef class PyConsumer:
         return arr,meta
     def get_next(self, group_id, meta_only = True, stream = "default"):
         return self._op("next",group_id,stream,meta_only,0)
-    def get_last(self, meta_only = True, stream = "default"):
-        return self._op("last","",stream,meta_only,0)
+    def get_last(self, meta_only = True, stream = "default", group_id = ""):
+        return self._op("last",group_id,stream,meta_only,0)
     def get_by_id(self,uint64_t id,meta_only = True, stream = "default"):
         return self._op("id","",stream,meta_only,id)
     def retrieve_data(self,meta):
@@ -349,9 +352,12 @@ cdef class PyConsumer:
         if op == "next":
             with nogil:
                 dataset = self.c_consumer.get().GetNextDataset(b_group_id, min_size,b_stream, &err)
-        elif op == "last":
+        elif op == "last" and group_id == "":
             with nogil:
                 dataset = self.c_consumer.get().GetLastDataset(min_size,b_stream, &err)
+        elif op == "last":
+            with nogil:
+                dataset = self.c_consumer.get().GetLastDataset(b_group_id, min_size,b_stream, &err)
         elif op == "id":
             with nogil:
                 dataset = self.c_consumer.get().GetDatasetById(id, min_size,b_stream, &err)
@@ -364,8 +370,8 @@ cdef class PyConsumer:
         return res
     def get_next_dataset(self, group_id, min_size = 0, stream = "default"):
         return self._op_dataset("next",group_id,stream,min_size,0)
-    def get_last_dataset(self, min_size = 0,  stream = "default"):
-        return self._op_dataset("last","0",stream,min_size,0)
+    def get_last_dataset(self, min_size = 0,  stream = "default", group_id = ""):
+        return self._op_dataset("last",group_id,stream,min_size,0)
     def get_dataset_by_id(self, uint64_t id, min_size = 0, stream = "default"):
         return self._op_dataset("id","0",stream,min_size,id)
     def get_beamtime_meta(self):
diff --git a/deploy/asapo_services/asapo_overwrite_vars.tfvars b/deploy/asapo_services/asapo_overwrite_vars.tfvars
index 3fbc457e1f1efac85d25fcfa476a10ad0cc78c09..be7a6281ed1e03a235de848fb79fd88eeb7822ba 100644
--- a/deploy/asapo_services/asapo_overwrite_vars.tfvars
+++ b/deploy/asapo_services/asapo_overwrite_vars.tfvars
@@ -2,6 +2,8 @@ elk_logs = false
 
 asapo_monitor = true
 asapo_monitor_alert = true
+asapo_alert_email = "xxx"
+asapo_alert_email_smart_host = "xxx:25"
 
 receiver_total_memory_size = 500
 receiver_dataserver_cache_size = 0 #gb
diff --git a/deploy/asapo_services/scripts/alert.yml.tpl b/deploy/asapo_services/scripts/alert.yml.tpl
index cc6ca5acc1be919aac459db2587b67e4253aa99a..af0b8b3b0e52543ec0349efd2ff49eff82a447ab 100644
--- a/deploy/asapo_services/scripts/alert.yml.tpl
+++ b/deploy/asapo_services/scripts/alert.yml.tpl
@@ -1,80 +1,108 @@
 groups:
-- name: prometheus_alerts
-  rules:
-- name: asapo-nomad-alerts
-  rules:
-  - alert: asapo-services
-    expr: sum(nomad_nomad_job_summary_running{exported_job="asapo-services"}) < 2 or absent(nomad_nomad_job_summary_running{exported_job="asapo-services"})
-    for: 1s
-    labels:
-      severity: fatal
-      group: asapo-cluster
-  - alert: asapo-monitoring
-    expr: sum(nomad_nomad_job_summary_running{exported_job="asapo-monitoring"}) < 2  or absent(nomad_nomad_job_summary_running{exported_job="asapo-monitoring"})
-    for: 1s
-    labels:
-      severity: fatal
-      group: asapo-cluster
-  - alert: asapo-mongo
-    expr: nomad_nomad_job_summary_running{exported_job="asapo-mongo"} < 1  or absent(nomad_nomad_job_summary_running{exported_job="asapo-mongo"})
-    for: 1s
-    labels:
-      severity: fatal
-      group: asapo-cluster
-  - alert: asapo-receivers-incomplete
-    expr: (nomad_nomad_job_summary_running{exported_job="asapo-receivers"} < {{ env "NOMAD_META_n_receivers" }} and sum (nomad_nomad_job_summary_running{exported_job="asapo-receivers"}) > 0)  or absent(nomad_nomad_job_summary_running{exported_job="asapo-receivers"})
-    for: 30s
-    labels:
-      severity: warn
-      group: asapo-cluster
-  - alert: asapo-receivers-absent
-    expr: nomad_nomad_job_summary_running{exported_job="asapo-receivers"} < 1 or absent(nomad_nomad_job_summary_running{exported_job="asapo-receivers"})
-    for: 1s
-    labels:
-      severity: fatal
-      group: asapo-cluster
-  - alert: asapo-nginx
-    expr: nomad_nomad_job_summary_running{exported_job="asapo-nginx"} < 1  or absent(nomad_nomad_job_summary_running{exported_job="asapo-nginx"})
-    for: 1s
-    labels:
-      severity: fatal
-      group: asapo-cluster
-  - alert: asapo-fluentd
-    expr: nomad_nomad_job_summary_running{exported_job="asapo-logging", task_group="fluentd"} < 1  or absent(nomad_nomad_job_summary_running{exported_job="asapo-logging", task_group="fluentd"})
-    for: 1s
-    labels:
-      severity: fatal
-      group: asapo-cluster
+  - name: asapo-nomad-alerts
+    rules:
+      - alert: asapo-services
+        expr: sum(nomad_nomad_job_summary_running{exported_job="asapo-services"}) < 2 or absent(nomad_nomad_job_summary_running{exported_job="asapo-services"})
+        for: 10s
+        labels:
+          severity: fatal
+          group: asapo-cluster
+      - alert: asapo-monitoring
+        expr: sum(nomad_nomad_job_summary_running{exported_job="asapo-monitoring"}) < 2  or absent(nomad_nomad_job_summary_running{exported_job="asapo-monitoring"})
+        for: 10s
+        labels:
+          severity: fatal
+          group: asapo-cluster
+      - alert: asapo-mongo
+        expr: nomad_nomad_job_summary_running{exported_job="asapo-mongo"} < 1  or absent(nomad_nomad_job_summary_running{exported_job="asapo-mongo"})
+        for: 10s
+        labels:
+          severity: fatal
+          group: asapo-cluster
+      - alert: asapo-receivers-incomplete
+        expr: (nomad_nomad_job_summary_running{exported_job="asapo-receivers"} < {{ env "NOMAD_META_n_receivers" }} and sum (nomad_nomad_job_summary_running{exported_job="asapo-receivers"}) > 0)  or absent(nomad_nomad_job_summary_running{exported_job="asapo-receivers"})
+        for: 60s
+        labels:
+          severity: warn
+          group: asapo-cluster
+      - alert: asapo-receivers-absent
+        expr: nomad_nomad_job_summary_running{exported_job="asapo-receivers"} < 1 or absent(nomad_nomad_job_summary_running{exported_job="asapo-receivers"})
+        for: 10s
+        labels:
+          severity: fatal
+          group: asapo-cluster
+      - alert: asapo-nginx
+        expr: nomad_nomad_job_summary_running{exported_job="asapo-nginx"} < 1  or absent(nomad_nomad_job_summary_running{exported_job="asapo-nginx"})
+        for: 10s
+        labels:
+          severity: fatal
+          group: asapo-cluster
+      - alert: asapo-fts-incomplete
+        expr: (nomad_nomad_job_summary_running{exported_job="asapo-file-transfer"} < {{ env "NOMAD_META_n_fts" }} and sum (nomad_nomad_job_summary_running{exported_job="asapo-file-transfer"}) > 0)  or absent(nomad_nomad_job_summary_running{exported_job="asapo-file-transfer"})
+        for: 60s
+        labels:
+          severity: warn
+          group: asapo-cluster
+      - alert: asapo-fts-absent
+        expr: nomad_nomad_job_summary_running{exported_job="asapo-file-transfer"} < 1 or absent(nomad_nomad_job_summary_running{exported_job="asapo-file-transfer"})
+        for: 10s
+        labels:
+          severity: fatal
+          group: asapo-cluster
+      - alert: asapo-brokers-incomplete
+        expr: (nomad_nomad_job_summary_running{exported_job="asapo-brokers"} < {{ env "NOMAD_META_n_brokers" }} and sum (nomad_nomad_job_summary_running{exported_job="asapo-brokers"}) > 0)  or absent(nomad_nomad_job_summary_running{exported_job="asapo-brokers"})
+        for: 60s
+        labels:
+          severity: warn
+          group: asapo-cluster
+      - alert: asapo-brokers-absent
+        expr: nomad_nomad_job_summary_running{exported_job="asapo-brokers"} < 1 or absent(nomad_nomad_job_summary_running{exported_job="asapo-brokers"})
+        for: 10s
+        labels:
+          severity: fatal
+          group: asapo-cluster          
+      - alert: asapo-fluentd
+        expr: nomad_nomad_job_summary_running{exported_job="asapo-logging", task_group="fluentd"} < 1  or absent(nomad_nomad_job_summary_running{exported_job="asapo-logging", task_group="fluentd"})
+        for: 10s
+        labels:
+          severity: fatal
+          group: asapo-cluster
 
-- name: asapo-consul-alerts
-  rules:
-    - alert: asapo-discovery
-      expr: sum (up{job="asapo-discovery"}) < 1 or absent(up{job="asapo-discovery"})
-      for: 1s
-      labels:
-        severity: fatal
-        group: asapo-cluster
-    - alert: asapo-brokers-incomplete
-      expr: (sum (up{job="asapo-broker"}) < {{ env "NOMAD_META_n_brokers" }} and sum (up{job="asapo-broker"}) > 0) or absent(up{job="asapo-broker"})
-      for: 30s
-      labels:
-        severity: warn
-        group: asapo-cluster
-    - alert: asapo-brokers-absent
-      expr: sum (up{job="asapo-broker"}) == 0 or absent(up{job="asapo-broker"})
-      for: 1s
-      labels:
-        severity: fatal
-        group: asapo-cluster
-    - alert: asapo-receivers-incomplete
-      expr: (sum (up{job="asapo-receiver"}) < {{ env "NOMAD_META_n_receivers" }} and sum (up{job="asapo-receiver"}) > 0) or absent(up{job="asapo-receiver"})
-      for: 30s
-      labels:
-        severity: warn
-        group: asapo-cluster
-    - alert: asapo-receivers-absent
-      expr: sum (up{job="asapo-receiver"}) == 0 or absent(up{job="asapo-receiver"})
-      for: 1s
-      labels:
-        severity: fatal
-        group: asapo-cluster
+  - name: asapo-consul-alerts
+    rules:
+      - alert: asapo-discovery
+        expr: sum (up{job="asapo-discovery"}) < 1 or absent(up{job="asapo-discovery"})
+        for: 10s
+        labels:
+          severity: fatal
+          group: asapo-cluster
+      - alert: asapo-mongodb-monitor
+        expr: sum (up{job="asapo-mongodb-monitor"}) < 1 or absent(up{job="asapo-mongodb-monitor"})
+        for: 10s
+        labels:
+          severity: fatal
+          group: asapo-cluster
+      - alert: asapo-brokers-incomplete
+        expr: (sum (up{job="asapo-broker"}) < {{ env "NOMAD_META_n_brokers" }} and sum (up{job="asapo-broker"}) > 0) or absent(up{job="asapo-broker"})
+        for: 60s
+        labels:
+          severity: warn
+          group: asapo-cluster
+      - alert: asapo-brokers-absent
+        expr: sum (up{job="asapo-broker"}) == 0 or absent(up{job="asapo-broker"})
+        for: 10s
+        labels:
+          severity: fatal
+          group: asapo-cluster
+      - alert: asapo-receivers-incomplete
+        expr: (sum (up{job="asapo-receiver"}) < {{ env "NOMAD_META_n_receivers" }} and sum (up{job="asapo-receiver"}) > 0) or absent(up{job="asapo-receiver"})
+        for: 60s
+        labels:
+          severity: warn
+          group: asapo-cluster
+      - alert: asapo-receivers-absent
+        expr: sum (up{job="asapo-receiver"}) == 0 or absent(up{job="asapo-receiver"})
+        for: 10s
+        labels:
+          severity: fatal
+          group: asapo-cluster
diff --git a/deploy/asapo_services/scripts/asapo-mongo.nmd.tpl b/deploy/asapo_services/scripts/asapo-mongo.nmd.tpl
index 7fefa0d18462177b604ed607ece288d3255e23e2..6b18019fd6603b76f484a58299d5e147d06d4fd1 100644
--- a/deploy/asapo_services/scripts/asapo-mongo.nmd.tpl
+++ b/deploy/asapo_services/scripts/asapo-mongo.nmd.tpl
@@ -22,6 +22,52 @@ job "asapo-mongo" {
       mode = "delay"
     }
 
+    network {
+      port "mongo" {
+        static = "${mongo_port}"
+      }
+      port "mongo_monitor" {
+        to = 9216
+      }
+    }
+
+    task "mongo-monitor" {
+      lifecycle {
+        hook = "poststart"
+        sidecar = true
+      }
+
+      driver = "docker"
+      user = "${asapo_user}"
+
+      config {
+        security_opt = ["no-new-privileges"]
+        userns_mode = "host"
+        image = "yakser/mongodb-exporter"
+        args = [
+          "--mongodb.uri=mongodb://$${NOMAD_ADDR_mongo}"
+        ]
+        ports = ["mongo_monitor"]
+      }
+
+      service {
+        port = "mongo_monitor"
+        name = "asapo-mongodb-monitor"
+        check {
+          name = "alive"
+          type     = "http"
+          path     = "/"
+          interval = "10s"
+          timeout  = "1s"
+        }
+        check_restart {
+          limit = 2
+          grace = "6000s"
+          ignore_warnings = false
+        }
+      }
+    }
+
     task "mongo" {
       driver = "docker"
       user = "${asapo_user}"
@@ -36,11 +82,6 @@ job "asapo-mongo" {
 
       resources {
         memory = "${mongo_total_memory_size}"
-        network {
-          port "mongo" {
-          static = "${mongo_port}"
-          }
-        }
       }
 
       service {
diff --git a/deploy/asapo_services/scripts/asapo-monitoring.nmd.tpl b/deploy/asapo_services/scripts/asapo-monitoring.nmd.tpl
index cc05f38db20ebe00c17b78ee60beb5423aac48f6..677e5413fa0000c45754de72151909eb135b79d4 100644
--- a/deploy/asapo_services/scripts/asapo-monitoring.nmd.tpl
+++ b/deploy/asapo_services/scripts/asapo-monitoring.nmd.tpl
@@ -122,6 +122,7 @@ job "asapo-monitoring" {
       meta {
         n_brokers = "${n_brokers}"
         n_receivers = "${n_receivers}"
+        n_fts = "${n_fts}"
       }
       service {
         name = "prometheus"
diff --git a/deploy/asapo_services/scripts/prometheus.yml.tpl b/deploy/asapo_services/scripts/prometheus.yml.tpl
index 49238b7f739614d3e09083c48b308446d288f6d1..853ac19ec2c69434b947a98ca5c4451f23d068a3 100644
--- a/deploy/asapo_services/scripts/prometheus.yml.tpl
+++ b/deploy/asapo_services/scripts/prometheus.yml.tpl
@@ -20,7 +20,6 @@ rule_files:
 # A scrape configuration containing exactly one endpoint to scrape:
 # Here it's Prometheus itself.
 scrape_configs:
-  # The job name is added as a label `job=<job_name>` to any timeseries scraped from this config.
   - job_name: "prometheus"
     consul_sd_configs:
       - server: '{{ env "NOMAD_IP_prometheus_ui" }}:8500'
@@ -38,19 +37,13 @@ scrape_configs:
     metrics_path: /v1/metrics
     params:
       format: ['prometheus']
-  - job_name: discovery
+  - job_name: asapo
     consul_sd_configs:
       - server: '{{ env "NOMAD_IP_prometheus_ui" }}:8500'
         services:
           - 'asapo-discovery'
-    relabel_configs:
-      - source_labels: [__meta_consul_service]
-        target_label: job
-  - job_name: broker
-    consul_sd_configs:
-      - server: '{{ env "NOMAD_IP_prometheus_ui" }}:8500'
-        services:
           - 'asapo-broker'
+          - 'asapo-mongodb-monitor'
     relabel_configs:
       - source_labels: [__meta_consul_service]
         target_label: job
diff --git a/deploy/asapo_services/scripts/templates.tf b/deploy/asapo_services/scripts/templates.tf
index 17d68d3781155c9c6f49081bbd6ec99deff14288..5cc67dc9e132bd0f6149d4abb34fdca2440362df 100644
--- a/deploy/asapo_services/scripts/templates.tf
+++ b/deploy/asapo_services/scripts/templates.tf
@@ -109,6 +109,7 @@ data "template_file" "asapo_monitoring" {
   vars = {
     n_brokers = "${var.n_brokers}"
     n_receivers = "${var.n_receivers}"
+    n_fts = "${var.n_fts}"
     service_dir = "${var.service_dir}"
     scripts_dir = "${var.job_scripts_dir}"
     asapo_monitor = "${var.asapo_monitor}"
diff --git a/deploy/nomad_consul_package/CMakeLists.txt b/deploy/nomad_consul_package/CMakeLists.txt
index 75914935d6050e12f61d5a1e2cc0fc81568d620c..3d6c5f962786c2f0784faaa2cd0c7dc89ad98423 100644
--- a/deploy/nomad_consul_package/CMakeLists.txt
+++ b/deploy/nomad_consul_package/CMakeLists.txt
@@ -4,10 +4,14 @@ PROJECT(ASAPO_HA)
 
 add_subdirectory (Packages)
 
+set (CONSUL_VERSION 1.10.3)
+set (NOMAD_VERSION 1.1.6)
+set (TERRAFORM_VERSION 1.0.8)
+
 INCLUDE(ExternalProject)
 ExternalProject_Add(
         consul
-        URL https://releases.hashicorp.com/consul/1.7.3/consul_1.7.3_linux_amd64.zip
+        URL https://releases.hashicorp.com/consul/${CONSUL_VERSION}/consul_${CONSUL_VERSION}_linux_amd64.zip
         PATCH_COMMAND ""
         CONFIGURE_COMMAND ""
         CMAKE_COMMAND ""
@@ -18,7 +22,7 @@ ExternalProject_Add(
 
 ExternalProject_Add(
         nomad
-        URL https://releases.hashicorp.com/nomad/0.11.3/nomad_0.11.3_linux_amd64.zip
+        URL https://releases.hashicorp.com/nomad/${NOMAD_VERSION}/nomad_${NOMAD_VERSION}_linux_amd64.zip
         PATCH_COMMAND ""
         CONFIGURE_COMMAND ""
         CMAKE_COMMAND ""
@@ -29,7 +33,7 @@ ExternalProject_Add(
 
 ExternalProject_Add(
         terraform
-        URL https://releases.hashicorp.com/terraform/0.12.26/terraform_0.12.26_linux_amd64.zip
+        URL https://releases.hashicorp.com/terraform/${TERRAFORM_VERSION}/terraform_${TERRAFORM_VERSION}_linux_amd64.zip
         PATCH_COMMAND ""
         CONFIGURE_COMMAND ""
         CMAKE_COMMAND ""
diff --git a/deploy/nomad_consul_package/Packages/CMakeLists.txt b/deploy/nomad_consul_package/Packages/CMakeLists.txt
index 1111e982e154882693c05345ab2e3583b96ec414..dc5c675efd23e26451debfcd4158d1c59dbb6777 100644
--- a/deploy/nomad_consul_package/Packages/CMakeLists.txt
+++ b/deploy/nomad_consul_package/Packages/CMakeLists.txt
@@ -13,8 +13,8 @@ SET(CPACK_PACKAGE_ARCHITECTURE "amd64")
 SET(CPACK_PACKAGE_NAME "asapo-ha")
 SET(CPACK_PACKAGE_VENDOR "DESY")
 SET(CPACK_PACKAGE_VERSION_MAJOR "1")
-SET(CPACK_PACKAGE_VERSION_MINOR "7")
-SET(CPACK_PACKAGE_VERSION_PATCH "3")
+SET(CPACK_PACKAGE_VERSION_MINOR "8")
+SET(CPACK_PACKAGE_VERSION_PATCH "0")
 
 SET(CPACK_PACKAGE_DESCRIPTION_SUMMARY "Nomad and Consul for Asapo Debian package")
 
diff --git a/discovery/src/asapo_discovery/protocols/hard_coded_consumer.go b/discovery/src/asapo_discovery/protocols/hard_coded_consumer.go
index 8c91240a3c47dfa0c0c492e08e15645632f10150..ec0a5e03ad4835f25cf891f25ae66cc53f58c844 100644
--- a/discovery/src/asapo_discovery/protocols/hard_coded_consumer.go
+++ b/discovery/src/asapo_discovery/protocols/hard_coded_consumer.go
@@ -12,6 +12,14 @@ func getTimefromDate(date string) time.Time{
 
 func GetSupportedConsumerProtocols() []Protocol {
 	return []Protocol{
+		Protocol{"v0.5",
+			map[string]string{
+				"Discovery": "v0.1",
+				"Authorizer": "v0.2",
+				"Broker": "v0.5",
+				"File Transfer": "v0.2",
+				"Data cache service": "v0.1",
+			}, &protocolValidatorCurrent{}},
 		Protocol{"v0.4",
 			map[string]string{
 				"Discovery": "v0.1",
@@ -19,7 +27,7 @@ func GetSupportedConsumerProtocols() []Protocol {
 				"Broker": "v0.4",
 				"File Transfer": "v0.2",
 				"Data cache service": "v0.1",
-			}, &protocolValidatorCurrent{}},
+			}, &protocolValidatorDeprecated{getTimefromDate("2022-12-01")}},
 		Protocol{"v0.3",
 			map[string]string{
 				"Discovery": "v0.1",
diff --git a/discovery/src/asapo_discovery/protocols/protocol_test.go b/discovery/src/asapo_discovery/protocols/protocol_test.go
index fbf1bc205d66e7a0241b83f75dfcda8134046171..f7933b7a848af1257f9ae8c0073c8047093fdb2e 100644
--- a/discovery/src/asapo_discovery/protocols/protocol_test.go
+++ b/discovery/src/asapo_discovery/protocols/protocol_test.go
@@ -15,7 +15,8 @@ type protocolTest struct {
 
 var protocolTests = []protocolTest{
 // consumer
-	{"consumer", "v0.4", true, "current", "v0.4"},
+	{"consumer", "v0.5", true, "current", "v0.5"},
+	{"consumer", "v0.4", true, "deprecates", "v0.4"},
 	{"consumer", "v0.3", true, "deprecates", "v0.3"},
 	{"consumer", "v0.2", true, "deprecates", "v0.2"},
 	{"consumer", "v0.1", true, "deprecates", "v0.1"},
diff --git a/docs/site/changelog/2021-10-14-21.09.0.md b/docs/site/changelog/2021-10-14-21.09.0.md
index 9f9cd26ccd531b51325036f01e2d53eefe05303c..c33445c2ff04eb5b50c599309226239e000db6c3 100644
--- a/docs/site/changelog/2021-10-14-21.09.0.md
+++ b/docs/site/changelog/2021-10-14-21.09.0.md
@@ -22,3 +22,4 @@ BUG FIXES
 INTERNAL
 * Improved authoriation service caching
 * Added profiling for Go services
+* Added metrics and alerts for asapo services
diff --git a/docs/site/examples/cpp/acknowledgements.cpp b/docs/site/examples/cpp/acknowledgements.cpp
index ea69b8cd9a82fcb0011f365d96c46afc040477b4..d8992ea60452fff6c5f15422cda94e658099ac1d 100644
--- a/docs/site/examples/cpp/acknowledgements.cpp
+++ b/docs/site/examples/cpp/acknowledgements.cpp
@@ -128,5 +128,5 @@ int main(int argc, char* argv[]) {
     }
     // print snippet_end
 
-   return EXIT_SUCCESS;
+    return EXIT_SUCCESS;
 }
diff --git a/docs/site/examples/cpp/next_stream.cpp b/docs/site/examples/cpp/next_stream.cpp
index bf26937ea402a1b4548116d3ea41b1b4d1890920..41ecf9f5f68c0b32841efae24e85ee9cb06ff578 100644
--- a/docs/site/examples/cpp/next_stream.cpp
+++ b/docs/site/examples/cpp/next_stream.cpp
@@ -96,7 +96,9 @@ int main(int argc, char* argv[]) {
             // when the stream finishes, we look for the info on the next stream
             auto streams = consumer->GetStreamList("", asapo::StreamFilter::kAllStreams, &err);
             // first, we find the stream with our name in the list of streams
-            auto stream = std::find_if(streams.begin(), streams.end(), [&stream_name](const asapo::StreamInfo& s) { return s.name == stream_name; });
+            auto stream = std::find_if(streams.begin(), streams.end(), [&stream_name](const asapo::StreamInfo & s) {
+                return s.name == stream_name;
+            });
 
             // then we look if the field 'nextStream' is set and not empty
             if (stream != streams.end() && !stream->next_stream.empty()) {
@@ -121,5 +123,5 @@ int main(int argc, char* argv[]) {
     } while (1);
     // read_stream snippet_end
 
-   return EXIT_SUCCESS;
+    return EXIT_SUCCESS;
 }
diff --git a/docs/site/versioned_examples/version-21.09.0/cpp/acknowledgements.cpp b/docs/site/versioned_examples/version-21.09.0/cpp/acknowledgements.cpp
index 1f63d3f10c2780e1a4b942d3d33cbbb3cb453df7..d3613147dd2db1faf8ab03f34d5e0ad2ddf8935a 100644
--- a/docs/site/versioned_examples/version-21.09.0/cpp/acknowledgements.cpp
+++ b/docs/site/versioned_examples/version-21.09.0/cpp/acknowledgements.cpp
@@ -124,5 +124,5 @@ int main(int argc, char* argv[]) {
         std::cout << "file name: " << mm.name << std::endl;
     }
 
-   return EXIT_SUCCESS;
+    return EXIT_SUCCESS;
 }
diff --git a/docs/site/versioned_examples/version-21.09.0/cpp/consume.cpp b/docs/site/versioned_examples/version-21.09.0/cpp/consume.cpp
index a802034c998ea78b11a2829ff7e3d21372bba7b1..aa55634693e10446dbe03f6313cec75215836272 100644
--- a/docs/site/versioned_examples/version-21.09.0/cpp/consume.cpp
+++ b/docs/site/versioned_examples/version-21.09.0/cpp/consume.cpp
@@ -14,7 +14,7 @@ int main(int argc, char* argv[]) {
 
     auto endpoint = "localhost:8400";
     auto beamtime = "asapo_test";
-    
+
     // test token. In production it is created during the start of the beamtime
     auto token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJl"
                  "eHAiOjk1NzE3MTAyMTYsImp0aSI6ImMzaXFhbGpmN"
@@ -26,28 +26,26 @@ int main(int argc, char* argv[]) {
     //set it according to your configuration.
     auto path_to_files = "/var/tmp/asapo/global_shared/data/test_facility/gpfs/test/2019/data/asapo_test";
 
-    auto credentials = asapo::SourceCredentials
-            {
-                asapo::SourceType::kProcessed, // should be kProcessed or kRaw, kProcessed writes to the core FS
-                beamtime,                      // the folder should exist
-                "",                            // can be empty or "auto", if beamtime_id is given
-                "test_source",                 // source
-                token                          // athorization token
-            };
+    auto credentials = asapo::SourceCredentials {
+        asapo::SourceType::kProcessed, // should be kProcessed or kRaw, kProcessed writes to the core FS
+        beamtime,                      // the folder should exist
+        "",                            // can be empty or "auto", if beamtime_id is given
+        "test_source",                 // source
+        token                          // athorization token
+    };
 
     auto consumer = asapo::ConsumerFactory::CreateConsumer
-        (endpoint,
-         path_to_files,
-         true,             // True if the path_to_files is accessible locally, False otherwise
-         credentials,      // same as for producer
-         &err);
+                    (endpoint,
+                     path_to_files,
+                     true,             // True if the path_to_files is accessible locally, False otherwise
+                     credentials,      // same as for producer
+                     &err);
 
     exit_if_error("Cannot create consumer", err);
     consumer->SetTimeout(5000); // How long do you want to wait on non-finished stream for a message.
 
     // you can get info about the streams in the beamtime
-    for (const auto& stream : consumer->GetStreamList("", asapo::StreamFilter::kAllStreams, &err))
-    {
+    for (const auto& stream : consumer->GetStreamList("", asapo::StreamFilter::kAllStreams, &err)) {
         std::cout << "Stream name: " << stream.name << std::endl;
         std::cout << "LastId: " << stream.last_id << std::endl;
         std::cout << "Stream finished: " << stream.finished << std::endl;
@@ -60,24 +58,24 @@ int main(int argc, char* argv[]) {
 
     asapo::MessageMeta mm;
     asapo::MessageData data;
-    
+
     do {
         // GetNext is the main function to get messages from streams. You would normally call it in loop.
         // you can either manually compare the mm.id to the stream.last_id, or wait for the error to happen
         err = consumer->GetNext(group_id, &mm, &data, "default");
-        
+
         if (err && err == asapo::ConsumerErrorTemplates::kStreamFinished) {
             // all the messages in the stream were processed
             std::cout << "stream finished" << std::endl;
             break;
         }
-        
+
         if (err && err == asapo::ConsumerErrorTemplates::kEndOfStream) {
             // not-finished stream timeout, or wrong or empty stream
             std::cout << "stream ended" << std::endl;
             break;
         }
-        
+
         exit_if_error("Cannot get next record", err);
 
         std::cout << "id: " << mm.id << std::endl;
diff --git a/docs/site/versioned_examples/version-21.09.0/cpp/consume_dataset.cpp b/docs/site/versioned_examples/version-21.09.0/cpp/consume_dataset.cpp
index a5f8be96630de3d2713b863972dd9b33b187470e..f869b9cd121231d4ad8ff3dd28c0ce5867242ab1 100644
--- a/docs/site/versioned_examples/version-21.09.0/cpp/consume_dataset.cpp
+++ b/docs/site/versioned_examples/version-21.09.0/cpp/consume_dataset.cpp
@@ -14,7 +14,7 @@ int main(int argc, char* argv[]) {
 
     auto endpoint = "localhost:8400";
     auto beamtime = "asapo_test";
-    
+
     auto token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJl"
                  "eHAiOjk1NzE3MTAyMTYsImp0aSI6ImMzaXFhbGpmN"
                  "DNhbGZwOHJua20wIiwic3ViIjoiYnRfYXNhcG9fdG"
@@ -35,15 +35,15 @@ int main(int argc, char* argv[]) {
 
     asapo::DataSet ds;
     asapo::MessageData data;
-    
+
     do {
         ds = consumer->GetNextDataset(group_id, 0, "default", &err);
-        
+
         if (err && err == asapo::ConsumerErrorTemplates::kStreamFinished) {
             std::cout << "stream finished" << std::endl;
             break;
         }
-        
+
         if (err && err == asapo::ConsumerErrorTemplates::kEndOfStream) {
             std::cout << "stream ended" << std::endl;
             break;
@@ -51,9 +51,8 @@ int main(int argc, char* argv[]) {
         exit_if_error("Cannot get next record", err);
 
         std::cout << "Dataset Id: " << ds.id << std::endl;
-        
-        for(int i = 0; i < ds.content.size(); i++)
-        {
+
+        for(int i = 0; i < ds.content.size(); i++) {
             err = consumer->RetrieveData(&ds.content[i], &data);
             exit_if_error("Cannot get dataset content", err);
 
diff --git a/docs/site/versioned_examples/version-21.09.0/cpp/metadata.cpp b/docs/site/versioned_examples/version-21.09.0/cpp/metadata.cpp
index b64351fbe7433d8b6946cf52f75a2d25e7d5ced5..2a9e5dda2e07c42852dd8fb41fc7b24fe3550d11 100644
--- a/docs/site/versioned_examples/version-21.09.0/cpp/metadata.cpp
+++ b/docs/site/versioned_examples/version-21.09.0/cpp/metadata.cpp
@@ -45,15 +45,15 @@ int main(int argc, char* argv[]) {
     // sample beamtime metadata. You can add any data you want, with any level of complexity
     // in this example we use strings and ints, and one nested structure
     auto beamtime_metadata = "{"
-    "   \"name\": \"beamtime name\","
-    "   \"condition\": \"beamtime condition\","
-    "   \"intvalue1\": 5,"
-    "   \"intvalue2\": 10,"
-    "   \"structure\": {"
-    "       \"structint1\": 20,"
-    "       \"structint2\": 30"
-    "   }"
-    "}";
+                             "   \"name\": \"beamtime name\","
+                             "   \"condition\": \"beamtime condition\","
+                             "   \"intvalue1\": 5,"
+                             "   \"intvalue2\": 10,"
+                             "   \"structure\": {"
+                             "       \"structint1\": 20,"
+                             "       \"structint2\": 30"
+                             "   }"
+                             "}";
 
     // send the metadata
     // with this call the new metadata will completely replace the one that's already there
@@ -62,9 +62,9 @@ int main(int argc, char* argv[]) {
 
     // we can update the existing metadata if we want, by modifying the existing fields, or adding new ones
     auto beamtime_metadata_update = "{"
-    "    \"condition\": \"updated beamtime condition\","
-    "    \"newintvalue\": 15"
-    "}";
+                                    "    \"condition\": \"updated beamtime condition\","
+                                    "    \"newintvalue\": 15"
+                                    "}";
 
     // send the metadata in the 'kUpdate' mode
     err = producer->SendBeamtimeMetadata(beamtime_metadata_update, asapo::MetaIngestMode{asapo::MetaIngestOp::kUpdate, true}, &ProcessAfterSend);
@@ -72,22 +72,23 @@ int main(int argc, char* argv[]) {
 
     // sample stream metadata
     auto stream_metadata = "{"
-    "    \"name\": \"stream name\","
-    "    \"condition\": \"stream condition\","
-    "    \"intvalue\": 44"
-    "}";
+                           "    \"name\": \"stream name\","
+                           "    \"condition\": \"stream condition\","
+                           "    \"intvalue\": 44"
+                           "}";
 
     // works the same way: for the initial set we use 'kReplace' the stream metadata, but update is also possible
     // update works exactly the same as for beamtime, but here we will only do 'kReplace'
-    err = producer->SendStreamMetadata(stream_metadata, asapo::MetaIngestMode{asapo::MetaIngestOp::kUpdate, true}, "default", &ProcessAfterSend);
+    err = producer->SendStreamMetadata(stream_metadata, asapo::MetaIngestMode{asapo::MetaIngestOp::kUpdate, true},
+                                       "default", &ProcessAfterSend);
     exit_if_error("Cannot send metadata", err);
 
     // sample message metadata
     auto message_metadata = "{"
-    "    \"name\": \"message name\","
-    "    \"condition\": \"message condition\","
-    "    \"somevalue\": 55"
-    "}";
+                            "    \"name\": \"message name\","
+                            "    \"condition\": \"message condition\","
+                            "    \"somevalue\": 55"
+                            "}";
 
     std::string data_string = "hello";
     auto send_size = data_string.size() + 1;
diff --git a/docs/site/versioned_examples/version-21.09.0/cpp/next_stream.cpp b/docs/site/versioned_examples/version-21.09.0/cpp/next_stream.cpp
index 85ae4602f3694b6c4eaa68914f16d366910fb40b..964689eec3a88a37285b833224dd8209b3bcbfb8 100644
--- a/docs/site/versioned_examples/version-21.09.0/cpp/next_stream.cpp
+++ b/docs/site/versioned_examples/version-21.09.0/cpp/next_stream.cpp
@@ -93,7 +93,9 @@ int main(int argc, char* argv[]) {
             // when the stream finishes, we look for the info on the next stream
             auto streams = consumer->GetStreamList("", asapo::StreamFilter::kAllStreams, &err);
             // first, we find the stream with our name in the list of streams
-            auto stream = std::find_if(streams.begin(), streams.end(), [&stream_name](const asapo::StreamInfo& s) { return s.name == stream_name; });
+            auto stream = std::find_if(streams.begin(), streams.end(), [&stream_name](const asapo::StreamInfo & s) {
+                return s.name == stream_name;
+            });
 
             // then we look if the field 'nextStream' is set and not empty
             if (stream != streams.end() && !stream->next_stream.empty()) {
@@ -117,5 +119,5 @@ int main(int argc, char* argv[]) {
         std::cout << "Message #" << mm.id << ", message content: " << reinterpret_cast<char const*>(data.get()) << std::endl;
     } while (1);
 
-   return EXIT_SUCCESS;
+    return EXIT_SUCCESS;
 }
diff --git a/docs/site/versioned_examples/version-21.09.0/cpp/pipeline.cpp b/docs/site/versioned_examples/version-21.09.0/cpp/pipeline.cpp
index e399e6fe5dd62463f3eaf0e818067644ef4e528f..d1022cd8c1f7bf20afd1bbc846b7fc079e595286 100644
--- a/docs/site/versioned_examples/version-21.09.0/cpp/pipeline.cpp
+++ b/docs/site/versioned_examples/version-21.09.0/cpp/pipeline.cpp
@@ -51,42 +51,43 @@ int main(int argc, char* argv[]) {
 
     asapo::MessageMeta mm;
     asapo::MessageData data;
-    
+
     do {
         // we expect the message to be in the 'default' stream already
         err = consumer->GetNext(group_id, &mm, &data, "default");
-        
+
         if (err && err == asapo::ConsumerErrorTemplates::kStreamFinished) {
             std::cout << "stream finished" << std::endl;
             break;
         }
-        
+
         if (err && err == asapo::ConsumerErrorTemplates::kEndOfStream) {
             std::cout << "stream ended" << std::endl;
             break;
         }
         exit_if_error("Cannot get next record", err);
-        
+
         // work on our data
         auto processed_string = std::string(reinterpret_cast<char const*>(data.get())) + " processed";
         auto send_size = processed_string.size() + 1;
         auto buffer = asapo::MessageData(new uint8_t[send_size]);
         memcpy(buffer.get(), processed_string.c_str(), send_size);
-        
+
         // you may use the same filename, if you want to rewrite the source file. This will result in warning, but it is a valid usecase
         asapo::MessageHeader message_header{mm.id, send_size, std::string("processed/test_file_") + std::to_string(mm.id)};
-        err = producer->Send(message_header, std::move(buffer), asapo::kDefaultIngestMode, pipelined_stream_name, &ProcessAfterSend);
+        err = producer->Send(message_header, std::move(buffer), asapo::kDefaultIngestMode, pipelined_stream_name,
+                             &ProcessAfterSend);
         exit_if_error("Cannot send message", err);
     } while (1);
 
 
-    err = producer->WaitRequestsFinished(2000); 
+    err = producer->WaitRequestsFinished(2000);
     exit_if_error("Producer exit on timeout", err);
 
     // the meta from the last iteration corresponds to the last message
     auto last_id = mm.id;
-    
-    err = producer->SendStreamFinishedFlag("pipelined",last_id, "", &ProcessAfterSend);
+
+    err = producer->SendStreamFinishedFlag("pipelined", last_id, "", &ProcessAfterSend);
     exit_if_error("Cannot finish stream", err);
 
     // you can remove the source stream if you do not need it anymore
diff --git a/docs/site/versioned_examples/version-21.09.0/cpp/produce.cpp b/docs/site/versioned_examples/version-21.09.0/cpp/produce.cpp
index 9faa61106eff6e55c502df383854e9275b1fe111..aee2a9b74db8f5ae583b331f593032cceafc3c68 100644
--- a/docs/site/versioned_examples/version-21.09.0/cpp/produce.cpp
+++ b/docs/site/versioned_examples/version-21.09.0/cpp/produce.cpp
@@ -29,14 +29,13 @@ int main(int argc, char* argv[]) {
     auto endpoint = "localhost:8400";
     auto beamtime = "asapo_test";
 
-    auto credentials = asapo::SourceCredentials
-            {
-                asapo::SourceType::kProcessed, // should be kProcessed or kRaw, kProcessed writes to the core FS
-                beamtime,                      // the folder should exist
-                "",                            // can be empty or "auto", if beamtime_id is given
-                "test_source",                 // source
-                ""                             // athorization token
-            };
+    auto credentials = asapo::SourceCredentials {
+        asapo::SourceType::kProcessed, // should be kProcessed or kRaw, kProcessed writes to the core FS
+        beamtime,                      // the folder should exist
+        "",                            // can be empty or "auto", if beamtime_id is given
+        "test_source",                 // source
+        ""                             // athorization token
+    };
 
     auto producer = asapo::Producer::Create(endpoint,
                                             1,                               // number of threads. Increase, if the sending speed seems slow
@@ -63,7 +62,7 @@ int main(int argc, char* argv[]) {
     // add the following at the end of the script
 
     err = producer->WaitRequestsFinished(2000); // will synchronously wait for all the data to be sent.
-                                                // Use it when no more data is expected.
+    // Use it when no more data is expected.
     exit_if_error("Producer exit on timeout", err);
 
     // you may want to mark the stream as finished
diff --git a/docs/site/versioned_examples/version-21.09.0/cpp/produce_dataset.cpp b/docs/site/versioned_examples/version-21.09.0/cpp/produce_dataset.cpp
index e2ea32e77bd291cafeae3ed1d9febdbed9f57ba4..aaefeed40380abe86134488700ae632f4cf09f44 100644
--- a/docs/site/versioned_examples/version-21.09.0/cpp/produce_dataset.cpp
+++ b/docs/site/versioned_examples/version-21.09.0/cpp/produce_dataset.cpp
@@ -27,7 +27,7 @@ int main(int argc, char* argv[]) {
     auto beamtime = "asapo_test";
 
     auto credentials = asapo::SourceCredentials{asapo::SourceType::kProcessed, beamtime, "", "test_source", ""};
-    
+
     auto producer = asapo::Producer::Create(endpoint, 1, asapo::RequestHandlerType::kTcp, credentials, 60000, &err);
     exit_if_error("Cannot start producer", err);
 
diff --git a/docs/site/versioned_examples/version-21.09.0/cpp/query.cpp b/docs/site/versioned_examples/version-21.09.0/cpp/query.cpp
index 945b5211a1f0a3c41603617f78b25f7d8aa118fc..243e8bd36b062f04307e83d34b370e590fac1f36 100644
--- a/docs/site/versioned_examples/version-21.09.0/cpp/query.cpp
+++ b/docs/site/versioned_examples/version-21.09.0/cpp/query.cpp
@@ -61,9 +61,9 @@ int main(int argc, char* argv[]) {
     // let's start with producing some messages with metadata
     for (uint64_t i = 1; i <= 10; i++) {
         auto message_metadata = "{"
-        "    \"condition\": \"condition #" + std::to_string(i) + "\","
-        "    \"somevalue\": " + std::to_string(i * 10) +
-        "}";
+                                "    \"condition\": \"condition #" + std::to_string(i) + "\","
+                                "    \"somevalue\": " + std::to_string(i * 10) +
+                                "}";
 
         std::string to_send = "message#" + std::to_string(i);
         auto send_size = to_send.size() + 1;
@@ -102,10 +102,13 @@ int main(int argc, char* argv[]) {
     std::cout << "Message with 30 < somevalue < 60" << std::endl;
     PrintMessages(metadatas, consumer);
 
-    auto now = std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
-    auto fifteen_minutes_ago = std::chrono::duration_cast<std::chrono::nanoseconds>((std::chrono::system_clock::now() - std::chrono::minutes(15)).time_since_epoch()).count();
+    auto now = std::chrono::duration_cast<std::chrono::nanoseconds>
+               (std::chrono::system_clock::now().time_since_epoch()).count();
+    auto fifteen_minutes_ago = std::chrono::duration_cast<std::chrono::nanoseconds>((std::chrono::system_clock::now() -
+                               std::chrono::minutes(15)).time_since_epoch()).count();
     std::cout << now << " " << fifteen_minutes_ago << std::endl;
-    metadatas = consumer->QueryMessages("timestamp < " + std::to_string(now) + " AND timestamp > " + std::to_string(fifteen_minutes_ago), "default", &err);
+    metadatas = consumer->QueryMessages("timestamp < " + std::to_string(now) + " AND timestamp > " + std::to_string(
+                                            fifteen_minutes_ago), "default", &err);
     exit_if_error("Cannot query messages", err);
     std::cout << "Messages in the last 15 minutes" << std::endl;
     PrintMessages(metadatas, consumer);
diff --git a/tests/automatic/consumer/consumer_api/consumer_api.c b/tests/automatic/consumer/consumer_api/consumer_api.c
index 0ec0990021aa97fa665e1ec7770ed50ad13f687d..d9996dddedfb6461620e0c0a267275efbd311742 100644
--- a/tests/automatic/consumer/consumer_api/consumer_api.c
+++ b/tests/automatic/consumer/consumer_api/consumer_api.c
@@ -29,6 +29,15 @@ void test_datasets(AsapoConsumerHandle consumer, AsapoStringHandle group_id) {
     asapo_free_handle(&md);
     asapo_free_handle(&dataset);
 
+// get last in group
+    dataset = asapo_consumer_get_last_dataset_ingroup(consumer,group_id, 0, "default", &err);
+    EXIT_IF_ERROR("asapo_consumer_get_last_dataset_ingroup", err);
+    asapo_free_handle(&dataset);
+    AsapoDataSetHandle ds_ig = asapo_consumer_get_last_dataset_ingroup(consumer,group_id, 0, "default", &err);
+    ASSERT_TRUE(ds_ig == NULL,"returns null in case of error");
+    ASSERT_TRUE(asapo_error_get_type(err) == kEndOfStream,"asapo_consumer_get_last_dataset_ingroup second time end of stream error");
+
+
 // get by id
     dataset = asapo_consumer_get_dataset_by_id(consumer, 8,0, "default", &err);
     EXIT_IF_ERROR("asapo_consumer_get_last_dataset", err);
@@ -114,6 +123,12 @@ void test_single(AsapoConsumerHandle consumer, AsapoStringHandle group_id) {
     ASSERT_EQ_INT(10,asapo_message_meta_get_id(md),"id");
     ASSERT_EQ_STRING("10",asapo_message_meta_get_name(md),"id");
 
+//last in group
+    asapo_consumer_get_last_ingroup(consumer, group_id, &md, NULL, "default",&err);
+    EXIT_IF_ERROR("asapo_consumer_get_last_ingroup", err);
+    asapo_consumer_get_last_ingroup(consumer, group_id, &md, NULL, "default",&err);
+    ASSERT_TRUE(asapo_error_get_type(err) == kEndOfStream,"asapo_consumer_get_last_ingroup second time end of stream error");
+
 //id
     asapo_consumer_get_by_id(consumer,8, &md, NULL, "default",&err);
     EXIT_IF_ERROR("asapo_consumer_get_by_id", err);
diff --git a/tests/automatic/consumer/consumer_api/consumer_api.cpp b/tests/automatic/consumer/consumer_api/consumer_api.cpp
index 9107d44ac36aadc981ca3f08537077c464479e37..4e1f99ec0320f10b265715a2c3632bffad4c5f6e 100644
--- a/tests/automatic/consumer/consumer_api/consumer_api.cpp
+++ b/tests/automatic/consumer/consumer_api/consumer_api.cpp
@@ -58,6 +58,13 @@ void TestSingle(const std::unique_ptr<asapo::Consumer>& consumer, const std::str
     M_AssertTrue(fi.name == "10", "GetLast filename");
     M_AssertTrue(fi.metadata == "{\"test\":10}", "GetLast metadata");
 
+    err = consumer->GetLast(group_id, &fi, nullptr, "default");
+    M_AssertTrue(err == nullptr, "GetLast inside group no error");
+    M_AssertTrue(fi.name == "10", "GetLast inside group filename");
+
+    err = consumer->GetLast(group_id, &fi, nullptr, "default");
+    M_AssertTrue(err == asapo::ConsumerErrorTemplates::kEndOfStream, "GetLast inside group error second time");
+
     err = consumer->GetNext(group_id, &fi, nullptr, "default");
     M_AssertTrue(err == nullptr, "GetNext2 no error");
     M_AssertTrue(fi.name == "2", "GetNext2 filename");
@@ -254,6 +261,11 @@ void TestDataset(const std::unique_ptr<asapo::Consumer>& consumer, const std::st
     M_AssertTrue(dataset.content[0].name == "10_1", "GetLastDataset filename");
     M_AssertTrue(dataset.content[0].metadata == "{\"test\":10}", "GetLastDataset metadata");
 
+    consumer->GetLastDataset(group_id, 0, "default", &err);
+    M_AssertTrue(err == nullptr, "GetLastDataset in group no error");
+    consumer->GetLastDataset(group_id, 0, "default", &err);
+    M_AssertTrue(err == asapo::ConsumerErrorTemplates::kEndOfStream, "GetLastDataset in group error second time");
+
     dataset = consumer->GetNextDataset(group_id, 0, "default", &err);
     M_AssertTrue(err == nullptr, "GetNextDataset2 no error");
     M_AssertTrue(dataset.content[0].name == "2_1", "GetNextDataSet2 filename");
diff --git a/tests/automatic/consumer/consumer_api_python/consumer_api.py b/tests/automatic/consumer/consumer_api_python/consumer_api.py
index 18a8dd3814210a20761dd626bf46325519fdb00c..86c32a9f32b43aaef6c52f6dac010a633a77dfd0 100644
--- a/tests/automatic/consumer/consumer_api_python/consumer_api.py
+++ b/tests/automatic/consumer/consumer_api_python/consumer_api.py
@@ -76,6 +76,16 @@ def check_single(consumer, group_id):
     assert_metaname(meta, "5", "get last1")
     assert_usermetadata(meta, "get last1")
 
+# get last in group
+    _, meta = consumer.get_last(meta_only=True,group_id=group_id)
+    assert_metaname(meta, "5", "get last in group")
+    try:
+      consumer.get_last(meta_only=True,group_id=group_id)
+    except asapo_consumer.AsapoEndOfStreamError:
+        pass
+    else:
+        exit_on_noerr("get last in group error second time")
+
     try:
         consumer.get_by_id(30, meta_only=True)
     except asapo_consumer.AsapoEndOfStreamError:
@@ -326,6 +336,17 @@ def check_dataset(consumer, group_id):
     assert_eq(res['expected_size'], 3, "get_last_dataset1 size ")
     assert_metaname(res['content'][2], "10_3", "get get_last_dataset1 name3")
 
+# get last dataset in group
+    res = consumer.get_last_dataset(group_id=group_id)
+    assert_eq(res['id'], 10, "get_last_dataset in group")
+    try:
+        consumer.get_last_dataset(group_id=group_id)
+    except asapo_consumer.AsapoEndOfStreamError:
+        pass
+    else:
+        exit_on_noerr("get last dataset in group error second time")
+
+
     res = consumer.get_next_dataset(group_id)
     assert_eq(res['id'], 3, "get_next_dataset3")