func()

in internal/oraclemetrics/oraclemetrics.go [299:349]


func (c *MetricCollector) SendDefaultMetricsToCloudMonitoring(ctx context.Context) []*mrpb.TimeSeries {
	queryNamesMap := queryMap(c.Config.GetOracleConfiguration().GetOracleMetrics().GetQueries())
	var queryNames []string
	for qn := range queryNamesMap {
		queryNames = append(queryNames, qn)
	}

	timeseries := []*mrpb.TimeSeries{}

	maxExecutionThreads := int(c.Config.GetOracleConfiguration().GetOracleMetrics().GetMaxExecutionThreads())
	wp := workerpool.New(maxExecutionThreads)

	for serviceName, db := range c.connections {
		dbInfo, err := fetchDatabaseInfo(ctx, db)
		if err != nil {
			log.CtxLogger(ctx).Errorw("Failed to fetch database information from v$database view", "error", err, "service_name", serviceName)
			continue
		}

		// Submit a task for each query to the workerpool.
		for _, qn := range queryNames {
			query, ok := queryNamesMap[qn]
			if !ok {
				log.CtxLogger(ctx).Warnw("Query not found", "query_name", qn)
				continue
			}
			if c.shouldSkipQuery(ctx, serviceName, qn) {
				continue
			}
			if !isQueryAllowedForRole(query, dbInfo.DatabaseRole) {
				continue
			}
			wp.Submit(func() {
				ts := executeQueryAndSendMetrics(ctx, queryOptions{
					db:            db,
					query:         query,
					timeout:       c.Config.GetOracleConfiguration().GetOracleMetrics().GetQueryTimeout().GetSeconds(),
					collector:     c,
					runningSum:    make(map[timeSeriesKey]prevVal),
					serviceName:   serviceName,
					defaultLabels: map[string]string{"dbid": dbInfo.DBID, "db_unique_name": dbInfo.DBUniqueName, "pdb_name": dbInfo.PdbName},
				})
				c.mu.Lock()
				timeseries = append(timeseries, ts...)
				c.mu.Unlock()
			})
		}
	}
	wp.StopWait()
	return timeseries
}