in src/frauddetectionservice/src/main/kotlin/frauddetectionservice/main.kt [31:70]
fun main() {
val options = FlagdOptions.builder()
.withGlobalTelemetry(true)
.build()
val flagdProvider = FlagdProvider(options)
OpenFeatureAPI.getInstance().setProvider(flagdProvider)
val props = Properties()
props[KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name
props[VALUE_DESERIALIZER_CLASS_CONFIG] = ByteArrayDeserializer::class.java.name
props[GROUP_ID_CONFIG] = groupID
val bootstrapServers = System.getenv("KAFKA_SERVICE_ADDR")
if (bootstrapServers == null) {
println("KAFKA_SERVICE_ADDR is not supplied")
exitProcess(1)
}
props[BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers
val consumer = KafkaConsumer<String, ByteArray>(props).apply {
subscribe(listOf(topic))
}
var totalCount = 0L
consumer.use {
while (true) {
totalCount = consumer
.poll(ofMillis(100))
.fold(totalCount) { accumulator, record ->
val newCount = accumulator + 1
if (getFeatureFlagValue("kafkaQueueProblems") > 0) {
logger.info("FeatureFlag 'kafkaQueueProblems' is enabled, sleeping 1 second")
Thread.sleep(1000)
}
val orders = OrderResult.parseFrom(record.value())
logger.info("Consumed record with orderId: ${orders.orderId}, and updated total count to: $newCount")
newCount
}
}
}
}