pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/seq/HdfsSequentialTextSink.java [39:69]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
public class HdfsSequentialTextSink extends HdfsAbstractSequenceFileSink<Long, String, LongWritable, Text> {

    private AtomicLong counter;

    @Override
    public Writer getWriter() throws IOException {
       counter = new AtomicLong(0);

       return SequenceFile
                .createWriter(
                   getConfiguration(),
                   getOptions().toArray(new Option[getOptions().size()]));
    }

    @Override
    protected List<Option> getOptions() throws IllegalArgumentException, IOException {
        List<Option> opts = super.getOptions();
        opts.add(Writer.keyClass(LongWritable.class));
        opts.add(Writer.valueClass(Text.class));
        return opts;
    }

    @Override
    public KeyValue<Long, String> extractKeyValue(Record<String> record) {
       Long sequence = record.getRecordSequence().orElseGet(() -> new Long(counter.incrementAndGet()));
       return new KeyValue<>(sequence, record.getValue());
    }

    @Override
    public KeyValue<LongWritable, Text> convert(KeyValue<Long, String> kv) {
       return new KeyValue<>(new LongWritable(kv.getKey()), new Text(kv.getValue()));
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



pulsar-io/hdfs3/src/main/java/org/apache/pulsar/io/hdfs3/sink/seq/HdfsSequentialTextSink.java [39:69]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
public class HdfsSequentialTextSink extends HdfsAbstractSequenceFileSink<Long, String, LongWritable, Text> {

    private AtomicLong counter;

    @Override
    public Writer getWriter() throws IOException {
       counter = new AtomicLong(0);

       return SequenceFile
                .createWriter(
                   getConfiguration(),
                   getOptions().toArray(new Option[getOptions().size()]));
    }

    @Override
    protected List<Option> getOptions() throws IllegalArgumentException, IOException {
        List<Option> opts = super.getOptions();
        opts.add(Writer.keyClass(LongWritable.class));
        opts.add(Writer.valueClass(Text.class));
        return opts;
    }

    @Override
    public KeyValue<Long, String> extractKeyValue(Record<String> record) {
       Long sequence = record.getRecordSequence().orElseGet(() -> new Long(counter.incrementAndGet()));
       return new KeyValue<>(sequence, record.getValue());
    }

    @Override
    public KeyValue<LongWritable, Text> convert(KeyValue<Long, String> kv) {
       return new KeyValue<>(new LongWritable(kv.getKey()), new Text(kv.getValue()));
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



