Skip to content
Snippets Groups Projects
Commit f271176c authored by Benjamin Fiorini's avatar Benjamin Fiorini
Browse files

monitoring : improve hbase-consumer performances

parent 335fa544
Branches
Tags
No related merge requests found
......@@ -89,7 +89,6 @@ def main():
tt.start()
# Start polling on the dirq
_history = list() # we keep history of already seen msg
msg_c = 0
log_line_c = 0
start_time = time.time()
......@@ -109,30 +108,24 @@ def main():
continue
except Exception:
raise
# look if we already sent the msg
if name in _history:
# if yes, just drop it, it's too old
_history.remove(name)
try:
message_queue.remove(name)
except OSError, exc:
logging.warning(str(exc))
else:
# if no, put it in history list, and send it
_history.append(name)
# get list of messages
data_list = json.loads(message.body)['data']
# push to buffer
BUFFER.appendleft(data_list)
# accounting/debug
msg_c += 1
log_line_c += len(data_list)
# get list of messages
data_list = json.loads(message.body)['data']
# push to buffer
BUFFER.appendleft(data_list)
# accounting/debug
msg_c += 1
log_line_c += len(data_list)
# Then remove the message
try:
message_queue.remove(name)
except OSError, exc:
logging.warning(str(exc))
except Exception:
logging.critical("Found critical error : exiting")
logging.critical(str(traceback.format_exc()))
exit_handler()
time.sleep(0.5)
time.sleep(0.1)
try:
# purge old messages
message_queue.purge(maxtemp=config_dic['message_max_temp'],
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment