func()

in eks/mng/logs.go [82:459]


func (ts *tester) fetchLogs(qps float32, burst int) error {
	logsDir := ts.cfg.EKSConfig.AddOnManagedNodeGroups.LogsDir
	sshOptLog := ssh.WithVerbose(ts.cfg.EKSConfig.LogLevel == "debug")
	rateLimiter := rate.NewLimiter(rate.Limit(qps), burst)
	rch, waits := make(chan instanceLogs, 10), 0

	for name, nodeGroup := range ts.cfg.EKSConfig.AddOnManagedNodeGroups.MNGs {
		ts.cfg.Logger.Info("fetching logs from managed node group",
			zap.String("mng-name", name),
			zap.Int("nodes", len(nodeGroup.Instances)),
		)
		waits += len(nodeGroup.Instances)

		for instID, cur := range nodeGroup.Instances {
			pfx := instID + "-"

			go func(instID, logsDir, pfx string, cur ec2config.Instance) {
				select {
				case <-ts.cfg.Stopc:
					ts.cfg.Logger.Warn("exiting fetch logger", zap.String("prefix", pfx))
					return
				default:
				}

				if !rateLimiter.Allow() {
					ts.cfg.Logger.Debug("waiting for rate limiter before SSH into the machine",
						zap.Float32("qps", qps),
						zap.Int("burst", burst),
						zap.String("instance-id", instID),
					)
					werr := rateLimiter.Wait(context.Background())
					ts.cfg.Logger.Debug("waited for rate limiter",
						zap.Float32("qps", qps),
						zap.Int("burst", burst),
						zap.Error(werr),
					)
				}

				sh, err := ssh.New(ssh.Config{
					Logger:        ts.cfg.Logger,
					KeyPath:       ts.cfg.EKSConfig.RemoteAccessPrivateKeyPath,
					PublicIP:      cur.PublicIP,
					PublicDNSName: cur.PublicDNSName,
					UserName:      cur.RemoteAccessUserName,
				})
				if err != nil {
					rch <- instanceLogs{mngName: name, errs: []string{err.Error()}}
					return
				}
				defer sh.Close()
				if err = sh.Connect(); err != nil {
					rch <- instanceLogs{mngName: name, errs: []string{err.Error()}}
					return
				}

				data := instanceLogs{mngName: name, instanceID: instID}
				// fetch default logs
				for cmd, fileName := range defaultLogs {
					if !rateLimiter.Allow() {
						ts.cfg.Logger.Debug("waiting for rate limiter before fetching file")
						werr := rateLimiter.Wait(context.Background())
						ts.cfg.Logger.Debug("waited for rate limiter", zap.Error(werr))
					}
					out, oerr := sh.Run(cmd, sshOptLog)
					if oerr != nil {
						data.errs = append(data.errs, fmt.Sprintf("failed to run command %q for %q (error %v)", cmd, instID, oerr))
						continue
					}

					fpath := filepath.Join(logsDir, shorten(ts.cfg.Logger, pfx+fileName))
					f, err := os.Create(fpath)
					if err != nil {
						data.errs = append(data.errs, fmt.Sprintf(
							"failed to create a file %q for %q (error %v)",
							fpath,
							instID,
							err,
						))
						continue
					}
					if _, err = f.Write(out); err != nil {
						data.errs = append(data.errs, fmt.Sprintf(
							"failed to write to a file %q for %q (error %v)",
							fpath,
							instID,
							err,
						))
						f.Close()
						continue
					}
					f.Close()
					ts.cfg.Logger.Debug("wrote", zap.String("file-path", fpath))
					data.paths = append(data.paths, fpath)
				}

				if !rateLimiter.Allow() {
					ts.cfg.Logger.Debug("waiting for rate limiter before fetching file")
					werr := rateLimiter.Wait(context.Background())
					ts.cfg.Logger.Debug("waited for rate limiter", zap.Error(werr))
				}
				ts.cfg.Logger.Info("listing systemd service units", zap.String("instance-id", instID))
				listCmd := "sudo systemctl list-units -t service --no-pager --no-legend --all"
				out, oerr := sh.Run(listCmd, sshOptLog)
				if oerr != nil {
					data.errs = append(data.errs, fmt.Sprintf(
						"failed to run command %q for %q (error %v)",
						listCmd,
						instID,
						oerr,
					))
				} else {
					/*
						auditd.service                                        loaded    active   running Security Auditing Service
						auth-rpcgss-module.service                            loaded    inactive dead    Kernel Module supporting RPCSEC_GSS
					*/
					svcCmdToFileName := make(map[string]string)
					for _, line := range strings.Split(string(out), "\n") {
						fields := strings.Fields(line)
						if len(fields) == 0 || fields[0] == "" || len(fields) < 5 {
							continue
						}
						if fields[1] == "not-found" {
							continue
						}
						if fields[2] == "inactive" {
							continue
						}
						svc := fields[0]
						svcCmd := "sudo journalctl --no-pager --output=cat -u " + svc
						svcFileName := svc + ".out.log"
						svcCmdToFileName[svcCmd] = svcFileName
					}
					for cmd, fileName := range svcCmdToFileName {
						if !rateLimiter.Allow() {
							ts.cfg.Logger.Debug("waiting for rate limiter before fetching file")
							werr := rateLimiter.Wait(context.Background())
							ts.cfg.Logger.Debug("waited for rate limiter", zap.Error(werr))
						}
						out, oerr := sh.Run(cmd, sshOptLog)
						if oerr != nil {
							data.errs = append(data.errs, fmt.Sprintf(
								"failed to run command %q for %q (error %v)",
								listCmd,
								instID,
								oerr,
							))
							continue
						}

						fpath := filepath.Join(logsDir, shorten(ts.cfg.Logger, pfx+fileName))
						f, err := os.Create(fpath)
						if err != nil {
							data.errs = append(data.errs, fmt.Sprintf(
								"failed to create a file %q for %q (error %v)",
								fpath,
								instID,
								err,
							))
							continue
						}
						if _, err = f.Write(out); err != nil {
							data.errs = append(data.errs, fmt.Sprintf(
								"failed to write to a file %q for %q (error %v)",
								fpath,
								instID,
								err,
							))
							f.Close()
							continue
						}
						f.Close()
						ts.cfg.Logger.Debug("wrote", zap.String("file-path", fpath))
						data.paths = append(data.paths, fpath)
					}
				}

				if !rateLimiter.Allow() {
					ts.cfg.Logger.Debug("waiting for rate limiter before fetching file")
					werr := rateLimiter.Wait(context.Background())
					ts.cfg.Logger.Debug("waited for rate limiter", zap.Error(werr))
				}
				// https://github.com/aws/amazon-vpc-cni-k8s/blob/master/docs/troubleshooting.md#ipamd-debugging-commands
				// https://github.com/aws/amazon-vpc-cni-k8s/blob/master/scripts/aws-cni-support.sh
				ts.cfg.Logger.Info("fetching ENI information", zap.String("instance-id", instID))
				eniCmd := "curl -s http://localhost:61679/v1/enis"
				out, oerr = sh.Run(eniCmd, sshOptLog)
				if oerr != nil {
					data.errs = append(data.errs, fmt.Sprintf(
						"failed to run command %q for %q (error %v)",
						eniCmd,
						instID,
						oerr,
					))
				} else {
					v1ENIOutputPath := filepath.Join(logsDir, shorten(ts.cfg.Logger, pfx+"v1-enis.out.log"))
					f, err := os.Create(v1ENIOutputPath)
					if err != nil {
						data.errs = append(data.errs, fmt.Sprintf(
							"failed to create a file %q for %q (error %v)",
							v1ENIOutputPath,
							instID,
							err,
						))
					} else {
						if _, err = f.Write(out); err != nil {
							data.errs = append(data.errs, fmt.Sprintf(
								"failed to write to a file %q for %q (error %v)",
								v1ENIOutputPath,
								instID,
								err,
							))
						} else {
							ts.cfg.Logger.Debug("wrote", zap.String("file-path", v1ENIOutputPath))
							data.paths = append(data.paths, v1ENIOutputPath)
						}
						f.Close()
					}
				}

				ts.cfg.Logger.Info("running /opt/cni/bin/aws-cni-support.sh", zap.String("instance-id", instID))
				cniCmd := "sudo /opt/cni/bin/aws-cni-support.sh || true"
				out, oerr = sh.Run(cniCmd, sshOptLog)
				if oerr != nil {
					data.errs = append(data.errs, fmt.Sprintf(
						"failed to run command %q for %q (error %v)",
						cniCmd,
						instID,
						oerr,
					))
				} else {
					ts.cfg.Logger.Info("ran /opt/cni/bin/aws-cni-support.sh", zap.String("instance-id", instID), zap.String("output", string(out)))
				}

				if !rateLimiter.Allow() {
					ts.cfg.Logger.Debug("waiting for rate limiter before fetching file")
					werr := rateLimiter.Wait(context.Background())
					ts.cfg.Logger.Debug("waited for rate limiter", zap.Error(werr))
				}
				ts.cfg.Logger.Info("listing /var/log", zap.String("instance-id", instID))
				findCmd := "sudo find /var/log ! -type d"
				out, oerr = sh.Run(findCmd, sshOptLog, ssh.WithRetry(5, 3*time.Second))
				if oerr != nil {
					data.errs = append(data.errs, fmt.Sprintf(
						"failed to run command %q for %q (error %v)",
						findCmd,
						instID,
						oerr,
					))
				} else {
					varLogPaths := make(map[string]string)
					for _, line := range strings.Split(string(out), "\n") {
						if len(line) == 0 {
							// last value
							continue
						}
						logCmd := "sudo cat " + line
						logPath := filepath.Base(line)
						varLogPaths[logCmd] = logPath
					}
					for cmd, logPath := range varLogPaths {
						if !rateLimiter.Allow() {
							ts.cfg.Logger.Debug("waiting for rate limiter before fetching file")
							werr := rateLimiter.Wait(context.Background())
							ts.cfg.Logger.Debug("waited for rate limiter", zap.Error(werr))
						}
						// e.g. "read tcp 10.119.223.210:58688->54.184.39.156:22: read: connection timed out"
						out, oerr := sh.Run(cmd, sshOptLog, ssh.WithRetry(2, 3*time.Second))
						if oerr != nil {
							data.errs = append(data.errs, fmt.Sprintf(
								"failed to run command %q for %q (error %v)",
								cmd,
								instID,
								oerr,
							))
							continue
						}

						fpath := filepath.Join(logsDir, shorten(ts.cfg.Logger, pfx+logPath))
						f, err := os.Create(fpath)
						if err != nil {
							data.errs = append(data.errs, fmt.Sprintf(
								"failed to create a file %q for %q (error %v)",
								fpath,
								instID,
								err,
							))
							continue
						}
						if _, err = f.Write(out); err != nil {
							data.errs = append(data.errs, fmt.Sprintf(
								"failed to write to a file %q for %q (error %v)",
								fpath,
								instID,
								err,
							))
							f.Close()
							continue
						}
						f.Close()
						ts.cfg.Logger.Debug("wrote", zap.String("file-path", fpath))
						data.paths = append(data.paths, fpath)
					}
				}
				rch <- data
			}(instID, logsDir, pfx, cur)
		}
	}

	ts.cfg.Logger.Info("waiting for log fetcher goroutines", zap.Int("waits", waits))
	total := 0
	for i := 0; i < waits; i++ {
		var data instanceLogs
		select {
		case data = <-rch:
		case <-ts.cfg.Stopc:
			ts.cfg.Logger.Warn("exiting fetch logger")
			ts.cfg.EKSConfig.Sync()
			return nil
		}
		if len(data.errs) > 0 {
			ts.cfg.Logger.Warn("failed to fetch logs, but keeping whatever available",
				zap.String("mng-name", data.mngName),
				zap.String("instance-id", data.instanceID),
				zap.Strings("errors", data.errs),
			)
		}
		cur, ok := ts.cfg.EKSConfig.AddOnManagedNodeGroups.MNGs[data.mngName]
		if !ok {
			return fmt.Errorf("EKS Managed Node Group name %q is unknown", data.mngName)
		}
		if cur.Logs == nil {
			cur.Logs = make(map[string][]string)
		}

		// existing logs are already written out to disk, merge/list them all
		var logs []string
		logs, ok = cur.Logs[data.instanceID]
		if ok {
			ts.cfg.Logger.Warn("managed node group already has existing logs; merging",
				zap.String("mng-name", data.mngName),
				zap.String("instance-id", data.instanceID),
			)
		}
		all := make(map[string]struct{})
		for _, v := range logs {
			all[v] = struct{}{}
		}
		for _, v := range data.paths {
			all[v] = struct{}{}
		}
		logs = make([]string, 0, len(all))
		for k := range all {
			logs = append(logs, k)
		}
		sort.Strings(logs)
		cur.Logs[data.instanceID] = logs
		files := len(logs)

		ts.cfg.EKSConfig.AddOnManagedNodeGroups.MNGs[data.mngName] = cur
		ts.cfg.EKSConfig.Sync()

		total += files
		ts.cfg.Logger.Info("wrote log files",
			zap.String("instance-id", data.instanceID),
			zap.Int("files", files),
			zap.Int("total-downloaded-files", total),
			zap.Int("total-goroutines-to-wait", waits),
			zap.Int("current-waited-goroutines", i),
		)
	}

	ts.cfg.Logger.Info("wrote all log files",
		zap.String("log-dir", logsDir),
		zap.Int("total-downloaded-files", total),
	)
	ts.cfg.EKSConfig.Sync()
	return nil
}