From b09b0803da77f394ba6c3e385eb177ec9e4ea20b Mon Sep 17 00:00:00 2001
From: Eric Cano <Eric.Cano@cern.ch>
Date: Tue, 25 Jul 2017 17:24:05 +0200
Subject: [PATCH] Added missing error handling when de/serializing protobufs.

---
 mediachanger/messages.cpp                     |  4 ++++
 objectstore/ArchiveRequest.cpp                | 14 +++++++++++--
 objectstore/GenericObject.cpp                 |  8 +++++++-
 objectstore/ObjectOps.hpp                     | 15 ++++++++++++--
 tapeserver/castor/messages/messages.cpp       |  5 ++++-
 .../tapeserver/daemon/RecallReportPacker.cpp  |  4 +++-
 tapeserver/daemon/DriveHandler.cpp            |  8 +++++++-
 tapeserver/daemon/DriveHandlerProxy.cpp       | 20 +++++++++++++++----
 8 files changed, 66 insertions(+), 12 deletions(-)

diff --git a/mediachanger/messages.cpp b/mediachanger/messages.cpp
index 42db8a5ad1..e2fd5a7ffc 100644
--- a/mediachanger/messages.cpp
+++ b/mediachanger/messages.cpp
@@ -132,6 +132,10 @@ std::string computeSHA1Base64(
   const google::protobuf::Message& msg) {
   std::string buffer;
   msg.SerializeToString(&buffer);
+  if (!msg.SerializeToString(&buffer)) {
+    throw cta::exception::Exception(std::string("In mediachanger::computeSHA1Base64(): could not serialize: ")+
+        msg.InitializationErrorString());
+  }
   return computeSHA1Base64(buffer.c_str(),buffer.size());
 }
 
