app/story_packages/updates/KinesisEventSender.scala (127 lines of code) (raw):
package story_packages.updates
import java.nio.ByteBuffer
import com.amazonaws.handlers.AsyncHandler
import com.amazonaws.services.kinesis.AmazonKinesisAsyncClientBuilder
import com.amazonaws.services.kinesis.model.{PutRecordsRequest, PutRecordsRequestEntry, PutRecordsResult}
import com.gu.facia.client.models.CollectionJson
import com.gu.storypackage.model.v1._
import com.gu.thrift.serializer.{GzipType, ThriftSerializer}
import org.joda.time.DateTime
import conf.ApplicationConfiguration
import story_packages.services.Logging
class KinesisEventSender(config: ApplicationConfiguration) extends Logging {
val streamName: String = config.updates.capi
def eventHandler(collectionId: String) = new AsyncHandler[PutRecordsRequest, PutRecordsResult] {
def onError(exception: Exception): Unit = {
Logger.error(s"$streamName - Error when sending thrift update to kinesis stream", exception)
}
def onSuccess(request: PutRecordsRequest, result: PutRecordsResult): Unit = {
Logger.info(s"$streamName - Kinesis thrift update for collection $collectionId sent correctly")
}
}
private lazy val client = {
AmazonKinesisAsyncClientBuilder.standard
.withCredentials(config.aws.mandatoryCredentials)
.withRegion(config.aws.region)
.build
}
def createUpdatePayload(collectionJson: CollectionJson): List[Article] = {
collectionJson.live.map(article => {
article.meta match {
case Some(trailMetaData) =>
Article(
id = article.id,
articleType = ArticleType.Article,
group = trailMetaData.group match {
case Some("1") => Group.Included
case _ => Group.Linked
},
headline = trailMetaData.headline,
href = trailMetaData.href,
trailText = trailMetaData.trailText,
imageSrc = if (trailMetaData.imageReplace.exists(identity)) {
trailMetaData.imageSrc
} else if (trailMetaData.imageCutoutReplace.exists(identity)) {
trailMetaData.imageCutoutSrc
} else {
None
},
isBoosted = trailMetaData.isBoosted,
imageHide = trailMetaData.imageHide,
showMainVideo = trailMetaData.showMainVideo,
showKickerTag = trailMetaData.showKickerTag,
showKickerSection = trailMetaData.showKickerSection,
showBoostedHeadline = trailMetaData.showBoostedHeadline,
byline = trailMetaData.showByline.flatMap{ enabled =>
if (enabled) trailMetaData.byline
else None
},
customKicker = trailMetaData.customKicker
)
case None =>
Article(
id = article.id,
group = Group.Linked,
articleType = ArticleType.Article
)}
})
}
def putReindexDelete(packageId: String, displayName: String, collectionJson: CollectionJson, isHidden: Boolean): Unit = {
sendUpdate(
if (isHidden) config.updates.reindexPreview else config.updates.reindex,
packageId,
Event(
eventType = EventType.Delete,
packageId = packageId,
packageName = displayName,
lastModified = collectionJson.lastUpdated.toString(),
articles = createUpdatePayload(collectionJson)))
}
def putReindexUpdate(packageId: String, displayName: String, collectionJson: CollectionJson, isHidden: Boolean): Unit = {
sendUpdate(
if (isHidden) config.updates.reindexPreview else config.updates.reindex,
packageId,
Event(
eventType = EventType.Update,
packageId = packageId,
packageName = displayName,
lastModified = collectionJson.lastUpdated.toString(),
articles = createUpdatePayload(collectionJson)))
}
def putCapiDelete(packageId: String, isHidden: Boolean): Unit = {
sendUpdate(
if (isHidden) config.updates.preview else config.updates.capi,
packageId,
Event(
eventType = EventType.Delete,
packageId = packageId,
packageName = "",
lastModified = DateTime.now().toString(),
articles = Nil))
}
def putCapiUpdate(packageId: String, displayName: String, collectionJson: CollectionJson, isHidden: Boolean): Unit = {
sendUpdate(
if (isHidden) config.updates.preview else config.updates.capi,
packageId,
Event(
eventType = EventType.Update,
packageId = packageId,
packageName = displayName,
lastModified = collectionJson.lastUpdated.toString(),
articles = createUpdatePayload(collectionJson)))
}
def sendUpdate(streamName: String, collectionId: String, event: Event): Unit = {
val request = new PutRecordsRequest().withStreamName(streamName)
val bytes = ThriftSerializer.serializeToBytes(event, Some(GzipType), Some(128))
if (bytes.length > config.updates.maxDataSize) {
Logger.error(s"$streamName - NOT sending because size (${bytes.length} bytes) is larger than max size (${config.updates.maxDataSize})")
} else {
Logger.info(s"$streamName - sending thrift update with size of ${bytes.length} bytes")
val record = new PutRecordsRequestEntry()
.withPartitionKey(event.packageId)
.withData(ByteBuffer.wrap(bytes))
request.withRecords(record)
client.putRecordsAsync(request, eventHandler(collectionId))
}
}
}