cloudflare_exporter.go (264 lines of code) (raw):
package main
import (
"context"
"fmt"
"net"
"net/http"
"os"
"strings"
"sync"
"time"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/machinebox/graphql"
"github.com/oklog/run"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/prometheus/common/promlog"
"github.com/prometheus/common/version"
"gopkg.in/alecthomas/kingpin.v2"
)
const (
namespace = "cloudflare"
apiMaxLimit = 10000
)
var (
// arguments
listenAddress = kingpin.Flag("listen-address", "Metrics exporter listen address.").
Short('l').Envar("CLOUDFLARE_EXPORTER_LISTEN_ADDRESS").Default(":9199").String()
cfEmail = kingpin.Flag("cloudflare-api-email", "email address for analytics API authentication.").
Envar("CLOUDFLARE_API_EMAIL").Required().String()
cfAPIKey = kingpin.Flag("cloudflare-api-key", "API key for analytics API authentication.").
Envar("CLOUDFLARE_API_KEY").Required().String()
cfZones = kingpin.Flag("cloudflare-zones", "Comma-separated list of zones to scrape. Omit to scrape all zones in account.").
Envar("CLOUDFLARE_ZONES").Default("").String()
cfAPIBaseURL = kingpin.Flag("cloudflare-api-base-url", "Cloudflare regular (non-analytics) API base URL").
Envar("CLOUDFLARE_API_BASE_URL").Default("https://api.cloudflare.com/client/v4").String()
cfAnalyticsAPIBaseURL = kingpin.Flag("cloudflare-analytics-api-base-url", "Cloudflare analytics (graphql) API base URL").
Envar("CLOUDFLARE_ANALYTICS_API_BASE_URL").Default("https://api.cloudflare.com/client/v4/graphql").String()
cfScrapeDataDelay = kingpin.Flag("cloudflare-scape-data-delay-minutes", "Delay from the scraped data to realtime in minutes").
Envar("CLOUDFLARE_SCRAPE_DATA_DELAY_MINUTES").Default("5").Int()
cfScrapeIntervalSeconds = kingpin.Flag("cloudflare-scrape-interval-seconds", "Interval at which to retrieve metrics from Cloudflare, separate from being scraped by prometheus").
Envar("CLOUDFLARE_SCRAPE_INTERVAL_SECONDS").Default("60").Int()
scrapeTimeoutSeconds = kingpin.Flag("scrape-timeout-seconds", "scrape timeout seconds").
Envar("CLOUDFLARE_EXPORTER_SCRAPE_TIMEOUT_SECONDS").Default("30").Int()
logLevel = kingpin.Flag("log-level", "log level").Envar("CLOUDFLARE_EXPORTER_LOG_LEVEL").Default("info").String()
initialScrapeImmediately = kingpin.Flag("initial-scrape-immediately", "Scrape Cloudflare immediately at startup, or wait scrape-timeout-seconds. For development only.").
Hidden().Envar("CLOUDFLARE_EXPORTER_INITIAL_SCRAPE_IMMEDIATELY").Default("false").Bool()
)
func main() {
kingpin.Version(version.Print("cloudflare_exporter"))
kingpin.Parse()
logger := newPromLogger(*logLevel)
level.Info(logger).Log("msg", "starting cloudflare_exporter")
cfExporter := &exporter{
email: *cfEmail, apiKey: *cfAPIKey, apiBaseURL: *cfAPIBaseURL,
graphqlClient: graphql.NewClient(*cfAnalyticsAPIBaseURL),
scrapeTimeout: time.Duration(*scrapeTimeoutSeconds) * time.Second,
scrapeInterval: time.Duration(*cfScrapeIntervalSeconds) * time.Second,
logger: logger,
scrapeLock: &sync.Mutex{},
scrapeBucket: newTimeBucket(
time.Now(),
time.Duration(*cfScrapeIntervalSeconds)*time.Second,
time.Duration(*cfScrapeDataDelay)*time.Minute,
),
extractors: map[string]extractFunc{
"graphql:zones:trafficCached": extractZoneHTTPCached,
"graphql:zones:trafficCountry": extractZoneHTTPCountry,
"graphql:zones:trafficColo": extractZoneHTTPColo,
"graphql:zones:trafficDetails": extractZoneHTTPDetailed,
"graphql:zones:networkErrorLogs": extractZoneNetworkErrorLogs,
"graphql:zones:firewallEventsAdaptiveGroups": extractZoneFirewallEvents,
"graphql:zones:reputation": extractZoneIPReputation,
"graphql:zones:healthCheckEventsGroups": extractZoneHealthCheckEvents,
},
}
prometheus.MustRegister(version.NewCollector("cloudflare_exporter"))
registerMetrics(nil)
router := http.NewServeMux()
router.Handle("/metrics", promhttp.Handler())
runGroup := run.Group{}
level.Info(logger).Log("msg", "listening", "addr", *listenAddress)
serverSocket, err := net.Listen("tcp", *listenAddress)
if err != nil {
level.Error(logger).Log("error", err)
os.Exit(1)
}
runGroup.Add(func() error {
return http.Serve(serverSocket, router)
}, func(error) {
level.Info(logger).Log("msg", "closing server socket")
serverSocket.Close()
})
cfScrapeCtx, cancelCfScrape := context.WithCancel(context.Background())
runGroup.Add(func() error {
level.Info(logger).Log("msg", "starting Cloudflare scrape loop")
return cfExporter.scrapeCloudflare(cfScrapeCtx)
}, func(error) {
level.Info(logger).Log("msg", "ending Cloudflare scrape loop")
cancelCfScrape()
})
if err := runGroup.Run(); err != nil {
level.Error(logger).Log("error", err)
os.Exit(1)
}
}
type exporter struct {
email string
apiKey string
apiBaseURL string
graphqlClient graphqlClient
scrapeInterval time.Duration
scrapeTimeout time.Duration
logger log.Logger
scrapeLock *sync.Mutex
scrapeBucket *TimeBucket
consecutiveRateLimitErrs int
skipNextScrapes int
extractors map[string]extractFunc
}
type graphqlClient interface {
Run(context.Context, *graphql.Request, interface{}) error
}
func (e *exporter) scrapeCloudflare(ctx context.Context) error {
if *initialScrapeImmediately {
// Initial scrape, the ticker below won't fire straight away.
// Risks double counting on restart. Only useful for development.
if err := e.scrapeCloudflareOnce(ctx); err != nil {
level.Error(e.logger).Log("error", err)
cfScrapeErrs.Inc()
}
}
ticker := time.Tick(e.scrapeInterval / 2)
for {
select {
case <-ticker:
// To make sure we are not indefinitely backlogged, we will still need to tick the bucket.
// If we were skipping scrapes, or they failed we are effectively skipping the metrics we would otherwise
// have scraped.
// To counter this, we double the scrapeInterval, but make sure, that we actually only scrape once per
// interval if everything is fine.
// If we have an error, we will untick and try again in scrapeInterval/2. This means we can gradually
// recover from failures while not backlogging indefinitely.
//
// Tick will only return true, if we are keeping pace (are at least 1 delay in the past). So if this returns
// false, we are too early, so we just break.
// If tick is true, we are either exactly on time, or we are allowed to catch up on missed scrapes.
if !e.scrapeBucket.tick() {
break
}
if e.skipNextScrapes > 0 {
e.logger.Log("msg", fmt.Sprintf("rate limited, will skip next %d scrapes", e.skipNextScrapes))
e.skipNextScrapes--
e.scrapeBucket.untick()
break
}
if err := e.scrapeCloudflareOnce(ctx); err != nil {
e.scrapeBucket.untick()
// Returning an error here would cause the exporter to crash. If it
// crashloops but prometheus manages to scrape it in between crashes, we
// might never notice that we are not updating our cached metrics.
// Instead, we should alert on the exporter_cloudflare_scrape_errors
// metric.
level.Error(e.logger).Log("error", err)
cfScrapeErrs.Inc()
// We've observed 2 error messages relating to rate limits in the wild:
// - "rate limiter budget depleted, please try again later"
// - "graphql: limit reached, please try again later"
// We crudely check for the substring "limit", and err on the side of
// applying backoff on errors containing it.
if strings.Contains(err.Error(), "limit") {
// Keep track of consecutive rate limit errors seen, and back off one
// extra scrape per consecutive error.
e.consecutiveRateLimitErrs++
e.skipNextScrapes = e.consecutiveRateLimitErrs
}
break
}
e.skipNextScrapes = 0
e.consecutiveRateLimitErrs = 0
case <-ctx.Done():
return nil
}
}
}
func (e *exporter) scrapeCloudflareOnce(ctx context.Context) error {
e.scrapeLock.Lock()
defer e.scrapeLock.Unlock()
logger := level.Info(log.With(e.logger, "event", "scraping cloudflare"))
logger.Log("msg", "starting")
cfScrapes.Inc()
ctx, cancel := context.WithTimeout(ctx, e.scrapeTimeout)
defer cancel()
duration, err := timeOperation(func() error {
var zones map[string]string
zones, err := e.getZones(ctx)
if err != nil {
return err
}
zonesActive.Set(float64(len(zones)))
return e.getZoneAnalytics(ctx, zones)
})
if err != nil {
return err
}
cfLastSuccessTimestampSeconds.Set(float64(time.Now().UTC().Unix()))
logger.Log("msg", "finished", "duration", duration.Seconds())
return nil
}
func (e *exporter) getZoneAnalytics(ctx context.Context, zones map[string]string) error {
var req = graphQL
var zoneList []string
for zoneID := range zones {
zoneList = append(zoneList, zoneID)
}
logger := level.Debug(log.With(e.logger, "event", "get analytics"))
logger.Log("msg", "starting", "bucket_start_time", e.scrapeBucket.getStartTime(), "bucket_end_time", e.scrapeBucket.getEndTime(), "zones", strings.Join(zoneList, ", "))
req.Var("zone_in", zoneList)
req.Var("start_time", e.scrapeBucket.getStartTime())
req.Var("end_time", e.scrapeBucket.getEndTime())
var gqlResp cloudflareResp
if err := e.makeGraphqlRequest(ctx, log.With(e.logger), req, &gqlResp); err != nil {
return err
}
if len(gqlResp.Viewer.Zones) < 1 {
return fmt.Errorf("expected at lest 1 zone, got %d", len(gqlResp.Viewer.Zones))
}
for _, zone := range gqlResp.Viewer.Zones {
for name, extract := range e.extractors {
results, err := extract(zone, zones, e.scrapeBucket)
if err != nil {
return err
}
logger.Log("msg", "finished", "bucket_start_time", e.scrapeBucket.getStartTime(), "bucket_end_time", e.scrapeBucket.getEndTime(), "results", results, "extractor", name)
if results == apiMaxLimit {
logger.Log("msg", "Warning. max results reached. Reduce cardinality")
}
}
}
return nil
}
func (e *exporter) makeGraphqlRequest(ctx context.Context, logger log.Logger, req *graphql.Request, resp interface{}) error {
req.Header.Set("X-AUTH-EMAIL", e.email)
req.Header.Set("X-AUTH-KEY", e.apiKey)
req.Var("limit", apiMaxLimit)
duration, err := timeOperation(func() error {
return e.graphqlClient.Run(ctx, req, &resp)
})
level.Debug(logger).Log("duration", duration.Seconds(), "msg", "finished request")
return err
}
func (e *exporter) getZones(ctx context.Context) (map[string]string, error) {
// TODO handle >50 zones (the API maximum per page) by requesting successive
// pages. For now, we don't anticipate having >50 zones any time soon.
req, err := http.NewRequestWithContext(ctx, http.MethodGet, e.apiBaseURL+"/zones?per_page=50", nil)
if err != nil {
return nil, err
}
req.Header.Set("X-AUTH-EMAIL", e.email)
req.Header.Set("X-AUTH-KEY", e.apiKey)
var zones map[string]string
duration, err := timeOperation(func() error {
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
if resp.StatusCode != http.StatusOK {
err = fmt.Errorf("expected status 200, got %d", resp.StatusCode)
return err
}
var zonesFilter []string
if *cfZones != "" {
zonesFilter = strings.Split(*cfZones, ",")
}
defer resp.Body.Close()
zones, err = parseZoneIDs(resp.Body, zonesFilter)
if err != nil {
return err
}
return nil
})
level.Debug(e.logger).Log("request", "list zones", "duration", duration.Seconds(), "msg", "finished request")
return zones, err
}
func newPromLogger(logLevel string) log.Logger {
loggerLogLevel := &promlog.AllowedLevel{}
if err := loggerLogLevel.Set(logLevel); err != nil {
panic(err)
}
logConf := &promlog.Config{Level: loggerLogLevel, Format: &promlog.AllowedFormat{}}
return promlog.New(logConf)
}