common/metrics/dashboard_components.go (349 lines of code) (raw):
// Copyright 2023 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package metrics
import (
"context"
"fmt"
"sync"
dashboard "cloud.google.com/go/monitoring/dashboard/apiv1"
"cloud.google.com/go/monitoring/dashboard/apiv1/dashboardpb"
"github.com/GoogleCloudPlatform/spanner-migration-tool/internal"
)
// Defines dimensions for Monitoring Dashboard Metrics
const (
// Default height of a tile in the monitoring dashboard
defaultMonitoringMetricHeight int32 = 16
// Default width of a tile in the monitoring dashboard
defaultMonitoringMetricWidth int32 = 16
// Default columns in the monitoring dashboard
defaultColumns int32 = 3
defaultMosaicColumns int32 = 48
)
var once sync.Once
var dashboardClient *dashboard.DashboardsClient
// MonitoringMetricsResources contains information required to create the monitoring dashboard
type MonitoringMetricsResources struct {
MigrationProjectId string
DataflowJobId string
DatastreamId string
JobMetadataGcsBucket string
PubsubSubscriptionId string
SpannerProjectId string
SpannerInstanceId string
SpannerDatabaseId string
ShardToShardResourcesMap map[string]internal.ShardResources
ShardId string
MigrationRequestId string
}
type TileInfo struct {
Title string
TimeSeriesQueries map[string]string // Map of legend template and their corresponding queries
TextContent string // string for text input
}
type MosaicGroup struct {
groupTitle string
groupCreateTileFunction func(resourceIds MonitoringMetricsResources) []*dashboardpb.MosaicLayout_Tile
}
func getDashboardClient(ctx context.Context) *dashboard.DashboardsClient {
if dashboardClient == nil {
once.Do(func() {
dashboardClient, _ = dashboard.NewDashboardsClient(ctx)
})
return dashboardClient
}
return dashboardClient
}
func createSpannerMetrics(resourceIds MonitoringMetricsResources) []*dashboardpb.MosaicLayout_Tile {
spannerTiles := []*dashboardpb.MosaicLayout_Tile{
TileInfo{Title: "Spanner CPU Utilisation", TimeSeriesQueries: map[string]string{"Database CPU Utilisation": fmt.Sprintf(spannerCpuUtilDbQuery, resourceIds.SpannerDatabaseId, resourceIds.SpannerInstanceId), "Instance CPU Utilisation": fmt.Sprintf(spannerCpuUtilInstanceQuery, resourceIds.SpannerInstanceId)}}.createXYChartTile(),
TileInfo{Title: "Spanner Storage", TimeSeriesQueries: map[string]string{"Database Storage": fmt.Sprintf(spannerStorageUtilDbQuery, resourceIds.SpannerDatabaseId, resourceIds.SpannerInstanceId), "Instance Storage": fmt.Sprintf(spannerStorageUtilInstanceQuery, resourceIds.SpannerInstanceId)}}.createXYChartTile(),
}
return spannerTiles
}
func createShardDataflowMetrics(resourceIds MonitoringMetricsResources) []*dashboardpb.MosaicLayout_Tile {
dataflowTiles := []*dashboardpb.MosaicLayout_Tile{
TileInfo{
Title: "Dataflow Workers CPU Utilization",
TimeSeriesQueries: map[string]string{
"p50 worker": fmt.Sprintf(dataflowCpuUtilPercentileQuery, resourceIds.DataflowJobId, "50"),
"p90 worker": fmt.Sprintf(dataflowCpuUtilPercentileQuery, resourceIds.DataflowJobId, "90"),
"Max worker": fmt.Sprintf(dataflowCpuUtilMaxQuery, resourceIds.DataflowJobId),
}}.createXYChartTile(),
TileInfo{
Title: "Dataflow Workers Memory Utilization",
TimeSeriesQueries: map[string]string{
"p50 worker": fmt.Sprintf(dataflowMemoryUtilPercentileQuery, resourceIds.DataflowJobId, "50"),
"p90 worker": fmt.Sprintf(dataflowMemoryUtilPercentileQuery, resourceIds.DataflowJobId, "90"),
"Max worker": fmt.Sprintf(dataflowMemoryUtilMaxQuery, resourceIds.DataflowJobId),
}}.createXYChartTile(),
TileInfo{Title: "Dataflow Workers Max Backlog Time Seconds", TimeSeriesQueries: map[string]string{"": fmt.Sprintf(dataflowBacklogTimeQuery, resourceIds.DataflowJobId)}}.createXYChartTile(),
}
return dataflowTiles
}
func createShardDatastreamMetrics(resourceIds MonitoringMetricsResources) []*dashboardpb.MosaicLayout_Tile {
datastreamTiles := []*dashboardpb.MosaicLayout_Tile{
TileInfo{
Title: "Datastream Total Latency",
TimeSeriesQueries: map[string]string{"p50 " + resourceIds.DatastreamId: fmt.Sprintf(datastreamTotalLatencyQuery, resourceIds.DatastreamId, "50"), "p90 " + resourceIds.DatastreamId: fmt.Sprintf(datastreamTotalLatencyQuery, resourceIds.DatastreamId, "90")}}.createXYChartTile(),
TileInfo{Title: "Datastream Throughput", TimeSeriesQueries: map[string]string{resourceIds.DatastreamId: fmt.Sprintf(datastreamThroughputQuery, resourceIds.DatastreamId)}}.createXYChartTile(),
TileInfo{Title: "Datastream Unsupported Events", TimeSeriesQueries: map[string]string{resourceIds.DatastreamId: fmt.Sprintf(datastreamUnsupportedEventsQuery, resourceIds.DatastreamId)}}.createXYChartTile(),
}
return datastreamTiles
}
func createShardGcsMetrics(resourceIds MonitoringMetricsResources) []*dashboardpb.MosaicLayout_Tile {
// If fetching gcs bucket failed, don't return any tiles
if resourceIds.JobMetadataGcsBucket == "" {
return []*dashboardpb.MosaicLayout_Tile{}
}
gcsBucketTiles := []*dashboardpb.MosaicLayout_Tile{
TileInfo{Title: "GCS Bucket Total Bytes", TimeSeriesQueries: map[string]string{resourceIds.JobMetadataGcsBucket: fmt.Sprintf(gcsTotalBytesQuery, resourceIds.JobMetadataGcsBucket)}}.createXYChartTile(),
}
return gcsBucketTiles
}
func createShardPubsubMetrics(resourceIds MonitoringMetricsResources) []*dashboardpb.MosaicLayout_Tile {
pubsubTiles := []*dashboardpb.MosaicLayout_Tile{
TileInfo{Title: "Pubsub Subscription Sent Message Count", TimeSeriesQueries: map[string]string{resourceIds.PubsubSubscriptionId: fmt.Sprintf(pubsubSubscriptionSentMessageCountQuery, resourceIds.PubsubSubscriptionId)}}.createXYChartTile(),
TileInfo{Title: "Pubsub Age of Oldest Unacknowledged Message", TimeSeriesQueries: map[string]string{resourceIds.PubsubSubscriptionId: fmt.Sprintf(pubsubOldestUnackedMessageAgeQuery, resourceIds.PubsubSubscriptionId)}}.createXYChartTile(),
}
return pubsubTiles
}
func createShardIndependentTopMetrics(resourceIds MonitoringMetricsResources) []*dashboardpb.MosaicLayout_Tile {
independentTopMetricsTiles := []*dashboardpb.MosaicLayout_Tile{
TileInfo{
Title: "Dataflow Workers CPU Utilization",
TimeSeriesQueries: map[string]string{
"p50 worker": fmt.Sprintf(dataflowCpuUtilPercentileQuery, resourceIds.DataflowJobId, "50"),
"p90 worker": fmt.Sprintf(dataflowCpuUtilPercentileQuery, resourceIds.DataflowJobId, "90"),
"Max worker": fmt.Sprintf(dataflowCpuUtilMaxQuery, resourceIds.DataflowJobId),
}}.createXYChartTile(),
TileInfo{Title: "Datastream Throughput", TimeSeriesQueries: map[string]string{resourceIds.DatastreamId: fmt.Sprintf(datastreamThroughputQuery, resourceIds.DatastreamId)}}.createXYChartTile(),
TileInfo{Title: "Datastream Unsupported Events", TimeSeriesQueries: map[string]string{resourceIds.DatastreamId: fmt.Sprintf(datastreamUnsupportedEventsQuery, resourceIds.DatastreamId)}}.createXYChartTile(),
TileInfo{Title: "Pubsub Age of Oldest Unacknowledged Message", TimeSeriesQueries: map[string]string{resourceIds.PubsubSubscriptionId: fmt.Sprintf(pubsubOldestUnackedMessageAgeQuery, resourceIds.PubsubSubscriptionId)}}.createXYChartTile(),
}
spannerMetrics := createSpannerMetrics(resourceIds)
independentTopMetricsTiles = append(independentTopMetricsTiles, spannerMetrics...)
return independentTopMetricsTiles
}
func createAggFilterCondition(resourceName string, resourceValues []string) string {
condition := ""
for _, id := range resourceValues {
if condition == "" {
condition = fmt.Sprintf("%s == '%s'", resourceName, id)
} else {
condition += fmt.Sprintf("|| %s == '%s'", resourceName, id)
}
}
return condition
}
func createAggDataflowMetrics(resourceIds MonitoringMetricsResources) []*dashboardpb.MosaicLayout_Tile {
var dataflowJobs []string
for _, value := range resourceIds.ShardToShardResourcesMap {
dataflowJobs = append(dataflowJobs, value.DataflowResources.JobId)
}
dataflowTiles := []*dashboardpb.MosaicLayout_Tile{
TileInfo{
Title: "Dataflow Workers CPU Utilization",
TimeSeriesQueries: map[string]string{
"p50 shard": fmt.Sprintf(dataflowAggCpuUtilPercentileQuery, createAggFilterCondition("metadata.user_labels.dataflow_job_id", dataflowJobs), "50"),
"p90 shard": fmt.Sprintf(dataflowAggCpuUtilPercentileQuery, createAggFilterCondition("metadata.user_labels.dataflow_job_id", dataflowJobs), "90"),
"Max shard": fmt.Sprintf(dataflowAggCpuUtilMaxQuery, createAggFilterCondition("metadata.user_labels.dataflow_job_id", dataflowJobs)),
}}.createXYChartTile(),
TileInfo{
Title: "Dataflow Workers Memory Utilization",
TimeSeriesQueries: map[string]string{
"p50 shard": fmt.Sprintf(dataflowAggMemoryUtilPercentileQuery, createAggFilterCondition("metadata.user_labels.dataflow_job_id", dataflowJobs), "50"),
"p90 shard": fmt.Sprintf(dataflowAggMemoryUtilPercentileQuery, createAggFilterCondition("metadata.user_labels.dataflow_job_id", dataflowJobs), "90"),
"Max shard": fmt.Sprintf(dataflowAggMemoryUtilMaxQuery, createAggFilterCondition("metadata.user_labels.dataflow_job_id", dataflowJobs)),
}}.createXYChartTile(),
TileInfo{Title: "Dataflow Workers Max Backlog Time Seconds", TimeSeriesQueries: map[string]string{"Dataflow Backlog Time Seconds": fmt.Sprintf(dataflowAggBacklogTimeQuery, createAggFilterCondition("metric.job_id", dataflowJobs))}}.createXYChartTile(),
TileInfo{Title: "Dataflow Per Shard Median CPU Utilization", TimeSeriesQueries: map[string]string{"": fmt.Sprintf(dataflowAggPerShardCpuUtil, createAggFilterCondition("metadata.user_labels.dataflow_job_id", dataflowJobs))}}.createXYChartTile(),
}
return dataflowTiles
}
func createAggDatastreamMetrics(resourceIds MonitoringMetricsResources) []*dashboardpb.MosaicLayout_Tile {
var datastreamJobs []string
for _, value := range resourceIds.ShardToShardResourcesMap {
datastreamJobs = append(datastreamJobs, value.DatastreamResources.DatastreamName)
}
datastreamTiles := []*dashboardpb.MosaicLayout_Tile{
TileInfo{
Title: "Datastream Total Latency",
TimeSeriesQueries: map[string]string{"p50 Datastream Latency": fmt.Sprintf(datastreamAggTotalLatencyQuery, createAggFilterCondition("resource.stream_id", datastreamJobs), "50", "50"), "p90 Datastream Latency": fmt.Sprintf(datastreamAggTotalLatencyQuery, createAggFilterCondition("resource.stream_id", datastreamJobs), "90", "90")}}.createXYChartTile(),
TileInfo{Title: "Total Datastream Throughput", TimeSeriesQueries: map[string]string{"Datastream Total Throughput": fmt.Sprintf(datastreamAggThroughputQuery, createAggFilterCondition("resource.stream_id", datastreamJobs))}}.createXYChartTile(),
TileInfo{Title: "Total Datastream Unsupported Events", TimeSeriesQueries: map[string]string{"Datastream Total Unsupported Events": fmt.Sprintf(datastreamAggUnsupportedEventsQuery, createAggFilterCondition("resource.stream_id", datastreamJobs))}}.createXYChartTile(),
}
return datastreamTiles
}
func createAggGcsMetrics(resourceIds MonitoringMetricsResources) []*dashboardpb.MosaicLayout_Tile {
var gcsBuckets []string
for _, value := range resourceIds.ShardToShardResourcesMap {
if value.GcsResources.BucketName != "" {
gcsBuckets = append(gcsBuckets, value.GcsResources.BucketName)
}
}
if len(gcsBuckets) == 0 {
return []*dashboardpb.MosaicLayout_Tile{}
}
// We fetch gcs buckets for dashboard creation it is possible due to an error we are not able to fetch gcs buckets for all the shards
gcsBucketTiles := []*dashboardpb.MosaicLayout_Tile{
TileInfo{Title: fmt.Sprintf("GCS Bucket Total Bytes for %v shards", len(gcsBuckets)), TimeSeriesQueries: map[string]string{resourceIds.JobMetadataGcsBucket: fmt.Sprintf(gcsAggTotalBytesQuery, createAggFilterCondition("resource.bucket_name", gcsBuckets))}}.createXYChartTile(),
}
return gcsBucketTiles
}
func createAggPubsubMetrics(resourceIds MonitoringMetricsResources) []*dashboardpb.MosaicLayout_Tile {
var pubsubSubs []string
for _, value := range resourceIds.ShardToShardResourcesMap {
pubsubSubs = append(pubsubSubs, value.PubsubResources.SubscriptionId)
}
pubsubTiles := []*dashboardpb.MosaicLayout_Tile{
TileInfo{Title: "Pubsub Subscription Sent Message Count", TimeSeriesQueries: map[string]string{"Pubsub Subscription Sent Message Count": fmt.Sprintf(pubsubAggSubscriptionSentMessageCountQuery, createAggFilterCondition("resource.subscription_id", pubsubSubs))}}.createXYChartTile(),
TileInfo{Title: "Pubsub Age of Oldest Unacknowledged Message", TimeSeriesQueries: map[string]string{"Pubsub Age of Oldest Unacknowledged Message": fmt.Sprintf(pubsubAggOldestUnackedMessageAgeQuery, createAggFilterCondition("resource.subscription_id", pubsubSubs))}}.createXYChartTile(),
}
return pubsubTiles
}
func createAggIndependentTopMetrics(resourceIds MonitoringMetricsResources) []*dashboardpb.MosaicLayout_Tile {
var dataflowJobs, datastreamJobs, pubsubSubs []string
for _, value := range resourceIds.ShardToShardResourcesMap {
dataflowJobs = append(dataflowJobs, value.DataflowResources.JobId)
datastreamJobs = append(datastreamJobs, value.DatastreamResources.DatastreamName)
pubsubSubs = append(pubsubSubs, value.PubsubResources.SubscriptionId)
}
independentTopMetricsTiles := []*dashboardpb.MosaicLayout_Tile{
TileInfo{
Title: "Dataflow Workers CPU Utilization",
TimeSeriesQueries: map[string]string{
"p50 shard": fmt.Sprintf(dataflowAggCpuUtilPercentileQuery, createAggFilterCondition("metadata.user_labels.dataflow_job_id", dataflowJobs), "50"),
"p90 shard": fmt.Sprintf(dataflowAggCpuUtilPercentileQuery, createAggFilterCondition("metadata.user_labels.dataflow_job_id", dataflowJobs), "90"),
"Max shard": fmt.Sprintf(dataflowAggCpuUtilMaxQuery, createAggFilterCondition("metadata.user_labels.dataflow_job_id", dataflowJobs)),
}}.createXYChartTile(),
TileInfo{Title: "Total Datastream Throughput", TimeSeriesQueries: map[string]string{"Datastream Throughput": fmt.Sprintf(datastreamAggThroughputQuery, createAggFilterCondition("resource.stream_id", datastreamJobs))}}.createXYChartTile(),
TileInfo{Title: "Total Datastream Unsupported Events", TimeSeriesQueries: map[string]string{"Datastream Unsupported Events": fmt.Sprintf(datastreamAggUnsupportedEventsQuery, createAggFilterCondition("resource.stream_id", datastreamJobs))}}.createXYChartTile(),
TileInfo{Title: "Pubsub Age of Oldest Unacknowledged Message", TimeSeriesQueries: map[string]string{"Pubsub Age of Oldest Unacknowledged Message": fmt.Sprintf(pubsubAggOldestUnackedMessageAgeQuery, createAggFilterCondition("resource.subscription_id", pubsubSubs))}}.createXYChartTile(),
}
spannerMetrics := createSpannerMetrics(resourceIds)
independentTopMetricsTiles = append(independentTopMetricsTiles, spannerMetrics...)
return independentTopMetricsTiles
}
func createAggIndependentBottomMetrics(resourceIds MonitoringMetricsResources) []*dashboardpb.MosaicLayout_Tile {
shardToDashboardMappingText := ""
for shardId, shardResource := range resourceIds.ShardToShardResourcesMap {
shardUrl := fmt.Sprintf("https://console.cloud.google.com/monitoring/dashboards/builder/%v?project=%v", shardResource.MonitoringResources.DashboardName, resourceIds.MigrationProjectId)
shardString := fmt.Sprintf("Shard [%s](%s)", shardId, shardUrl)
if shardToDashboardMappingText == "" {
shardToDashboardMappingText = shardString
} else {
shardToDashboardMappingText += " \\\n" + shardString
}
}
independentBottomMetricsTiles := []*dashboardpb.MosaicLayout_Tile{
TileInfo{
Title: "Shard Dashboards",
TextContent: shardToDashboardMappingText,
}.createTextTile(),
}
return independentBottomMetricsTiles
}
// createXYChartTile returns a single tile in a mosaic layout dashboard
func (tileInfo TileInfo) createXYChartTile() *dashboardpb.MosaicLayout_Tile {
var dataSets []*dashboardpb.XyChart_DataSet
for legendTemplate, query := range tileInfo.TimeSeriesQueries {
ds := &dashboardpb.XyChart_DataSet{
PlotType: dashboardpb.XyChart_DataSet_LINE,
TargetAxis: dashboardpb.XyChart_DataSet_Y1,
TimeSeriesQuery: &dashboardpb.TimeSeriesQuery{
Source: &dashboardpb.TimeSeriesQuery_TimeSeriesQueryLanguage{
TimeSeriesQueryLanguage: query,
},
},
}
if legendTemplate != "" {
ds.LegendTemplate = legendTemplate
}
dataSets = append(dataSets, ds)
}
tile := dashboardpb.MosaicLayout_Tile{
Widget: &dashboardpb.Widget{
Title: tileInfo.Title,
Content: &dashboardpb.Widget_XyChart{
XyChart: &dashboardpb.XyChart{
ChartOptions: &dashboardpb.ChartOptions{
Mode: dashboardpb.ChartOptions_COLOR,
},
DataSets: dataSets,
},
},
},
}
return &tile
}
// createCollapsibleGroupTile returns a collapsible group tile in a mosaic layout dashboard
func (tileInfo TileInfo) createCollapsibleGroupTile(tiles []*dashboardpb.MosaicLayout_Tile, heightOffset int32) (*dashboardpb.MosaicLayout_Tile, int32) {
groupTileHeight := setWidgetPositions(tiles, heightOffset)
groupTile := dashboardpb.MosaicLayout_Tile{
XPos: 0,
YPos: heightOffset,
Width: defaultMonitoringMetricWidth * defaultColumns,
Height: groupTileHeight,
Widget: &dashboardpb.Widget{
Title: tileInfo.Title,
Content: &dashboardpb.Widget_CollapsibleGroup{
CollapsibleGroup: &dashboardpb.CollapsibleGroup{
Collapsed: true,
},
},
},
}
return &groupTile, heightOffset + groupTileHeight
}
func (tileInfo TileInfo) createTextTile() *dashboardpb.MosaicLayout_Tile {
textTile := dashboardpb.MosaicLayout_Tile{
Widget: &dashboardpb.Widget{
Title: tileInfo.Title,
Content: &dashboardpb.Widget_Text{
Text: &dashboardpb.Text{
Content: tileInfo.TextContent,
Format: dashboardpb.Text_MARKDOWN,
},
},
},
}
return &textTile
}
// setWidgetPositions positions the tiles in the monitoring dashboard
func setWidgetPositions(tiles []*dashboardpb.MosaicLayout_Tile, heightOffset int32) int32 {
for tilePosition, tile := range tiles {
tile.XPos = (int32(tilePosition) % defaultColumns) * defaultMonitoringMetricWidth
tile.YPos = heightOffset + (int32(tilePosition)/defaultColumns)*defaultMonitoringMetricHeight
tile.Width = defaultMonitoringMetricWidth
tile.Height = defaultMonitoringMetricHeight
}
return ((int32(len(tiles)-1) / defaultColumns) + 1) * defaultMonitoringMetricHeight
}
// getCreateMonitoringDashboardRequest returns the request for generating the monitoring dashboard
func getCreateMonitoringDashboardRequest(
resourceIds MonitoringMetricsResources,
createIndependentTopMetric func(resourceIds MonitoringMetricsResources) []*dashboardpb.MosaicLayout_Tile,
mosaicGroups []MosaicGroup,
createAggIndependentBottomMetrics func(resourceIds MonitoringMetricsResources) []*dashboardpb.MosaicLayout_Tile,
displayName string) *dashboardpb.CreateDashboardRequest {
var mosaicLayoutTiles []*dashboardpb.MosaicLayout_Tile
var heightOffset int32 = 0
// create top independent metrics tiles
independentTopMetricsTiles := createIndependentTopMetric(resourceIds)
heightOffset += setWidgetPositions(independentTopMetricsTiles, heightOffset)
mosaicLayoutTiles = append(mosaicLayoutTiles, independentTopMetricsTiles...)
// add group tiles
for _, mosaicGroup := range mosaicGroups {
metricTiles := mosaicGroup.groupCreateTileFunction(resourceIds)
var groupTile *dashboardpb.MosaicLayout_Tile
groupTile, heightOffset = TileInfo{Title: mosaicGroup.groupTitle}.createCollapsibleGroupTile(metricTiles, heightOffset)
mosaicLayoutTiles = append(append(mosaicLayoutTiles, metricTiles...), groupTile)
}
// create bottom independent metrics tiles
if createAggIndependentBottomMetrics != nil {
independentBottomMetricsTiles := createAggIndependentBottomMetrics(resourceIds)
heightOffset += setWidgetPositions(independentBottomMetricsTiles, heightOffset)
mosaicLayoutTiles = append(mosaicLayoutTiles, independentBottomMetricsTiles...)
}
mosaicLayout := dashboardpb.MosaicLayout{
Columns: defaultMosaicColumns,
Tiles: mosaicLayoutTiles,
}
layout := dashboardpb.Dashboard_MosaicLayout{
MosaicLayout: &mosaicLayout,
}
dashboardDisplayName := displayName
db := dashboardpb.Dashboard{
DisplayName: dashboardDisplayName,
Layout: &layout,
}
req := &dashboardpb.CreateDashboardRequest{
Parent: "projects/" + resourceIds.MigrationProjectId,
Dashboard: &db,
}
return req
}