pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/AbstractHdfsConnector.java [51:248]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
public abstract class AbstractHdfsConnector {

    private static final Object RESOURCES_LOCK = new Object();

    // Hadoop Configuration, Filesystem, and UserGroupInformation (optional)
    protected final AtomicReference<HdfsResources> hdfsResources = new AtomicReference<>();
    protected AbstractHdfsConfig connectorConfig;
    protected CompressionCodecFactory compressionCodecFactory;

    public AbstractHdfsConnector() {
       hdfsResources.set(new HdfsResources(null, null, null));
    }

    /*
     * Reset Hadoop Configuration and FileSystem based on the supplied configuration resources.
     */
    protected HdfsResources resetHDFSResources(HdfsSinkConfig hdfsSinkConfig) throws IOException {
        Configuration config = new ExtendedConfiguration();
        config.setClassLoader(Thread.currentThread().getContextClassLoader());

        getConfig(config, connectorConfig.getHdfsConfigResources());

        // first check for timeout on HDFS connection, because FileSystem has a hard coded 15 minute timeout
        checkHdfsUriForTimeout(config);

        /* Disable caching of Configuration and FileSystem objects, else we cannot reconfigure
         * the processor without a complete restart
         */
        String disableCacheName = String.format("fs.%s.impl.disable.cache",
                FileSystem.getDefaultUri(config).getScheme());
        config.set(disableCacheName, "true");

        // If kerberos is enabled, create the file system as the kerberos principal
        // -- use RESOURCE_LOCK to guarantee UserGroupInformation is accessed by only a single thread at at time
        FileSystem fs;
        UserGroupInformation ugi;
        synchronized (RESOURCES_LOCK) {
            if (SecurityUtil.isSecurityEnabled(config)) {
                ugi = SecurityUtil.loginKerberos(config,
                        connectorConfig.getKerberosUserPrincipal(), connectorConfig.getKeytab());
                fs = getFileSystemAsUser(config, ugi);
            } else {
                config.set("ipc.client.fallback-to-simple-auth-allowed", "true");
                config.set("hadoop.security.authentication", "simple");
                ugi = SecurityUtil.loginSimple(config);
                fs = getFileSystemAsUser(config, ugi);
            }
        }
        return new HdfsResources(config, fs, ugi);
    }

    private static Configuration getConfig(final Configuration config, String res) throws IOException {
        boolean foundResources = false;
        if (null != res) {
            String[] resources = res.split(",");
            for (String resource : resources) {
                config.addResource(new Path(resource.trim()));
                foundResources = true;
            }
        }

        if (!foundResources) {
            // check that at least 1 non-default resource is available on the classpath
            String configStr = config.toString();
            for (String resource : configStr.substring(configStr.indexOf(":") + 1).split(",")) {
                if (!resource.contains("default") && config.getResource(resource.trim()) != null) {
                    foundResources = true;
                    break;
                }
            }
        }

        if (!foundResources) {
            throw new IOException("Could not find any of the " + res + " on the classpath");
        }
        return config;
    }

    /*
     * Reduce the timeout of a socket connection from the default in FileSystem.get()
     */
    protected void checkHdfsUriForTimeout(Configuration config) throws IOException {
        URI hdfsUri = FileSystem.getDefaultUri(config);
        String address = hdfsUri.getAuthority();
        int port = hdfsUri.getPort();
        if (address == null || address.isEmpty() || port < 0) {
            return;
        }
        InetSocketAddress namenode = NetUtils.createSocketAddr(address, port);
        SocketFactory socketFactory = NetUtils.getDefaultSocketFactory(config);
        Socket socket = null;
        try {
            socket = socketFactory.createSocket();
            NetUtils.connect(socket, namenode, 1000); // 1 second timeout
        } finally {
            IOUtils.closeQuietly(socket);
        }
    }

    /**
     * This exists in order to allow unit tests to override it so that they don't take several
     * minutes waiting for UDP packets to be received.
     *
     * @param config
     *            the configuration to use
     * @return the FileSystem that is created for the given Configuration
     * @throws IOException
     *             if unable to create the FileSystem
     */
    protected FileSystem getFileSystem(final Configuration config) throws IOException {
        return FileSystem.get(config);
    }

    protected FileSystem getFileSystemAsUser(final Configuration config, UserGroupInformation ugi) throws IOException {
        try {
            return ugi.doAs((PrivilegedExceptionAction<FileSystem>) () -> FileSystem.get(config));
        } catch (InterruptedException e) {
            throw new IOException("Unable to create file system: " + e.getMessage());
        }
    }

    protected Configuration getConfiguration() {
        return hdfsResources.get().getConfiguration();
    }

