Semaphores.cpp 6.94 KB
Newer Older
1
2
3
#include "castor/server/MutexLocker.hpp"
#include "castor/server/Semaphores.hpp"
#include "castor/server/Threading.hpp"
4
#include "common/exception/Errnum.hpp"
5
#include "common/exception/Exception.hpp"
6
7
8
9
10
11
12
13
#include <errno.h>
#include <sys/time.h>

//------------------------------------------------------------------------------
//PosixSemaphore constructor
//------------------------------------------------------------------------------
castor::server::PosixSemaphore::PosixSemaphore(int initial)
 {
14
  cta::exception::Errnum::throwOnReturnedErrno(
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
    sem_init(&m_sem, 0, initial),
    "Error from sem_init in castor::server::PosixSemaphore::PosixSemaphore()");
}
//------------------------------------------------------------------------------
//PosixSemaphore destructor
//------------------------------------------------------------------------------
castor::server::PosixSemaphore::~PosixSemaphore() {
  /* There is a danger of destroying the semaphore in the consumer
     while the producer is still referring to the object.
     This mutex prevents this from happening. (The release method locks it). */
  MutexLocker ml(&m_mutexPosterProtection);
  sem_destroy(&m_sem);
}
//------------------------------------------------------------------------------
//acquire
//------------------------------------------------------------------------------
void castor::server::PosixSemaphore::acquire()
 {
  int ret;
  /* If we receive EINTR, we should just keep trying (signal interruption) */
  while((ret = sem_wait(&m_sem)) && EINTR == errno) {}
  /* If it was not EINTR, it's a failure */
37
  cta::exception::Errnum::throwOnNonZero(ret,
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
    "Error from sem_wait in castor::server::PosixSemaphore::acquire()");
}
//------------------------------------------------------------------------------
//acquire
//------------------------------------------------------------------------------
void castor::server::PosixSemaphore::acquireWithTimeout(uint64_t timeout_us)
 {
  int ret;
  struct timeval tv;
  gettimeofday(&tv, NULL);
  struct timespec ts;
  // Add microseconds
  ts.tv_nsec = (tv.tv_usec + (timeout_us % 1000000)) * 1000;
  // Forward carry and add seconds
  ts.tv_sec = tv.tv_sec + timeout_us / 1000000 + ts.tv_nsec / 1000000000;
  // Clip what we carried
  ts.tv_nsec %= 1000000000;
  /* If we receive EINTR, we should just keep trying (signal interruption) */
  while((ret = sem_timedwait(&m_sem, &ts)) && EINTR == errno) {}
  /* If we got a timeout, throw a special exception */
  if (ret && ETIMEDOUT == errno) { throw Timeout(); }
  /* If it was not EINTR, it's a failure */
60
  cta::exception::Errnum::throwOnNonZero(ret,
61
62
63
64
65
66
67
68
69
70
71
    "Error from sem_wait in castor::server::PosixSemaphore::acquireWithTimeout()");
}

//------------------------------------------------------------------------------
//tryAcquire
//------------------------------------------------------------------------------
bool castor::server::PosixSemaphore::tryAcquire()
 {
  int ret = sem_trywait(&m_sem);
  if (!ret) return true;
  if (ret && EAGAIN == errno) return false;
72
  cta::exception::Errnum::throwOnNonZero(ret,
73
74
75
76
77
78
79
80
81
82
83
    "Error from sem_trywait in castor::server::PosixSemaphore::tryAcquire()");
  /* unreacheable, just for compiler happiness */
  return false;
}
//------------------------------------------------------------------------------
//release
//------------------------------------------------------------------------------
void castor::server::PosixSemaphore::release(int n)
 {
  for (int i=0; i<n; i++) {
    MutexLocker ml(&m_mutexPosterProtection);
84
    cta::exception::Errnum::throwOnNonZero(sem_post(&m_sem),
85
86
87
88
89
90
91
92
      "Error from sem_post in castor::server::PosixSemaphore::release()");
  }
}
//------------------------------------------------------------------------------
//CondVarSemaphore constructor
//------------------------------------------------------------------------------
castor::server::CondVarSemaphore::CondVarSemaphore(int initial)
:m_value(initial) {
93
      cta::exception::Errnum::throwOnReturnedErrno(
94
95
        pthread_cond_init(&m_cond, NULL),
        "Error from pthread_cond_init in castor::server::CondVarSemaphore::CondVarSemaphore()");
96
      cta::exception::Errnum::throwOnReturnedErrno(
97
98
99
100
101
102
103
104
105
106
107
108
        pthread_mutex_init(&m_mutex, NULL),
        "Error from pthread_mutex_init in castor::server::CondVarSemaphore::CondVarSemaphore()");
    }

//------------------------------------------------------------------------------
//CondVarSemaphore destructor
//------------------------------------------------------------------------------
castor::server::CondVarSemaphore::~CondVarSemaphore() {
      /* Barrier protecting the last user */
      pthread_mutex_lock(&m_mutex);
      pthread_mutex_unlock(&m_mutex);
      /* Cleanup */
109
110
      int rc=pthread_cond_destroy(&m_cond);
      rc=rc;
111
112
113
114
115
116
117
      pthread_mutex_destroy(&m_mutex);
    }
//------------------------------------------------------------------------------
//acquire
//------------------------------------------------------------------------------
void castor::server::CondVarSemaphore::acquire()
 {
118
  cta::exception::Errnum::throwOnReturnedErrno(
119
120
121
    pthread_mutex_lock(&m_mutex),
    "Error from pthread_mutex_lock in castor::server::CondVarSemaphore::acquire()");
  while (m_value <= 0) {
122
    cta::exception::Errnum::throwOnReturnedErrno(
123
124
125
126
      pthread_cond_wait(&m_cond, &m_mutex),
      "Error from pthread_cond_wait in castor::server::CondVarSemaphore::acquire()");
  }
  m_value--;
127
  cta::exception::Errnum::throwOnReturnedErrno(
128
129
130
131
132
133
134
135
136
    pthread_mutex_unlock(&m_mutex),
    "Error from pthread_mutex_unlock in castor::server::CondVarSemaphore::acquire()");
}
//------------------------------------------------------------------------------
//tryAcquire
//------------------------------------------------------------------------------
bool castor::server::CondVarSemaphore::tryAcquire()
 {
  bool ret;
137
  cta::exception::Errnum::throwOnReturnedErrno(
138
139
140
141
142
143
144
145
    pthread_mutex_lock(&m_mutex),
      "Error from pthread_mutex_lock in castor::server::CondVarSemaphore::tryAcquire()");
  if (m_value > 0) {
    ret = true;
    m_value--;
  } else {
    ret = false;
  }
146
  cta::exception::Errnum::throwOnReturnedErrno(
147
148
149
150
151
152
153
154
155
156
    pthread_mutex_unlock(&m_mutex),
      "Error from pthread_mutex_unlock in castor::server::CondVarSemaphore::tryAcquire()");
  return ret;
}
//------------------------------------------------------------------------------
//release
//------------------------------------------------------------------------------
void castor::server::CondVarSemaphore::release(int n)
 {
  for (int i=0; i<n; i++) {
157
  cta::exception::Errnum::throwOnReturnedErrno(
158
159
160
    pthread_mutex_lock(&m_mutex),
      "Error from pthread_mutex_unlock in castor::server::CondVarSemaphore::release()");
    m_value++;
161
  cta::exception::Errnum::throwOnReturnedErrno(
162
163
    pthread_cond_signal(&m_cond),
      "Error from pthread_cond_signal in castor::server::CondVarSemaphore::release()");
164
  cta::exception::Errnum::throwOnReturnedErrno(
165
166
167
168
    pthread_mutex_unlock(&m_mutex),
      "Error from pthread_mutex_unlock in castor::server::CondVarSemaphore::release()");
  }
}