collector/collector.go (434 lines of code) (raw):
// Copyright (c) 2016 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package collector
import (
"fmt"
"strconv"
"strings"
"sync"
"time"
"github.com/uber/arachne/config"
"github.com/uber/arachne/defines"
"github.com/uber/arachne/internal/ip"
"github.com/uber/arachne/internal/log"
"github.com/uber/arachne/internal/tcp"
"github.com/uber/arachne/metrics"
"github.com/fatih/color"
"github.com/google/gopacket/layers"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
const hostWidth = 51
const tableWidth = 119
// report of metrics measured
type report struct {
latency2Way time.Duration
latency1Way time.Duration
timedOut bool
}
// map[target address string] => *[QOS_DCSP_VALUE] =>map[source port]
type resultStore map[string]*[defines.NumQOSDCSPValues]map[layers.TCPPort]report
type messageStore map[string]*[defines.NumQOSDCSPValues]srcPortScopedMessageStore
type srcPortScopedMessageStore struct {
sent srcPortScopedMessages
rcvd srcPortScopedMessages
}
type srcPortScopedMessages map[layers.TCPPort]tcp.Message
func (ms messageStore) target(target string, QosDSCPIndex uint8) *srcPortScopedMessageStore {
// TODO: validate dscp is in range or create a dscp type alias
if _, exists := ms[target]; !exists {
ms[target] = new([defines.NumQOSDCSPValues]srcPortScopedMessageStore)
}
if ms[target][QosDSCPIndex].sent == nil {
ms[target][QosDSCPIndex].sent = make(srcPortScopedMessages)
}
if ms[target][QosDSCPIndex].rcvd == nil {
ms[target][QosDSCPIndex].rcvd = make(srcPortScopedMessages)
}
return &ms[target][QosDSCPIndex]
}
func (spsm srcPortScopedMessages) add(srcPort layers.TCPPort, message tcp.Message) {
spsm[srcPort] = message
}
func (ms messageStore) sentAdd(target string, QosDSCPIndex uint8, srcPort layers.TCPPort, message tcp.Message) {
ms.target(target, QosDSCPIndex).sent.add(srcPort, message)
}
func (ms messageStore) rcvdAdd(target string, QosDSCPIndex uint8, srcPort layers.TCPPort, message tcp.Message) {
ms.target(target, QosDSCPIndex).rcvd.add(srcPort, message)
}
func (ms messageStore) existsRcvd(target string, QosDSCPIndex uint8, srcPort layers.TCPPort) (tcp.Message, bool) {
if _, exists := ms[target]; !exists {
return tcp.Message{}, false
}
if ms[target][QosDSCPIndex].rcvd == nil {
return tcp.Message{}, false
}
matchedMsg, existsMatch := ms[target][QosDSCPIndex].rcvd[srcPort]
if !existsMatch {
return tcp.Message{}, false
}
return matchedMsg, true
}
func (ms messageStore) existsSent(target string, QosDSCPIndex uint8, srcPort layers.TCPPort) (tcp.Message, bool) {
if _, exists := ms[target]; !exists {
return tcp.Message{}, false
}
if ms[target][QosDSCPIndex].sent == nil {
return tcp.Message{}, false
}
matchedMsg, existsMatch := ms[target][QosDSCPIndex].sent[srcPort]
if !existsMatch {
return tcp.Message{}, false
}
return matchedMsg, true
}
func (rs resultStore) add(target string, QosDSCPIndex uint8, srcPort layers.TCPPort, r report) {
if rs[target] == nil {
var resDSCP [defines.NumQOSDCSPValues]map[layers.TCPPort]report
rs[target] = &resDSCP
}
if rs[target][QosDSCPIndex] == nil {
rs[target][QosDSCPIndex] = make(map[layers.TCPPort]report)
}
rs[target][QosDSCPIndex][srcPort] = r
}
type resultWalker func(report, string, string, layers.TCPPort, bool, *log.Logger)
func (rs resultStore) walkResults(
remotes config.RemoteStore,
currentDSCP *ip.DSCPValue,
foreground bool,
logger *log.Logger,
walkerF ...resultWalker) {
for target, r := range rs {
remote, existsTarget := remotes[target]
if !existsTarget {
logger.Error("host exists in resultStore, but not in remoteStore",
zap.String("host", target))
}
qos := *currentDSCP
if remote.External {
qos = ip.DSCPBeLow
}
for srcPort, rep := range r[(ip.GetDSCP).Pos(qos, logger)] {
walkerF[0](rep, remote.Hostname, remote.Location, srcPort, foreground, logger)
}
if len(walkerF) > 1 {
logger.Error("only one result walker function expected currently")
}
}
}
// processResults calculates metrics, uploads stats and stores in results[] for stdout, if needed.
func (rs resultStore) processResults(
gl *config.Global,
remotes config.RemoteStore,
target string,
req tcp.Message,
rep tcp.Message,
logger *log.Logger,
) report {
// Calculate metrics
l2w := rep.Ts.Run.Sub(req.Ts.Run)
timedOut := l2w > gl.RemoteConfig.Timeout
if timedOut {
logger.Debug("Received timed-out echo response from", zap.String("target", target))
}
l1w := rep.Ts.Payload.Sub(req.Ts.Unix)
if rep.FromExternalTarget(gl.RemoteConfig.TargetTCPPort) {
l1w = -1
}
r := report{
latency2Way: l2w,
latency1Way: l1w,
timedOut: timedOut}
// Store processed report to 'result' data structure for stdout, if needed
if !*(gl.CLI.SenderOnlyMode) {
QosDSCPIndex := (ip.GetDSCP).Pos(req.QosDSCP, logger)
rs.add(target, QosDSCPIndex, req.SrcPort, r)
}
return r
}
func (rs resultStore) printResults(
gl *config.Global,
remotes config.RemoteStore,
currentDSCP *ip.DSCPValue,
logger *log.Logger,
) {
foreground := *gl.CLI.Foreground
printTableHeader(gl, (*currentDSCP).Text(logger), logger)
rs.walkResults(remotes, currentDSCP, foreground, logger, printTableEntry)
printTableFooter(foreground, logger)
}
// Run processes the echoes sent and received to compute and report all the metrics desired.
func Run(
gl *config.Global,
sentC chan tcp.Message,
rcvdC chan tcp.Message,
remotes config.RemoteStore,
currentDSCP *ip.DSCPValue,
sr metrics.Reporter,
completeCycleUpload chan bool,
wg *sync.WaitGroup,
kill chan struct{},
logger *log.Logger,
) {
go func() {
for {
logger.Debug("Entering new batch cycle collection.")
// Have garbage collector clean messageStore and resultStore after every bach cycle interval
ms := make(messageStore)
rs := make(resultStore)
batchWorker(gl, sentC, rcvdC, remotes, ms, rs, currentDSCP, statsUpload, sr,
completeCycleUpload, kill, wg, logger)
logger.Debug("Removing all state from current batch cycle collection.")
select {
case <-kill:
logger.Debug("Collector goroutine returning.")
return
default:
}
}
}()
}
func batchWorker(
gl *config.Global,
sentC chan tcp.Message,
rcvdC chan tcp.Message,
remotes config.RemoteStore,
ms messageStore,
rs resultStore,
currentDSCP *ip.DSCPValue,
sfn statsUploader,
sr metrics.Reporter,
completeCycleUpload chan bool,
kill chan struct{},
wg *sync.WaitGroup,
logger *log.Logger,
) {
for {
select {
case out := <-sentC:
if out.Type != tcp.EchoRequest {
logger.Error("unexpected 'echo' type received in 'out' by collector.",
zap.Any("type", out.Type))
continue
}
QosDSCPIndex := (ip.GetDSCP).Pos(out.QosDSCP, logger)
// SYN sent
targetKey := out.DstAddr.String()
ms.sentAdd(targetKey, QosDSCPIndex, out.SrcPort, out)
// Matching SYN ACK already received?
matchedMsg, existsMatch := ms.existsRcvd(targetKey, QosDSCPIndex, out.SrcPort)
if existsMatch && matchedMsg.Type == tcp.EchoReply && matchedMsg.Ack == out.Seq+1 {
logger.Debug("response already exists for same target",
zap.Any("message", matchedMsg))
report := rs.processResults(gl, remotes, targetKey, out, matchedMsg, logger)
sfn(gl.RemoteConfig, sr, targetKey, remotes, out.QosDSCP, out.SrcPort, &report, logger)
}
case in := <-rcvdC:
if in.Type != tcp.EchoReply {
logger.Error("unexpected 'echo' type received in 'in' by collector.",
zap.Any("type", in.Type))
continue
}
QosDSCPIndex := (ip.GetDSCP).Pos(in.QosDSCP, logger)
// SYN+ACK received
targetKey := in.SrcAddr.String()
ms.rcvdAdd(targetKey, QosDSCPIndex, in.SrcPort, in)
// SYN+ACK received from internal target/agent
// SrcPort = source port of pkt received by external target/server
// DstPort = to the well-defined arachne port
portKey := in.SrcPort
if in.FromExternalTarget(gl.RemoteConfig.TargetTCPPort) {
// SYN+ACK received from external target/server
// SrcPort = not well-defined arachne port (e.g. 80)
// DstPort = source port of pkt received by external target/server
portKey = in.DstPort
}
// Matching SYN probe exists in sent and intended targets (remotes)?
probe, existsMatch := ms.existsSent(targetKey, QosDSCPIndex, portKey)
if !existsMatch {
u := "target"
if _, existsTarget := remotes[targetKey]; existsTarget {
u = "probe"
}
logger.Debug("received following response",
zap.String("non-existing", u),
zap.Any("response", ms[targetKey][QosDSCPIndex].rcvd[in.SrcPort]),
zap.String("source_address", targetKey))
continue
}
if in.Ack != probe.Seq+1 {
logger.Warn("unmatched ACK",
zap.Uint32("in_ACK", in.Ack),
zap.Uint32("out_SEQ", probe.Seq),
zap.String("source_address", in.SrcAddr.String()))
continue
}
report := rs.processResults(gl, remotes, targetKey, probe, in, logger)
sfn(gl.RemoteConfig, sr, targetKey, remotes, in.QosDSCP, portKey, &report, logger)
case <-completeCycleUpload:
for key, value := range ms {
logger.Debug("At end of batch cycle, sent and received 'messages' of",
zap.String("host", key),
zap.Any("messages", value))
}
for key, value := range rs {
logger.Debug("At end of batch cycle, 'result' of",
zap.String("host", key),
zap.Any("result", value))
}
if !*gl.CLI.SenderOnlyMode {
zeroOutResults(gl.RemoteConfig, ms, rs, remotes, sfn, sr, logger)
//TODO print only tcp.DSCPBeLow when only external targets exist in remotes?
rs.printResults(gl, remotes, currentDSCP, logger)
}
wg.Done()
return
case <-kill:
logger.Info("Collector asked to exit without uploading.")
return
}
}
}
type statsUploader func(
glr *config.RemoteConfig,
sr metrics.Reporter,
target string,
remotes config.RemoteStore,
QOSDSCP ip.DSCPValue,
srcPort layers.TCPPort,
r *report,
logger *log.Logger,
)
func statsUpload(
glr *config.RemoteConfig,
sr metrics.Reporter,
target string,
remotes config.RemoteStore,
QOSDSCP ip.DSCPValue,
srcPort layers.TCPPort,
r *report,
logger *log.Logger,
) {
remote, existsTarget := remotes[target]
if !existsTarget {
logger.Error("host exists in resultStore, but not in remoteStore",
zap.String("host", target))
return
}
tags := map[string]string{
"host": glr.HostName,
"host_location": glr.Location,
"target": remote.Hostname,
"target_location": remote.Location,
"dscp": strconv.Itoa(int(QOSDSCP)),
"source_port": strconv.Itoa(int(srcPort)),
"timed_out": strconv.FormatBool((*r).timedOut),
}
// Both following in nanoseconds
sr.ReportGauge("latency_2way", tags, int64((*r).latency2Way))
sr.ReportGauge("latency_1way", tags, (*r).latency1Way.Nanoseconds())
}
// zeroOutResults fills latencies for targets not existing in resultStore with zeros.
func zeroOutResults(
glr *config.RemoteConfig,
ms messageStore,
rs resultStore,
remotes config.RemoteStore,
sfn statsUploader,
sr metrics.Reporter,
logger *log.Logger,
) {
timedOutReport := report{
latency2Way: 0,
latency1Way: 0,
timedOut: true}
for targetKey := range ms {
_, existsTarget := rs[targetKey]
if !existsTarget {
var resDSCP [defines.NumQOSDCSPValues]map[layers.TCPPort]report
rs[targetKey] = &resDSCP
}
for qosDSCP := 0; qosDSCP < defines.NumQOSDCSPValues; qosDSCP++ {
if rs[targetKey][qosDSCP] == nil {
rs[targetKey][qosDSCP] = make(map[layers.TCPPort]report)
}
for srcPort := range ms[targetKey][qosDSCP].sent {
if _, existsSrc := rs[targetKey][qosDSCP][srcPort]; existsSrc {
continue
}
rs[targetKey][qosDSCP][srcPort] = timedOutReport
// Upload timed out results
sfn(glr, sr, targetKey, remotes, ip.GetDSCP[qosDSCP], srcPort, &timedOutReport, logger)
time.Sleep(1 * time.Millisecond)
}
}
}
}
func printTableHeader(gl *config.Global, currentDSCP string, logger *log.Logger) {
color.Set(color.FgHiYellow, color.Bold)
defer color.Unset()
if *gl.CLI.Foreground {
fmt.Printf("%74s\n", "Arachne ["+defines.ArachneVersion+"]")
fmt.Printf("%-55s%64s\n",
gl.RemoteConfig.HostName+":"+strconv.Itoa(int(gl.RemoteConfig.TargetTCPPort))+
" with QoS DSCP '"+currentDSCP+"'", time.Now().Format(time.RFC850))
if gl.RemoteConfig.Location != "" && gl.RemoteConfig.Location != " " {
fmt.Printf("Location: %s\n", gl.RemoteConfig.Location)
}
fmt.Printf("\n%51s|%26s|%8s%s%8s|\n", "", "", "", "RTT (msec)", "")
fmt.Printf("Host%47s|%8s%s%10s|%4s%s%7s%s%5s|%2s%s\n", "",
"", "Location", "",
"", "2-way", "", "1-way", "",
"", "Timed Out?")
color.Set(color.FgHiYellow)
fmt.Printf(strings.Repeat("-", hostWidth) + "|" +
strings.Repeat("-", 26) + "|" +
strings.Repeat("-", 26) + "|" +
strings.Repeat("-", 13) + "\n")
} else {
logger.Info("Arachne -- Table of Results",
zap.String("version", defines.ArachneVersion),
zap.String("host", gl.RemoteConfig.HostName),
zap.String("host_location", gl.RemoteConfig.Location),
zap.Any("target_TCP_port", gl.RemoteConfig.TargetTCPPort),
zap.String("QoS_DSCP", currentDSCP),
)
}
}
func printTableFooter(foreground bool, logger *log.Logger) {
color.Set(color.FgHiYellow)
defer color.Unset()
if foreground {
fmt.Printf(strings.Repeat("-", tableWidth) + "\n")
} else {
logger.Info(strings.Repeat("-", tableWidth))
}
}
func printTableEntry(
r report,
targetHost string,
targetLocation string,
srcPort layers.TCPPort,
foreground bool,
logger *log.Logger,
) {
var twoWay, oneWay zapcore.Field
color.Set(color.FgHiYellow)
defer color.Unset()
timedOut := "no"
if r.timedOut {
timedOut = "yes"
}
if foreground {
fmt.Printf("%-51s|", targetHost+"("+strconv.Itoa(int(srcPort))+")")
fmt.Printf("%-26s|%3s", " "+targetLocation, "")
}
if r.latency2Way == 0 {
twoWay = zap.String("2-way", "-")
oneWay = zap.String("1-way", "-")
if foreground {
fmt.Printf("%4s %11s%8s%5s%s\n", "-", "-", "|", "", timedOut)
}
} else {
twoWay = zap.Float64("2-way", float64(r.latency2Way/1e5)/10.0)
// Ignore 1-way when echoing an external server or when estimated value is smaller than a threshold
if r.latency1Way == -1 || r.latency1Way < 10*time.Nanosecond {
if foreground {
fmt.Printf("%5.1f %11s%7s%5s%s\n", float32(r.latency2Way/1e5)/10.0,
"N/A", "|", "", timedOut)
}
oneWay = zap.String("1-way", "N/A")
} else {
if foreground {
fmt.Printf("%5.1f %11.1f%7s%5s%s\n", float32(r.latency2Way/1e5)/10.0,
float32(r.latency1Way/1e5)/10.0, "|", "", timedOut)
}
oneWay = zap.Float64("1-way", float64(r.latency1Way/1e5)/10.0/10.0)
}
}
if !foreground {
logger.Info("Result",
zap.String("target", targetHost),
zap.String("target_location", targetLocation),
zap.Any("source_port", srcPort),
twoWay,
oneWay,
zap.String("timed_out", timedOut))
}
}