in ccmlib/cluster.py [0:0]
def actively_watch_logs_for_error(self, on_error_call, interval=1):
"""
Begins a thread that repeatedly scans system.log for new errors, every interval seconds.
(The first pass covers the entire log contents written at that point,
subsequent scans cover newly appended log messages).
Reports new errors, by calling the provided callback with an OrderedDictionary
mapping node name to a list of error lines.
Returns the thread itself, which should be .join()'ed to wrap up execution,
otherwise will run until the main thread exits.
"""
class LogWatchingThread(threading.Thread):
"""
This class is embedded here for now, because it is used only from
within Cluster, and depends on cluster.nodelist().
"""
def __init__(self, cluster):
super(LogWatchingThread, self).__init__()
self.cluster = cluster
self.daemon = True # set so that thread will exit when main thread exits
self.req_stop_event = threading.Event()
self.done_event = threading.Event()
self.log_positions = defaultdict(int)
def scan(self):
errordata = OrderedDict()
try:
for node in self.cluster.nodelist():
scan_from_mark = self.log_positions[node.name]
next_time_scan_from_mark = node.mark_log()
if next_time_scan_from_mark == scan_from_mark:
# log hasn't advanced, nothing to do for this node
continue
else:
errors = node.grep_log_for_errors_from(seek_start=scan_from_mark)
self.log_positions[node.name] = next_time_scan_from_mark
if errors:
errordata[node.name] = errors
except IOError as e:
if 'No such file or directory' in str(e.strerror):
pass # most likely log file isn't yet written
# in the case of unexpected error, report this thread to the callback
else:
errordata['log_scanner'] = [[str(e)]]
return errordata
def scan_and_report(self):
errordata = self.scan()
if errordata:
on_error_call(errordata)
def run(self):
common.debug("Log-watching thread starting.")
# run until stop gets requested by .join()
while not self.req_stop_event.is_set():
self.scan_and_report()
time.sleep(interval)
try:
# do a final scan to make sure we got to the very end of the files
self.scan_and_report()
finally:
common.debug("Log-watching thread exiting.")
# done_event signals that the scan completed a final pass
self.done_event.set()
def join(self, timeout=None):
# signals to the main run() loop that a stop is requested
self.req_stop_event.set()
# now wait for the main loop to get through a final log scan, and signal that it's done
self.done_event.wait(timeout=interval * 2) # need to wait at least interval seconds before expecting thread to finish. 2x for safety.
super(LogWatchingThread, self).join(timeout)
log_watcher = LogWatchingThread(self)
log_watcher.start()
return log_watcher