GarbageCollector.cpp 12.1 KB
Newer Older
1
/*
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
 * The CERN Tape Archive (CTA) project
 * Copyright (C) 2015  CERN
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */

19
#include "GarbageCollector.hpp"
20
#include "AgentReference.hpp"
21
#include "RootEntry.hpp"
22
#include <algorithm>
23
24

namespace cta { namespace objectstore {
25
const size_t GarbageCollector::c_maxWatchedAgentsPerGC = 5;
26

27
28
GarbageCollector::GarbageCollector(Backend & os, AgentReference & agentReference, catalogue::Catalogue & catalogue): 
  m_objectStore(os), m_catalogue(catalogue), m_ourAgentReference(agentReference), m_agentRegister(os) {
29
30
31
  RootEntry re(m_objectStore);
  ScopedSharedLock reLock(re);
  re.fetch();
32
  m_agentRegister.setAddress(re.getAgentRegisterAddress());
33
34
35
36
  reLock.release();
  ScopedSharedLock arLock(m_agentRegister);
  m_agentRegister.fetch();
}
37

38
39
40
41
void GarbageCollector::runOnePass(log::LogContext & lc) {
  trimGoneTargets(lc);
  aquireTargets(lc);
  checkHeartbeats(lc);
42
}
43
  
44
void GarbageCollector::trimGoneTargets(log::LogContext & lc) {
45
46
47
48
49
50
51
52
  ScopedSharedLock arLock(m_agentRegister);
  m_agentRegister.fetch();
  arLock.release();
  std::list<std::string> agentList = m_agentRegister.getAgents();
  for (std::map<std::string, AgentWatchdog * >::iterator wa
        = m_watchedAgents.begin();
      wa != m_watchedAgents.end();) {
    if (agentList.end() == std::find(agentList.begin(), agentList.end(), wa->first)) {
53
54
55
56
      Agent ourAgent(m_ourAgentReference.getAgentAddress(), m_objectStore);
      ScopedExclusiveLock oaLock(ourAgent);
      ourAgent.fetch();
      ourAgent.removeFromOwnership(wa->first);
57
      std::string removedAgent = wa->first;
58
      ourAgent.commit();
59
60
61
      oaLock.release();
      delete wa->second;
      m_watchedAgents.erase(wa++);
62
63
64
      log::ScopedParamContainer params(lc);
      params.add("agentAddress", removedAgent);
      lc.log(log::INFO, "In GarbageCollector::trimGoneTargets(): removed now gone agent.");
65
66
    } else {
      wa++;
67
68
    }
  }
69
}
70
  
71
void GarbageCollector::aquireTargets(log::LogContext & lc) {
72
73
74
75
76
77
78
  ScopedExclusiveLock arLock(m_agentRegister);
  m_agentRegister.fetch();
  // Get the list of untracked agents
  std::list<std::string> candidatesList = m_agentRegister.getUntrackedAgents();
  std::list<std::string>::const_iterator c = candidatesList.begin();
  // We can now take ownership of new agents, up to our max...
  // and we don't monitor ourselves!
79
80
  for (;m_watchedAgents.size() < c_maxWatchedAgentsPerGC
         && c!=candidatesList.end(); c++) {
81
    // We don't monitor ourselves
82
    if (*c != m_ourAgentReference.getAgentAddress()) {
83
84
85
86
87
88
89
90
91
92
93
94
95
96
      // So we have a candidate we might want to monitor
      // First, check that the agent entry exists, and that ownership
      // is indeed pointing to the agent register
      Agent ag(*c, m_objectStore);
      if (!ag.exists()) {
        // This is a dangling pointer to a dead object:
        // remove it in the agentRegister.
        m_agentRegister.removeAgent(*c);
        continue;
      }
      ScopedExclusiveLock agLock(ag);
      ag.fetch();
      // Check that the actual owner is the agent register.
      // otherwise, it should not be listed as an agent to monitor
97
98
      if (ag.getOwner() != m_agentRegister.getAddressIfSet()) {
        m_agentRegister.trackAgent(ag.getAddressIfSet());
99
100
101
102
103
104
        agLock.release();
        continue;
      }
      // We are now interested in tracking this agent. So we will transfer its
      // ownership. We alredy have an exclusive lock on the agent.
      // Lock ours
105
106
107
108
109
      Agent ourAgent(m_ourAgentReference.getAgentAddress(), m_objectStore);
      ScopedExclusiveLock oaLock(ourAgent);
      ourAgent.fetch();
      ourAgent.addToOwnership(ag.getAddressIfSet());
      ourAgent.commit();
110
      // We now have a pointer to the agent, we can make the ownership official
111
      ag.setOwner(ourAgent.getAddressIfSet());
112
      ag.commit();
113
114
      log::ScopedParamContainer params(lc);
      params.add("agentAddress", ag.getAddressIfSet())
115
            .add("gcAgentAddress", ourAgent.getAddressIfSet());
116
      lc.log(log::INFO, "In GarbageCollector::aquireTargets(): started tracking an untracked agent");
117
118
      // Agent is officially our, we can remove it from the untracked agent's
      // list
119
      m_agentRegister.trackAgent(ag.getAddressIfSet());
120
      m_agentRegister.commit();
121
122
123
      // Agent is now officially ours, let's track it. We have the release the 
      // lock to the agent before constructing the watchdog, which builds
      // its own agent objects (and need to lock the object store representation)
124
      std::string agentName = ag.getAddressIfSet();
125
126
      double timeout=ag.getTimeout();
      agLock.release();      
127
128
      m_watchedAgents[agentName] =
        new AgentWatchdog(agentName, m_objectStore);
129
      m_watchedAgents[ag.getAddressIfSet()]->setTimeout(timeout);
130
    }
131
  }
132
133
134
135
  // Commit all the modifications to the agent register
  m_agentRegister.commit();
}
 
136
void GarbageCollector::checkHeartbeats(log::LogContext & lc) {
137
138
139
140
141
142
  // Check the heartbeats of the watched agents
  // We can still fail on many steps
  for (std::map<std::string, AgentWatchdog * >::iterator wa = m_watchedAgents.begin();
      wa != m_watchedAgents.end();) {
    // Get the heartbeat. Clean dead agents and remove references to them
    if (!wa->second->checkAlive()) {
143
      cleanupDeadAgent(wa->first, lc);
144
145
146
147
148
      Agent ourAgent(m_ourAgentReference.getAgentAddress(), m_objectStore);
      ScopedExclusiveLock oaLock(ourAgent);
      ourAgent.fetch();
      ourAgent.removeFromOwnership(wa->first);
      ourAgent.commit();
149
150
151
152
153
154
155
156
      delete wa->second;
      m_watchedAgents.erase(wa++);
    } else {
      wa++;
    }
  }
}

Eric Cano's avatar
Eric Cano committed
157
158
159
160
161
162
163
164
165
void GarbageCollector::cleanupDeadAgent(const std::string & address, log::LogContext & lc) {
  // Check that we are still owners of the agent (sanity check).
  Agent agent(address, m_objectStore);
  ScopedExclusiveLock agLock(agent);
  agent.fetch();
  log::ScopedParamContainer params(lc);
  params.add("agentAddress", agent.getAddressIfSet())
       .add("gcAgentAddress", m_ourAgentReference.getAgentAddress());
  if (agent.getOwner() != m_ourAgentReference.getAgentAddress()) {
166
   log::ScopedParamContainer params(lc);
Eric Cano's avatar
Eric Cano committed
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
   lc.log(log::WARNING, "In GarbageCollector::cleanupDeadAgent(): skipping agent which is not owned by this garbage collector as thought.");
   // The agent is removed from our ownership by the calling function: we're done.
   return;
  }
  lc.log(log::INFO, "In GarbageCollector::cleanupDeadAgent(): will cleanup dead agent.");
  // Return all objects owned by the agent to their respective backup owners
  auto ownedObjects = agent.getOwnershipList();
  for (auto obj = ownedObjects.begin(); obj!= ownedObjects.end(); obj++) {
   // Find the object
   GenericObject go(*obj, m_objectStore);
   log::ScopedParamContainer params2(lc);
   params2.add("objectAddress", go.getAddressIfSet());
   // If the object does not exist, we're done.
   if (go.exists()) {
     ScopedExclusiveLock goLock(go);
     go.fetch();
     // Call GenericOpbject's garbage collect method, which in turn will
     // delegate to the object type's garbage collector.
185
     go.garbageCollectDispatcher(goLock, address, m_ourAgentReference, lc, m_catalogue);
Eric Cano's avatar
Eric Cano committed
186
187
188
     lc.log(log::INFO, "In GarbageCollector::cleanupDeadAgent(): garbage collected owned object.");
   } else {
     lc.log(log::INFO, "In GarbageCollector::cleanupDeadAgent(): skipping garbage collection of now gone object.");
189
   }
Eric Cano's avatar
Eric Cano committed
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
   // In all cases, relinquish ownership for this object
   agent.removeFromOwnership(*obj);
   agent.commit();
  }
  // We now processed all the owned objects. We can delete the agent's entry
  agent.removeAndUnregisterSelf();
  lc.log(log::INFO, "In GarbageCollector::cleanupDeadAgent(): agent entry removed.");
}

void GarbageCollector::reinjectOwnedObject(log::LogContext& lc) {
  // We have to release the agents we were following. PErformance is not an issue, so
  // we go in small steps.
  // First check the agents are indeed owned by us and still exist.
  std::list<std::string> stillTrackedAgents;
  std::list<std::string> goneAgents;
  std::list<std::string> notReallyOwnedAgents;
  std::list<std::string> inaccessibleAgents;
  {
    auto a = m_watchedAgents.begin();
    while(a!=m_watchedAgents.end()) {
      auto & agentAddress=a->first;
      log::ScopedParamContainer params(lc);
      params.add("agentAddress", agentAddress);
      // Check the agent is there, and ours.
      if (!m_objectStore.exists(agentAddress)) {
        goneAgents.emplace_back(agentAddress);
        lc.log(log::INFO, "In GarbageCollector::reinjectOwnedObject(): agent not present anymore.");
      } else {
        try {
          Agent ag(agentAddress, m_objectStore);
          ScopedSharedLock agl(ag);
          ag.fetch();
          if (ag.getOwner() == m_ourAgentReference.getAgentAddress()) {
            stillTrackedAgents.emplace_back(agentAddress);
            lc.log(log::INFO, "In GarbageCollector::reinjectOwnedObject(): agent still owned by us.");
          } else {
            params.add("currentOwner", ag.getOwner());
            notReallyOwnedAgents.emplace_back(agentAddress);
            lc.log(log::ERR, "In GarbageCollector::reinjectOwnedObject(): agent not owned by us.");
          }
        } catch (cta::exception::Exception & ex) {
          params.add("ExceptionMessage", ex.getMessageValue());
          lc.log(log::ERR, "In GarbageCollector::reinjectOwnedObject(): agent inaccessible.");
          inaccessibleAgents.emplace_back(a->first);
        }
      }
      a=m_watchedAgents.erase(a);
    }
  }
  {
    // We now have an overview of the situation. We can update the agent register based on that.
    ScopedExclusiveLock arLock(m_agentRegister);
    m_agentRegister.fetch();
    for (auto &sta: stillTrackedAgents) {
      m_agentRegister.untrackAgent(sta);
      log::ScopedParamContainer params(lc);
      params.add("agentAddress", sta);
      lc.log(log::INFO, "In GarbageCollector::reinjectOwnedObject(): untracked agent in registry.");
    }
    for (auto &ga: goneAgents) {
      m_agentRegister.removeAgent(ga);
      log::ScopedParamContainer params(lc);
      params.add("agentAddress", ga);
      lc.log(log::INFO, "In GarbageCollector::reinjectOwnedObject(): removed gone agent from registry.");
    }
    // This is all we are going to do. Other agents cannot be acted upon.
    m_agentRegister.commit();
  }
  // We can now remove ownership from the agents we still owned
  for (auto & sta: stillTrackedAgents) {
    log::ScopedParamContainer params(lc);
    params.add("agentAddress", sta);
    Agent ag (sta, m_objectStore);
    ScopedExclusiveLock agl(ag);
    ag.fetch();
    if (ag.getOwner() == m_ourAgentReference.getAgentAddress()) {
      ag.setOwner(m_agentRegister.getAddressIfSet());
      ag.commit();
Eric Cano's avatar
Eric Cano committed
268
      lc.log(log::INFO, "In GarbageCollector::reinjectOwnedObject(): changed agent ownership to registry.");
Eric Cano's avatar
Eric Cano committed
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
    } else {
      params.add("newOwner", ag.getOwner());
      lc.log(log::ERR, "In GarbageCollector::reinjectOwnedObject(): skipping agent whose ownership we lost last minute.");
    }
  }
  // We can now cleanup our own agent and remove it.
  Agent ourAg(m_ourAgentReference.getAgentAddress(), m_objectStore);
  ScopedExclusiveLock ourAgL(ourAg);
  ourAg.fetch();
  std::list<std::string> allAgents;
  allAgents.splice(allAgents.end(), stillTrackedAgents);
  allAgents.splice(allAgents.end(), notReallyOwnedAgents);
  allAgents.splice(allAgents.end(), inaccessibleAgents);
  allAgents.splice(allAgents.end(), goneAgents);
  for (auto & a: allAgents) {
    log::ScopedParamContainer params(lc);
    params.add("agentAddress", a);
    ourAg.removeFromOwnership(a);
    lc.log(log::ERR, "In GarbageCollector::reinjectOwnedObject(): removed agent from our ownership.");
  }
  ourAg.commit();
}

292
293
294



295
}}