GarbageCollector.cpp 7.81 KB
Newer Older
1
/*
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
 * The CERN Tape Archive (CTA) project
 * Copyright (C) 2015  CERN
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */

19
20
#include "GarbageCollector.hpp"
#include "RootEntry.hpp"
21
//#include "FIFO.hpp"
22
#include <algorithm>
23
24

namespace cta { namespace objectstore {
25
const size_t GarbageCollector::c_maxWatchedAgentsPerGC = 2;
26

27
28
29
30
31
GarbageCollector::GarbageCollector(Backend & os, Agent & agent): 
  m_objectStore(os), m_ourAgent(agent), m_agentRegister(os), m_timeout(5.0) {
  RootEntry re(m_objectStore);
  ScopedSharedLock reLock(re);
  re.fetch();
32
  m_agentRegister.setAddress(re.getAgentRegisterAddress());
33
34
35
36
  reLock.release();
  ScopedSharedLock arLock(m_agentRegister);
  m_agentRegister.fetch();
}
37
38
39
40

void GarbageCollector::setTimeout(double timeout) {
  m_timeout = timeout;
}
41
42
43
44
45
46
47
48
  
void GarbageCollector::runOnePass() {
  // Bump our own heart beat
  {
    ScopedExclusiveLock lock (m_ourAgent);
    m_ourAgent.fetch();
    m_ourAgent.bumpHeartbeat();
    m_ourAgent.commit();
49
  }
50
51
52
53
  trimGoneTargets();
  aquireTargets();
  checkHeartbeats();
}
54
  
55
56
57
58
59
60
61
62
63
64
void GarbageCollector::trimGoneTargets() {
  ScopedSharedLock arLock(m_agentRegister);
  m_agentRegister.fetch();
  arLock.release();
  std::list<std::string> agentList = m_agentRegister.getAgents();
  for (std::map<std::string, AgentWatchdog * >::iterator wa
        = m_watchedAgents.begin();
      wa != m_watchedAgents.end();) {
    if (agentList.end() == std::find(agentList.begin(), agentList.end(), wa->first)) {
      ScopedExclusiveLock oaLock(m_ourAgent);
65
      m_ourAgent.fetch();
66
      m_ourAgent.removeFromOwnership(wa->first);
67
      m_ourAgent.commit();
68
69
70
71
72
      oaLock.release();
      delete wa->second;
      m_watchedAgents.erase(wa++);
    } else {
      wa++;
73
74
    }
  }
75
}
76
  
77
78
79
80
81
82
83
84
void GarbageCollector::aquireTargets() {
  ScopedExclusiveLock arLock(m_agentRegister);
  m_agentRegister.fetch();
  // Get the list of untracked agents
  std::list<std::string> candidatesList = m_agentRegister.getUntrackedAgents();
  std::list<std::string>::const_iterator c = candidatesList.begin();
  // We can now take ownership of new agents, up to our max...
  // and we don't monitor ourselves!
85
86
  for (;m_watchedAgents.size() < c_maxWatchedAgentsPerGC
         && c!=candidatesList.end(); c++) {
87
    // We don't monitor ourselves
88
    if (*c != m_ourAgent.getAddressIfSet()) {
89
90
91
92
93
94
95
96
97
98
99
100
101
102
      // So we have a candidate we might want to monitor
      // First, check that the agent entry exists, and that ownership
      // is indeed pointing to the agent register
      Agent ag(*c, m_objectStore);
      if (!ag.exists()) {
        // This is a dangling pointer to a dead object:
        // remove it in the agentRegister.
        m_agentRegister.removeAgent(*c);
        continue;
      }
      ScopedExclusiveLock agLock(ag);
      ag.fetch();
      // Check that the actual owner is the agent register.
      // otherwise, it should not be listed as an agent to monitor
103
104
      if (ag.getOwner() != m_agentRegister.getAddressIfSet()) {
        m_agentRegister.trackAgent(ag.getAddressIfSet());
105
106
107
108
109
110
111
112
        agLock.release();
        continue;
      }
      // We are now interested in tracking this agent. So we will transfer its
      // ownership. We alredy have an exclusive lock on the agent.
      // Lock ours
      ScopedExclusiveLock oaLock(m_ourAgent);
      m_ourAgent.fetch();
113
      m_ourAgent.addToOwnership(ag.getAddressIfSet());
114
115
      m_ourAgent.commit();
      // We now have a pointer to the agent, we can make the ownership official
116
      ag.setOwner(m_ourAgent.getAddressIfSet());
117
118
119
      ag.commit();
      // And we can remove the now dangling pointer from the agent register
      // (we hold an exclusive lock all along)
120
      m_agentRegister.trackAgent(ag.getAddressIfSet());
121
      m_agentRegister.commit();
122
123
      // Agent is officially our, we can remove it from the untracked agent's
      // list
124
      m_agentRegister.trackAgent(ag.getAddressIfSet());
125
126
127
      // Agent is now officially ours, let's track it. We have the release the 
      // lock to the agent before constructing the watchdog, which builds
      // its own agent objects (and need to lock the object store representation)
128
      std::string agentName = ag.getAddressIfSet();
129
130
131
      agLock.release();
      m_watchedAgents[agentName] =
        new AgentWatchdog(agentName, m_objectStore);
132
      m_watchedAgents[ag.getAddressIfSet()]->setTimeout(m_timeout);
133
    }
134
  }
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
  // Commit all the modifications to the agent register
  m_agentRegister.commit();
}
 
void GarbageCollector::checkHeartbeats() {
  // Check the heartbeats of the watched agents
  // We can still fail on many steps
  for (std::map<std::string, AgentWatchdog * >::iterator wa = m_watchedAgents.begin();
      wa != m_watchedAgents.end();) {
    // Get the heartbeat. Clean dead agents and remove references to them
    if (!wa->second->checkAlive()) {
      cleanupDeadAgent(wa->first);
      ScopedExclusiveLock oaLock(m_ourAgent);
      m_ourAgent.removeFromOwnership(wa->first);
      m_ourAgent.commit();
      delete wa->second;
      m_watchedAgents.erase(wa++);
    } else {
      wa++;
    }
  }
}

