app/services/KinesisConsumer.scala (66 lines of code) (raw):

package services import java.nio.ByteBuffer import com.amazonaws.auth.DefaultAWSCredentialsProviderChain import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.{IRecordProcessor, IRecordProcessorFactory} import com.amazonaws.services.kinesis.clientlibrary.lib.worker.{InitialPositionInStream, KinesisClientLibConfiguration, ShutdownReason, Worker} import com.amazonaws.services.kinesis.clientlibrary.types.{InitializationInput, ProcessRecordsInput, ShutdownInput} import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel import com.amazonaws.services.kinesis.model.Record import com.fasterxml.jackson.databind.util.ByteBufferBackedInputStream import org.apache.thrift.protocol.{TCompactProtocol, TProtocol} import org.apache.thrift.transport.TIOStreamTransport import play.api.Logging import scala.jdk.CollectionConverters._ import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.Future import scala.language.implicitConversions class KinesisConsumer(streamName: String, appName: String, processor: KinesisStreamRecordProcessor) extends Logging { logger.info(s"Creating Kinesis Consumer for (streamName: $streamName, appName: $appName)") val kinesisClientLibConfiguration = new KinesisClientLibConfiguration(appName, streamName, new DefaultAWSCredentialsProviderChain, s"$appName-worker") kinesisClientLibConfiguration .withRegionName(AWS.region.getName) .withMetricsLevel(MetricsLevel.NONE) .withInitialPositionInStream(InitialPositionInStream.LATEST) val worker = new Worker.Builder() .recordProcessorFactory(new KinesisProcessorConsumerFactory(appName, processor)) .config(kinesisClientLibConfiguration) .build() def start(): Unit = { Future{ worker.run() } } def stop(): Unit = { worker.shutdown() } } class KinesisProcessorConsumerFactory(appName: String, processor: KinesisStreamRecordProcessor) extends IRecordProcessorFactory { override def createProcessor(): IRecordProcessor = new KinesisProcessorConsumer(appName, processor) } class KinesisProcessorConsumer(appName: String, processor: KinesisStreamRecordProcessor) extends IRecordProcessor with Logging { override def shutdown(shutdownInput: ShutdownInput): Unit = { shutdownInput.getShutdownReason match { case ShutdownReason.TERMINATE => { logger.info(s"terminating $appName consumer") //shutdownInput.getCheckpointer.checkpoint } case _ => logger.info(s"shutting down $appName consumer reason = ${shutdownInput.getShutdownReason}") } } override def initialize(initializationInput: InitializationInput): Unit = { logger.info(s"$appName consumer started for shard ${initializationInput.getShardId}") } override def processRecords(processRecordsInput: ProcessRecordsInput): Unit = { processRecordsInput.getRecords.asScala foreach { record => processor.process(record) } } } trait KinesisStreamRecordProcessor { def process(record: Record): Unit } object KinesisRecordPayloadConversions { def kinesisRecordAsThriftCompactProtocol(rec: Record, stripCompressionByte: Boolean = false): TProtocol = { val data: ByteBuffer = rec.getData val bytes = if (stripCompressionByte) ByteBuffer.wrap(data.array().tail) else data val bbis = new ByteBufferBackedInputStream(bytes) val transport = new TIOStreamTransport(bbis) new TCompactProtocol(transport) } }