VMBackup/debughelper/run.go (490 lines of code) (raw):
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"os"
"os/exec"
"path"
"strconv"
"strings"
"sync"
"time"
)
const LF_RUN = "run.log"
const LF_STRACE = "strace.log"
const LF_CPU = "cpu.log"
const LF_MEM = "mem.log"
const LF_DISK = "disk.log"
const LF_DIAG = "diagnosis.log"
type Run struct {
wd string
opID string
log *log.Logger
strace bool
tracePID int64
logToMem bool
inMemDir string
}
func NewRun(workingDir, opID string, with_strace bool, trace_pid int64, logToMem bool) (*Run, *os.File, error) {
r := &Run{
wd: workingDir,
opID: opID,
strace: with_strace,
tracePID: trace_pid,
inMemDir: "/dev/shm/Microsoft.Azure.Snapshots.Diagnostics",
logToMem: logToMem,
}
if r.logToMem {
if err := os.MkdirAll(path.Join(r.inMemDir, r.opID), 0755); err != nil {
return nil, nil, wrapErr(err, "os.MkdirAll failed")
}
}
f, err := os.OpenFile(path.Join(r.workDir(), LF_RUN), os.O_CREATE, 0644)
if err != nil {
return nil, nil, wrapErr(err, "os.OpenFile failed")
}
r.log = log.New(f, "", log.Ldate|log.Ltime|log.LUTC)
return r, f, nil
}
func (r Run) workDir() string {
p := path.Join(r.wd, r.opID)
if r.logToMem {
p = path.Join(r.inMemDir, r.opID)
}
return p
}
func (r Run) startStrace(ctx context.Context) error {
if !r.strace {
return nil
}
if r.tracePID == 0 {
return fmt.Errorf("empty process ID")
}
command := exec.CommandContext(
ctx, "strace", "-t", "-p", fmt.Sprintf("%d", r.tracePID),
"-f", "-o", path.Join(r.workDir(), LF_STRACE),
)
_, err := command.CombinedOutput()
if err != nil {
r.log.Println(wrapErr(err, "CombinedOutput failed"))
}
return nil
}
func (r Run) diagnose() string {
avreport := diagnoseAvs()
dbreport := diagnoseDbs()
logFile := path.Join(r.workDir(), LF_DIAG)
f, err := os.OpenFile(logFile, os.O_CREATE, 0644)
if err != nil {
r.log.Println(wrapErr(err, "os.OpenFile failed"))
}
defer f.Close()
l := ""
if len(avreport) > 0 {
l = l + "========== ANTIVIRUS ============\n\n"
l = l + strings.Join(avreport, "\n\n")
}
f.WriteString(l)
if len(l) > 0 {
l = "\n\n\n"
}
if len(dbreport) > 0 {
l = l + "========== DATABSES ============\n\n"
l = l + strings.Join(dbreport, "\n\n")
}
f.WriteString(l)
f.WriteString("\n")
r.persistInMemDir()
return path.Join(r.wd, r.opID, LF_DIAG)
}
type LoadAvg struct {
TS int64 `json:"timestamp_millis"`
One string `json:"one"`
Five string `json:"five"`
Fifteen string `json:"fifteen"`
SchedRatio string `json:"scheduled_ratio"`
LP string `json:"last_pid"`
}
func (r Run) monitorCPU(ctx context.Context, cpuStream chan *LoadAvg) {
// log.Println("[monitorCPU] -> Fired")
ticker := time.NewTicker(time.Second)
ctx1, cancel := context.WithCancel(ctx)
outer:
for {
select {
case <-ctx.Done():
cancel()
ticker.Stop()
cpuStream <- nil
break outer
case <-ticker.C:
go func() {
command := exec.CommandContext(ctx1, "cat", "/proc/loadavg")
bs, err := command.CombinedOutput()
if err != nil {
r.log.Println(wrapErr(err, "CombinedOutput failed"))
} else {
fields := strings.Fields(strings.Trim(string(bs), " \n"))
if len(fields) != 5 {
r.log.Println(wrapErr(fmt.Errorf("/proc/loadavg returned invalid number of strings")))
} else {
la := LoadAvg{
One: fields[0],
Five: fields[1],
Fifteen: fields[2],
SchedRatio: fields[3],
LP: fields[4],
TS: time.Now().UnixMilli(),
}
log.Println("[monitorCPU] -> sending new metric")
cpuStream <- &la
}
}
}()
}
}
}
func (r Run) logCPU(ctx context.Context, cpuStream chan *LoadAvg) error {
f, err := os.Create(path.Join(r.workDir(), LF_CPU))
if err != nil {
return wrapErr(err, "os.Create failed")
}
// logger := log.New(f, "", log.Ldate|log.Ltime|log.LUTC)
defer f.Close()
outer:
for {
select {
case <-ctx.Done():
break outer
case lav := <-cpuStream:
// log.Println("[logCPU] -> new metric received")
bs, err := json.Marshal(lav)
if err != nil {
r.log.Println(wrapErr(err, "json.Marshal failed"))
} else {
// log.Println("[logCPU] -> writing to log file")
f.WriteString(fmt.Sprintf("%s\n", string(bs)))
}
}
}
return nil
}
type Mem struct {
TS int64 `json:"timestamp_millis"`
TotalKb int64 `json:"total_kb"`
AvailKb int64 `json:"avail_kb"`
FreeKb int64 `json:"free_kb"`
CachedKb int64 `json:"cached_kb"`
SwapCachedKb int64 `json:"swap_cached_kb"`
SwapTotalKb int64 `json:"swap_total_kb"`
SwapFreeKb int64 `json:"swap_free_kb"`
}
func (r Run) monitorMem(ctx context.Context, memStream chan *Mem) {
ticker := time.NewTicker(time.Second)
ctx1, cancel := context.WithCancel(context.TODO())
outer:
for {
select {
case <-ctx.Done():
cancel()
ticker.Stop()
memStream <- nil
break outer
case <-ticker.C:
command := exec.CommandContext(ctx1, "cat", "/proc/meminfo")
bs, err := command.CombinedOutput()
if err != nil {
log.Println(wrapErr(err, "CombinedOutput failed"))
} else {
m := Mem{
TS: time.Now().UnixMilli(),
}
flag := false
for _, line := range strings.Split(string(bs), "\n") {
if len(line) == 0 {
continue
}
fields := strings.Fields(line)
if len(fields) != 3 {
continue
}
switch fields[0] {
case "MemTotal:":
v, err := strconv.Atoi(fields[1])
if err != nil {
log.Println("MemTotal conversion to int failed: ", err)
} else {
m.TotalKb = int64(v)
flag = true
}
case "MemAvailable:":
v, err := strconv.Atoi(fields[1])
if err != nil {
log.Println("MemAvailable conversion to int failed: ", err)
} else {
m.AvailKb = int64(v)
flag = true
}
case "MemFree:":
v, err := strconv.Atoi(fields[1])
if err != nil {
log.Println("MemFree conversion to int failed: ", err)
} else {
m.FreeKb = int64(v)
flag = true
}
case "Cached:":
v, err := strconv.Atoi(fields[1])
if err != nil {
log.Println("Cached Mem conversion to int failed: ", err)
} else {
m.CachedKb = int64(v)
flag = true
}
case "SwapCached:":
v, err := strconv.Atoi(fields[1])
if err != nil {
log.Println("SwapCached conversion to int failed: ", err)
} else {
m.SwapCachedKb = int64(v)
flag = true
}
case "SwapTotal:":
v, err := strconv.Atoi(fields[1])
if err != nil {
log.Println("SwapTotal conversion to int failed: ", err)
} else {
m.SwapTotalKb = int64(v)
flag = true
}
case "SwapFree:":
v, err := strconv.Atoi(fields[1])
if err != nil {
log.Println("SwapFree conversion to int failed: ", err)
} else {
m.SwapFreeKb = int64(v)
flag = true
}
}
}
if flag {
// log.Println("[monitorMem] -> sending new metric")
memStream <- &m
}
}
}
}
}
func (r Run) logMem(ctx context.Context, memStream chan *Mem) error {
// log.Println("[logMem] -> Fired")
f, err := os.Create(path.Join(r.workDir(), LF_MEM))
if err != nil {
return wrapErr(err, "OpenFile failed")
}
defer f.Close()
outer:
for {
select {
case <-ctx.Done():
break outer
case lav := <-memStream:
if lav == nil {
break outer
}
log.Println("[logMem] -> received new metric")
bs, err := json.Marshal(lav)
if err != nil {
r.log.Println(wrapErr(err, "json.Marshal failed"))
} else {
log.Println("[logMem] -> writing to log file")
f.WriteString(fmt.Sprintf("%s\n", string(bs)))
}
}
}
return nil
}
type DiskLog struct {
TS int64 `json:"timestamp_millis"`
MajorNum string `json:"major_num"`
MinorNum string `json:"minor_num"`
DeviceName string `json:"device_name"`
ReadsCompleted string `json:"reads_completed_successfully"`
ReadsMerged string `json:"reads_merged"`
SectorsRead string `json:"sectors_read"`
TimeSpentReadingMs string `json:"time_spent_reading_ms"`
WritesCompleted string `json:"writes_completed"`
WriteMerged string `json:"writes_merged"`
SectorsWritten string `json:"sectors_written"`
TimeSpentWritingMs string `json:"time_spent_writing"`
IosInProgress string `json:"ios_currently_in_progress"`
TimeSpentIosMs string `json:"time_spent_doing_ios_ms"`
WeightedTimeSpentDoingIosMs string `json:"weighted_time_spent_doing_ios_ms"`
DiscardsCompleted string `json:"discards_completed_successfully"`
DiscardsMerged string `json:"discards_merged"`
SectorsDiscarded string `json:"sectors_discarded"`
TimeSpentDiscardingMs string `json:"time_sspent_discarding"`
FlushRequestsCompleted string `json:"flush_requests_completed_successfully"`
TimeSpentFlushingMs string `json:"time_spent_flushing"`
}
func (r Run) monitorDisk(ctx context.Context, diskChan chan *DiskLog) {
ticker := time.NewTicker(time.Second)
ctx1, cancel := context.WithCancel(context.TODO())
outer:
for {
select {
case <-ctx.Done():
cancel()
ticker.Stop()
diskChan <- nil
break outer
case <-ticker.C:
command := exec.CommandContext(ctx1, "cat", "/proc/diskstats")
bs, err := command.CombinedOutput()
if err != nil {
log.Println(wrapErr(err, "CombinedOutput failed"))
continue outer
}
for _, line := range strings.Split(string(bs), "\n") {
fields := strings.Fields(line)
// Get only sata or nvme disks
if len(fields) == 0 {
continue
}
if !strings.Contains(fields[2], "sd") && !strings.Contains(fields[2], "nvme") {
continue
}
dl := DiskLog{
TS: time.Now().UnixMilli(),
MajorNum: fields[0],
MinorNum: fields[1],
DeviceName: fields[2],
ReadsCompleted: fields[3],
ReadsMerged: fields[4],
SectorsRead: fields[5],
TimeSpentReadingMs: fields[6],
WritesCompleted: fields[7],
WriteMerged: fields[8],
SectorsWritten: fields[9],
TimeSpentWritingMs: fields[10],
IosInProgress: fields[11],
TimeSpentIosMs: fields[12],
WeightedTimeSpentDoingIosMs: fields[13],
}
lf := len(fields)
// Kernel 4.18+ will have the following fields
if lf >= 18 {
dl.DiscardsCompleted = fields[14]
dl.DiscardsMerged = fields[15]
dl.SectorsDiscarded = fields[16]
dl.TimeSpentDiscardingMs = fields[17]
}
// Kernel 5.5+ further have the following fields
if lf >= 20 {
dl.FlushRequestsCompleted = fields[18]
dl.TimeSpentFlushingMs = fields[19]
}
diskChan <- &dl
}
}
}
}
func (r Run) logDisk(ctx context.Context, diskChan chan *DiskLog) error {
f, err := os.Create(path.Join(r.workDir(), LF_DISK))
if err != nil {
return wrapErr(err, "os.Create failed")
}
defer f.Close()
outer:
for {
select {
case <-ctx.Done():
break outer
case lav := <-diskChan:
if lav == nil {
break outer
}
log.Println("[logDisk] -> received new metric")
bs, err := json.Marshal(lav)
if err != nil {
r.log.Println(wrapErr(err, "json.Marshal failed"))
} else {
log.Println("[logDisk] -> writing to log file")
f.WriteString(fmt.Sprintf("%s\n", string(bs)))
}
}
}
return nil
}
func (r Run) persistInMemDir() {
// log.Println("[persistInMemDir] -> Fired")
if !r.logToMem {
return
}
log.Printf("moving: \"%s\" to \"%s\"\n", r.workDir(), r.wd)
cmd := exec.Command("mv", r.workDir(), fmt.Sprintf("%s/", r.wd))
if _, err := cmd.CombinedOutput(); err != nil {
r.log.Println(wrapErr(err, fmt.Sprintf("moving from shared memory to path: \"%s\" failed", r.wd)))
}
}
func (r Run) monitor(ctx context.Context) {
// log.Println("[monitor] -> Fired")
wg := sync.WaitGroup{}
// save pid file
pf, err := os.Create(path.Join(r.workDir(), "monitor.pid"))
if err != nil {
r.log.Println("error creating pid file")
return
}
pf.WriteString(fmt.Sprintf("%d", os.Getpid()))
// strace
tctx, tcancel := context.WithCancel(ctx)
wg.Add(1)
go func() {
defer wg.Done()
r.startStrace(tctx)
}()
// CPU ==============================
cpuStream := make(chan *LoadAvg, 1)
cpuCtx, cpuCancel := context.WithCancel(ctx)
logCpuCtx, logCpuCancel := context.WithCancel(ctx)
wg.Add(1)
go func() {
defer wg.Done()
r.monitorCPU(cpuCtx, cpuStream)
}()
wg.Add(1)
go func() {
defer wg.Done()
if err := r.logCPU(logCpuCtx, cpuStream); err != nil {
r.log.Println(wrapErr(err))
}
}()
// RAM
memStream := make(chan *Mem, 1)
memCtx, memCancel := context.WithCancel(ctx)
logMemCtx, logMemCancel := context.WithCancel(ctx)
wg.Add(1)
go func() {
defer wg.Done()
r.monitorMem(memCtx, memStream)
}()
wg.Add(1)
go func() {
defer wg.Done()
if err := r.logMem(logMemCtx, memStream); err != nil {
r.log.Println(wrapErr(err))
}
}()
// Disk
diskChan := make(chan *DiskLog, 20)
diskCtx, diskCancel := context.WithCancel(ctx)
logDiskCtx, logDiskCancel := context.WithCancel(ctx)
wg.Add(1)
go func() {
defer wg.Done()
r.monitorDisk(diskCtx, diskChan)
}()
wg.Add(1)
go func() {
defer wg.Done()
if err := r.logDisk(logDiskCtx, diskChan); err != nil {
r.log.Println(wrapErr(err))
}
}()
<-ctx.Done()
tcancel()
cpuCancel()
logCpuCancel()
memCancel()
logMemCancel()
diskCancel()
logDiskCancel()
wg.Wait()
r.persistInMemDir()
}