pylib/flake8/flake8/engine.py (205 lines of code) (raw):

# -*- coding: utf-8 -*- import errno import io import platform import re import sys import warnings import pycodestyle as pep8 from flake8 import __version__ from flake8 import callbacks from flake8.reporter import (multiprocessing, BaseQReport, FileQReport, QueueReport) from flake8 import util _flake8_noqa = re.compile(r'\s*# flake8[:=]\s*noqa', re.I).search EXTRA_EXCLUDE = ['.tox', '.eggs', '*.egg'] pep8.PROJECT_CONFIG = ('.flake8',) + pep8.PROJECT_CONFIG def _load_entry_point(entry_point, verify_requirements): """Based on the version of setuptools load an entry-point correctly. setuptools 11.3 deprecated `require=False` in the call to EntryPoint.load. To load entry points correctly after that without requiring all dependencies be present, the proper way is to call EntryPoint.resolve. This function will provide backwards compatibility for older versions of setuptools while also ensuring we do the right thing for the future. """ if hasattr(entry_point, 'resolve') and hasattr(entry_point, 'require'): if verify_requirements: entry_point.require() plugin = entry_point.resolve() else: plugin = entry_point.load(require=verify_requirements) return plugin def _register_extensions(): """Register all the extensions.""" extensions = util.OrderedSet() extensions.add(('pycodestyle', pep8.__version__)) parser_hooks = [] options_hooks = [] ignored_hooks = [] try: from pkg_resources import iter_entry_points except ImportError: pass else: for entry in iter_entry_points('flake8.extension'): # Do not verify that the requirements versions are valid checker = _load_entry_point(entry, verify_requirements=False) pep8.register_check(checker, codes=[entry.name]) extensions.add((checker.name, checker.version)) if hasattr(checker, 'add_options'): parser_hooks.append(checker.add_options) if hasattr(checker, 'parse_options'): options_hooks.append(checker.parse_options) if getattr(checker, 'off_by_default', False) is True: ignored_hooks.append(entry.name) return extensions, parser_hooks, options_hooks, ignored_hooks def get_parser(): """This returns an instance of optparse.OptionParser with all the extensions registered and options set. This wraps ``pep8.get_parser``. """ (extensions, parser_hooks, options_hooks, ignored) = _register_extensions() details = ', '.join('%s: %s' % ext for ext in extensions) python_version = get_python_version() parser = pep8.get_parser('flake8', '%s (%s) %s' % ( __version__, details, python_version )) for opt in ('--repeat', '--testsuite', '--doctest'): try: parser.remove_option(opt) except ValueError: pass if multiprocessing: parser.config_options.append('jobs') parser.add_option('-j', '--jobs', type='string', default='auto', help="number of jobs to run simultaneously, " "or 'auto'. This is ignored on Windows.") parser.add_option('--exit-zero', action='store_true', help="exit with code 0 even if there are errors") for parser_hook in parser_hooks: parser_hook(parser) # See comment above regarding why this has to be a callback. parser.add_option('--install-hook', default=False, dest='install_hook', help='Install the appropriate hook for this ' 'repository.', action='callback', callback=callbacks.install_vcs_hook) parser.add_option('--output-file', default=None, help='Redirect report to a file.', type='string', nargs=1, action='callback', callback=callbacks.redirect_stdout) parser.add_option('--enable-extensions', default='', dest='enable_extensions', help='Enable plugins and extensions that are disabled ' 'by default', type='string') parser.config_options.extend(['output-file', 'enable-extensions']) parser.ignored_extensions = ignored return parser, options_hooks class NoQAStyleGuide(pep8.StyleGuide): def input_file(self, filename, lines=None, expected=None, line_offset=0): """Run all checks on a Python source file.""" if self.options.verbose: print('checking %s' % filename) fchecker = self.checker_class( filename, lines=lines, options=self.options) # Any "flake8: noqa" comments to ignore the entire file? if any(_flake8_noqa(line) for line in fchecker.lines): return 0 return fchecker.check_all(expected=expected, line_offset=line_offset) class StyleGuide(object): """A wrapper StyleGuide object for Flake8 usage. This allows for OSErrors to be caught in the styleguide and special logic to be used to handle those errors. """ # Reasoning for error numbers is in-line below serial_retry_errors = set([ # ENOSPC: Added by sigmavirus24 # > On some operating systems (OSX), multiprocessing may cause an # > ENOSPC error while trying to trying to create a Semaphore. # > In those cases, we should replace the customized Queue Report # > class with pep8's StandardReport class to ensure users don't run # > into this problem. # > (See also: https://gitlab.com/pycqa/flake8/issues/74) errno.ENOSPC, # NOTE(sigmavirus24): When adding to this list, include the reasoning # on the lines before the error code and always append your error # code. Further, please always add a trailing `,` to reduce the visual # noise in diffs. ]) def __init__(self, **kwargs): # This allows us to inject a mocked StyleGuide in the tests. self._styleguide = kwargs.pop('styleguide', NoQAStyleGuide(**kwargs)) @property def options(self): return self._styleguide.options @property def paths(self): return self._styleguide.paths def _retry_serial(self, func, *args, **kwargs): """This will retry the passed function in serial if necessary. In the event that we encounter an OSError with an errno in :attr:`serial_retry_errors`, this function will retry this function using pep8's default Report class which operates in serial. """ try: return func(*args, **kwargs) except OSError as oserr: if oserr.errno in self.serial_retry_errors: self.init_report(pep8.StandardReport) else: raise return func(*args, **kwargs) def check_files(self, paths=None): return self._retry_serial(self._styleguide.check_files, paths=paths) def excluded(self, filename, parent=None): return self._styleguide.excluded(filename, parent=parent) def init_report(self, reporter=None): return self._styleguide.init_report(reporter) def input_file(self, filename, lines=None, expected=None, line_offset=0): return self._retry_serial( self._styleguide.input_file, filename=filename, lines=lines, expected=expected, line_offset=line_offset, ) def _parse_multi_options(options, split_token=','): r"""Split and strip and discard empties. Turns the following: A, B, into ["A", "B"]. Credit: Kristian Glass as contributed to pep8 """ if options: return [o.strip() for o in options.split(split_token) if o.strip()] else: return options def _disable_extensions(parser, options): ignored_extensions = set(getattr(parser, 'ignored_extensions', [])) enabled = set(_parse_multi_options(options.enable_extensions)) # Remove any of the selected extensions from the extensions ignored by # default. ignored_extensions -= enabled # Whatever is left afterwards should be unioned with options.ignore and # options.ignore should be updated with that. options.ignore = tuple(ignored_extensions.union(options.ignore)) def get_style_guide(**kwargs): """Parse the options and configure the checker. This returns a sub-class of ``pep8.StyleGuide``.""" kwargs['parser'], options_hooks = get_parser() styleguide = StyleGuide(**kwargs) options = styleguide.options _disable_extensions(kwargs['parser'], options) if options.exclude and not isinstance(options.exclude, list): options.exclude = pep8.normalize_paths(options.exclude) elif not options.exclude: options.exclude = [] # Add patterns in EXTRA_EXCLUDE to the list of excluded patterns options.exclude.extend(pep8.normalize_paths(EXTRA_EXCLUDE)) for options_hook in options_hooks: options_hook(options) if util.warn_when_using_jobs(options): if not multiprocessing: warnings.warn("The multiprocessing module is not available. " "Ignoring --jobs arguments.") if util.is_windows(): warnings.warn("The --jobs option is not available on Windows. " "Ignoring --jobs arguments.") if util.is_using_stdin(styleguide.paths): warnings.warn("The --jobs option is not compatible with supplying " "input using - . Ignoring --jobs arguments.") if options.diff: warnings.warn("The --diff option was specified with --jobs but " "they are not compatible. Ignoring --jobs arguments." ) if options.diff: options.jobs = None force_disable_jobs = util.force_disable_jobs(styleguide) if multiprocessing and options.jobs and not force_disable_jobs: if options.jobs.isdigit(): n_jobs = int(options.jobs) else: try: n_jobs = multiprocessing.cpu_count() except NotImplementedError: n_jobs = 1 if n_jobs > 1: options.jobs = n_jobs reporter = QueueReport if options.quiet: reporter = BaseQReport if options.quiet == 1: reporter = FileQReport report = styleguide.init_report(reporter) report.input_file = styleguide.input_file styleguide.runner = report.task_queue.put return styleguide def get_python_version(): # The implementation isn't all that important. try: impl = platform.python_implementation() + " " except AttributeError: # Python 2.5 impl = '' return '%s%s on %s' % (impl, platform.python_version(), platform.system()) def make_stdin_get_value(original): def stdin_get_value(): if not hasattr(stdin_get_value, 'cached_stdin'): value = original() if sys.version_info < (3, 0): stdin = io.BytesIO(value) else: stdin = io.StringIO(value) stdin_get_value.cached_stdin = stdin else: stdin = stdin_get_value.cached_stdin return stdin.getvalue() return stdin_get_value pep8.stdin_get_value = make_stdin_get_value(pep8.stdin_get_value)