in alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/channel/kafka/KafkaSagaEventConsumer.java [55:89]
public KafkaSagaEventConsumer(ActorSystem actorSystem, ActorRef sagaShardRegionActor,
MetricsService metricsService, String bootstrap_servers, String topic, Map<String,String> consumerConfigMap) {
super(actorSystem, sagaShardRegionActor, metricsService);
// init consumer
final Materializer materializer = ActorMaterializer.create(actorSystem);
final Config consumerConfig = actorSystem.settings().config().getConfig("akka.kafka.consumer");
final ConsumerSettings<String, String> consumerSettings =
ConsumerSettings
.create(consumerConfig, new StringDeserializer(), new StringDeserializer())
.withBootstrapServers(bootstrap_servers)
.withGroupId(groupId)
.withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
.withProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "StringDeserializer.class")
.withProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "StringDeserializer.class");
consumerConfigMap.forEach((k,v) -> consumerSettings.withProperty(k,v));
Consumer.committableSource(consumerSettings, Subscriptions.topics(topic))
.mapAsync(20, event -> {
BaseEvent bean = jsonMapper.readValue(event.record().value(), BaseEvent.class);
if (LOG.isDebugEnabled()) {
LOG.debug("receive [{}] {} {}", bean.getGlobalTxId(), bean.getType(), bean.getLocalTxId());
}
return sendSagaActor(bean).thenApply(done -> event.committableOffset());
})
.batch(
100,
ConsumerMessage::createCommittableOffsetBatch,
ConsumerMessage.CommittableOffsetBatch::updated
)
.mapAsync(20, offset -> offset.commitJavadsl())
.to(Sink.ignore())
.run(materializer);
}