pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarKafkaSchema.java [28:80]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
public class PulsarKafkaSchema<T> implements Schema<T> {

    private final Serializer<T> kafkaSerializer;

    private final Deserializer<T> kafkaDeserializer;

    private String topic;

    public PulsarKafkaSchema(Serializer<T> serializer) {
        this(serializer, null);
    }

    public PulsarKafkaSchema(Deserializer<T> deserializer) {
        this(null, deserializer);
    }

    public PulsarKafkaSchema(Serializer<T> serializer, Deserializer<T> deserializer) {
        this.kafkaSerializer = serializer;
        this.kafkaDeserializer = deserializer;
    }

    public Serializer<T> getKafkaSerializer() {
        return kafkaSerializer;
    }

    public Deserializer<T> getKafkaDeserializer() {
        return kafkaDeserializer;
    }

    public void setTopic(String topic) {
        this.topic = topic;
    }

    @Override
    public byte[] encode(T message) {
        checkArgument(kafkaSerializer != null, "Kafka serializer is not initialized yet");
        return kafkaSerializer.serialize(this.topic, message);
    }

    @Override
    public T decode(byte[] message) {
        checkArgument(kafkaDeserializer != null, "Kafka deserializer is not initialized yet");
        return kafkaDeserializer.deserialize(this.topic, message);
    }

    @Override
    public SchemaInfo getSchemaInfo() {
        return Schema.BYTES.getSchemaInfo();
    }

    @Override
    public Schema<T> clone() {
        return this;
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarKafkaSchema.java [28:80]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
public class PulsarKafkaSchema<T> implements Schema<T> {

    private final Serializer<T> kafkaSerializer;

    private final Deserializer<T> kafkaDeserializer;

    private String topic;

    public PulsarKafkaSchema(Serializer<T> serializer) {
        this(serializer, null);
    }

    public PulsarKafkaSchema(Deserializer<T> deserializer) {
        this(null, deserializer);
    }

    public PulsarKafkaSchema(Serializer<T> serializer, Deserializer<T> deserializer) {
        this.kafkaSerializer = serializer;
        this.kafkaDeserializer = deserializer;
    }

    public Serializer<T> getKafkaSerializer() {
        return kafkaSerializer;
    }

    public Deserializer<T> getKafkaDeserializer() {
        return kafkaDeserializer;
    }

    public void setTopic(String topic) {
        this.topic = topic;
    }

    @Override
    public byte[] encode(T message) {
        checkArgument(kafkaSerializer != null, "Kafka serializer is not initialized yet");
        return kafkaSerializer.serialize(this.topic, message);
    }

    @Override
    public T decode(byte[] message) {
        checkArgument(kafkaDeserializer != null, "Kafka deserializer is not initialized yet");
        return kafkaDeserializer.deserialize(this.topic, message);
    }

    @Override
    public SchemaInfo getSchemaInfo() {
        return Schema.BYTES.getSchemaInfo();
    }

    @Override
    public Schema<T> clone() {
        return this;
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



