collector/client.go (255 lines of code) (raw):
package collector
import (
"bytes"
"compress/gzip"
"context"
"crypto/tls"
"crypto/x509"
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"os"
"sync"
"time"
"github.com/Azure/adx-mon/pkg/logger"
"github.com/Azure/adx-mon/pkg/prompb"
prom_model "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
corev1 "k8s.io/api/core/v1"
)
const (
caPath = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt"
tokenPath = "/var/run/secrets/kubernetes.io/serviceaccount/token"
)
type MetricsClient struct {
opts ClientOpts
transport *http.Transport
client *http.Client
closing chan struct{}
mu sync.RWMutex
token string
NodeName string
}
type ClientOpts struct {
DialTimeout time.Duration
TLSHandshakeTimeout time.Duration
ScrapeTimeOut time.Duration
NodeName string
}
func (c ClientOpts) WithDefaults() ClientOpts {
opts := ClientOpts{
DialTimeout: 5 * time.Second,
TLSHandshakeTimeout: 5 * time.Second,
ScrapeTimeOut: 10 * time.Second,
}
if c.ScrapeTimeOut.Seconds() > 0 {
opts.ScrapeTimeOut = c.ScrapeTimeOut
}
if c.DialTimeout.Seconds() > 0 {
opts.DialTimeout = c.DialTimeout
}
if c.TLSHandshakeTimeout.Seconds() > 0 {
opts.TLSHandshakeTimeout = c.TLSHandshakeTimeout
}
opts.NodeName = c.NodeName
return opts
}
func NewMetricsClient(opts ClientOpts) (*MetricsClient, error) {
opts = opts.WithDefaults()
dialer := &net.Dialer{
Timeout: opts.DialTimeout,
}
transport := &http.Transport{
DialContext: dialer.DialContext,
TLSHandshakeTimeout: opts.TLSHandshakeTimeout,
}
if _, err := os.Stat(caPath); err == nil {
// Load CA cert
caCert, err := os.ReadFile(caPath)
if err != nil {
return nil, err
}
caCertPool, err := x509.SystemCertPool()
if err != nil {
return nil, err
}
caCertPool.AppendCertsFromPEM(caCert)
// Setup HTTPS client
tlsConfig := &tls.Config{
RootCAs: caCertPool,
// Match how K8s handles probing with self-signed certs
// https://github.com/kubernetes/kubernetes/blob/master/pkg/probe/http/http.go#L41
InsecureSkipVerify: true,
}
transport.TLSClientConfig = tlsConfig
}
var token string
if _, err := os.Stat(tokenPath); err == nil {
b, err := os.ReadFile(tokenPath)
if err != nil {
return nil, err
}
token = string(b)
}
httpClient := &http.Client{
Timeout: opts.ScrapeTimeOut,
Transport: transport,
}
c := &MetricsClient{
opts: opts,
NodeName: opts.NodeName,
transport: transport,
client: httpClient,
token: token,
closing: make(chan struct{}),
}
if token != "" {
go c.refreshToken()
}
return c, nil
}
func (c *MetricsClient) FetchMetrics(target string) (map[string]*prom_model.MetricFamily, error) {
parser := &expfmt.TextParser{}
ctx, cancel := context.WithTimeout(context.Background(), c.opts.ScrapeTimeOut)
defer cancel()
req, err := http.NewRequestWithContext(ctx, "GET", target, nil)
if err != nil {
return nil, fmt.Errorf("create request for %s: %w", target, err)
}
req.Header.Set("Accept-Encoding", "gzip")
c.mu.RLock()
token := c.token
c.mu.RUnlock()
if token != "" {
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", token))
}
resp, err := c.client.Do(req)
if err != nil {
return nil, fmt.Errorf("collect node metrics for %s: %w", target, err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("collect node metrics for %s: %s", target, resp.Status)
}
br := resp.Body
if resp.Header.Get("Content-Encoding") == "gzip" {
br, err = gzip.NewReader(resp.Body)
if err != nil {
return nil, fmt.Errorf("collect node metrics for %s: %w", target, err)
}
defer br.Close()
}
fams, err := parser.TextToMetricFamilies(br)
if err != nil {
return nil, fmt.Errorf("decode metrics for %s: %w", target, err)
}
return fams, err
}
func (c *MetricsClient) FetchMetricsIterator(target string) (*prompb.Iterator, error) {
req, err := http.NewRequest("GET", target, nil)
if err != nil {
return nil, fmt.Errorf("create request for %s: %w", target, err)
}
req.Header.Set("Accept-Encoding", "gzip")
c.mu.RLock()
token := c.token
c.mu.RUnlock()
if token != "" {
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", token))
}
resp, err := c.client.Do(req)
if err != nil {
return nil, fmt.Errorf("collect node metrics for %s: %w", target, err)
}
if resp.StatusCode != http.StatusOK {
defer resp.Body.Close()
return nil, fmt.Errorf("collect node metrics for %s: %s", target, resp.Status)
}
br := resp.Body
if resp.Header.Get("Content-Encoding") == "gzip" {
br, err = NewGzipReaderWithClose(resp.Body)
if err != nil {
defer br.Close()
return nil, fmt.Errorf("collect node metrics for %s: %w", target, err)
}
}
return prompb.NewIterator(br), nil
}
// Pods returns a list of pods running on the node. This uses the kubelet API instead of the kubernetes API server.
func (c *MetricsClient) Pods() (corev1.PodList, error) {
req, err := http.NewRequest("GET", fmt.Sprintf("https://%s:10250/pods", c.NodeName), nil)
if err != nil {
return corev1.PodList{}, err
}
c.mu.RLock()
token := c.token
c.mu.RUnlock()
if token != "" {
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", token))
}
resp, err := c.client.Do(req)
if err != nil {
return corev1.PodList{}, err
}
defer resp.Body.Close()
var buf bytes.Buffer
_, err = buf.ReadFrom(resp.Body)
if err != nil {
return corev1.PodList{}, err
}
var pl corev1.PodList
err = json.Unmarshal(buf.Bytes(), &pl)
if err != nil {
return corev1.PodList{}, err
}
return pl, nil
}
func (c *MetricsClient) Close() error {
close(c.closing)
c.client.CloseIdleConnections()
return nil
}
func (c *MetricsClient) readToken() (string, error) {
b, err := os.ReadFile(tokenPath)
if err != nil {
return "", err
}
return string(b), nil
}
func (c *MetricsClient) refreshToken() {
t := time.NewTicker(30 * time.Minute)
defer t.Stop()
for {
select {
case <-c.closing:
return
case <-t.C:
token, err := c.readToken()
if err != nil {
logger.Errorf("Failed to read token: %s", err)
continue
}
c.mu.Lock()
c.token = token
c.mu.Unlock()
}
}
}
// gzipReaderWithClose wraps gzip.Reader and the underlying reader
type gzipReaderWithClose struct {
*gzip.Reader
underlying io.ReadCloser
}
// Close closes both the gzip.Reader and the underlying reader
func (gr *gzipReaderWithClose) Close() error {
err := gr.Reader.Close()
if err != nil {
return err
}
return gr.underlying.Close()
}
// NewGzipReaderWithClose creates a new gzipReaderWithClose
func NewGzipReaderWithClose(r io.ReadCloser) (*gzipReaderWithClose, error) {
gr, err := gzip.NewReader(r)
if err != nil {
return nil, err
}
return &gzipReaderWithClose{
Reader: gr,
underlying: r,
}, nil
}