thrall/app/lib/kinesis/ThrallEventConsumer.scala (117 lines of code) (raw):
package lib.kinesis
import org.apache.pekko.actor.ActorSystem
import com.gu.mediaservice.lib.aws.UpdateMessage
import com.gu.mediaservice.lib.json.{JsonByteArrayUtil, PlayJsonHelpers}
import com.gu.mediaservice.lib.logging._
import com.gu.mediaservice.model.{ExternalThrallMessage, ThrallMessage}
import lib._
import lib.elasticsearch._
import java.time.Instant
import java.util.concurrent.Executors
import scala.concurrent.duration.{FiniteDuration, MILLISECONDS, SECONDS}
import scala.concurrent.{ExecutionContext, Future, TimeoutException}
import scala.util.{Failure, Success, Try}
class ThrallEventConsumer(es: ElasticSearch,
thrallMetrics: ThrallMetrics,
store: ThrallStore,
metadataEditorNotifications: MetadataEditorNotifications,
actorSystem: ActorSystem
) extends PlayJsonHelpers with GridLogging {
private val attemptTimeout = FiniteDuration(20, SECONDS)
private val delay = FiniteDuration(1, MILLISECONDS)
private val attempts = 2
private val timeout = attemptTimeout * attempts + delay * (attempts - 1)
private val messageProcessor = new MessageProcessor(es, store, metadataEditorNotifications)
private implicit val implicitActorSystem: ActorSystem = actorSystem
private implicit val executionContext: ExecutionContext =
ExecutionContext.fromExecutor(Executors.newCachedThreadPool)
def processMessage(message: ThrallMessage): Future[ThrallMessage] = {
val marker = message
val stopwatch = Stopwatch.start
//Try to process the update message twice, and give them both 30 seconds to run.
RetryHandler.handleWithRetryAndTimeout(
/*
* Brief note on retry strategy:
* Trying a second time might be dangerous, hopefully waiting a reasonable length of time should mitigate this.
* From the logs, trying again after 30 seconds should only affect 1/300,000 messages.
*
*/
(marker) => {
messageProcessor.process(message, marker)
}, attempts, attemptTimeout, delay, marker
).transform {
case Success(_) => {
logger.info(
combineMarkers(marker, stopwatch.elapsed),
s"Completed processing of ${message.subject} message"
)
Success(message)
}
case Failure(processorNotFoundException: ProcessorNotFoundException) => {
logger.error(
s"Could not find processor for ${processorNotFoundException.unknownSubject} message; message will be ignored"
)
Failure(processorNotFoundException)
}
case Failure(timeoutException: TimeoutException) => {
logger.error(
combineMarkers(marker, stopwatch.elapsed),
s"Timeout of $timeout reached while processing ${message.subject} message; message will be ignored:",
timeoutException
)
Failure(timeoutException)
}
case Failure(e: Throwable) => {
logger.error(
combineMarkers(marker, stopwatch.elapsed),
s"Failed to process ${message.subject} message; message will be ignored:", e
)
Failure(e)
}
}
}
}
object ThrallEventConsumer extends GridLogging with PlayJsonHelpers {
def parseRecordAsUpdateMessage(r: Array[Byte], timestamp: Instant):Either[Throwable,ExternalThrallMessage] = {
Try(JsonByteArrayUtil.fromByteArray[UpdateMessage](r)) match {
case Success(Right(updateMessage: UpdateMessage)) => {
MessageTranslator.translate(updateMessage)
}
case Success(Left(cause)) => {
val message = new String(r)
logParseErrors(cause)
Left(NoMessageException(timestamp, message)) //No message received
}
case Failure(e) => {
Left(e)
}
}
}
def parseRecordAsExternalThrallMessage(r: Array[Byte], timestamp: Instant):Either[Throwable,ExternalThrallMessage] = {
Try(JsonByteArrayUtil.fromByteArray[ExternalThrallMessage](r)) match {
case Success(Right(message: ExternalThrallMessage)) => {
Right(message)
}
case Success(Left(_)) => {
// We expect this to fail validation until `UpdateMessage`s are phased out completely,
// so we do not log the cause of this Left.
val message = new String(r)
Left(NoMessageException(timestamp, message)) //No message received
}
case Failure(e) => {
Left(e)
}
}
}
def parseRecord(r: Array[Byte], timestamp: Instant):Either[Throwable,ExternalThrallMessage] = {
(parseRecordAsExternalThrallMessage(r, timestamp) match {
case Right(message) => Right(message)
case _ => parseRecordAsUpdateMessage(r, timestamp)
}) match {
case Right(message) => {
logger.info(message.toLogMarker, s"Received ${message.subject} message at $timestamp")
Right(message)
}
case left@Left(NoMessageException(timestamp, message)) => {
logger.warn(s"No message present in record at $timestamp", message)
left
}
case left@Left(e) => {
logger.error(s"Exception during process record block at $timestamp", e)
left
}
}
}
}
case class NoMessageException(timestamp: Instant, message: String) extends Exception(s"No message present in record at $timestamp ${message}")