public KafkaSagaEventConsumer()

in alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/channel/kafka/KafkaSagaEventConsumer.java [55:89]


  public KafkaSagaEventConsumer(ActorSystem actorSystem, ActorRef sagaShardRegionActor,
      MetricsService metricsService, String bootstrap_servers, String topic, Map<String,String> consumerConfigMap) {
    super(actorSystem, sagaShardRegionActor, metricsService);


    // init consumer
    final Materializer materializer = ActorMaterializer.create(actorSystem);
    final Config consumerConfig = actorSystem.settings().config().getConfig("akka.kafka.consumer");
    final ConsumerSettings<String, String> consumerSettings =
        ConsumerSettings
            .create(consumerConfig, new StringDeserializer(), new StringDeserializer())
            .withBootstrapServers(bootstrap_servers)
            .withGroupId(groupId)
            .withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
            .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
            .withProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "StringDeserializer.class")
            .withProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "StringDeserializer.class");
    consumerConfigMap.forEach((k,v) -> consumerSettings.withProperty(k,v));
    Consumer.committableSource(consumerSettings, Subscriptions.topics(topic))
        .mapAsync(20, event -> {
          BaseEvent bean = jsonMapper.readValue(event.record().value(), BaseEvent.class);
          if (LOG.isDebugEnabled()) {
            LOG.debug("receive [{}] {} {}", bean.getGlobalTxId(), bean.getType(), bean.getLocalTxId());
          }
          return sendSagaActor(bean).thenApply(done -> event.committableOffset());
        })
        .batch(
            100,
            ConsumerMessage::createCommittableOffsetBatch,
            ConsumerMessage.CommittableOffsetBatch::updated
        )
        .mapAsync(20, offset -> offset.commitJavadsl())
        .to(Sink.ignore())
        .run(materializer);
  }