app/story_packages/metrics/CloudWatch.scala (68 lines of code) (raw):

package story_packages.metrics import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration import com.amazonaws.handlers.AsyncHandler import com.amazonaws.services.cloudwatch.{AmazonCloudWatchAsync, AmazonCloudWatchAsyncClientBuilder} import com.amazonaws.services.cloudwatch.model._ import conf.ApplicationConfiguration import story_packages.services.Logging import scala.jdk.CollectionConverters._ class CloudWatch(config: ApplicationConfiguration) extends Logging { lazy val cloudwatch: Option[AmazonCloudWatchAsync] = config.aws.credentials.map { credentials => AmazonCloudWatchAsyncClientBuilder.standard .withEndpointConfiguration(new EndpointConfiguration(config.aws.endpoints.monitoring, config.aws.region)) .withCredentials(credentials) .build } trait LoggingAsyncHandler extends AsyncHandler[PutMetricDataRequest, PutMetricDataResult] { def onError(exception: Exception): Unit = { Logger.info(s"CloudWatch PutMetricDataRequest error: ${exception.getMessage}}") } def onSuccess(request: PutMetricDataRequest, result: PutMetricDataResult ): Unit = { Logger.info("CloudWatch PutMetricDataRequest - success") } } case class AsyncHandlerForMetric(frontendStatisticSets: List[FrontendStatisticSet]) extends LoggingAsyncHandler { override def onError(exception: Exception) = { Logger.warn(s"Failed to put ${frontendStatisticSets.size} metrics: $exception") Logger.warn(s"Failed to put ${frontendStatisticSets.map(_.metric.name).mkString(",")}") frontendStatisticSets.foreach { _.reset() } super.onError(exception) } override def onSuccess(request: PutMetricDataRequest, result: PutMetricDataResult ) = { Logger.info(s"Successfully put ${frontendStatisticSets.size} metrics") Logger.info(s"Successfully put ${frontendStatisticSets.map(_.metric.name).mkString(",")}") super.onSuccess(request, result) } } def putMetricsWithStage(metrics: List[FrontendMetric], applicationDimension: Dimension, stageDimension: Dimension): Unit = putMetrics("Application", metrics, List(stageDimension, applicationDimension)) def putMetrics(metricNamespace: String, metrics: List[FrontendMetric], dimensions: List[Dimension]): Unit = { for { metricGroup <- metrics.filterNot(_.isEmpty).grouped(20) } { val metricsAsStatistics: List[FrontendStatisticSet] = metricGroup.map( metric => FrontendStatisticSet(metric, metric.getAndResetDataPoints)) val request = new PutMetricDataRequest() .withNamespace(metricNamespace) .withMetricData { val metricData = for(metricStatistic <- metricsAsStatistics) yield { new MetricDatum() .withStatisticValues(frontendMetricToStatisticSet(metricStatistic)) .withUnit(metricStatistic.metric.metricUnit) .withMetricName(metricStatistic.metric.name) .withDimensions(dimensions.asJava) } metricData.asJava } cloudwatch.foreach(_.putMetricDataAsync(request, AsyncHandlerForMetric(metricsAsStatistics))) } } private def frontendMetricToStatisticSet(metricStatistics: FrontendStatisticSet): StatisticSet = new StatisticSet() .withMaximum(metricStatistics.maximum) .withMinimum(metricStatistics.minimum) .withSampleCount(metricStatistics.sampleCount) .withSum(metricStatistics.sum) }