pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/text/HdfsAbstractTextFileSink.java [39:76]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
public abstract class HdfsAbstractTextFileSink<K, V> extends HdfsAbstractSink<K, V> implements Sink<V> {

    private static final Logger LOG = LoggerFactory.getLogger(HdfsAbstractTextFileSink.class);

    protected OutputStreamWriter writer;

    @Override
    protected void createWriter() throws IOException {
        writer = new OutputStreamWriter(new BufferedOutputStream(openHdfsStream()), getEncoding());
    }

    @Override
    public void close() throws Exception {
        writer.close();
        super.close();
    }

    @Override
    public void write(Record<V> record) {
       try {
           KeyValue<K, V> kv = extractKeyValue(record);
           writer.write(kv.getValue().toString());

           if (hdfsSinkConfig.getSeparator() != '\u0000') {
              writer.write(hdfsSinkConfig.getSeparator());
           }
           unackedRecords.put(record);
        } catch (IOException | InterruptedException e) {
            LOG.error("Unable to write to file " + getPath(), e);
            record.fail();
        }
    }

    private OutputStream openHdfsStream() throws IOException {
       if (hdfsSinkConfig.getCompression() != null) {
          return getCompressionCodec().createOutputStream(getHdfsStream());
       } else {
          return getHdfsStream();
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



pulsar-io/hdfs3/src/main/java/org/apache/pulsar/io/hdfs3/sink/text/HdfsAbstractTextFileSink.java [39:76]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
public abstract class HdfsAbstractTextFileSink<K, V> extends HdfsAbstractSink<K, V> implements Sink<V> {

    private static final Logger LOG = LoggerFactory.getLogger(HdfsAbstractTextFileSink.class);

    protected OutputStreamWriter writer;

    @Override
    protected void createWriter() throws IOException {
        writer = new OutputStreamWriter(new BufferedOutputStream(openHdfsStream()), getEncoding());
    }

    @Override
    public void close() throws Exception {
        writer.close();
        super.close();
    }

    @Override
    public void write(Record<V> record) {
       try {
           KeyValue<K, V> kv = extractKeyValue(record);
           writer.write(kv.getValue().toString());

           if (hdfsSinkConfig.getSeparator() != '\u0000') {
              writer.write(hdfsSinkConfig.getSeparator());
           }
           unackedRecords.put(record);
        } catch (IOException | InterruptedException e) {
            LOG.error("Unable to write to file " + getPath(), e);
            record.fail();
        }
    }

    private OutputStream openHdfsStream() throws IOException {
       if (hdfsSinkConfig.getCompression() != null) {
          return getCompressionCodec().createOutputStream(getHdfsStream());
       } else {
          return getHdfsStream();
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



