pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/seq/HdfsAbstractSequenceFileSink.java [45:94]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
public abstract class HdfsAbstractSequenceFileSink<K, V, HdfsK, HdfsV>
    extends HdfsAbstractSink<K, V> implements Sink<V> {

    private static final Logger LOG = LoggerFactory.getLogger(HdfsAbstractSequenceFileSink.class);

    protected AtomicLong counter;
    protected FSDataOutputStream hdfsStream;
    protected Writer writer = null;

    public abstract KeyValue<HdfsK, HdfsV> convert(KeyValue<K, V> kv);

    @Override
    public void close() throws Exception {
       writer.close();
       super.close();
    }

    @Override
    protected void createWriter() throws IOException {
       writer = getWriter();
    }

    @Override
    public void write(Record<V> record) {
       try {
            KeyValue<K, V> kv = extractKeyValue(record);
            KeyValue<HdfsK, HdfsV> keyValue = convert(kv);
            writer.append(keyValue.getKey(), keyValue.getValue());
            unackedRecords.put(record);
        } catch (IOException | InterruptedException e) {
            LOG.error("Unable to write to file " + getPath(), e);
            record.fail();
        }
    }

    protected Writer getWriter() throws IOException {
        counter = new AtomicLong(0);
        List<Option> options = getOptions();
        return SequenceFile.createWriter(getConfiguration(),
                options.toArray(new Option[options.size()]));
     }

    protected List<Option> getOptions() throws IllegalArgumentException, IOException {
        List<Option> list = new ArrayList<Option>();
        list.add(Writer.stream(getHdfsStream()));

        if (getCompressionCodec() != null) {
            list.add(Writer.compression(SequenceFile.CompressionType.RECORD, getCompressionCodec()));
        }
        return list;
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



pulsar-io/hdfs3/src/main/java/org/apache/pulsar/io/hdfs3/sink/seq/HdfsAbstractSequenceFileSink.java [45:94]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
public abstract class HdfsAbstractSequenceFileSink<K, V, HdfsK, HdfsV>
    extends HdfsAbstractSink<K, V> implements Sink<V> {

    private static final Logger LOG = LoggerFactory.getLogger(HdfsAbstractSequenceFileSink.class);

    protected AtomicLong counter;
    protected FSDataOutputStream hdfsStream;
    protected Writer writer = null;

    public abstract KeyValue<HdfsK, HdfsV> convert(KeyValue<K, V> kv);

    @Override
    public void close() throws Exception {
       writer.close();
       super.close();
    }

    @Override
    protected void createWriter() throws IOException {
       writer = getWriter();
    }

    @Override
    public void write(Record<V> record) {
       try {
            KeyValue<K, V> kv = extractKeyValue(record);
            KeyValue<HdfsK, HdfsV> keyValue = convert(kv);
            writer.append(keyValue.getKey(), keyValue.getValue());
            unackedRecords.put(record);
        } catch (IOException | InterruptedException e) {
            LOG.error("Unable to write to file " + getPath(), e);
            record.fail();
        }
    }

    protected Writer getWriter() throws IOException {
        counter = new AtomicLong(0);
        List<Option> options = getOptions();
        return SequenceFile.createWriter(getConfiguration(),
                options.toArray(new Option[options.size()]));
     }

    protected List<Option> getOptions() throws IllegalArgumentException, IOException {
        List<Option> list = new ArrayList<Option>();
        list.add(Writer.stream(getHdfsStream()));

        if (getCompressionCodec() != null) {
            list.add(Writer.compression(SequenceFile.CompressionType.RECORD, getCompressionCodec()));
        }
        return list;
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