    protected FileSystem getFileSystem() {
        return hdfsResources.get().getFileSystem();
    }

    protected UserGroupInformation getUserGroupInformation() {
        return hdfsResources.get().getUserGroupInformation();
    }

    protected String getEncoding() {
        return StringUtils.isNotBlank(connectorConfig.getEncoding())
                   ? connectorConfig.getEncoding() : Charset.defaultCharset().name();
    }

    protected CompressionCodec getCompressionCodec() {
       if (connectorConfig.getCompression() == null) {
           return null;
       }

       CompressionCodec codec = getCompressionCodecFactory()
               .getCodecByName(connectorConfig.getCompression().name());

       return (codec != null) ? codec : new DefaultCodec();
    }

    protected CompressionCodecFactory getCompressionCodecFactory() {
        if (compressionCodecFactory == null) {
            compressionCodecFactory = new CompressionCodecFactory(getConfiguration());
        }

        return compressionCodecFactory;
    }

    /**
     * Extending Hadoop Configuration to prevent it from caching classes that can't be found. Since users may be
     * adding additional JARs to the classpath we don't want them to have to restart the JVM to be able to load
     * something that was previously not found, but might now be available.
     * Reference the original getClassByNameOrNull from Configuration.
     */
    static class ExtendedConfiguration extends Configuration {

        private final Map<ClassLoader, Map<String, WeakReference<Class<?>>>> cacheClasses = new WeakHashMap<>();

