pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/seq/HdfsTextSink.java [33:52]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
public class HdfsTextSink extends
     HdfsAbstractSequenceFileSink<String, String, Text, Text> {

    @Override
    protected List<Option> getOptions() throws IllegalArgumentException, IOException {
        List<Option> opts = super.getOptions();
        opts.add(Writer.keyClass(Text.class));
        opts.add(Writer.valueClass(Text.class));
        return opts;
    }

    @Override
    public KeyValue<String, String> extractKeyValue(Record<String> record) {
       String key = record.getKey().orElseGet(() -> record.getValue());
       return new KeyValue<>(key, record.getValue());
    }

    @Override
    public KeyValue<Text, Text> convert(KeyValue<String, String> kv) {
       return new KeyValue<>(new Text(kv.getKey()), new Text(kv.getValue()));
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



pulsar-io/hdfs3/src/main/java/org/apache/pulsar/io/hdfs3/sink/seq/HdfsTextSink.java [33:52]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
public class HdfsTextSink extends
     HdfsAbstractSequenceFileSink<String, String, Text, Text> {

    @Override
    protected List<Option> getOptions() throws IllegalArgumentException, IOException {
        List<Option> opts = super.getOptions();
        opts.add(Writer.keyClass(Text.class));
        opts.add(Writer.valueClass(Text.class));
        return opts;
    }

    @Override
    public KeyValue<String, String> extractKeyValue(Record<String> record) {
       String key = record.getKey().orElseGet(() -> record.getValue());
       return new KeyValue<>(key, record.getValue());
    }

    @Override
    public KeyValue<Text, Text> convert(KeyValue<String, String> kv) {
       return new KeyValue<>(new Text(kv.getKey()), new Text(kv.getValue()));
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



