pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/AbstractHdfsConnector.java [67:100]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    protected HdfsResources resetHDFSResources(HdfsSinkConfig hdfsSinkConfig) throws IOException {
        Configuration config = new ExtendedConfiguration();
        config.setClassLoader(Thread.currentThread().getContextClassLoader());

        getConfig(config, connectorConfig.getHdfsConfigResources());

        // first check for timeout on HDFS connection, because FileSystem has a hard coded 15 minute timeout
        checkHdfsUriForTimeout(config);

        /* Disable caching of Configuration and FileSystem objects, else we cannot reconfigure
         * the processor without a complete restart
         */
        String disableCacheName = String.format("fs.%s.impl.disable.cache",
                FileSystem.getDefaultUri(config).getScheme());
        config.set(disableCacheName, "true");

        // If kerberos is enabled, create the file system as the kerberos principal
        // -- use RESOURCE_LOCK to guarantee UserGroupInformation is accessed by only a single thread at at time
        FileSystem fs;
        UserGroupInformation ugi;
        synchronized (RESOURCES_LOCK) {
            if (SecurityUtil.isSecurityEnabled(config)) {
                ugi = SecurityUtil.loginKerberos(config,
                        connectorConfig.getKerberosUserPrincipal(), connectorConfig.getKeytab());
                fs = getFileSystemAsUser(config, ugi);
            } else {
                config.set("ipc.client.fallback-to-simple-auth-allowed", "true");
                config.set("hadoop.security.authentication", "simple");
                ugi = SecurityUtil.loginSimple(config);
                fs = getFileSystemAsUser(config, ugi);
            }
        }
        return new HdfsResources(config, fs, ugi);
    }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



pulsar-io/hdfs3/src/main/java/org/apache/pulsar/io/hdfs3/AbstractHdfsConnector.java [67:100]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    protected HdfsResources resetHDFSResources(HdfsSinkConfig hdfsSinkConfig) throws IOException {
        Configuration config = new ExtendedConfiguration();
        config.setClassLoader(Thread.currentThread().getContextClassLoader());

        getConfig(config, connectorConfig.getHdfsConfigResources());

        // first check for timeout on HDFS connection, because FileSystem has a hard coded 15 minute timeout
        checkHdfsUriForTimeout(config);

        /* Disable caching of Configuration and FileSystem objects, else we cannot reconfigure
         * the processor without a complete restart
         */
        String disableCacheName = String.format("fs.%s.impl.disable.cache",
                FileSystem.getDefaultUri(config).getScheme());
        config.set(disableCacheName, "true");

        // If kerberos is enabled, create the file system as the kerberos principal
        // -- use RESOURCE_LOCK to guarantee UserGroupInformation is accessed by only a single thread at at time
        FileSystem fs;
        UserGroupInformation ugi;
        synchronized (RESOURCES_LOCK) {
            if (SecurityUtil.isSecurityEnabled(config)) {
                ugi = SecurityUtil.loginKerberos(config,
                        connectorConfig.getKerberosUserPrincipal(), connectorConfig.getKeytab());
                fs = getFileSystemAsUser(config, ugi);
            } else {
                config.set("ipc.client.fallback-to-simple-auth-allowed", "true");
                config.set("hadoop.security.authentication", "simple");
                ugi = SecurityUtil.loginSimple(config);
                fs = getFileSystemAsUser(config, ugi);
            }
        }
        return new HdfsResources(config, fs, ugi);
    }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



