pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/KafkaMessageRouter.java [28:41]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
public class KafkaMessageRouter extends RoundRobinPartitionMessageRouterImpl {

    public static final String PARTITION_ID = "pulsar.partition.id";

    public KafkaMessageRouter(long maxBatchingDelayMs) {
        super(HashingScheme.JavaStringHash, ThreadLocalRandom.current().nextInt(), true, maxBatchingDelayMs);
    }

    @Override
    public int choosePartition(Message<?> msg, TopicMetadata metadata) {
        if (msg.hasProperty(PARTITION_ID)) {
            return Integer.parseInt(msg.getProperty(PARTITION_ID));
        } else {
            return super.choosePartition(msg, metadata);
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/KafkaMessageRouter.java [28:41]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
public class KafkaMessageRouter extends RoundRobinPartitionMessageRouterImpl {

    public static final String PARTITION_ID = "pulsar.partition.id";

    public KafkaMessageRouter(long maxBatchingDelayMs) {
        super(HashingScheme.JavaStringHash, ThreadLocalRandom.current().nextInt(), true, maxBatchingDelayMs);
    }

    @Override
    public int choosePartition(Message<?> msg, TopicMetadata metadata) {
        if (msg.hasProperty(PARTITION_ID)) {
            return Integer.parseInt(msg.getProperty(PARTITION_ID));
        } else {
            return super.choosePartition(msg, metadata);
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



