in alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/channel/kafka/KafkaActorEventChannel.java [45:78]
public KafkaActorEventChannel(SpecSagaAkkaProperties specSagaAkkaProperties, MetricsService metricsService) {
super(metricsService);
// init topic
Map props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
specSagaAkkaProperties.getChannel().getKafka().getBootstrapServers());
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 50000);
try (final AdminClient adminClient = KafkaAdminClient.create(props)) {
try {
final NewTopic newTopic = new NewTopic(
specSagaAkkaProperties.getChannel().getKafka().getTopic(),
specSagaAkkaProperties.getChannel().getKafka().getNumPartitions(),
specSagaAkkaProperties.getChannel().getKafka().getReplicationFactor());
final CreateTopicsResult createTopicsResult = adminClient
.createTopics(Collections.singleton(newTopic));
createTopicsResult.values().get(specSagaAkkaProperties.getChannel().getKafka().getTopic())
.get();
} catch (InterruptedException | ExecutionException e) {
if (e.getCause() instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
if (!(e.getCause() instanceof TopicExistsException)) {
throw new RuntimeException(e.getMessage(), e);
}
}
}
// create producer
this.kafkaMessagePublisher = new KafkaMessagePublisher(
specSagaAkkaProperties.getChannel().getKafka().getBootstrapServers(),
specSagaAkkaProperties.getChannel().getKafka().getTopic(),
specSagaAkkaProperties.getChannel().getKafka().getProducer());
LOG.info("Kafka Channel Init");
}