Agent.cpp 5.72 KB
Newer Older
1
/*
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
 * The CERN Tape Archive (CTA) project
 * Copyright (C) 2015  CERN
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */

19
20
21
#include "Agent.hpp"
#include "AgentRegister.hpp"
#include "RootEntry.hpp"
22
#include "common/exception/Errnum.hpp"
23
#include "ProtcolBuffersAlgorithms.hpp"
24
25
#include <string>
#include <sstream>
26
#include <iomanip>
27
28
#include <sys/syscall.h>
#include <ctime>
29
#include <cxxabi.h>
30
#include <unistd.h>
31

32
cta::objectstore::Agent::Agent(Backend & os): 
33
  ObjectOps<serializers::Agent>(os), m_nextId(0) {}
34

35
36
37
cta::objectstore::Agent::Agent(const std::string & name, Backend & os): 
  ObjectOps<serializers::Agent>(os, name), m_nextId(0) {}

38
39
40
41
42
43
44
45
void cta::objectstore::Agent::initialize() {
  ObjectOps<serializers::Agent>::initialize();
  m_payload.set_heartbeat(0);
  m_payload.set_timeout_us(60*1000*1000);
  m_payload.set_description("");
  m_payloadInterpreted = true;
}

46
void cta::objectstore::Agent::generateName(const std::string & typeName) {
47
48
49
50
51
52
53
54
55
56
57
58
59
60
  std::stringstream aid;
  // Get time
  time_t now = time(0);
  struct tm localNow;
  localtime_r(&now, &localNow);
  // Get hostname
  char host[200];
  cta::exception::Errnum::throwOnMinusOne(gethostname(host, sizeof(host)),
    "In AgentId::AgentId:  failed to gethostname");
  // gettid is a safe system call (never fails)
  aid << typeName << "-" << host << "-" << syscall(SYS_gettid) << "-"
    << 1900 + localNow.tm_year
    << std::setfill('0') << std::setw(2) 
    << 1 + localNow.tm_mon
61
62
63
64
    << std::setw(2) << localNow.tm_mday << "-"
    << std::setw(2) << localNow.tm_hour << ":"
    << std::setw(2) << localNow.tm_min << ":"
    << std::setw(2) << localNow.tm_sec;
65
  setAddress(aid.str());
66
67
}

68
69
70
71
72
73
74
void cta::objectstore::Agent::insertAndRegisterSelf() {
  // We suppose initialize was already called, and that the agent name
  // is set.
  // We need to get hold of the agent register, which we suppose is available
  RootEntry re(m_objectStore);
  ScopedSharedLock reLock(re);
  re.fetch();
75
  AgentRegister ar(re.getAgentRegisterAddress(), m_objectStore);
76
77
78
79
  reLock.release();
  // Then we should first create a pointer to our agent
  ScopedExclusiveLock arLock(ar);
  ar.fetch();
80
  ar.addAgent(getAddressIfSet());
81
82
  ar.commit();
  // Set the agent register as owner and backup owner
83
84
  setBackupOwner(ar.getAddressIfSet());
  setOwner(ar.getAddressIfSet());
85
86
87
88
89
90
  // Create the agent
  insert();
  // And release the agent register's lock
  arLock.release();
}

Eric Cano's avatar
Eric Cano committed
91
92
93
94
95
96
97
void cta::objectstore::Agent::removeAndUnregisterSelf() {
  // Check that we own the proper lock
  checkPayloadWritable();
  // Check that we are not empty
  if (!isEmpty()) {
    throw AgentStillOwnsObjects("In Agent::deleteAndUnregisterSelf: agent still owns objects");
  }
98
99
100
101
102
103
104
  // First delete ourselves
  remove();
  // Then we remove the dangling pointer about ourselves in the agent register.
  // We need to get hold of the agent register, which we suppose is available
  RootEntry re(m_objectStore);
  ScopedSharedLock reLock(re);
  re.fetch();
105
  AgentRegister ar(re.getAgentRegisterAddress(), m_objectStore);
106
107
108
109
  reLock.release();
  // Then we should first create a pointer to our agent
  ScopedExclusiveLock arLock(ar);
  ar.fetch();
110
  ar.removeAgent(getAddressIfSet());
111
112
113
114
  ar.commit();
  arLock.release();
}

Eric Cano's avatar
Eric Cano committed
115
116
117
118
119
120
121
122
bool cta::objectstore::Agent::isEmpty() {
  checkPayloadReadable();
  if (m_payload.ownedobjects_size())
    return false;
  return true;
}


123
/*void cta::objectstore::Agent::create() {
124
125
  if (!m_setupDone)
    throw SetupNotDone("In Agent::create(): setup() not yet done");
126
127
  RootEntry re(m_objectStore);
  AgentRegister ar(re.allocateOrGetAgentRegister(*this), m_objectStore);
128
  ar.addIntendedElement(selfName(), *this);
129
  serializers::Agent as;
130
  as.set_heartbeat(0);
131
132
133
  writeChild(selfName(), as);
  ar.upgradeIntendedElementToActual(selfName(), *this);
  m_creationDone = true;
134
}*/
135

136
std::string cta::objectstore::Agent::nextId(const std::string & childType) {
137
  std::stringstream id;
138
  id << childType << "-" << getAddressIfSet() << "-" << m_nextId++;
139
140
141
  return id.str();
}

142
void cta::objectstore::Agent::addToOwnership(std::string name) {
143
144
  checkPayloadWritable();
  std::string * owned = m_payload.mutable_ownedobjects()->Add();
145
  *owned = name;
146
147
}

148
void cta::objectstore::Agent::removeFromOwnership(std::string name) {
149
150
151
  checkPayloadWritable();
  serializers::removeString(m_payload.mutable_ownedobjects(), name);
  /*
152
153
154
  bool found;
  do {
    found = false;
155
156
    for (int i=0; i<m_payload.mutable_ownedobjects()->size(); i++) {
      if (name == *m_payload.mutable_ownedobjects(i)) {
157
        found = true;
158
159
        m_payload.mutable_ownedobjects()->SwapElements(i, m_payload.mutable_ownedobjects()->size()-1);
        m_payload.mutable_ownedobjects()->RemoveLast();
160
161
162
        break;
      }
    }
163
  } while (found);*/
164
165
}

166
std::list<std::string> 
167
168
  cta::objectstore::Agent::getOwnershipList() {
  checkPayloadReadable();
169
  std::list<std::string> ret;
170
171
  for (int i=0; i<m_payload.ownedobjects_size(); i++) {
    ret.push_back(m_payload.ownedobjects(i));
172
173
174
175
  }
  return ret;
}

176
177
178
void cta::objectstore::Agent::bumpHeartbeat() {
  checkPayloadWritable();
  m_payload.set_heartbeat(m_payload.heartbeat()+1);
179
180
}

181
182
183
uint64_t cta::objectstore::Agent::getHeartbeatCount() {
  checkPayloadReadable();
  return m_payload.heartbeat();
184
185
}