report/reporter.go (110 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package report import ( "fmt" "sync" "sync/atomic" "time" ) // Reporter impl type Reporter struct { // atomic counter byteSends sendBytes int64 reportDuration time.Duration closeCh chan struct{} wg sync.WaitGroup fileSize int64 TotalWorkers uint64 FinishedWorkers uint64 FailedWorkers map[int]int Lock sync.Mutex } func NewReporter(reportDuration int, size int64, totalWorkers uint64) *Reporter { return &Reporter{ reportDuration: time.Duration(reportDuration), closeCh: make(chan struct{}), fileSize: size, TotalWorkers: totalWorkers, FinishedWorkers: 0, FailedWorkers: make(map[int]int), Lock: sync.Mutex{}, } } // incr sent bytes func (r *Reporter) IncrSendBytes(bytes int64) { atomic.AddInt64(&r.sendBytes, bytes) } // convert bytes to human readable func humanReadableBytes(bytes int64) string { if bytes < 1024 { return fmt.Sprintf("%d B", bytes) } if bytes < 1024*1024 { return fmt.Sprintf("%.2f KB", float64(bytes)/1024) } if bytes < 1024*1024*1024 { return fmt.Sprintf("%.2f MB", float64(bytes)/1024/1024) } return fmt.Sprintf("%.2f GB", float64(bytes)/1024/1024/1024) } // ticker to print send bytes, duration func (r *Reporter) Report() { r.wg.Add(1) go r.report() } func getProgressBar(progress, total int, fill *int) string { barSize := 60 // length of the progress bar percentage := float64(progress) / float64(total) numberOfBars := int(percentage * float64(barSize)) bars := "" for i := 0; i < numberOfBars; i++ { bars += "=" } *fill = numberOfBars return bars } // fill the rest of the progress bar func getEmptyBar(emptySize int) string { emptyBars := "" for i := 0; i < emptySize; i++ { emptyBars += " " } return emptyBars } func (r *Reporter) report() { ticker := time.NewTicker(r.reportDuration * time.Second) beg := time.Now() last_sent_time := beg last_sent := atomic.LoadInt64(&r.sendBytes) for { select { case <-ticker.C: now := time.Now() beg_duration := now.Sub(beg).Seconds() duration := now.Sub(last_sent_time).Seconds() last_sent_time = now sent := atomic.LoadInt64(&r.sendBytes) rate := int64(float64(sent-last_sent) / duration) total := r.fileSize progress := int(float64(float64(sent)/float64(total)) * 100) if progress >= 100 { progress = 100 } last_sent = sent fill := 0 fmt.Print("\r") fmt.Printf("Progress: [%s%s] %d%% Elapsed: %.2fs Rate: %s/s", getProgressBar(progress, 100, &fill), getEmptyBar(60-fill), progress, beg_duration, humanReadableBytes(rate)) fmt.Print("\033[0K") // some worker may be failed. if progress == 100 || atomic.LoadUint64(&r.FinishedWorkers) == atomic.LoadUint64(&r.TotalWorkers) { fmt.Print("\nFinishing and publishing data ...\n") r.wg.Done() return } case <-r.closeCh: r.wg.Done() return } } } // close and wait for all report done func (r *Reporter) CloseWait() { close(r.closeCh) r.wg.Wait() }