in eks/ng/logs.go [84:461]
func (ts *tester) fetchLogs(qps float32, burst int) error {
logsDir := ts.cfg.EKSConfig.AddOnNodeGroups.LogsDir
sshOptLog := ssh.WithVerbose(ts.cfg.EKSConfig.LogLevel == "debug")
rateLimiter := rate.NewLimiter(rate.Limit(qps), burst)
rch, waits := make(chan instanceLogs, 10), 0
for name, nodeGroup := range ts.cfg.EKSConfig.AddOnNodeGroups.ASGs {
ts.cfg.Logger.Info("fetching logs from node group",
zap.String("asg-name", name),
zap.Int("nodes", len(nodeGroup.Instances)),
)
waits += len(nodeGroup.Instances)
for instID, cur := range nodeGroup.Instances {
pfx := instID + "-"
go func(instID, logsDir, pfx string, cur ec2config.Instance) {
select {
case <-ts.cfg.Stopc:
ts.cfg.Logger.Warn("exiting fetch logger", zap.String("prefix", pfx))
return
default:
}
if !rateLimiter.Allow() {
ts.cfg.Logger.Debug("waiting for rate limiter before SSH into the machine",
zap.Float32("qps", qps),
zap.Int("burst", burst),
zap.String("instance-id", instID),
)
werr := rateLimiter.Wait(context.Background())
ts.cfg.Logger.Debug("waited for rate limiter",
zap.Float32("qps", qps),
zap.Int("burst", burst),
zap.Error(werr),
)
}
sh, err := ssh.New(ssh.Config{
Logger: ts.cfg.Logger,
KeyPath: ts.cfg.EKSConfig.RemoteAccessPrivateKeyPath,
PublicIP: cur.PublicIP,
PublicDNSName: cur.PublicDNSName,
UserName: cur.RemoteAccessUserName,
})
if err != nil {
rch <- instanceLogs{asgName: name, errs: []string{err.Error()}}
return
}
defer sh.Close()
if err = sh.Connect(); err != nil {
rch <- instanceLogs{asgName: name, errs: []string{err.Error()}}
return
}
data := instanceLogs{asgName: name, instanceID: instID}
// fetch default logs
for cmd, fileName := range defaultLogs {
if !rateLimiter.Allow() {
ts.cfg.Logger.Debug("waiting for rate limiter before fetching file")
werr := rateLimiter.Wait(context.Background())
ts.cfg.Logger.Debug("waited for rate limiter", zap.Error(werr))
}
out, oerr := sh.Run(cmd, sshOptLog)
if oerr != nil {
data.errs = append(data.errs, fmt.Sprintf("failed to run command %q for %q (error %v)", cmd, instID, oerr))
continue
}
fpath := filepath.Join(logsDir, shorten(ts.cfg.Logger, pfx+fileName))
f, err := os.Create(fpath)
if err != nil {
data.errs = append(data.errs, fmt.Sprintf(
"failed to create a file %q for %q (error %v)",
fpath,
instID,
err,
))
continue
}
if _, err = f.Write(out); err != nil {
data.errs = append(data.errs, fmt.Sprintf(
"failed to write to a file %q for %q (error %v)",
fpath,
instID,
err,
))
f.Close()
continue
}
f.Close()
ts.cfg.Logger.Debug("wrote", zap.String("file-path", fpath))
data.paths = append(data.paths, fpath)
}
if !rateLimiter.Allow() {
ts.cfg.Logger.Debug("waiting for rate limiter before fetching file")
werr := rateLimiter.Wait(context.Background())
ts.cfg.Logger.Debug("waited for rate limiter", zap.Error(werr))
}
ts.cfg.Logger.Info("listing systemd service units", zap.String("instance-id", instID))
listCmd := "sudo systemctl list-units -t service --no-pager --no-legend --all"
out, oerr := sh.Run(listCmd, sshOptLog)
if oerr != nil {
data.errs = append(data.errs, fmt.Sprintf(
"failed to run command %q for %q (error %v)",
listCmd,
instID,
oerr,
))
} else {
/*
auditd.service loaded active running Security Auditing Service
auth-rpcgss-module.service loaded inactive dead Kernel Module supporting RPCSEC_GSS
*/
svcCmdToFileName := make(map[string]string)
for _, line := range strings.Split(string(out), "\n") {
fields := strings.Fields(line)
if len(fields) == 0 || fields[0] == "" || len(fields) < 5 {
continue
}
if fields[1] == "not-found" {
continue
}
if fields[2] == "inactive" {
continue
}
svc := fields[0]
svcCmd := "sudo journalctl --no-pager --output=cat -u " + svc
svcFileName := svc + ".out.log"
svcCmdToFileName[svcCmd] = svcFileName
}
for cmd, fileName := range svcCmdToFileName {
if !rateLimiter.Allow() {
ts.cfg.Logger.Debug("waiting for rate limiter before fetching file")
werr := rateLimiter.Wait(context.Background())
ts.cfg.Logger.Debug("waited for rate limiter", zap.Error(werr))
}
out, oerr := sh.Run(cmd, sshOptLog)
if oerr != nil {
data.errs = append(data.errs, fmt.Sprintf(
"failed to run command %q for %q (error %v)",
listCmd,
instID,
oerr,
))
continue
}
fpath := filepath.Join(logsDir, shorten(ts.cfg.Logger, pfx+fileName))
f, err := os.Create(fpath)
if err != nil {
data.errs = append(data.errs, fmt.Sprintf(
"failed to create a file %q for %q (error %v)",
fpath,
instID,
err,
))
continue
}
if _, err = f.Write(out); err != nil {
data.errs = append(data.errs, fmt.Sprintf(
"failed to write to a file %q for %q (error %v)",
fpath,
instID,
err,
))
f.Close()
continue
}
f.Close()
ts.cfg.Logger.Debug("wrote", zap.String("file-path", fpath))
data.paths = append(data.paths, fpath)
}
}
if !rateLimiter.Allow() {
ts.cfg.Logger.Debug("waiting for rate limiter before fetching file")
werr := rateLimiter.Wait(context.Background())
ts.cfg.Logger.Debug("waited for rate limiter", zap.Error(werr))
}
// https://github.com/aws/amazon-vpc-cni-k8s/blob/master/docs/troubleshooting.md#ipamd-debugging-commands
// https://github.com/aws/amazon-vpc-cni-k8s/blob/master/scripts/aws-cni-support.sh
ts.cfg.Logger.Info("fetching ENI information", zap.String("instance-id", instID))
eniCmd := "curl -s http://localhost:61679/v1/enis"
out, oerr = sh.Run(eniCmd, sshOptLog)
if oerr != nil {
data.errs = append(data.errs, fmt.Sprintf(
"failed to run command %q for %q (error %v)",
eniCmd,
instID,
oerr,
))
} else {
v1ENIOutputPath := filepath.Join(logsDir, shorten(ts.cfg.Logger, pfx+"v1-enis.out.log"))
f, err := os.Create(v1ENIOutputPath)
if err != nil {
data.errs = append(data.errs, fmt.Sprintf(
"failed to create a file %q for %q (error %v)",
v1ENIOutputPath,
instID,
err,
))
} else {
if _, err = f.Write(out); err != nil {
data.errs = append(data.errs, fmt.Sprintf(
"failed to write to a file %q for %q (error %v)",
v1ENIOutputPath,
instID,
err,
))
} else {
ts.cfg.Logger.Debug("wrote", zap.String("file-path", v1ENIOutputPath))
data.paths = append(data.paths, v1ENIOutputPath)
}
f.Close()
}
}
ts.cfg.Logger.Info("running /opt/cni/bin/aws-cni-support.sh", zap.String("instance-id", instID))
cniCmd := "sudo /opt/cni/bin/aws-cni-support.sh || true"
out, oerr = sh.Run(cniCmd, sshOptLog)
if oerr != nil {
data.errs = append(data.errs, fmt.Sprintf(
"failed to run command %q for %q (error %v)",
cniCmd,
instID,
oerr,
))
} else {
ts.cfg.Logger.Info("ran /opt/cni/bin/aws-cni-support.sh", zap.String("instance-id", instID), zap.String("output", string(out)))
}
if !rateLimiter.Allow() {
ts.cfg.Logger.Debug("waiting for rate limiter before fetching file")
werr := rateLimiter.Wait(context.Background())
ts.cfg.Logger.Debug("waited for rate limiter", zap.Error(werr))
}
ts.cfg.Logger.Info("listing /var/log", zap.String("instance-id", instID))
findCmd := "sudo find /var/log ! -type d"
out, oerr = sh.Run(findCmd, sshOptLog, ssh.WithRetry(5, 3*time.Second))
if oerr != nil {
data.errs = append(data.errs, fmt.Sprintf(
"failed to run command %q for %q (error %v)",
findCmd,
instID,
oerr,
))
} else {
varLogPaths := make(map[string]string)
for _, line := range strings.Split(string(out), "\n") {
if len(line) == 0 {
// last value
continue
}
logCmd := "sudo cat " + line
logPath := filepath.Base(line)
varLogPaths[logCmd] = logPath
}
for cmd, logPath := range varLogPaths {
if !rateLimiter.Allow() {
ts.cfg.Logger.Debug("waiting for rate limiter before fetching file")
werr := rateLimiter.Wait(context.Background())
ts.cfg.Logger.Debug("waited for rate limiter", zap.Error(werr))
}
// e.g. "read tcp 10.119.223.210:58688->54.184.39.156:22: read: connection timed out"
out, oerr := sh.Run(cmd, sshOptLog, ssh.WithRetry(2, 3*time.Second))
if oerr != nil {
data.errs = append(data.errs, fmt.Sprintf(
"failed to run command %q for %q (error %v)",
cmd,
instID,
oerr,
))
continue
}
fpath := filepath.Join(logsDir, shorten(ts.cfg.Logger, pfx+logPath))
f, err := os.Create(fpath)
if err != nil {
data.errs = append(data.errs, fmt.Sprintf(
"failed to create a file %q for %q (error %v)",
fpath,
instID,
err,
))
continue
}
if _, err = f.Write(out); err != nil {
data.errs = append(data.errs, fmt.Sprintf(
"failed to write to a file %q for %q (error %v)",
fpath,
instID,
err,
))
f.Close()
continue
}
f.Close()
ts.cfg.Logger.Debug("wrote", zap.String("file-path", fpath))
data.paths = append(data.paths, fpath)
}
}
rch <- data
}(instID, logsDir, pfx, cur)
}
}
ts.cfg.Logger.Info("waiting for log fetcher goroutines", zap.Int("waits", waits))
total := 0
for i := 0; i < waits; i++ {
var data instanceLogs
select {
case data = <-rch:
case <-ts.cfg.Stopc:
ts.cfg.Logger.Warn("exiting fetch logger")
ts.cfg.EKSConfig.Sync()
return nil
}
if len(data.errs) > 0 {
ts.cfg.Logger.Warn("failed to fetch logs, but keeping whatever available",
zap.String("mng-name", data.asgName),
zap.String("instance-id", data.instanceID),
zap.Strings("errors", data.errs),
)
}
cur, ok := ts.cfg.EKSConfig.AddOnNodeGroups.ASGs[data.asgName]
if !ok {
return fmt.Errorf("EKS Node Group name %q is unknown", data.asgName)
}
if cur.Logs == nil {
cur.Logs = make(map[string][]string)
}
// existing logs are already written out to disk, merge/list them all
var logs []string
logs, ok = cur.Logs[data.instanceID]
if ok {
ts.cfg.Logger.Warn("node group already has existing logs; merging",
zap.String("asg-name", data.asgName),
zap.String("instance-id", data.instanceID),
)
}
all := make(map[string]struct{})
for _, v := range logs {
all[v] = struct{}{}
}
for _, v := range data.paths {
all[v] = struct{}{}
}
logs = make([]string, 0, len(all))
for k := range all {
logs = append(logs, k)
}
sort.Strings(logs)
cur.Logs[data.instanceID] = logs
files := len(logs)
ts.cfg.EKSConfig.AddOnNodeGroups.ASGs[data.asgName] = cur
ts.cfg.EKSConfig.Sync()
total += files
ts.cfg.Logger.Info("wrote log files",
zap.String("instance-id", data.instanceID),
zap.Int("files", files),
zap.Int("total-downloaded-files", total),
zap.Int("total-goroutines-to-wait", waits),
zap.Int("current-waited-goroutines", i),
)
}
ts.cfg.Logger.Info("wrote all log files",
zap.String("log-dir", logsDir),
zap.Int("total-downloaded-files", total),
)
ts.cfg.EKSConfig.Sync()
return nil
}