-
Jorge Camarero Vera authoredJorge Camarero Vera authored
Agent.cpp 9.14 KiB
/*
* @project The CERN Tape Archive (CTA)
* @copyright Copyright © 2021-2022 CERN
* @license This program is free software, distributed under the terms of the GNU General Public
* Licence version 3 (GPL Version 3), copied verbatim in the file "COPYING". You can
* redistribute it and/or modify it under the terms of the GPL Version 3, or (at your
* option) any later version.
*
* This program is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
* PARTICULAR PURPOSE. See the GNU General Public License for more details.
*
* In applying this licence, CERN does not waive the privileges and immunities
* granted to it by virtue of its status as an Intergovernmental Organization or
* submit itself to any jurisdiction.
*/
#include "Agent.hpp"
#include "AgentRegister.hpp"
#include "RootEntry.hpp"
#include "GenericObject.hpp"
#include "common/exception/Errnum.hpp"
#include "ProtocolBuffersAlgorithms.hpp"
#include <string>
#include <sstream>
#include <ctime>
#include <cxxabi.h>
#include <google/protobuf/util/json_util.h>
cta::objectstore::Agent::Agent(GenericObject& go):
ObjectOps<serializers::Agent, serializers::Agent_t>(go.objectStore()) {
// Here we transplant the generic object into the new object
go.transplantHeader(*this);
// And interpret the header.
getPayloadFromHeader();
}
cta::objectstore::Agent::Agent(const std::string & name, Backend & os):
ObjectOps<serializers::Agent, serializers::Agent_t>(os, name), m_nextId(0) {}
void cta::objectstore::Agent::initialize() {
ObjectOps<serializers::Agent, serializers::Agent_t>::initialize();
m_payload.set_heartbeat(0);
m_payload.set_timeout_us(120*1000*1000);
m_payload.set_description("");
m_payload.set_being_garbage_collected(false);
m_payload.set_gc_needed(false);
m_payloadInterpreted = true;
}
void cta::objectstore::Agent::insertAndRegisterSelf(log::LogContext & lc) {
// We suppose initialize was already called, and that the agent name
// is set.
// We need to get hold of the agent register, which we suppose is available
RootEntry re(m_objectStore);
ScopedSharedLock reLock(re);
re.fetch();
AgentRegister ar(re.getAgentRegisterAddress(), m_objectStore);
reLock.release();
// Then we should first create a pointer to our agent
ScopedExclusiveLock arLock(ar);
ar.fetch();
ar.addAgent(getAddressIfSet());
ar.commit();
// Set the agent register as owner and backup owner
setBackupOwner(ar.getAddressIfSet());
setOwner(ar.getAddressIfSet());
// Create the agent
insert();
// And release the agent register's lock
arLock.release();
}
void cta::objectstore::Agent::removeAndUnregisterSelf(log::LogContext & lc) {
// Check that we own the proper lock
checkPayloadWritable();
// Check that we are not empty
if (!isEmpty()) {
// Log all owned objects by batches of 25.
std::list<std::string> ownershipList = getOwnershipList();
{
auto ownedObj = ownershipList.begin();
size_t currentCount=0;
size_t startIndex=0;
size_t endIndex=0;
std::string currentObjs;
while (ownedObj != ownershipList.end()) {
if (currentObjs.size()) currentObjs+= " ";
currentObjs += *(ownedObj++);
endIndex++;
if (++currentCount >= 25 || ownedObj == ownershipList.end()) {
log::ScopedParamContainer params(lc);
params.add("agentObject",getAddressIfSet())
.add("objects", currentObjs)
.add("startIndex", startIndex)
.add("endIndex", endIndex - 1)
.add("totalObjects", ownershipList.size());
lc.log(log::ERR, "In Agent::deleteAndUnregisterSelf: agent still owns objects. Here is a part of the list.");
startIndex = endIndex;
currentCount = 0;
currentObjs = "";
}
}
}
// Prepare exception to be thrown.
std::stringstream exSs;
exSs << "In Agent::removeAndUnregisterSelf: agent (agentObject=" << getAddressIfSet() << ") still owns objects. Here's the first few:";
size_t count=0;
for(auto i=ownershipList.begin(); i!=ownershipList.end(); i++) {
exSs << " " << *i;
if (++count > 3) {
exSs << " [... trimmed at 3 of " << ownershipList.size() << "]";
break;
}
}
throw AgentStillOwnsObjects(exSs.str());
}
// First delete ourselves
remove();
log::ScopedParamContainer params(lc);
params.add("agentObject", getAddressIfSet());
lc.log(log::INFO, "In Agent::removeAndUnregisterSelf(): Removed agent object.");
// Then we remove the dangling pointer about ourselves in the agent register.
// We need to get hold of the agent register, which we suppose is available
RootEntry re(m_objectStore);
re.fetchNoLock();
AgentRegister ar(re.getAgentRegisterAddress(), m_objectStore);
// Then we should first create a pointer to our agent
ScopedExclusiveLock arLock(ar);
ar.fetch();
ar.removeAgent(getAddressIfSet());
ar.commit();
arLock.release();
}
bool cta::objectstore::Agent::isEmpty() {
checkPayloadReadable();
if (m_payload.ownedobjects_size())
return false;
return true;
}
bool cta::objectstore::Agent::isBeingGarbageCollected() {
checkPayloadReadable();
return m_payload.being_garbage_collected();
}
void cta::objectstore::Agent::setBeingGarbageCollected() {
checkPayloadWritable();
m_payload.set_being_garbage_collected(true);
}
void cta::objectstore::Agent::setNeedsGarbageCollection() {
checkPayloadWritable();
m_payload.set_gc_needed(true);
}
bool cta::objectstore::Agent::needsGarbageCollection() {
checkPayloadReadable();
if (!m_payload.has_gc_needed()) return false;
return m_payload.gc_needed();
}
void cta::objectstore::Agent::garbageCollect(const std::string& presumedOwner, AgentReference & agentReference, log::LogContext & lc,
cta::catalogue::Catalogue & catalogue) {
checkPayloadWritable();
// We are here limited to checking the presumed owner and mark the agent as
// untracked in the agent register in case of match, else we do nothing
if (m_header.owner() == presumedOwner) {
// We need to get hold of the agent register, which we suppose is available
RootEntry re(m_objectStore);
ScopedSharedLock reLock(re);
re.fetch();
AgentRegister ar(re.getAgentRegisterAddress(), m_objectStore);
reLock.release();
// Then we should first create a pointer to our agent
ScopedExclusiveLock arLock(ar);
ar.fetch();
ar.untrackAgent(getAddressIfSet());
ar.commit();
arLock.release();
// We now mark ourselves as owned by the agent register
setOwner(ar.getAddressIfSet());
commit();
}
}
/*void cta::objectstore::Agent::create() {
if (!m_setupDone)
throw SetupNotDone("In Agent::create(): setup() not yet done");
RootEntry re(m_objectStore);
AgentRegister ar(re.allocateOrGetAgentRegister(*this), m_objectStore);
ar.addIntendedElement(selfName(), *this);
serializers::Agent as;
as.set_heartbeat(0);
writeChild(selfName(), as);
ar.upgradeIntendedElementToActual(selfName(), *this);
m_creationDone = true;
}*/
void cta::objectstore::Agent::addToOwnership(const std::string& name) {
checkPayloadWritable();
std::string * owned = m_payload.mutable_ownedobjects()->Add();
*owned = name;
}
void cta::objectstore::Agent::removeFromOwnership(std::string name) {
checkPayloadWritable();
serializers::removeString(m_payload.mutable_ownedobjects(), name);
}
std::list<std::string>
cta::objectstore::Agent::getOwnershipList() {
checkPayloadReadable();
std::list<std::string> ret;
for (int i=0; i<m_payload.ownedobjects_size(); i++) {
ret.push_back(m_payload.ownedobjects(i));
}
return ret;
}
std::set<std::string> cta::objectstore::Agent::getOwnershipSet() {
checkPayloadReadable();
std::set<std::string> ret;
for (const auto &oo: m_payload.ownedobjects())
ret.insert(oo);
return ret;
}
void cta::objectstore::Agent::resetOwnership(const std::set<std::string>& ownershipSet) {
checkPayloadWritable();
m_payload.mutable_ownedobjects()->Clear();
for (const auto &oo: ownershipSet)
*m_payload.mutable_ownedobjects()->Add() = oo;
}
size_t cta::objectstore::Agent::getOwnershipListSize() {
checkPayloadReadable();
return m_payload.ownedobjects_size();
}
void cta::objectstore::Agent::bumpHeartbeat() {
checkPayloadWritable();
auto heartbeat=m_payload.heartbeat()+1;
m_payload.set_heartbeat(heartbeat);
}
uint64_t cta::objectstore::Agent::getHeartbeatCount() {
checkPayloadReadable();
return m_payload.heartbeat();
}
double cta::objectstore::Agent::getTimeout() {
checkPayloadReadable();
return 0.000001 * m_payload.timeout_us();
}
void cta::objectstore::Agent::setTimeout_us(uint64_t timeout) {
checkPayloadWritable();
m_payload.set_timeout_us(timeout);
}
std::string cta::objectstore::Agent::dump() {
checkPayloadReadable();
google::protobuf::util::JsonPrintOptions options;
options.add_whitespace = true;
options.always_print_primitive_fields = true;
std::string headerDump;
google::protobuf::util::MessageToJsonString(m_payload, &headerDump, options);
return headerDump;
}