app/metrics/CloudWatch.scala (109 lines of code) (raw):
package metrics
import com.amazonaws.client.builder.AwsClientBuilder
import com.amazonaws.handlers.AsyncHandler
import com.amazonaws.services.cloudwatch.model._
import com.amazonaws.services.cloudwatch.{
AmazonCloudWatchAsync,
AmazonCloudWatchAsyncClientBuilder
}
import conf.ApplicationConfiguration
import logging.Logging
import services.AwsEndpoints
import scala.jdk.CollectionConverters._
class CloudWatch(
val config: ApplicationConfiguration,
val awsEndpoints: AwsEndpoints
) extends Logging {
lazy val cloudwatch: Option[AmazonCloudWatchAsync] =
config.aws.credentials.map(credentials => {
val endpoint = new AwsClientBuilder.EndpointConfiguration(
awsEndpoints.monitoring,
config.aws.region
)
AmazonCloudWatchAsyncClientBuilder
.standard()
.withCredentials(credentials)
.withEndpointConfiguration(endpoint)
.build()
})
trait LoggingAsyncHandler
extends AsyncHandler[PutMetricDataRequest, PutMetricDataResult] {
def onError(exception: Exception): Unit = {
logger.warn(
s"CloudWatch PutMetricDataRequest error: ${exception.getMessage}}"
)
}
def onSuccess(
request: PutMetricDataRequest,
result: PutMetricDataResult
): Unit = {}
}
case class AsyncHandlerForMetric(
frontendStatisticSets: List[FrontendStatisticSet]
) extends LoggingAsyncHandler {
override def onError(exception: Exception) = {
logger.warn(
s"Failed to put ${frontendStatisticSets.size} metrics: $exception"
)
logger.warn(
s"Failed to put ${frontendStatisticSets.map(_.metric.name).mkString(",")}"
)
frontendStatisticSets.foreach { _.reset() }
super.onError(exception)
}
override def onSuccess(
request: PutMetricDataRequest,
result: PutMetricDataResult
) = {
super.onSuccess(request, result)
}
}
def putMetricsWithStage(
metrics: List[FrontendMetric],
applicationDimension: Dimension,
stageDimension: Dimension
): Unit =
putMetrics(
"Application",
metrics,
List(stageDimension, applicationDimension)
)
def putMetrics(
metricNamespace: String,
metrics: List[FrontendMetric],
dimensions: List[Dimension]
): Unit = {
for {
metricGroup <- metrics.filterNot(_.isEmpty).grouped(20)
} {
val metricsAsStatistics: List[FrontendStatisticSet] =
metricGroup.map(metric =>
FrontendStatisticSet(metric, metric.getAndResetDataPoints)
)
val metricsAsDatums = metricsAsStatistics.map(metricStatistic =>
new MetricDatum()
.withStatisticValues(frontendMetricToStatisticSet(metricStatistic))
.withUnit(metricStatistic.metric.metricUnit)
.withMetricName(metricStatistic.metric.name)
.withDimensions(dimensions.asJavaCollection)
)
val request = new PutMetricDataRequest()
.withNamespace(metricNamespace)
.withMetricData(metricsAsDatums.asJavaCollection)
cloudwatch.foreach(
_.putMetricDataAsync(
request,
AsyncHandlerForMetric(metricsAsStatistics)
)
)
}
}
private def frontendMetricToStatisticSet(
metricStatistics: FrontendStatisticSet
): StatisticSet =
new StatisticSet()
.withMaximum(metricStatistics.maximum)
.withMinimum(metricStatistics.minimum)
.withSampleCount(metricStatistics.sampleCount)
.withSum(metricStatistics.sum)
}