diff --git a/objectstore/ArchiveRequest.cpp b/objectstore/ArchiveRequest.cpp
index 65d83cc729..e7ac5d1695 100644
--- a/objectstore/ArchiveRequest.cpp
+++ b/objectstore/ArchiveRequest.cpp
@@ -407,14 +407,24 @@ ArchiveRequest::AsyncJobOwnerUpdater* ArchiveRequest::asyncUpdateJobOwner(uint16
       [this, copyNumber, owner, previousOwner, &retRef](const std::string &in)->std::string {
         // We have a locked and fetched object, so we just need to work on its representation.
         serializers::ObjectHeader oh;
-        oh.ParseFromString(in);
+        if (!oh.ParseFromString(in)) {
+          // Use a the tolerant parser to assess the situation.
+          oh.ParsePartialFromString(in);
+          throw cta::exception::Exception(std::string("In ArchiveRequest::asyncUpdateJobOwner(): could not parse header: ")+
+            oh.InitializationErrorString());
+        }
         if (oh.type() != serializers::ObjectType::ArchiveRequest_t) {
           std::stringstream err;
           err << "In ArchiveRequest::asyncUpdateJobOwner()::lambda(): wrong object type: " << oh.type();
           throw cta::exception::Exception(err.str());
         }
         serializers::ArchiveRequest payload;
-        payload.ParseFromString(oh.payload());
+        if (!payload.ParseFromString(oh.payload())) {
+          // Use a the tolerant parser to assess the situation.
+          payload.ParsePartialFromString(oh.payload());
+          throw cta::exception::Exception(std::string("In ArchiveRequest::asyncUpdateJobOwner(): could not parse payload: ")+
+            payload.InitializationErrorString());
+        }
         // Find the copy number and change the owner.
         auto *jl=payload.mutable_jobs();
         for (auto j=jl->begin(); j!=jl->end(); j++) {
diff --git a/objectstore/GenericObject.cpp b/objectstore/GenericObject.cpp
index 00763ee794..2c2f5332d8 100644
--- a/objectstore/GenericObject.cpp
+++ b/objectstore/GenericObject.cpp
@@ -37,7 +37,13 @@ void GenericObject::fetch() {
     throw NotLocked("In ObjectOps::fetch(): object not locked");
   m_existingObject = true;
   // Get the header from the object store. We don't care for the type
-  m_header.ParseFromString(m_objectStore.read(getAddressIfSet()));
+  auto objData=m_objectStore.read(getAddressIfSet());
+  if (!m_header.ParseFromString(objData)) {
+    // Use a the tolerant parser to assess the situation.
+    m_header.ParsePartialFromString(objData);
+    throw cta::exception::Exception(std::string("In GenericObject::fetch: could not parse header: ") + 
+      m_header.InitializationErrorString());
+  }
   m_headerInterpreted = true;
 }
 
diff --git a/objectstore/ObjectOps.hpp b/objectstore/ObjectOps.hpp
index bbb50d82ff..b43e4f9423 100644
--- a/objectstore/ObjectOps.hpp
+++ b/objectstore/ObjectOps.hpp
@@ -296,12 +296,23 @@ public:
 protected:
   
   void getPayloadFromHeader () {
-    m_payload.ParseFromString(m_header.payload());
+    if (!m_payload.ParseFromString(m_header.payload())) {
+      // Use a the tolerant parser to assess the situation.
+      m_header.ParsePartialFromString(m_header.payload());
+      throw cta::exception::Exception(std::string("In <ObjectOps") + typeid(PayloadType).name() + 
+              ">::getPayloadFromHeader(): could not parse payload: " + m_header.InitializationErrorString());
+    }
     m_payloadInterpreted = true;
   }
 
   void getHeaderFromObjectStore () {
-    m_header.ParseFromString(m_objectStore.read(getAddressIfSet()));
+    auto objData=m_objectStore.read(getAddressIfSet());
+    if (!m_header.ParseFromString(objData)) {
+      // Use a the tolerant parser to assess the situation.
+      m_header.ParsePartialFromString(objData);
+      throw cta::exception::Exception(std::string("In <ObjectOps") + typeid(PayloadType).name() + 
+              ">::getHeaderFromObjectStore(): could not parse header: " + m_header.InitializationErrorString());
+    }
     if (m_header.type() != payloadTypeId) {
       std::stringstream err;
       err << "In ObjectOps::getHeaderFromObjectStore wrong object type: "
diff --git a/tapeserver/castor/messages/messages.cpp b/tapeserver/castor/messages/messages.cpp
index f8dc1e3d40..e15871702b 100644
--- a/tapeserver/castor/messages/messages.cpp
+++ b/tapeserver/castor/messages/messages.cpp
@@ -49,7 +49,10 @@ std::string castor::messages::computeSHA1Base64(const std::string &data) {
 std::string castor::messages::computeSHA1Base64(
   const google::protobuf::Message& msg) {
   std::string buffer;
-  msg.SerializeToString(&buffer);
+  if (!msg.SerializeToString(&buffer)) {
+    throw cta::exception::Exception(std::string("In castor::messages::computeSHA1Base64(): could not serialize: ")+
+        msg.InitializationErrorString());
+  }
   return computeSHA1Base64(buffer.c_str(),buffer.size());
 }
 
diff --git a/tapeserver/castor/tape/tapeserver/daemon/RecallReportPacker.cpp b/tapeserver/castor/tape/tapeserver/daemon/RecallReportPacker.cpp
index fc19a2aa01..a2d9799bdb 100644
--- a/tapeserver/castor/tape/tapeserver/daemon/RecallReportPacker.cpp
+++ b/tapeserver/castor/tape/tapeserver/daemon/RecallReportPacker.cpp
@@ -286,12 +286,14 @@ void RecallReportPacker::WorkerThread::run(){
   // Cross check that the queue is indeed empty.
   while (m_parent.m_fifo.size()) {
     // There is at least one extra report we missed.
+    // The drive status reports are not a problem though.
     cta::log::ScopedParamContainer spc(m_parent.m_lc);
     std::unique_ptr<Report> missedReport(m_parent.m_fifo.pop());
     spc.add("ReportType", typeid(*missedReport).name());
     if (missedReport->goingToEnd())
       spc.add("goingToEnd", "true");
-    m_parent.m_lc.log(cta::log::ERR, "Popping missed report (memory leak)");
+    if (typeid(*missedReport) != typeid(RecallReportPacker::ReportDriveStatus))
+      m_parent.m_lc.log(cta::log::ERR, "Popping missed report (memory leak)");
   }
   m_parent.m_lc.log(cta::log::DEBUG, "Finishing RecallReportPacker thread");
 }
diff --git a/tapeserver/daemon/DriveHandler.cpp b/tapeserver/daemon/DriveHandler.cpp
index 21c6891fb7..1aaff7138c 100644
--- a/tapeserver/daemon/DriveHandler.cpp
+++ b/tapeserver/daemon/DriveHandler.cpp
@@ -274,7 +274,13 @@ SubprocessHandler::ProcessingStatus DriveHandler::processEvent() {
   // Read from the socket pair 
   try {
     serializers::WatchdogMessage message;
-    message.ParseFromString(m_socketPair->receive());
+    auto datagram=m_socketPair->receive();
+    if (!message.ParseFromString(datagram)) {
+      // Use a the tolerant parser to assess the situation.
+      message.ParsePartialFromString(datagram);
+      throw cta::exception::Exception(std::string("In SubprocessHandler::ProcessingStatus(): could not parse message: ")+
+        message.InitializationErrorString());
+    }
     // Logs are processed in all cases
     processLogs(message);
     // If we report bytes, process the report (this is a heartbeat)
diff --git a/tapeserver/daemon/DriveHandlerProxy.cpp b/tapeserver/daemon/DriveHandlerProxy.cpp
index c98906495f..b461ea58ad 100644
--- a/tapeserver/daemon/DriveHandlerProxy.cpp
+++ b/tapeserver/daemon/DriveHandlerProxy.cpp
@@ -37,7 +37,10 @@ void DriveHandlerProxy::addLogParams(const std::string& unitName, const std::lis
     lp->set_value(p.getValue());
   }
   std::string buffer;
-  watchdogMessage.SerializeToString(&buffer);
+  if (!watchdogMessage.SerializeToString(&buffer)) {
+    throw cta::exception::Exception(std::string("In DriveHandlerProxy::addLogParams(): could not serialize: ")+
+        watchdogMessage.InitializationErrorString());
+  }
   m_socketPair.send(buffer);
 }
 
@@ -50,7 +53,10 @@ void DriveHandlerProxy::deleteLogParams(const std::string& unitName, const std::
     *lpn = pn;
   }
   std::string buffer;
-  watchdogMessage.SerializeToString(&buffer);
+  if (!watchdogMessage.SerializeToString(&buffer)) {
+    throw cta::exception::Exception(std::string("In DriveHandlerProxy::deleteLogParams(): could not serialize: ")+
+        watchdogMessage.InitializationErrorString());
+  }
   m_socketPair.send(buffer);
 }
 
@@ -66,7 +72,10 @@ void DriveHandlerProxy::reportHeartbeat(uint64_t totalTapeBytesMoved, uint64_t t
   watchdogMessage.set_totaltapebytesmoved(totalTapeBytesMoved);
   watchdogMessage.set_totaldiskbytesmoved(totalDiskBytesMoved);
   std::string buffer;
-  watchdogMessage.SerializeToString(&buffer);
+  if (!watchdogMessage.SerializeToString(&buffer)) {
+    throw cta::exception::Exception(std::string("In DriveHandlerProxy::reportHeartbeat(): could not serialize: ")+
+        watchdogMessage.InitializationErrorString());
+  }
   m_socketPair.send(buffer);
 }
 
@@ -80,7 +89,10 @@ void DriveHandlerProxy::reportState(const cta::tape::session::SessionState state
   watchdogMessage.set_sessiontype((uint32_t)type);
   watchdogMessage.set_vid(vid);
   std::string buffer;
-  watchdogMessage.SerializeToString(&buffer);
+  if (!watchdogMessage.SerializeToString(&buffer)) {
+    throw cta::exception::Exception(std::string("In DriveHandlerProxy::reportState(): could not serialize: ")+
+        watchdogMessage.InitializationErrorString());
+  }
   m_socketPair.send(buffer);
 }
 
-- 
GitLab