public KafkaActorEventChannel()

in alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/channel/kafka/KafkaActorEventChannel.java [45:78]


  public KafkaActorEventChannel(SpecSagaAkkaProperties specSagaAkkaProperties, MetricsService metricsService) {
    super(metricsService);
    // init topic
    Map props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
        specSagaAkkaProperties.getChannel().getKafka().getBootstrapServers());
    props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 50000);
    try (final AdminClient adminClient = KafkaAdminClient.create(props)) {
      try {
        final NewTopic newTopic = new NewTopic(
            specSagaAkkaProperties.getChannel().getKafka().getTopic(),
            specSagaAkkaProperties.getChannel().getKafka().getNumPartitions(),
            specSagaAkkaProperties.getChannel().getKafka().getReplicationFactor());
        final CreateTopicsResult createTopicsResult = adminClient
            .createTopics(Collections.singleton(newTopic));
        createTopicsResult.values().get(specSagaAkkaProperties.getChannel().getKafka().getTopic())
            .get();
      } catch (InterruptedException | ExecutionException e) {
        if (e.getCause() instanceof InterruptedException) {
          Thread.currentThread().interrupt();
        }
        if (!(e.getCause() instanceof TopicExistsException)) {
          throw new RuntimeException(e.getMessage(), e);
        }
      }
    }

    // create producer
    this.kafkaMessagePublisher = new KafkaMessagePublisher(
        specSagaAkkaProperties.getChannel().getKafka().getBootstrapServers(),
        specSagaAkkaProperties.getChannel().getKafka().getTopic(),
        specSagaAkkaProperties.getChannel().getKafka().getProducer());
    LOG.info("Kafka Channel Init");
  }