cloudflare_exporter.go (264 lines of code) (raw):

package main import ( "context" "fmt" "net" "net/http" "os" "strings" "sync" "time" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/machinebox/graphql" "github.com/oklog/run" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/prometheus/common/promlog" "github.com/prometheus/common/version" "gopkg.in/alecthomas/kingpin.v2" ) const ( namespace = "cloudflare" apiMaxLimit = 10000 ) var ( // arguments listenAddress = kingpin.Flag("listen-address", "Metrics exporter listen address."). Short('l').Envar("CLOUDFLARE_EXPORTER_LISTEN_ADDRESS").Default(":9199").String() cfEmail = kingpin.Flag("cloudflare-api-email", "email address for analytics API authentication."). Envar("CLOUDFLARE_API_EMAIL").Required().String() cfAPIKey = kingpin.Flag("cloudflare-api-key", "API key for analytics API authentication."). Envar("CLOUDFLARE_API_KEY").Required().String() cfZones = kingpin.Flag("cloudflare-zones", "Comma-separated list of zones to scrape. Omit to scrape all zones in account."). Envar("CLOUDFLARE_ZONES").Default("").String() cfAPIBaseURL = kingpin.Flag("cloudflare-api-base-url", "Cloudflare regular (non-analytics) API base URL"). Envar("CLOUDFLARE_API_BASE_URL").Default("https://api.cloudflare.com/client/v4").String() cfAnalyticsAPIBaseURL = kingpin.Flag("cloudflare-analytics-api-base-url", "Cloudflare analytics (graphql) API base URL"). Envar("CLOUDFLARE_ANALYTICS_API_BASE_URL").Default("https://api.cloudflare.com/client/v4/graphql").String() cfScrapeDataDelay = kingpin.Flag("cloudflare-scape-data-delay-minutes", "Delay from the scraped data to realtime in minutes"). Envar("CLOUDFLARE_SCRAPE_DATA_DELAY_MINUTES").Default("5").Int() cfScrapeIntervalSeconds = kingpin.Flag("cloudflare-scrape-interval-seconds", "Interval at which to retrieve metrics from Cloudflare, separate from being scraped by prometheus"). Envar("CLOUDFLARE_SCRAPE_INTERVAL_SECONDS").Default("60").Int() scrapeTimeoutSeconds = kingpin.Flag("scrape-timeout-seconds", "scrape timeout seconds"). Envar("CLOUDFLARE_EXPORTER_SCRAPE_TIMEOUT_SECONDS").Default("30").Int() logLevel = kingpin.Flag("log-level", "log level").Envar("CLOUDFLARE_EXPORTER_LOG_LEVEL").Default("info").String() initialScrapeImmediately = kingpin.Flag("initial-scrape-immediately", "Scrape Cloudflare immediately at startup, or wait scrape-timeout-seconds. For development only."). Hidden().Envar("CLOUDFLARE_EXPORTER_INITIAL_SCRAPE_IMMEDIATELY").Default("false").Bool() ) func main() { kingpin.Version(version.Print("cloudflare_exporter")) kingpin.Parse() logger := newPromLogger(*logLevel) level.Info(logger).Log("msg", "starting cloudflare_exporter") cfExporter := &exporter{ email: *cfEmail, apiKey: *cfAPIKey, apiBaseURL: *cfAPIBaseURL, graphqlClient: graphql.NewClient(*cfAnalyticsAPIBaseURL), scrapeTimeout: time.Duration(*scrapeTimeoutSeconds) * time.Second, scrapeInterval: time.Duration(*cfScrapeIntervalSeconds) * time.Second, logger: logger, scrapeLock: &sync.Mutex{}, scrapeBucket: newTimeBucket( time.Now(), time.Duration(*cfScrapeIntervalSeconds)*time.Second, time.Duration(*cfScrapeDataDelay)*time.Minute, ), extractors: map[string]extractFunc{ "graphql:zones:trafficCached": extractZoneHTTPCached, "graphql:zones:trafficCountry": extractZoneHTTPCountry, "graphql:zones:trafficColo": extractZoneHTTPColo, "graphql:zones:trafficDetails": extractZoneHTTPDetailed, "graphql:zones:networkErrorLogs": extractZoneNetworkErrorLogs, "graphql:zones:firewallEventsAdaptiveGroups": extractZoneFirewallEvents, "graphql:zones:reputation": extractZoneIPReputation, "graphql:zones:healthCheckEventsGroups": extractZoneHealthCheckEvents, }, } prometheus.MustRegister(version.NewCollector("cloudflare_exporter")) registerMetrics(nil) router := http.NewServeMux() router.Handle("/metrics", promhttp.Handler()) runGroup := run.Group{} level.Info(logger).Log("msg", "listening", "addr", *listenAddress) serverSocket, err := net.Listen("tcp", *listenAddress) if err != nil { level.Error(logger).Log("error", err) os.Exit(1) } runGroup.Add(func() error { return http.Serve(serverSocket, router) }, func(error) { level.Info(logger).Log("msg", "closing server socket") serverSocket.Close() }) cfScrapeCtx, cancelCfScrape := context.WithCancel(context.Background()) runGroup.Add(func() error { level.Info(logger).Log("msg", "starting Cloudflare scrape loop") return cfExporter.scrapeCloudflare(cfScrapeCtx) }, func(error) { level.Info(logger).Log("msg", "ending Cloudflare scrape loop") cancelCfScrape() }) if err := runGroup.Run(); err != nil { level.Error(logger).Log("error", err) os.Exit(1) } } type exporter struct { email string apiKey string apiBaseURL string graphqlClient graphqlClient scrapeInterval time.Duration scrapeTimeout time.Duration logger log.Logger scrapeLock *sync.Mutex scrapeBucket *TimeBucket consecutiveRateLimitErrs int skipNextScrapes int extractors map[string]extractFunc } type graphqlClient interface { Run(context.Context, *graphql.Request, interface{}) error } func (e *exporter) scrapeCloudflare(ctx context.Context) error { if *initialScrapeImmediately { // Initial scrape, the ticker below won't fire straight away. // Risks double counting on restart. Only useful for development. if err := e.scrapeCloudflareOnce(ctx); err != nil { level.Error(e.logger).Log("error", err) cfScrapeErrs.Inc() } } ticker := time.Tick(e.scrapeInterval / 2) for { select { case <-ticker: // To make sure we are not indefinitely backlogged, we will still need to tick the bucket. // If we were skipping scrapes, or they failed we are effectively skipping the metrics we would otherwise // have scraped. // To counter this, we double the scrapeInterval, but make sure, that we actually only scrape once per // interval if everything is fine. // If we have an error, we will untick and try again in scrapeInterval/2. This means we can gradually // recover from failures while not backlogging indefinitely. // // Tick will only return true, if we are keeping pace (are at least 1 delay in the past). So if this returns // false, we are too early, so we just break. // If tick is true, we are either exactly on time, or we are allowed to catch up on missed scrapes. if !e.scrapeBucket.tick() { break } if e.skipNextScrapes > 0 { e.logger.Log("msg", fmt.Sprintf("rate limited, will skip next %d scrapes", e.skipNextScrapes)) e.skipNextScrapes-- e.scrapeBucket.untick() break } if err := e.scrapeCloudflareOnce(ctx); err != nil { e.scrapeBucket.untick() // Returning an error here would cause the exporter to crash. If it // crashloops but prometheus manages to scrape it in between crashes, we // might never notice that we are not updating our cached metrics. // Instead, we should alert on the exporter_cloudflare_scrape_errors // metric. level.Error(e.logger).Log("error", err) cfScrapeErrs.Inc() // We've observed 2 error messages relating to rate limits in the wild: // - "rate limiter budget depleted, please try again later" // - "graphql: limit reached, please try again later" // We crudely check for the substring "limit", and err on the side of // applying backoff on errors containing it. if strings.Contains(err.Error(), "limit") { // Keep track of consecutive rate limit errors seen, and back off one // extra scrape per consecutive error. e.consecutiveRateLimitErrs++ e.skipNextScrapes = e.consecutiveRateLimitErrs } break } e.skipNextScrapes = 0 e.consecutiveRateLimitErrs = 0 case <-ctx.Done(): return nil } } } func (e *exporter) scrapeCloudflareOnce(ctx context.Context) error { e.scrapeLock.Lock() defer e.scrapeLock.Unlock() logger := level.Info(log.With(e.logger, "event", "scraping cloudflare")) logger.Log("msg", "starting") cfScrapes.Inc() ctx, cancel := context.WithTimeout(ctx, e.scrapeTimeout) defer cancel() duration, err := timeOperation(func() error { var zones map[string]string zones, err := e.getZones(ctx) if err != nil { return err } zonesActive.Set(float64(len(zones))) return e.getZoneAnalytics(ctx, zones) }) if err != nil { return err } cfLastSuccessTimestampSeconds.Set(float64(time.Now().UTC().Unix())) logger.Log("msg", "finished", "duration", duration.Seconds()) return nil } func (e *exporter) getZoneAnalytics(ctx context.Context, zones map[string]string) error { var req = graphQL var zoneList []string for zoneID := range zones { zoneList = append(zoneList, zoneID) } logger := level.Debug(log.With(e.logger, "event", "get analytics")) logger.Log("msg", "starting", "bucket_start_time", e.scrapeBucket.getStartTime(), "bucket_end_time", e.scrapeBucket.getEndTime(), "zones", strings.Join(zoneList, ", ")) req.Var("zone_in", zoneList) req.Var("start_time", e.scrapeBucket.getStartTime()) req.Var("end_time", e.scrapeBucket.getEndTime()) var gqlResp cloudflareResp if err := e.makeGraphqlRequest(ctx, log.With(e.logger), req, &gqlResp); err != nil { return err } if len(gqlResp.Viewer.Zones) < 1 { return fmt.Errorf("expected at lest 1 zone, got %d", len(gqlResp.Viewer.Zones)) } for _, zone := range gqlResp.Viewer.Zones { for name, extract := range e.extractors { results, err := extract(zone, zones, e.scrapeBucket) if err != nil { return err } logger.Log("msg", "finished", "bucket_start_time", e.scrapeBucket.getStartTime(), "bucket_end_time", e.scrapeBucket.getEndTime(), "results", results, "extractor", name) if results == apiMaxLimit { logger.Log("msg", "Warning. max results reached. Reduce cardinality") } } } return nil } func (e *exporter) makeGraphqlRequest(ctx context.Context, logger log.Logger, req *graphql.Request, resp interface{}) error { req.Header.Set("X-AUTH-EMAIL", e.email) req.Header.Set("X-AUTH-KEY", e.apiKey) req.Var("limit", apiMaxLimit) duration, err := timeOperation(func() error { return e.graphqlClient.Run(ctx, req, &resp) }) level.Debug(logger).Log("duration", duration.Seconds(), "msg", "finished request") return err } func (e *exporter) getZones(ctx context.Context) (map[string]string, error) { // TODO handle >50 zones (the API maximum per page) by requesting successive // pages. For now, we don't anticipate having >50 zones any time soon. req, err := http.NewRequestWithContext(ctx, http.MethodGet, e.apiBaseURL+"/zones?per_page=50", nil) if err != nil { return nil, err } req.Header.Set("X-AUTH-EMAIL", e.email) req.Header.Set("X-AUTH-KEY", e.apiKey) var zones map[string]string duration, err := timeOperation(func() error { resp, err := http.DefaultClient.Do(req) if err != nil { return err } if resp.StatusCode != http.StatusOK { err = fmt.Errorf("expected status 200, got %d", resp.StatusCode) return err } var zonesFilter []string if *cfZones != "" { zonesFilter = strings.Split(*cfZones, ",") } defer resp.Body.Close() zones, err = parseZoneIDs(resp.Body, zonesFilter) if err != nil { return err } return nil }) level.Debug(e.logger).Log("request", "list zones", "duration", duration.Seconds(), "msg", "finished request") return zones, err } func newPromLogger(logLevel string) log.Logger { loggerLogLevel := &promlog.AllowedLevel{} if err := loggerLogLevel.Set(logLevel); err != nil { panic(err) } logConf := &promlog.Config{Level: loggerLogLevel, Format: &promlog.AllowedFormat{}} return promlog.New(logConf) }