oss/lib/monitor.go (571 lines of code) (raw):
package lib
import (
"fmt"
"strings"
"sync/atomic"
"time"
)
const (
normalExit = iota
errExit
)
var processTickInterval int64 = 5
var clearStrLen int = 0
var clearStr string = strings.Repeat(" ", clearStrLen)
func getClearStr(str string) string {
if clearStrLen <= len(str) {
clearStrLen = len(str)
return fmt.Sprintf("\r%s", str)
}
clearStr = strings.Repeat(" ", clearStrLen)
return fmt.Sprintf("\r%s\r%s", clearStr, str)
}
type Monitorer interface {
setScanError(err error)
updateScanNum(num int64)
setScanEnd()
}
// for normal object operation
type MonitorSnap struct {
okNum int64
errNum int64
skipNum int64
dealNum int64
}
/*
* Put same type variables together to make them 64bits alignment to avoid
* atomic.AddInt64() panic
* Please guarantee the alignment if you add new filed
*/
type Monitor struct {
opStr string
totalNum int64
okNum int64
errNum int64
skipNum int64
seekAheadError error
seekAheadEnd bool
finish bool
_ uint32 //Add padding to make sure the next data 64bits alignment
}
func (m *Monitor) init(opStr string) {
m.opStr = opStr
m.totalNum = 0
m.seekAheadEnd = false
m.seekAheadError = nil
m.okNum = 0
m.errNum = 0
m.skipNum = 0
m.finish = false
}
func (m *Monitor) setScanError(err error) {
m.seekAheadError = err
m.seekAheadEnd = true
}
func (m *Monitor) updateScanNum(num int64) {
m.totalNum = m.totalNum + num
}
func (m *Monitor) setScanEnd() {
m.seekAheadEnd = true
}
func (m *Monitor) updateOKNum(num int64) {
atomic.AddInt64(&m.okNum, num)
}
func (m *Monitor) updateErrNum(num int64) {
atomic.AddInt64(&m.errNum, num)
}
func (m *Monitor) getSnapshot() *MonitorSnap {
var snap MonitorSnap
snap.okNum = m.okNum
snap.errNum = m.errNum
snap.skipNum = m.skipNum
snap.dealNum = snap.okNum + snap.errNum
return &snap
}
func (m *Monitor) progressBar(finish bool, exitStat int) string {
if m.finish {
return ""
}
m.finish = m.finish || finish
if !finish {
return m.getProgressBar()
}
return m.getFinishBar(exitStat)
}
func (m *Monitor) getProgressBar() string {
snap := m.getSnapshot()
if m.seekAheadEnd && m.seekAheadError == nil {
if snap.errNum == 0 {
return getClearStr(fmt.Sprintf("Total %d objects. %s %d objects, Progress: %d%s", m.totalNum, m.opStr, snap.okNum, m.getPrecent(snap), "%%"))
}
return getClearStr(fmt.Sprintf("Total %d objects. %s %d objects, Error %d objects, Progress: %d%s", m.totalNum, m.opStr, snap.okNum, snap.errNum, m.getPrecent(snap), "%%"))
}
scanNum := max(m.totalNum, snap.dealNum)
if snap.errNum == 0 {
return getClearStr(fmt.Sprintf("Scanned %d objects. %s %d objects.", scanNum, m.opStr, snap.okNum))
}
return getClearStr(fmt.Sprintf("Scanned %d objects. %s %d objects, Error %d objects.", scanNum, m.opStr, snap.okNum, snap.errNum))
}
func (m *Monitor) getPrecent(snap *MonitorSnap) int {
if m.seekAheadEnd && m.seekAheadError == nil {
if m.totalNum != 0 {
return int(float64((snap.dealNum)*100.0) / float64(m.totalNum))
}
return 100
}
return 0
}
func (m *Monitor) getFinishBar(exitStat int) string {
if exitStat == normalExit {
return m.getWholeFinishBar()
}
return m.getDefeatBar()
}
func (m *Monitor) getWholeFinishBar() string {
snap := m.getSnapshot()
if m.seekAheadEnd && m.seekAheadError == nil {
if snap.errNum == 0 {
return getClearStr(fmt.Sprintf("Succeed: Total %d objects. %s %d objects(skip %d objects).\n", m.totalNum, m.opStr, snap.okNum, snap.skipNum))
}
return getClearStr(fmt.Sprintf("FinishWithError: Total %d objects. %s %d objects(skip %d objects), Error %d objects.\n", m.totalNum, m.opStr, snap.okNum, snap.skipNum, snap.errNum))
}
scanNum := max(m.totalNum, snap.dealNum)
if snap.errNum == 0 {
return getClearStr(fmt.Sprintf("Succeed: Total %d objects. %s %d objects(skip %d objects).\n", scanNum, m.opStr, snap.okNum, snap.skipNum))
}
return getClearStr(fmt.Sprintf("FinishWithError: Scanned %d objects. %s %d objects(skip %d objects), Error %d objects.\n", scanNum, m.opStr, snap.okNum, snap.skipNum, snap.errNum))
}
func (m *Monitor) getDefeatBar() string {
snap := m.getSnapshot()
if m.seekAheadEnd && m.seekAheadError == nil {
return getClearStr(fmt.Sprintf("Total %d objects. %s %d objects(skip %d objects), when error happens.\n", m.totalNum, m.opStr, snap.okNum, snap.skipNum))
}
scanNum := max(m.totalNum, snap.dealNum)
return getClearStr(fmt.Sprintf("Scanned %d objects. %s %d objects(skip %d objects), when error happens.\n", scanNum, m.opStr, snap.okNum, snap.skipNum))
}
// For rm
type RMMonitorSnap struct {
objectNum int64
uploadIdNum int64
errObjectNum int64
errUploadIdNum int64
dealNum int64
errNum int64
removedBucket string
}
/*
* Put same type variables together to make them 64bits alignment to avoid
* atomic.AddInt64() panic
* Please guarantee the alignment if you add new filed
*/
type RMMonitor struct {
op int64
totalObjectNum int64
totalUploadIdNum int64
objectNum int64
uploadIdNum int64
errObjectNum int64
errUploadIdNum int64
removedBucket string
seekAheadError error
seekAheadEnd bool
finish bool
_ uint32 //Add padding to make sure the next data 64bits alignment
}
func (m *RMMonitor) init() {
m.op = 0
m.totalObjectNum = 0
m.totalUploadIdNum = 0
m.seekAheadEnd = false
m.seekAheadError = nil
m.objectNum = 0
m.uploadIdNum = 0
m.errObjectNum = 0
m.errUploadIdNum = 0
m.finish = false
m.removedBucket = ""
}
func (m *RMMonitor) updateOP(op int64) {
m.op = m.op | op
}
func (m *RMMonitor) setOP(op int64) {
m.op = op
}
func (m *RMMonitor) setScanError(err error) {
m.seekAheadError = err
m.seekAheadEnd = true
}
func (m *RMMonitor) updateScanNum(num int64) {
m.totalObjectNum = m.totalObjectNum + num
}
func (m *RMMonitor) updateScanUploadIdNum(num int64) {
m.totalUploadIdNum = m.totalUploadIdNum + num
}
func (m *RMMonitor) setScanEnd() {
m.seekAheadEnd = true
}
func (m *RMMonitor) updateObjectNum(num int64) {
atomic.AddInt64(&m.objectNum, num)
}
func (m *RMMonitor) updateUploadIdNum(num int64) {
atomic.AddInt64(&m.uploadIdNum, num)
}
func (m *RMMonitor) updateErrObjectNum(num int64) {
atomic.AddInt64(&m.errObjectNum, num)
}
func (m *RMMonitor) updateErrUploadIdNum(num int64) {
atomic.AddInt64(&m.errUploadIdNum, num)
}
func (m *RMMonitor) updateRemovedBucket(bucket string) {
m.removedBucket = bucket
}
func (m *RMMonitor) getSnapshot() *RMMonitorSnap {
var snap RMMonitorSnap
snap.objectNum = m.objectNum
snap.uploadIdNum = m.uploadIdNum
snap.errObjectNum = m.errObjectNum
snap.errUploadIdNum = m.errUploadIdNum
snap.dealNum = snap.objectNum + snap.uploadIdNum + snap.errObjectNum + snap.errUploadIdNum
snap.errNum = snap.errObjectNum + snap.errUploadIdNum
snap.removedBucket = m.removedBucket
return &snap
}
func (m *RMMonitor) progressBar(finish bool, exitStat int) string {
if m.finish {
return ""
}
m.finish = m.finish || finish
if !finish {
return m.getProgressBar()
}
return m.getFinishBar(exitStat)
}
func (m *RMMonitor) getProgressBar() string {
if m.op&allType != 0 {
snap := m.getSnapshot()
if m.seekAheadEnd && m.seekAheadError == nil {
return getClearStr(fmt.Sprintf("Total %s. %s%s Progress: %d%s", m.getTotalInfo(), m.getOKInfo(snap), m.getErrInfo(snap), m.getPrecent(snap), "%%"))
}
m.totalObjectNum = max(m.totalObjectNum, snap.objectNum+snap.errObjectNum)
m.totalUploadIdNum = max(m.totalUploadIdNum, snap.uploadIdNum+snap.errUploadIdNum)
return getClearStr(fmt.Sprintf("Scanned %s. %s%s", m.getTotalInfo(), m.getOKInfo(snap), m.getErrInfo(snap)))
}
return getClearStr("")
}
func (m *RMMonitor) getTotalInfo() string {
strList := []string{}
if m.op&objectType != 0 {
strList = append(strList, fmt.Sprintf("%d objects", m.totalObjectNum))
}
if m.op&multipartType != 0 {
strList = append(strList, fmt.Sprintf("%d uploadIds", m.totalUploadIdNum))
}
return strings.Join(strList, ", ")
}
func (m *RMMonitor) getOKInfo(snap *RMMonitorSnap) string {
strList := []string{}
if m.op&allType == 0 {
return ""
}
if m.op&objectType != 0 {
strList = append(strList, fmt.Sprintf("%d objects", snap.objectNum))
}
if m.op&multipartType != 0 {
strList = append(strList, fmt.Sprintf("%d uploadIds", snap.uploadIdNum))
}
return fmt.Sprintf("Removed %s.", strings.Join(strList, ", "))
}
func (m *RMMonitor) getErrInfo(snap *RMMonitorSnap) string {
if snap.errNum != 0 {
strList := []string{}
if snap.errObjectNum != 0 {
strList = append(strList, fmt.Sprintf("%d objects", snap.errObjectNum))
}
if snap.errUploadIdNum != 0 {
strList = append(strList, fmt.Sprintf("%d uploadIds", snap.errUploadIdNum))
}
return fmt.Sprintf(" Error %s.", strings.Join(strList, ", "))
}
return ""
}
func (m *RMMonitor) getPrecent(snap *RMMonitorSnap) int {
if m.seekAheadEnd && m.seekAheadError == nil {
if m.totalObjectNum+m.totalUploadIdNum != 0 {
return int(float64((snap.dealNum)*100.0) / float64(m.totalObjectNum+m.totalUploadIdNum))
}
return 100
}
return 0
}
func (m *RMMonitor) getFinishBar(exitStat int) string {
snap := m.getSnapshot()
return m.getObjectFinishBar(snap, exitStat) + m.getBucketFinishBar(snap)
}
func (m *RMMonitor) getObjectFinishBar(snap *RMMonitorSnap, exitStat int) string {
if m.op&allType != 0 {
if m.seekAheadEnd && m.seekAheadError == nil {
if m.getExitStat(snap, exitStat) == errExit {
return getClearStr(fmt.Sprintf("Total %s. %s when error happens.\n", m.getTotalInfo(), m.getOKInfo(snap)))
}
return getClearStr(fmt.Sprintf("Succeed: Total %s. %s\n", m.getTotalInfo(), m.getOKInfo(snap)))
}
m.totalObjectNum = max(m.totalObjectNum, snap.objectNum+snap.errObjectNum)
m.totalUploadIdNum = max(m.totalUploadIdNum, snap.uploadIdNum+snap.errUploadIdNum)
if m.getExitStat(snap, exitStat) == errExit {
return getClearStr(fmt.Sprintf("Scanned %s. %s when error happens.\n", m.getTotalInfo(), m.getOKInfo(snap)))
}
return getClearStr(fmt.Sprintf("Succeed: Total %s. %s\n", m.getTotalInfo(), m.getOKInfo(snap)))
}
return getClearStr("")
}
func (m *RMMonitor) getExitStat(snap *RMMonitorSnap, exitStat int) int {
if exitStat != normalExit || snap.errNum != 0 || (m.op&bucketType != 0 && snap.removedBucket == "") {
return errExit
}
return normalExit
}
func (m *RMMonitor) getBucketFinishBar(snap *RMMonitorSnap) string {
if m.op&bucketType != 0 && snap.removedBucket != "" {
return getClearStr(fmt.Sprintf("Removed Bucket: %s\n", snap.removedBucket))
}
return getClearStr("")
}
// for cp
type CPMonitorSnap struct {
transferSize int64
skipSize int64
dealSize int64
fileNum int64
dirNum int64
skipNum int64
skipNumDir int64
errNum int64
okNum int64
dealNum int64
duration int64
incrementSize int64
}
/*
* Put same type variables together to make them 64bits alignment to avoid
* atomic.AddInt64() panic
* Please guarantee the alignment if you add new filed
*/
type CPMonitor struct {
totalSize int64
totalNum int64
transferSize int64
skipSize int64
dealSize int64
fileNum int64
dirNum int64
skipNum int64
skipNumDir int64
errNum int64
lastSnapSize int64
tickDuration int64
seekAheadError error
op operationType
seekAheadEnd bool
finish bool
_ uint32 //Add padding to make sure the next data 64bits alignment
lastSnapTime time.Time
}
func (m *CPMonitor) init(op operationType) {
m.op = op
m.totalSize = 0
m.totalNum = 0
m.seekAheadEnd = false
m.seekAheadError = nil
m.transferSize = 0
m.skipSize = 0
m.dealSize = 0
m.fileNum = 0
m.dirNum = 0
m.skipNum = 0
m.errNum = 0
m.finish = false
m.lastSnapSize = 0
m.lastSnapTime = time.Now()
m.tickDuration = processTickInterval * int64(time.Second)
}
func (m *CPMonitor) setScanError(err error) {
m.seekAheadError = err
m.seekAheadEnd = true
}
func (m *CPMonitor) updateScanNum(num int64) {
m.totalNum = m.totalNum + num
}
func (m *CPMonitor) updateScanSizeNum(size, num int64) {
m.totalSize = m.totalSize + size
m.totalNum = m.totalNum + num
}
func (m *CPMonitor) setScanEnd() {
m.seekAheadEnd = true
}
func (m *CPMonitor) updateTransferSize(size int64) {
atomic.AddInt64(&m.transferSize, size)
}
func (m *CPMonitor) updateDealSize(size int64) {
atomic.AddInt64(&m.dealSize, size)
}
func (m *CPMonitor) updateFile(size, num int64) {
atomic.AddInt64(&m.fileNum, num)
atomic.AddInt64(&m.transferSize, size)
atomic.AddInt64(&m.dealSize, size)
}
func (m *CPMonitor) updateDir(size, num int64) {
atomic.AddInt64(&m.dirNum, num)
atomic.AddInt64(&m.transferSize, size)
atomic.AddInt64(&m.dealSize, size)
}
func (m *CPMonitor) updateSkip(size, num int64) {
atomic.AddInt64(&m.skipNum, num)
atomic.AddInt64(&m.skipSize, size)
}
func (m *CPMonitor) updateSkipDir(num int64) {
atomic.AddInt64(&m.skipNumDir, num)
}
func (m *CPMonitor) updateErr(size, num int64) {
atomic.AddInt64(&m.errNum, num)
atomic.AddInt64(&m.transferSize, size)
}
func (m *CPMonitor) getSnapshot() *CPMonitorSnap {
var snap CPMonitorSnap
snap.transferSize = m.transferSize
snap.skipSize = m.skipSize
snap.dealSize = m.dealSize + snap.skipSize
snap.fileNum = m.fileNum
snap.dirNum = m.dirNum
snap.skipNum = m.skipNum
snap.errNum = m.errNum
snap.okNum = snap.fileNum + snap.dirNum + snap.skipNum
snap.dealNum = snap.okNum + snap.errNum
snap.skipNumDir = m.skipNumDir
now := time.Now()
snap.duration = now.Sub(m.lastSnapTime).Nanoseconds()
return &snap
}
func (m *CPMonitor) progressBar(finish bool, exitStat int) string {
if m.finish {
return ""
}
m.finish = m.finish || finish
if !finish {
return m.getProgressBar()
}
return m.getFinishBar(exitStat)
}
func (m *CPMonitor) getProgressBar() string {
mu.RLock()
defer mu.RUnlock()
snap := m.getSnapshot()
if snap.duration < m.tickDuration {
return ""
} else {
m.lastSnapTime = time.Now()
snap.incrementSize = m.transferSize - m.lastSnapSize
m.lastSnapSize = snap.transferSize
}
if m.seekAheadEnd && m.seekAheadError == nil {
return getClearStr(fmt.Sprintf("Total num: %d, size: %s. Dealed num: %d%s%s, Progress: %.3f%s, Speed: %.2fKB/s", m.totalNum, getSizeString(m.totalSize), snap.dealNum, m.getDealNumDetail(snap), m.getDealSizeDetail(snap), m.getPrecent(snap), "%%", m.getSpeed(snap)))
}
scanNum := max(m.totalNum, snap.dealNum)
scanSize := max(m.totalSize, snap.dealSize)
return getClearStr(fmt.Sprintf("Scanned num: %d, size: %s. Dealed num: %d%s%s, Speed: %.2fKB/s.", scanNum, getSizeString(scanSize), snap.dealNum, m.getDealNumDetail(snap), m.getDealSizeDetail(snap), m.getSpeed(snap)))
}
func (m *CPMonitor) getFinishBar(exitStat int) string {
if exitStat == normalExit {
return m.getWholeFinishBar()
}
return m.getDefeatBar()
}
func (m *CPMonitor) getWholeFinishBar() string {
snap := m.getSnapshot()
if m.seekAheadEnd && m.seekAheadError == nil {
if snap.errNum == 0 {
return getClearStr(fmt.Sprintf("Succeed: Total num: %d, size: %s. OK num: %d%s%s.\n", m.totalNum, getSizeString(m.totalSize), snap.okNum, m.getDealNumDetail(snap), m.getSkipSize(snap)))
}
return getClearStr(fmt.Sprintf("FinishWithError: Total num: %d, size: %s. Error num: %d. OK num: %d%s%s.\n", m.totalNum, getSizeString(m.totalSize), snap.errNum, snap.okNum, m.getOKNumDetail(snap), m.getSizeDetail(snap)))
}
scanNum := max(m.totalNum, snap.dealNum)
if snap.errNum == 0 {
return getClearStr(fmt.Sprintf("Succeed: Total num: %d, size: %s. OK num: %d%s%s.\n", scanNum, getSizeString(snap.dealSize), snap.okNum, m.getDealNumDetail(snap), m.getSkipSize(snap)))
}
return getClearStr(fmt.Sprintf("FinishWithError: Scanned %d %s. Error num: %d. OK num: %d%s%s.\n", scanNum, m.getSubject(), snap.errNum, snap.okNum, m.getOKNumDetail(snap), m.getSizeDetail(snap)))
}
func (m *CPMonitor) getDefeatBar() string {
snap := m.getSnapshot()
if m.seekAheadEnd && m.seekAheadError == nil {
return getClearStr(fmt.Sprintf("Total num: %d, size: %s. Dealed num: %d%s%s. When error happens.\n", m.totalNum, getSizeString(m.totalSize), snap.okNum, m.getOKNumDetail(snap), m.getSizeDetail(snap)))
}
scanNum := max(m.totalNum, snap.dealNum)
return getClearStr(fmt.Sprintf("Scanned %d %s. Dealed num: %d%s%s. When error happens.\n", scanNum, m.getSubject(), snap.okNum, m.getOKNumDetail(snap), m.getSizeDetail(snap)))
}
func (m *CPMonitor) getSubject() string {
switch m.op {
case operationTypePut:
return "files"
default:
return "objects"
}
}
func (m *CPMonitor) getDealNumDetail(snap *CPMonitorSnap) string {
return m.getNumDetail(snap, true)
}
func (m *CPMonitor) getOKNumDetail(snap *CPMonitorSnap) string {
return m.getNumDetail(snap, false)
}
func (m *CPMonitor) getNumDetail(snap *CPMonitorSnap, hasErr bool) string {
if !hasErr && snap.okNum == 0 {
return ""
}
strList := []string{}
if hasErr && snap.errNum != 0 {
strList = append(strList, fmt.Sprintf("Error %d %s", snap.errNum, m.getSubject()))
}
if snap.fileNum != 0 {
strList = append(strList, fmt.Sprintf("%s %d %s", m.getOPStr(), snap.fileNum, m.getSubject()))
}
if snap.dirNum != 0 {
str := fmt.Sprintf("%d directories", snap.dirNum)
if snap.fileNum == 0 {
str = fmt.Sprintf("%s %d directories", m.getOPStr(), snap.dirNum)
}
strList = append(strList, str)
}
if snap.skipNum != 0 {
strList = append(strList, fmt.Sprintf("skip %d %s", snap.skipNum, m.getSubject()))
}
if snap.skipNumDir != 0 {
strList = append(strList, fmt.Sprintf("skip %d directory", snap.skipNumDir))
}
if len(strList) == 0 {
return ""
}
return fmt.Sprintf("(%s)", strings.Join(strList, ", "))
}
func (m *CPMonitor) getSpeed(snap *CPMonitorSnap) float64 {
return (float64(snap.incrementSize) / 1024) / (float64(snap.duration) * 1e-9)
}
func (m *CPMonitor) getOPStr() string {
switch m.op {
case operationTypePut:
return "upload"
case operationTypeGet:
return "download"
default:
return "copy"
}
}
func (m *CPMonitor) getDealSizeDetail(snap *CPMonitorSnap) string {
return fmt.Sprintf(", OK size: %s", getSizeString(snap.dealSize))
}
func (m *CPMonitor) getSkipSize(snap *CPMonitorSnap) string {
if snap.skipSize != 0 {
return fmt.Sprintf(", Skip size: %s", getSizeString(snap.skipSize))
}
return ""
}
func (m *CPMonitor) getSizeDetail(snap *CPMonitorSnap) string {
if snap.skipSize == 0 {
return fmt.Sprintf(", Transfer size: %s", getSizeString(snap.transferSize))
}
if snap.transferSize == 0 {
return fmt.Sprintf(", Skip size: %s", getSizeString(snap.skipSize))
}
return fmt.Sprintf(", OK size: %s(transfer: %s, skip: %s)", getSizeString(snap.transferSize+snap.skipSize), getSizeString(snap.transferSize), getSizeString(snap.skipSize))
}
func (m *CPMonitor) getPrecent(snap *CPMonitorSnap) float64 {
if m.seekAheadEnd && m.seekAheadError == nil {
if m.totalSize != 0 {
return float64((snap.dealSize)*100.0) / float64(m.totalSize)
}
if m.totalNum != 0 {
return float64((snap.dealNum)*100.0) / float64(m.totalNum)
}
return 100
}
return 0
}