in ec2/logs.go [132:448]
func (ts *Tester) fetchLogs(qps float32, burst int, commandToFileName map[string]string) error {
logsDir := ts.cfg.ASGsLogsDir
sshOpt := ssh.WithVerbose(ts.cfg.LogLevel == "debug")
rateLimiter := rate.NewLimiter(rate.Limit(qps), burst)
rch, waits := make(chan instanceLogs, 10), 0
for name, cur := range ts.cfg.ASGs {
ts.lg.Info("fetching logs",
zap.String("asg-name", name),
zap.Int("instances", len(cur.Instances)),
)
waits += len(cur.Instances)
for instID, iv := range cur.Instances {
pfx := instID + "-"
go func(instID, logsDir, pfx string, iv ec2config.Instance) {
select {
case <-ts.stopCreationCh:
ts.lg.Warn("exiting fetch logger", zap.String("prefix", pfx))
return
default:
}
if !rateLimiter.Allow() {
ts.lg.Debug("waiting for rate limiter before SSH into the machine",
zap.Float32("qps", qps),
zap.Int("burst", burst),
zap.String("instance-id", instID),
)
werr := rateLimiter.Wait(context.Background())
ts.lg.Debug("waited for rate limiter",
zap.Float32("qps", qps),
zap.Int("burst", burst),
zap.Error(werr),
)
}
sh, err := ssh.New(ssh.Config{
Logger: ts.lg,
KeyPath: ts.cfg.RemoteAccessPrivateKeyPath,
PublicIP: iv.PublicIP,
PublicDNSName: iv.PublicDNSName,
UserName: iv.RemoteAccessUserName,
})
if err != nil {
rch <- instanceLogs{asgName: name, errs: []string{err.Error()}}
return
}
defer sh.Close()
if err = sh.Connect(); err != nil {
rch <- instanceLogs{asgName: name, errs: []string{err.Error()}}
return
}
data := instanceLogs{asgName: name, instanceID: instID}
// fetch default logs
for cmd, fileName := range commandToFileName {
if !rateLimiter.Allow() {
ts.lg.Debug("waiting for rate limiter before fetching file")
werr := rateLimiter.Wait(context.Background())
ts.lg.Debug("waited for rate limiter", zap.Error(werr))
}
out, oerr := sh.Run(cmd, sshOpt)
if oerr != nil {
data.errs = append(data.errs, fmt.Sprintf("failed to run command %q for %q (error %v)", cmd, instID, oerr))
continue
}
fpath := filepath.Join(logsDir, shorten(ts.lg, pfx+fileName))
f, err := os.Create(fpath)
if err != nil {
data.errs = append(data.errs, fmt.Sprintf(
"failed to create a file %q for %q (error %v)",
fpath,
instID,
err,
))
continue
}
if _, err = f.Write(out); err != nil {
data.errs = append(data.errs, fmt.Sprintf(
"failed to write to a file %q for %q (error %v)",
fpath,
instID,
err,
))
f.Close()
continue
}
f.Close()
ts.lg.Debug("wrote", zap.String("file-path", fpath))
data.paths = append(data.paths, fpath)
}
if !rateLimiter.Allow() {
ts.lg.Debug("waiting for rate limiter before fetching file")
werr := rateLimiter.Wait(context.Background())
ts.lg.Debug("waited for rate limiter", zap.Error(werr))
}
ts.lg.Info("listing systemd service units", zap.String("instance-id", instID))
listCmd := "sudo systemctl list-units -t service --no-pager --no-legend --all"
out, oerr := sh.Run(listCmd, sshOpt)
if oerr != nil {
data.errs = append(data.errs, fmt.Sprintf(
"failed to run command %q for %q (error %v)",
listCmd,
instID,
oerr,
))
} else {
/*
auditd.service loaded active running Security Auditing Service
auth-rpcgss-module.service loaded inactive dead Kernel Module supporting RPCSEC_GSS
*/
svcCmdToFileName := make(map[string]string)
for _, line := range strings.Split(string(out), "\n") {
fields := strings.Fields(line)
if len(fields) == 0 || fields[0] == "" || len(fields) < 5 {
continue
}
if fields[1] == "not-found" {
continue
}
if fields[2] == "inactive" {
continue
}
svc := fields[0]
svcCmd := "sudo journalctl --no-pager --output=cat -u " + svc
svcFileName := svc + ".out.log"
svcCmdToFileName[svcCmd] = svcFileName
}
for cmd, fileName := range svcCmdToFileName {
if !rateLimiter.Allow() {
ts.lg.Debug("waiting for rate limiter before fetching file")
werr := rateLimiter.Wait(context.Background())
ts.lg.Debug("waited for rate limiter", zap.Error(werr))
}
out, oerr := sh.Run(cmd, sshOpt)
if oerr != nil {
data.errs = append(data.errs, fmt.Sprintf(
"failed to run command %q for %q (error %v)",
listCmd,
instID,
oerr,
))
continue
}
fpath := filepath.Join(logsDir, shorten(ts.lg, pfx+fileName))
f, err := os.Create(fpath)
if err != nil {
data.errs = append(data.errs, fmt.Sprintf(
"failed to create a file %q for %q (error %v)",
fpath,
instID,
err,
))
continue
}
if _, err = f.Write(out); err != nil {
data.errs = append(data.errs, fmt.Sprintf(
"failed to write to a file %q for %q (error %v)",
fpath,
instID,
err,
))
f.Close()
continue
}
f.Close()
ts.lg.Debug("wrote", zap.String("file-path", fpath))
data.paths = append(data.paths, fpath)
}
}
if !rateLimiter.Allow() {
ts.lg.Debug("waiting for rate limiter before fetching file")
werr := rateLimiter.Wait(context.Background())
ts.lg.Debug("waited for rate limiter", zap.Error(werr))
}
ts.lg.Info("listing /var/log", zap.String("instance-id", instID))
findCmd := "sudo find /var/log ! -type d"
out, oerr = sh.Run(findCmd, sshOpt)
if oerr != nil {
data.errs = append(data.errs, fmt.Sprintf(
"failed to run command %q for %q (error %v)",
findCmd,
instID,
oerr,
))
} else {
varLogCmdToFileName := make(map[string]string)
for _, line := range strings.Split(string(out), "\n") {
if len(line) == 0 {
// last value
continue
}
logCmd := "sudo cat " + line
logName := filepath.Base(line)
varLogCmdToFileName[logCmd] = logName
}
for cmd, fileName := range varLogCmdToFileName {
if !rateLimiter.Allow() {
ts.lg.Debug("waiting for rate limiter before fetching file")
werr := rateLimiter.Wait(context.Background())
ts.lg.Debug("waited for rate limiter", zap.Error(werr))
}
out, oerr := sh.Run(cmd, sshOpt)
if oerr != nil {
data.errs = append(data.errs, fmt.Sprintf(
"failed to run command %q for %q (error %v)",
cmd,
instID,
oerr,
))
continue
}
fpath := filepath.Join(logsDir, shorten(ts.lg, pfx+fileName))
f, err := os.Create(fpath)
if err != nil {
data.errs = append(data.errs, fmt.Sprintf(
"failed to create a file %q for %q (error %v)",
fpath,
instID,
err,
))
continue
}
if _, err = f.Write(out); err != nil {
data.errs = append(data.errs, fmt.Sprintf(
"failed to write to a file %q for %q (error %v)",
fpath,
instID,
err,
))
f.Close()
continue
}
f.Close()
ts.lg.Debug("wrote", zap.String("file-path", fpath))
data.paths = append(data.paths, fpath)
}
}
rch <- data
}(instID, logsDir, pfx, iv)
}
}
ts.lg.Info("waiting for log fetcher goroutines", zap.Int("waits", waits))
total := 0
for i := 0; i < waits; i++ {
var data instanceLogs
select {
case data = <-rch:
case <-ts.stopCreationCh:
ts.lg.Warn("exiting fetch logger")
return ts.cfg.Sync()
}
if len(data.errs) > 0 {
ts.lg.Warn("failed to fetch logs",
zap.String("asg-name", data.asgName),
zap.String("instance-id", data.instanceID),
zap.Strings("errors", data.errs),
)
continue
}
cur, ok := ts.cfg.ASGs[data.asgName]
if !ok {
return fmt.Errorf("ASG name %q is unknown", data.asgName)
}
if cur.Logs == nil {
cur.Logs = make(map[string][]string)
}
// existing logs are already written out to disk, merge/list them all
var logs []string
logs, ok = cur.Logs[data.instanceID]
if ok {
ts.lg.Warn("ASG already has existing logs; merging",
zap.String("asg-name", data.asgName),
zap.String("instance-id", data.instanceID),
)
}
all := make(map[string]struct{})
for _, v := range logs {
all[v] = struct{}{}
}
for _, v := range data.paths {
all[v] = struct{}{}
}
logs = make([]string, 0, len(all))
for k := range all {
logs = append(logs, k)
}
sort.Strings(logs)
cur.Logs[data.instanceID] = logs
files := len(logs)
ts.cfg.ASGs[data.asgName] = cur
ts.cfg.Sync()
total += files
ts.lg.Info("wrote log files",
zap.String("instance-id", data.instanceID),
zap.Int("files", files),
zap.Int("total-downloaded-files", total),
)
}
ts.lg.Info("wrote all log files",
zap.String("log-dir", logsDir),
zap.Int("total-downloaded-files", total),
)
return ts.cfg.Sync()
}