internal/common/worker_progress_counter.go (80 lines of code) (raw):
/*
* Copyright (c) 2023 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package common
import "go.uber.org/atomic"
type WorkerProgressCounter struct {
WorkerAddr string `json:"workerAddr"`
Total *atomic.Int64 `json:"total"`
Pulled *atomic.Int64 `json:"pulled"`
Running *atomic.Int64 `json:"running"`
Success *atomic.Int64 `json:"success"`
Failed *atomic.Int64 `json:"failed"`
}
func NewWorkerProgressCounter(workerAddr string) (c *WorkerProgressCounter) {
return &WorkerProgressCounter{
Total: atomic.NewInt64(0),
Pulled: atomic.NewInt64(0),
Running: atomic.NewInt64(0),
Success: atomic.NewInt64(0),
Failed: atomic.NewInt64(0),
WorkerAddr: workerAddr,
}
}
func (c *WorkerProgressCounter) DecrementFailed() {
if c.Failed.Load() > 0 {
c.Failed.Sub(1)
}
}
func (c *WorkerProgressCounter) DecrementRunning(delta int64) {
if c.Running.Load() >= delta {
c.Running.Sub(delta)
}
if c.Total.Load() >= delta {
c.Total.Sub(delta)
}
}
func (c *WorkerProgressCounter) DecrementSuccess() {
if c.Success.Load() > 0 {
c.Success.Sub(1)
}
}
func (c *WorkerProgressCounter) GetFailed() int64 {
return c.Failed.Load()
}
func (c *WorkerProgressCounter) GetRunning() int64 {
return c.Running.Load()
}
func (c *WorkerProgressCounter) GetSuccess() int64 {
return c.Success.Load()
}
func (c *WorkerProgressCounter) GetTotal() int64 {
return c.Total.Load()
}
func (c *WorkerProgressCounter) GetWorkerAddr() string {
return c.WorkerAddr
}
func (c *WorkerProgressCounter) IncrementFailed(delta int64) {
if c.Running.Load() >= delta {
c.Running.Sub(delta)
}
c.Failed.Add(delta)
}
func (c *WorkerProgressCounter) IncrementOneFailed() {
c.IncrementFailed(1)
}
func (c *WorkerProgressCounter) IncrementPulled() {
c.Pulled.Add(1)
}
func (c *WorkerProgressCounter) IncrementRunning() {
if c.Pulled.Load() > 0 {
c.Pulled.Sub(1)
}
c.Running.Add(1)
}
func (c *WorkerProgressCounter) IncrementSuccess() {
if c.Running.Load() > 0 {
c.Running.Sub(1)
}
c.Success.Add(1)
}
func (c *WorkerProgressCounter) IncrementTotal() {
c.Total.Add(1)
}