private FileFilter createFileFilter()

in pulsar-io/file/src/main/java/org/apache/pulsar/io/file/FileListingThread.java [136:188]


    private FileFilter createFileFilter(FileSourceConfig fileConfig) {
        final long minSize = Optional.ofNullable(fileConfig.getMinimumSize()).orElse(1);
        final Double maxSize = Optional.ofNullable(fileConfig.getMaximumSize()).orElse(Double.MAX_VALUE);
        final long minAge = Optional.ofNullable(fileConfig.getMinimumFileAge()).orElse(0);
        final Long maxAge = Optional.ofNullable(fileConfig.getMaximumFileAge()).orElse(Long.MAX_VALUE);
        final boolean ignoreHidden = Optional.ofNullable(fileConfig.getIgnoreHiddenFiles()).orElse(true);
        final Pattern filePattern = Pattern.compile(Optional.ofNullable(fileConfig.getFileFilter()).orElse("[^\\.].*"));
        final String indir = fileConfig.getInputDirectory();
        final String pathPatternStr = fileConfig.getPathFilter();
        final Pattern pathPattern = (!recurseDirs || pathPatternStr == null) ? null : Pattern.compile(pathPatternStr);

        return new FileFilter() {
            @Override
            public boolean accept(final File file) {
                if (minSize > file.length()) {
                    return false;
                }
                if (maxSize != null && maxSize < file.length()) {
                    return false;
                }
                final long fileAge = System.currentTimeMillis() - file.lastModified();
                if (minAge > fileAge) {
                    return false;
                }
                if (maxAge != null && maxAge < fileAge) {
                    return false;
                }
                if (ignoreHidden && file.isHidden()) {
                    return false;
                }
                if (pathPattern != null) {
                    Path reldir = Paths.get(indir).relativize(file.toPath()).getParent();
                    if (reldir != null && !reldir.toString().isEmpty()) {
                        if (!pathPattern.matcher(reldir.toString()).matches()) {
                            return false;
                        }
                    }
                }
                //Verify that we have at least read permissions on the file we're considering grabbing
                if (!Files.isReadable(file.toPath())) {
                    return false;
                }

                /* Verify that if we're not keeping original that we have write
                 * permissions on the directory the file is in
                 */
                if (!keepOriginal && !Files.isWritable(file.toPath().getParent())) {
                    return false;
                }
                return filePattern.matcher(file.getName()).matches();
            }
        };
    }