GarbageCollector.cpp 12.9 KB
Newer Older
1
/*
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
 * The CERN Tape Archive (CTA) project
 * Copyright (C) 2015  CERN
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */

19
#include "GarbageCollector.hpp"
20
#include "AgentReference.hpp"
21
#include "RootEntry.hpp"
22
#include <algorithm>
23
24

namespace cta { namespace objectstore {
25
const size_t GarbageCollector::c_maxWatchedAgentsPerGC = 25;
26

27
28
GarbageCollector::GarbageCollector(Backend & os, AgentReference & agentReference, catalogue::Catalogue & catalogue): 
  m_objectStore(os), m_catalogue(catalogue), m_ourAgentReference(agentReference), m_agentRegister(os) {
29
30
31
  RootEntry re(m_objectStore);
  ScopedSharedLock reLock(re);
  re.fetch();
32
  m_agentRegister.setAddress(re.getAgentRegisterAddress());
33
34
35
36
  reLock.release();
  ScopedSharedLock arLock(m_agentRegister);
  m_agentRegister.fetch();
}
37

38
39
40
41
void GarbageCollector::runOnePass(log::LogContext & lc) {
  trimGoneTargets(lc);
  aquireTargets(lc);
  checkHeartbeats(lc);
42
}
43
  
44
void GarbageCollector::trimGoneTargets(log::LogContext & lc) {
45
46
47
48
49
50
51
52
  ScopedSharedLock arLock(m_agentRegister);
  m_agentRegister.fetch();
  arLock.release();
  std::list<std::string> agentList = m_agentRegister.getAgents();
  for (std::map<std::string, AgentWatchdog * >::iterator wa
        = m_watchedAgents.begin();
      wa != m_watchedAgents.end();) {
    if (agentList.end() == std::find(agentList.begin(), agentList.end(), wa->first)) {
53
54
55
56
      Agent ourAgent(m_ourAgentReference.getAgentAddress(), m_objectStore);
      ScopedExclusiveLock oaLock(ourAgent);
      ourAgent.fetch();
      ourAgent.removeFromOwnership(wa->first);
57
      std::string removedAgent = wa->first;
58
      ourAgent.commit();
59
60
61
      oaLock.release();
      delete wa->second;
      m_watchedAgents.erase(wa++);
62
63
64
      log::ScopedParamContainer params(lc);
      params.add("agentAddress", removedAgent);
      lc.log(log::INFO, "In GarbageCollector::trimGoneTargets(): removed now gone agent.");
65
66
    } else {
      wa++;
67
68
    }
  }
69
}
70
  
71
void GarbageCollector::aquireTargets(log::LogContext & lc) {
72
73
74
75
76
77
78
  ScopedExclusiveLock arLock(m_agentRegister);
  m_agentRegister.fetch();
  // Get the list of untracked agents
  std::list<std::string> candidatesList = m_agentRegister.getUntrackedAgents();
  std::list<std::string>::const_iterator c = candidatesList.begin();
  // We can now take ownership of new agents, up to our max...
  // and we don't monitor ourselves!
79
80
  for (;m_watchedAgents.size() < c_maxWatchedAgentsPerGC
         && c!=candidatesList.end(); c++) {
81
    // We don't monitor ourselves
82
    if (*c != m_ourAgentReference.getAgentAddress()) {
83
84
85
86
      // So we have a candidate we might want to monitor
      // First, check that the agent entry exists, and that ownership
      // is indeed pointing to the agent register
      Agent ag(*c, m_objectStore);
87
      ScopedExclusiveLock agLock;
88
      Agent ourAgent(m_ourAgentReference.getAgentAddress(), m_objectStore);
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
      ScopedExclusiveLock oaLock;
      try {
        if (!ag.exists()) {
          // This is a dangling pointer to a dead object:
          // remove it in the agentRegister.
          m_agentRegister.removeAgent(*c);
          continue;
        }
        agLock.lock(ag);
        ag.fetch();
        // Check that the actual owner is the agent register.
        // otherwise, it should not be listed as an agent to monitor
        if (ag.getOwner() != m_agentRegister.getAddressIfSet()) {
          m_agentRegister.trackAgent(ag.getAddressIfSet());
          agLock.release();
          continue;
        }
        // We are now interested in tracking this agent. So we will transfer its
        // ownership. We alredy have an exclusive lock on the agent.
        // Lock ours
        
        oaLock.lock(ourAgent);
        ourAgent.fetch();
        ourAgent.addToOwnership(ag.getAddressIfSet());
        ourAgent.commit();
        // We now have a pointer to the agent, we can make the ownership official
        ag.setOwner(ourAgent.getAddressIfSet());
        ag.commit();
      } catch (cta::exception::Exception & ex) {
        // We received an exception. This can happen is the agent disappears under our feet.
        // This is fine, we just let go this time, and trimGoneTargets() will just de-reference
        // it later. But if the object is present, we have a problem.
        if (m_objectStore.exists(*c)) throw;
      }
123
124
      log::ScopedParamContainer params(lc);
      params.add("agentAddress", ag.getAddressIfSet())
125
            .add("gcAgentAddress", ourAgent.getAddressIfSet());
126
      lc.log(log::INFO, "In GarbageCollector::aquireTargets(): started tracking an untracked agent");
127
      // Agent is officially ours, we can remove it from the untracked agent's
128
      // list
129
      m_agentRegister.trackAgent(ag.getAddressIfSet());
130
      m_agentRegister.commit();
131
132
133
      // Agent is now officially ours, let's track it. We have the release the 
      // lock to the agent before constructing the watchdog, which builds
      // its own agent objects (and need to lock the object store representation)
134
      std::string agentName = ag.getAddressIfSet();
135
136
      double timeout=ag.getTimeout();
      agLock.release();      
137
138
      m_watchedAgents[agentName] =
        new AgentWatchdog(agentName, m_objectStore);
139
      m_watchedAgents[ag.getAddressIfSet()]->setTimeout(timeout);
140
    }
141
  }
142
143
144
145
  // Commit all the modifications to the agent register
  m_agentRegister.commit();
}
 
146
void GarbageCollector::checkHeartbeats(log::LogContext & lc) {
147
148
149
150
151
  // Check the heartbeats of the watched agents
  // We can still fail on many steps
  for (std::map<std::string, AgentWatchdog * >::iterator wa = m_watchedAgents.begin();
      wa != m_watchedAgents.end();) {
    // Get the heartbeat. Clean dead agents and remove references to them
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
    try {
      if (!wa->second->checkAlive()) {
        cleanupDeadAgent(wa->first, lc);
        Agent ourAgent(m_ourAgentReference.getAgentAddress(), m_objectStore);
        ScopedExclusiveLock oaLock(ourAgent);
        ourAgent.fetch();
        ourAgent.removeFromOwnership(wa->first);
        ourAgent.commit();
        delete wa->second;
        m_watchedAgents.erase(wa++);
      } else {
        wa++;
      }
    } catch (cta::exception::Exception & ex) {
      if (wa->second->checkExists()) {
        // We really have a problem: we failed to check on an agent, that is still present.
        throw;
      } else {
        // The agent is simply gone on the wrong time. It will be trimmed from the list on the next pass.
        wa++;
      }
173
174
175
176
    }
  }
}

Eric Cano's avatar
Eric Cano committed
177
178
179
180
181
182
183
184
185
void GarbageCollector::cleanupDeadAgent(const std::string & address, log::LogContext & lc) {
  // Check that we are still owners of the agent (sanity check).
  Agent agent(address, m_objectStore);
  ScopedExclusiveLock agLock(agent);
  agent.fetch();
  log::ScopedParamContainer params(lc);
  params.add("agentAddress", agent.getAddressIfSet())
       .add("gcAgentAddress", m_ourAgentReference.getAgentAddress());
  if (agent.getOwner() != m_ourAgentReference.getAgentAddress()) {
186
   log::ScopedParamContainer params(lc);
Eric Cano's avatar
Eric Cano committed
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
   lc.log(log::WARNING, "In GarbageCollector::cleanupDeadAgent(): skipping agent which is not owned by this garbage collector as thought.");
   // The agent is removed from our ownership by the calling function: we're done.
   return;
  }
  lc.log(log::INFO, "In GarbageCollector::cleanupDeadAgent(): will cleanup dead agent.");
  // Return all objects owned by the agent to their respective backup owners
  auto ownedObjects = agent.getOwnershipList();
  for (auto obj = ownedObjects.begin(); obj!= ownedObjects.end(); obj++) {
   // Find the object
   GenericObject go(*obj, m_objectStore);
   log::ScopedParamContainer params2(lc);
   params2.add("objectAddress", go.getAddressIfSet());
   // If the object does not exist, we're done.
   if (go.exists()) {
     ScopedExclusiveLock goLock(go);
     go.fetch();
     // Call GenericOpbject's garbage collect method, which in turn will
     // delegate to the object type's garbage collector.
205
     go.garbageCollectDispatcher(goLock, address, m_ourAgentReference, lc, m_catalogue);
Eric Cano's avatar
Eric Cano committed
206
207
208
     lc.log(log::INFO, "In GarbageCollector::cleanupDeadAgent(): garbage collected owned object.");
   } else {
     lc.log(log::INFO, "In GarbageCollector::cleanupDeadAgent(): skipping garbage collection of now gone object.");
209
   }
Eric Cano's avatar
Eric Cano committed
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
   // In all cases, relinquish ownership for this object
   agent.removeFromOwnership(*obj);
   agent.commit();
  }
  // We now processed all the owned objects. We can delete the agent's entry
  agent.removeAndUnregisterSelf();
  lc.log(log::INFO, "In GarbageCollector::cleanupDeadAgent(): agent entry removed.");
}

void GarbageCollector::reinjectOwnedObject(log::LogContext& lc) {
  // We have to release the agents we were following. PErformance is not an issue, so
  // we go in small steps.
  // First check the agents are indeed owned by us and still exist.
  std::list<std::string> stillTrackedAgents;
  std::list<std::string> goneAgents;
  std::list<std::string> notReallyOwnedAgents;
  std::list<std::string> inaccessibleAgents;
  {
    auto a = m_watchedAgents.begin();
    while(a!=m_watchedAgents.end()) {
      auto & agentAddress=a->first;
      log::ScopedParamContainer params(lc);
      params.add("agentAddress", agentAddress);
      // Check the agent is there, and ours.
      if (!m_objectStore.exists(agentAddress)) {
        goneAgents.emplace_back(agentAddress);
        lc.log(log::INFO, "In GarbageCollector::reinjectOwnedObject(): agent not present anymore.");
      } else {
        try {
          Agent ag(agentAddress, m_objectStore);
          ScopedSharedLock agl(ag);
          ag.fetch();
          if (ag.getOwner() == m_ourAgentReference.getAgentAddress()) {
            stillTrackedAgents.emplace_back(agentAddress);
            lc.log(log::INFO, "In GarbageCollector::reinjectOwnedObject(): agent still owned by us.");
          } else {
            params.add("currentOwner", ag.getOwner());
            notReallyOwnedAgents.emplace_back(agentAddress);
            lc.log(log::ERR, "In GarbageCollector::reinjectOwnedObject(): agent not owned by us.");
          }
        } catch (cta::exception::Exception & ex) {
          params.add("ExceptionMessage", ex.getMessageValue());
          lc.log(log::ERR, "In GarbageCollector::reinjectOwnedObject(): agent inaccessible.");
          inaccessibleAgents.emplace_back(a->first);
        }
      }
      a=m_watchedAgents.erase(a);
    }
  }
  {
    // We now have an overview of the situation. We can update the agent register based on that.
    ScopedExclusiveLock arLock(m_agentRegister);
    m_agentRegister.fetch();
    for (auto &sta: stillTrackedAgents) {
      m_agentRegister.untrackAgent(sta);
      log::ScopedParamContainer params(lc);
      params.add("agentAddress", sta);
      lc.log(log::INFO, "In GarbageCollector::reinjectOwnedObject(): untracked agent in registry.");
    }
    for (auto &ga: goneAgents) {
      m_agentRegister.removeAgent(ga);
      log::ScopedParamContainer params(lc);
      params.add("agentAddress", ga);
      lc.log(log::INFO, "In GarbageCollector::reinjectOwnedObject(): removed gone agent from registry.");
    }
    // This is all we are going to do. Other agents cannot be acted upon.
    m_agentRegister.commit();
  }
  // We can now remove ownership from the agents we still owned
  for (auto & sta: stillTrackedAgents) {
    log::ScopedParamContainer params(lc);
    params.add("agentAddress", sta);
    Agent ag (sta, m_objectStore);
    ScopedExclusiveLock agl(ag);
    ag.fetch();
    if (ag.getOwner() == m_ourAgentReference.getAgentAddress()) {
      ag.setOwner(m_agentRegister.getAddressIfSet());
      ag.commit();
Eric Cano's avatar
Eric Cano committed
288
      lc.log(log::INFO, "In GarbageCollector::reinjectOwnedObject(): changed agent ownership to registry.");
Eric Cano's avatar
Eric Cano committed
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
    } else {
      params.add("newOwner", ag.getOwner());
      lc.log(log::ERR, "In GarbageCollector::reinjectOwnedObject(): skipping agent whose ownership we lost last minute.");
    }
  }
  // We can now cleanup our own agent and remove it.
  Agent ourAg(m_ourAgentReference.getAgentAddress(), m_objectStore);
  ScopedExclusiveLock ourAgL(ourAg);
  ourAg.fetch();
  std::list<std::string> allAgents;
  allAgents.splice(allAgents.end(), stillTrackedAgents);
  allAgents.splice(allAgents.end(), notReallyOwnedAgents);
  allAgents.splice(allAgents.end(), inaccessibleAgents);
  allAgents.splice(allAgents.end(), goneAgents);
  for (auto & a: allAgents) {
    log::ScopedParamContainer params(lc);
    params.add("agentAddress", a);
    ourAg.removeFromOwnership(a);
    lc.log(log::ERR, "In GarbageCollector::reinjectOwnedObject(): removed agent from our ownership.");
  }
  ourAg.commit();
}

312
313
314



315
}}