 void GarbageCollector::cleanupDeadAgent(const std::string & name) {
   // Check that we are still owners of the agent (sanity check).
   Agent agent(name, m_objectStore);
   ScopedExclusiveLock agLock(agent);
   agent.fetch();
163
   if (agent.getOwner() != m_ourAgent.getAddressIfSet()) {
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
     throw cta::exception::Exception("In GarbageCollector::cleanupDeadAgent: the ownership is not ours as expected");
   }
   // Return all objects owned by the agent to their respective backup owners
   std::list<std::string> ownedObjects = agent.getOwnershipList();
   for (std::list<std::string>::iterator obj = ownedObjects.begin();
       obj!= ownedObjects.end(); obj++) {
     // Find the object
     GenericObject go(*obj, m_objectStore);
     // If the objet does not exist, we're done.
     if (!go.exists())
       continue;
     ScopedExclusiveLock goLock(go);
     go.fetch();
     // if the object is already owned by someone else, this was a dangling pointer:
     // nothing to do.
     if (go.getOwner() != name)
       continue;
     // We reached a point where we have to actually move to ownership to somewhere
     // else. Find that somewhere else
     GenericObject gContainter(go.getBackupOwner(), m_objectStore);
     if (!go.exists()) {
       throw cta::exception::Exception("In GarbageCollector::cleanupDeadAgent: backup owner does not exist!");
     }
     ScopedSharedLock gContLock(gContainter);
     gContainter.fetch();
189
     serializers::ObjectType containerType = gContainter.type();
190
     gContLock.release();
191
     switch(containerType) {
192
193
194
195
196
197
198
199
200
201
202
203
204
//       case serializers::FIFO_t: {
//         FIFO fifo(go.getBackupOwner(), m_objectStore);
//         ScopedExclusiveLock ffLock(fifo);
//         fifo.fetch();
//         fifo.pushIfNotPresent(go.getNameIfSet());
//         fifo.commit();
//         // We now have a pointer to the object. Make the change official.
//         go.setOwner(go.getBackupOwner());
//         go.commit();
//         ffLock.release();
//         goLock.release();
//         break;
//       }
205
206
207
208
209
       default: {
         throw cta::exception::Exception("In GarbageCollector::cleanupDeadAgent: unexpected container type!");
       }
     }
   }
210
   // We now processed all the owned objects. We can delete the agent's entry
Eric Cano's avatar
Eric Cano committed
211
   agent.removeAndUnregisterSelf();
212
213
214
215
 }



216
}}