gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/AvroJobSpecDeserializer.java [37:68]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
@Slf4j
/**
 * A deserializer that converts a byte array into an {@link AvroJobSpec}
 */
public class AvroJobSpecDeserializer implements Deserializer<AvroJobSpec> {
  private BinaryDecoder _decoder;
  private SpecificDatumReader<AvroJobSpec> _reader;
  private SchemaVersionWriter<?> _versionWriter;

  @Override
  public void configure(Map<String, ?> configs, boolean isKey) {
    InputStream dummyInputStream = new ByteArrayInputStream(new byte[0]);
    _decoder = DecoderFactory.get().binaryDecoder(dummyInputStream, null);
    _reader = new SpecificDatumReader<AvroJobSpec>(AvroJobSpec.SCHEMA$);
    _versionWriter = new FixedSchemaVersionWriter();
  }

  @Override
  public AvroJobSpec deserialize(String topic, byte[] data) {
    try (InputStream is = new ByteArrayInputStream(data)) {
      _versionWriter.readSchemaVersioningInformation(new DataInputStream(is));

      Decoder decoder = DecoderFactory.get().binaryDecoder(is, _decoder);

      return _reader.read(null, decoder);
    } catch (IOException e) {
      throw new RuntimeException("Could not decode message");
    }
  }

  @Override
  public void close() {
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/service/AvroJobSpecDeserializer.java [37:68]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
@Slf4j
/**
 * A deserializer that converts a byte array into an {@link AvroJobSpec}
 */
public class AvroJobSpecDeserializer implements Deserializer<AvroJobSpec> {
  private BinaryDecoder _decoder;
  private SpecificDatumReader<AvroJobSpec> _reader;
  private SchemaVersionWriter<?> _versionWriter;

  @Override
  public void configure(Map<String, ?> configs, boolean isKey) {
    InputStream dummyInputStream = new ByteArrayInputStream(new byte[0]);
    _decoder = DecoderFactory.get().binaryDecoder(dummyInputStream, null);
    _reader = new SpecificDatumReader<AvroJobSpec>(AvroJobSpec.SCHEMA$);
    _versionWriter = new FixedSchemaVersionWriter();
  }

  @Override
  public AvroJobSpec deserialize(String topic, byte[] data) {
    try (InputStream is = new ByteArrayInputStream(data)) {
      _versionWriter.readSchemaVersioningInformation(new DataInputStream(is));

      Decoder decoder = DecoderFactory.get().binaryDecoder(is, _decoder);

      return _reader.read(null, decoder);
    } catch (IOException e) {
      throw new RuntimeException("Could not decode message");
    }
  }

  @Override
  public void close() {
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



