internal/scanners/diagnostics_settings.go (159 lines of code) (raw):
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
package scanners
import (
"context"
"fmt"
"math"
"net/http"
"strings"
"sync"
"time"
"github.com/Azure/azqr/internal/models"
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/arm"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/runtime"
"github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/monitor/armmonitor"
"github.com/rs/zerolog/log"
)
// DiagnosticSettingsScanner - scanner for diagnostic settings
type DiagnosticSettingsScanner struct {
ctx context.Context
client *arm.Client
}
// Init - Initializes the DiagnosticSettingsScanner
func (d *DiagnosticSettingsScanner) Init(ctx context.Context, cred azcore.TokenCredential, options *arm.ClientOptions) error {
client, err := arm.NewClient(moduleName+".DiagnosticSettingsBatch", moduleVersion, cred, options)
if err != nil {
return err
}
d.client = client
d.ctx = ctx
return nil
}
// ListResourcesWithDiagnosticSettings - Lists all resources with diagnostic settings
func (d *DiagnosticSettingsScanner) ListResourcesWithDiagnosticSettings(resources []*string) (map[string]bool, error) {
res := map[string]bool{}
if len(resources) > 5000 {
log.Warn().Msg(fmt.Sprintf("%d resources detected. Scan will take longer than usual", len(resources)))
}
batches := int(math.Ceil(float64(len(resources)) / 20))
models.LogResourceTypeScan("Diagnostic Settings")
if batches == 0 {
return res, nil
}
log.Debug().Msgf("Number of diagnostic setting batches: %d", batches)
jobs := make(chan []*string, batches)
ch := make(chan map[string]bool, batches)
var wg sync.WaitGroup
// Start workers
// Based on: https://medium.com/insiderengineering/concurrent-http-requests-in-golang-best-practices-and-techniques-f667e5a19dea
numWorkers := 100 // Define the number of workers in the pool
for w := 0; w < numWorkers; w++ {
go d.worker(jobs, ch, &wg)
}
wg.Add(batches)
// Split resources into batches of 20 items.
batchSize := 20
batchCount := 0
for i := 0; i < len(resources); i += batchSize {
j := i + batchSize
if j > len(resources) {
j = len(resources)
}
jobs <- resources[i:j]
batchCount++
if batchCount == numWorkers {
log.Debug().Msgf("all %d workers are running. Sleeping for 4 seconds to avoid throttling", numWorkers)
batchCount = 0
// there are more batches to process
// Staggering queries to avoid throttling. Max 15 queries each 5 seconds.
// https://learn.microsoft.com/en-us/azure/governance/resource-graph/concepts/guidance-for-throttled-requests#staggering-queries
time.Sleep(4 * time.Second)
}
}
// Wait for all workers to finish
close(jobs)
wg.Wait()
for i := 0; i < batches; i++ {
for k, v := range <-ch {
res[k] = v
}
}
return res, nil
}
func (d *DiagnosticSettingsScanner) worker(jobs <-chan []*string, results chan<- map[string]bool, wg *sync.WaitGroup) {
for ids := range jobs {
resp, err := d.restCall(d.ctx, ids)
if err != nil {
log.Fatal().Err(err).Msg("Failed to get diagnostic settings")
}
asyncRes := map[string]bool{}
for _, response := range resp.Responses {
for _, diagnosticSetting := range response.Content.Value {
id := parseResourceId(diagnosticSetting.ID)
asyncRes[id] = true
}
}
results <- asyncRes
wg.Done()
}
}
const (
moduleName = "armresources"
moduleVersion = "v1.1.1"
)
func (d *DiagnosticSettingsScanner) restCall(ctx context.Context, resourceIds []*string) (*ArmBatchResponse, error) {
req, err := runtime.NewRequest(ctx, http.MethodPost, runtime.JoinPaths(d.client.Endpoint(), "batch"))
if err != nil {
return nil, err
}
reqQP := req.Raw().URL.Query()
reqQP.Set("api-version", "2020-06-01")
req.Raw().URL.RawQuery = reqQP.Encode()
req.Raw().Header["Accept"] = []string{"application/json"}
batch := ArmBatchRequest{
Requests: []ArmBatchRequestItem{},
}
for _, resourceId := range resourceIds {
batch.Requests = append(batch.Requests, ArmBatchRequestItem{
HttpMethod: http.MethodGet,
RelativeUrl: *resourceId + "/providers/microsoft.insights/diagnosticSettings?api-version=2021-05-01-preview",
})
}
// set request body
err = runtime.MarshalAsJSON(req, batch)
if err != nil {
return nil, err
}
resp, err := d.client.Pipeline().Do(req)
if err != nil {
return nil, err
}
if !runtime.HasStatusCode(resp, http.StatusOK, http.StatusAccepted) {
return nil, runtime.NewResponseError(resp)
}
result := ArmBatchResponse{}
if err := runtime.UnmarshalAsJSON(resp, &result); err != nil {
return nil, err
}
return &result, nil
}
func parseResourceId(diagnosticSettingID *string) string {
id := *diagnosticSettingID
i := strings.Index(id, "/providers/microsoft.insights/diagnosticSettings/")
return strings.ToLower(id[:i])
}
type (
ArmBatchRequest struct {
Requests []ArmBatchRequestItem `json:"requests"`
}
ArmBatchRequestItem struct {
HttpMethod string `json:"httpMethod"`
RelativeUrl string `json:"relativeUrl"`
}
ArmBatchResponse struct {
Responses []ArmBatchResponseItem `json:"responses"`
}
ArmBatchResponseItem struct {
Content armmonitor.DiagnosticSettingsResourceCollection `json:"content"`
}
)
func (d *DiagnosticSettingsScanner) Scan(resources []*string) map[string]bool {
diagResults, err := d.ListResourcesWithDiagnosticSettings(resources)
if err != nil {
if models.ShouldSkipError(err) {
diagResults = map[string]bool{}
} else {
log.Fatal().Err(err).Msg("Failed to list resources with Diagnostic Settings")
}
}
return diagResults
}