        @Override
        public Class<?> getClassByNameOrNull(String name) {
            final ClassLoader classLoader = getClassLoader();

            Map<String, WeakReference<Class<?>>> map;
            synchronized (cacheClasses) {
                map = cacheClasses.get(classLoader);
                if (map == null) {
                    map = Collections.synchronizedMap(new WeakHashMap<>());
                    cacheClasses.put(classLoader, map);
                }
            }

            Class<?> clazz = null;
            WeakReference<Class<?>> ref = map.get(name);
            if (ref != null) {
                clazz = ref.get();
            }

            if (clazz == null) {
                try {
                    clazz = Class.forName(name, true, classLoader);
                } catch (ClassNotFoundException | NoClassDefFoundError e) {
                    return null;
                }
                // two putters can race here, but they'll put the same class
                map.put(name, new WeakReference<>(clazz));
                return clazz;
            } else {
                // cache hit
                return clazz;
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



pulsar-io/hdfs3/src/main/java/org/apache/pulsar/io/hdfs3/AbstractHdfsConnector.java [51:248]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
public abstract class AbstractHdfsConnector {

    private static final Object RESOURCES_LOCK = new Object();

    // Hadoop Configuration, Filesystem, and UserGroupInformation (optional)
    protected final AtomicReference<HdfsResources> hdfsResources = new AtomicReference<>();
    protected AbstractHdfsConfig connectorConfig;
    protected CompressionCodecFactory compressionCodecFactory;

    public AbstractHdfsConnector() {
       hdfsResources.set(new HdfsResources(null, null, null));
    }

    /*
     * Reset Hadoop Configuration and FileSystem based on the supplied configuration resources.
     */
    protected HdfsResources resetHDFSResources(HdfsSinkConfig hdfsSinkConfig) throws IOException {
        Configuration config = new ExtendedConfiguration();
        config.setClassLoader(Thread.currentThread().getContextClassLoader());

        getConfig(config, connectorConfig.getHdfsConfigResources());

        // first check for timeout on HDFS connection, because FileSystem has a hard coded 15 minute timeout
        checkHdfsUriForTimeout(config);

        /* Disable caching of Configuration and FileSystem objects, else we cannot reconfigure
         * the processor without a complete restart
         */
        String disableCacheName = String.format("fs.%s.impl.disable.cache",
                FileSystem.getDefaultUri(config).getScheme());
        config.set(disableCacheName, "true");

        // If kerberos is enabled, create the file system as the kerberos principal
        // -- use RESOURCE_LOCK to guarantee UserGroupInformation is accessed by only a single thread at at time
        FileSystem fs;
        UserGroupInformation ugi;
        synchronized (RESOURCES_LOCK) {
            if (SecurityUtil.isSecurityEnabled(config)) {
                ugi = SecurityUtil.loginKerberos(config,
                        connectorConfig.getKerberosUserPrincipal(), connectorConfig.getKeytab());
                fs = getFileSystemAsUser(config, ugi);
            } else {
                config.set("ipc.client.fallback-to-simple-auth-allowed", "true");
                config.set("hadoop.security.authentication", "simple");
                ugi = SecurityUtil.loginSimple(config);
                fs = getFileSystemAsUser(config, ugi);
            }
        }
        return new HdfsResources(config, fs, ugi);
    }

    private static Configuration getConfig(final Configuration config, String res) throws IOException {
        boolean foundResources = false;
        if (null != res) {
            String[] resources = res.split(",");
            for (String resource : resources) {
                config.addResource(new Path(resource.trim()));
                foundResources = true;
            }
        }

        if (!foundResources) {
            // check that at least 1 non-default resource is available on the classpath
            String configStr = config.toString();
            for (String resource : configStr.substring(configStr.indexOf(":") + 1).split(",")) {
                if (!resource.contains("default") && config.getResource(resource.trim()) != null) {
                    foundResources = true;
                    break;
                }
            }
        }

        if (!foundResources) {
            throw new IOException("Could not find any of the " + res + " on the classpath");
        }
        return config;
    }

    /*
     * Reduce the timeout of a socket connection from the default in FileSystem.get()
     */
    protected void checkHdfsUriForTimeout(Configuration config) throws IOException {
        URI hdfsUri = FileSystem.getDefaultUri(config);
        String address = hdfsUri.getAuthority();
        int port = hdfsUri.getPort();
        if (address == null || address.isEmpty() || port < 0) {
            return;
        }
        InetSocketAddress namenode = NetUtils.createSocketAddr(address, port);
        SocketFactory socketFactory = NetUtils.getDefaultSocketFactory(config);
        Socket socket = null;
        try {
            socket = socketFactory.createSocket();
            NetUtils.connect(socket, namenode, 1000); // 1 second timeout
        } finally {
            IOUtils.closeQuietly(socket);
        }
    }

    /**
     * This exists in order to allow unit tests to override it so that they don't take several
     * minutes waiting for UDP packets to be received.
     *
     * @param config
     *            the configuration to use
     * @return the FileSystem that is created for the given Configuration
     * @throws IOException
     *             if unable to create the FileSystem
     */
    protected FileSystem getFileSystem(final Configuration config) throws IOException {
        return FileSystem.get(config);
    }

    protected FileSystem getFileSystemAsUser(final Configuration config, UserGroupInformation ugi) throws IOException {
        try {
            return ugi.doAs((PrivilegedExceptionAction<FileSystem>) () -> FileSystem.get(config));
        } catch (InterruptedException e) {
            throw new IOException("Unable to create file system: " + e.getMessage());
        }
    }

    protected Configuration getConfiguration() {
        return hdfsResources.get().getConfiguration();
    }

    protected FileSystem getFileSystem() {
        return hdfsResources.get().getFileSystem();
    }

    protected UserGroupInformation getUserGroupInformation() {
        return hdfsResources.get().getUserGroupInformation();
    }

    protected String getEncoding() {
        return StringUtils.isNotBlank(connectorConfig.getEncoding())
                   ? connectorConfig.getEncoding() : Charset.defaultCharset().name();
    }

    protected CompressionCodec getCompressionCodec() {
       if (connectorConfig.getCompression() == null) {
           return null;
       }

       CompressionCodec codec = getCompressionCodecFactory()
               .getCodecByName(connectorConfig.getCompression().name());

       return (codec != null) ? codec : new DefaultCodec();
    }

    protected CompressionCodecFactory getCompressionCodecFactory() {
        if (compressionCodecFactory == null) {
            compressionCodecFactory = new CompressionCodecFactory(getConfiguration());
        }

        return compressionCodecFactory;
    }

    /**
     * Extending Hadoop Configuration to prevent it from caching classes that can't be found. Since users may be
     * adding additional JARs to the classpath we don't want them to have to restart the JVM to be able to load
     * something that was previously not found, but might now be available.
     * Reference the original getClassByNameOrNull from Configuration.
     */
    static class ExtendedConfiguration extends Configuration {

        private final Map<ClassLoader, Map<String, WeakReference<Class<?>>>> cacheClasses = new WeakHashMap<>();

        @Override
        public Class<?> getClassByNameOrNull(String name) {
            final ClassLoader classLoader = getClassLoader();

            Map<String, WeakReference<Class<?>>> map;
            synchronized (cacheClasses) {
                map = cacheClasses.get(classLoader);
                if (map == null) {
                    map = Collections.synchronizedMap(new WeakHashMap<>());
                    cacheClasses.put(classLoader, map);
                }
            }

            Class<?> clazz = null;
            WeakReference<Class<?>> ref = map.get(name);
            if (ref != null) {
                clazz = ref.get();
            }

            if (clazz == null) {
                try {
                    clazz = Class.forName(name, true, classLoader);
                } catch (ClassNotFoundException | NoClassDefFoundError e) {
                    return null;
                }
                // two putters can race here, but they'll put the same class
                map.put(name, new WeakReference<>(clazz));
                return clazz;
            } else {
                // cache hit
                return clazz;
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



