def actively_watch_logs_for_error()

in ccmlib/cluster.py [0:0]


    def actively_watch_logs_for_error(self, on_error_call, interval=1):
        """
        Begins a thread that repeatedly scans system.log for new errors, every interval seconds.
        (The first pass covers the entire log contents written at that point,
        subsequent scans cover newly appended log messages).

        Reports new errors, by calling the provided callback with an OrderedDictionary
        mapping node name to a list of error lines.

        Returns the thread itself, which should be .join()'ed to wrap up execution,
        otherwise will run until the main thread exits.
        """
        class LogWatchingThread(threading.Thread):
            """
            This class is embedded here for now, because it is used only from
            within Cluster, and depends on cluster.nodelist().
            """

            def __init__(self, cluster):
                super(LogWatchingThread, self).__init__()
                self.cluster = cluster
                self.daemon = True  # set so that thread will exit when main thread exits
                self.req_stop_event = threading.Event()
                self.done_event = threading.Event()
                self.log_positions = defaultdict(int)

            def scan(self):
                errordata = OrderedDict()

                try:
                    for node in self.cluster.nodelist():
                        scan_from_mark = self.log_positions[node.name]
                        next_time_scan_from_mark = node.mark_log()
                        if next_time_scan_from_mark == scan_from_mark:
                            # log hasn't advanced, nothing to do for this node
                            continue
                        else:
                            errors = node.grep_log_for_errors_from(seek_start=scan_from_mark)
                        self.log_positions[node.name] = next_time_scan_from_mark
                        if errors:
                            errordata[node.name] = errors
                except IOError as e:
                    if 'No such file or directory' in str(e.strerror):
                        pass  # most likely log file isn't yet written

                    # in the case of unexpected error, report this thread to the callback
                    else:
                        errordata['log_scanner'] = [[str(e)]]

                return errordata

            def scan_and_report(self):
                errordata = self.scan()

                if errordata:
                    on_error_call(errordata)

            def run(self):
                common.debug("Log-watching thread starting.")

                # run until stop gets requested by .join()
                while not self.req_stop_event.is_set():
                    self.scan_and_report()
                    time.sleep(interval)

                try:
                    # do a final scan to make sure we got to the very end of the files
                    self.scan_and_report()
                finally:
                    common.debug("Log-watching thread exiting.")
                    # done_event signals that the scan completed a final pass
                    self.done_event.set()

            def join(self, timeout=None):
                # signals to the main run() loop that a stop is requested
                self.req_stop_event.set()
                # now wait for the main loop to get through a final log scan, and signal that it's done
                self.done_event.wait(timeout=interval * 2)  # need to wait at least interval seconds before expecting thread to finish. 2x for safety.
                super(LogWatchingThread, self).join(timeout)

        log_watcher = LogWatchingThread(self)
        log_watcher.start()
        return log_watcher