Commit b09b0803 authored by Eric Cano's avatar Eric Cano
Browse files

Added missing error handling when de/serializing protobufs.

parent 1c995087
......@@ -132,6 +132,10 @@ std::string computeSHA1Base64(
const google::protobuf::Message& msg) {
std::string buffer;
msg.SerializeToString(&buffer);
if (!msg.SerializeToString(&buffer)) {
throw cta::exception::Exception(std::string("In mediachanger::computeSHA1Base64(): could not serialize: ")+
msg.InitializationErrorString());
}
return computeSHA1Base64(buffer.c_str(),buffer.size());
}
......
......@@ -407,14 +407,24 @@ ArchiveRequest::AsyncJobOwnerUpdater* ArchiveRequest::asyncUpdateJobOwner(uint16
[this, copyNumber, owner, previousOwner, &retRef](const std::string &in)->std::string {
// We have a locked and fetched object, so we just need to work on its representation.
serializers::ObjectHeader oh;
oh.ParseFromString(in);
if (!oh.ParseFromString(in)) {
// Use a the tolerant parser to assess the situation.
oh.ParsePartialFromString(in);
throw cta::exception::Exception(std::string("In ArchiveRequest::asyncUpdateJobOwner(): could not parse header: ")+
oh.InitializationErrorString());
}
if (oh.type() != serializers::ObjectType::ArchiveRequest_t) {
std::stringstream err;
err << "In ArchiveRequest::asyncUpdateJobOwner()::lambda(): wrong object type: " << oh.type();
throw cta::exception::Exception(err.str());
}
serializers::ArchiveRequest payload;
payload.ParseFromString(oh.payload());
if (!payload.ParseFromString(oh.payload())) {
// Use a the tolerant parser to assess the situation.
payload.ParsePartialFromString(oh.payload());
throw cta::exception::Exception(std::string("In ArchiveRequest::asyncUpdateJobOwner(): could not parse payload: ")+
payload.InitializationErrorString());
}
// Find the copy number and change the owner.
auto *jl=payload.mutable_jobs();
for (auto j=jl->begin(); j!=jl->end(); j++) {
......
......@@ -37,7 +37,13 @@ void GenericObject::fetch() {
throw NotLocked("In ObjectOps::fetch(): object not locked");
m_existingObject = true;
// Get the header from the object store. We don't care for the type
m_header.ParseFromString(m_objectStore.read(getAddressIfSet()));
auto objData=m_objectStore.read(getAddressIfSet());
if (!m_header.ParseFromString(objData)) {
// Use a the tolerant parser to assess the situation.
m_header.ParsePartialFromString(objData);
throw cta::exception::Exception(std::string("In GenericObject::fetch: could not parse header: ") +
m_header.InitializationErrorString());
}
m_headerInterpreted = true;
}
......
......@@ -296,12 +296,23 @@ public:
protected:
void getPayloadFromHeader () {
m_payload.ParseFromString(m_header.payload());
if (!m_payload.ParseFromString(m_header.payload())) {
// Use a the tolerant parser to assess the situation.
m_header.ParsePartialFromString(m_header.payload());
throw cta::exception::Exception(std::string("In <ObjectOps") + typeid(PayloadType).name() +
">::getPayloadFromHeader(): could not parse payload: " + m_header.InitializationErrorString());
}
m_payloadInterpreted = true;
}
void getHeaderFromObjectStore () {
m_header.ParseFromString(m_objectStore.read(getAddressIfSet()));
auto objData=m_objectStore.read(getAddressIfSet());
if (!m_header.ParseFromString(objData)) {
// Use a the tolerant parser to assess the situation.
m_header.ParsePartialFromString(objData);
throw cta::exception::Exception(std::string("In <ObjectOps") + typeid(PayloadType).name() +
">::getHeaderFromObjectStore(): could not parse header: " + m_header.InitializationErrorString());
}
if (m_header.type() != payloadTypeId) {
std::stringstream err;
err << "In ObjectOps::getHeaderFromObjectStore wrong object type: "
......
......@@ -49,7 +49,10 @@ std::string castor::messages::computeSHA1Base64(const std::string &data) {
std::string castor::messages::computeSHA1Base64(
const google::protobuf::Message& msg) {
std::string buffer;
msg.SerializeToString(&buffer);
if (!msg.SerializeToString(&buffer)) {
throw cta::exception::Exception(std::string("In castor::messages::computeSHA1Base64(): could not serialize: ")+
msg.InitializationErrorString());
}
return computeSHA1Base64(buffer.c_str(),buffer.size());
}
......
......@@ -286,12 +286,14 @@ void RecallReportPacker::WorkerThread::run(){
// Cross check that the queue is indeed empty.
while (m_parent.m_fifo.size()) {
// There is at least one extra report we missed.
// The drive status reports are not a problem though.
cta::log::ScopedParamContainer spc(m_parent.m_lc);
std::unique_ptr<Report> missedReport(m_parent.m_fifo.pop());
spc.add("ReportType", typeid(*missedReport).name());
if (missedReport->goingToEnd())
spc.add("goingToEnd", "true");
m_parent.m_lc.log(cta::log::ERR, "Popping missed report (memory leak)");
if (typeid(*missedReport) != typeid(RecallReportPacker::ReportDriveStatus))
m_parent.m_lc.log(cta::log::ERR, "Popping missed report (memory leak)");
}
m_parent.m_lc.log(cta::log::DEBUG, "Finishing RecallReportPacker thread");
}
......
......@@ -274,7 +274,13 @@ SubprocessHandler::ProcessingStatus DriveHandler::processEvent() {
// Read from the socket pair
try {
serializers::WatchdogMessage message;
message.ParseFromString(m_socketPair->receive());
auto datagram=m_socketPair->receive();
if (!message.ParseFromString(datagram)) {
// Use a the tolerant parser to assess the situation.
message.ParsePartialFromString(datagram);
throw cta::exception::Exception(std::string("In SubprocessHandler::ProcessingStatus(): could not parse message: ")+
message.InitializationErrorString());
}
// Logs are processed in all cases
processLogs(message);
// If we report bytes, process the report (this is a heartbeat)
......
......@@ -37,7 +37,10 @@ void DriveHandlerProxy::addLogParams(const std::string& unitName, const std::lis
lp->set_value(p.getValue());
}
std::string buffer;
watchdogMessage.SerializeToString(&buffer);
if (!watchdogMessage.SerializeToString(&buffer)) {
throw cta::exception::Exception(std::string("In DriveHandlerProxy::addLogParams(): could not serialize: ")+
watchdogMessage.InitializationErrorString());
}
m_socketPair.send(buffer);
}
......@@ -50,7 +53,10 @@ void DriveHandlerProxy::deleteLogParams(const std::string& unitName, const std::
*lpn = pn;
}
std::string buffer;
watchdogMessage.SerializeToString(&buffer);
if (!watchdogMessage.SerializeToString(&buffer)) {
throw cta::exception::Exception(std::string("In DriveHandlerProxy::deleteLogParams(): could not serialize: ")+
watchdogMessage.InitializationErrorString());
}
m_socketPair.send(buffer);
}
......@@ -66,7 +72,10 @@ void DriveHandlerProxy::reportHeartbeat(uint64_t totalTapeBytesMoved, uint64_t t
watchdogMessage.set_totaltapebytesmoved(totalTapeBytesMoved);
watchdogMessage.set_totaldiskbytesmoved(totalDiskBytesMoved);
std::string buffer;
watchdogMessage.SerializeToString(&buffer);
if (!watchdogMessage.SerializeToString(&buffer)) {
throw cta::exception::Exception(std::string("In DriveHandlerProxy::reportHeartbeat(): could not serialize: ")+
watchdogMessage.InitializationErrorString());
}
m_socketPair.send(buffer);
}
......@@ -80,7 +89,10 @@ void DriveHandlerProxy::reportState(const cta::tape::session::SessionState state
watchdogMessage.set_sessiontype((uint32_t)type);
watchdogMessage.set_vid(vid);
std::string buffer;
watchdogMessage.SerializeToString(&buffer);
if (!watchdogMessage.SerializeToString(&buffer)) {
throw cta::exception::Exception(std::string("In DriveHandlerProxy::reportState(): could not serialize: ")+
watchdogMessage.InitializationErrorString());
}
m_socketPair.send(buffer);
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment