pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/HdfsAbstractSink.java [67:88]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
       connectToHdfs();
       createWriter();
       launchSyncThread();
    }

    @Override
    public void close() throws Exception {
       syncThread.halt();
       syncThread.join(0);
    }

    protected final void connectToHdfs() throws IOException {
       try {
           HdfsResources resources = hdfsResources.get();

           if (resources.getConfiguration() == null) {
               resources = this.resetHDFSResources(hdfsSinkConfig);
               hdfsResources.set(resources);
           }
       } catch (IOException ex) {
          hdfsResources.set(new HdfsResources(null, null, null));
          throw ex;
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



pulsar-io/hdfs3/src/main/java/org/apache/pulsar/io/hdfs3/sink/HdfsAbstractSink.java [62:83]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
       connectToHdfs();
       createWriter();
       launchSyncThread();
    }

    @Override
    public void close() throws Exception {
       syncThread.halt();
       syncThread.join(0);
    }

    protected final void connectToHdfs() throws IOException {
       try {
           HdfsResources resources = hdfsResources.get();

           if (resources.getConfiguration() == null) {
               resources = this.resetHDFSResources(hdfsSinkConfig);
               hdfsResources.set(resources);
           }
       } catch (IOException ex) {
          hdfsResources.set(new HdfsResources(null, null, null));
          throw ex;
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



