tools/cli/admin_timers.go (238 lines of code) (raw):
// The MIT License (MIT)
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.
package cli
import (
"context"
"encoding/json"
"fmt"
"io"
"os"
"github.com/urfave/cli"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/backoff"
"github.com/uber/cadence/common/persistence"
)
// LoadCloser loads timer task information
type LoadCloser interface {
Load() []*persistence.TimerTaskInfo
Close()
}
// Printer prints timer task information
type Printer interface {
Print(timers []*persistence.TimerTaskInfo) error
}
// Reporter wraps LoadCloser, Printer and a filter on time task type and domainID
type Reporter struct {
domainID string
timerTypes []int
loader LoadCloser
printer Printer
}
type dbLoadCloser struct {
ctx *cli.Context
executionManager persistence.ExecutionManager
}
type fileLoadCloser struct {
file *os.File
}
type histogramPrinter struct {
ctx *cli.Context
timeFormat string
}
type jsonPrinter struct {
ctx *cli.Context
}
// NewDBLoadCloser creates a new LoadCloser to load timer task information from database
func NewDBLoadCloser(c *cli.Context) LoadCloser {
shardID := getRequiredIntOption(c, FlagShardID)
executionManager := initializeExecutionStore(c, shardID)
return &dbLoadCloser{
ctx: c,
executionManager: executionManager,
}
}
// NewFileLoadCloser creates a new LoadCloser to load timer task information from file
func NewFileLoadCloser(c *cli.Context) LoadCloser {
file, err := os.Open(c.String(FlagInputFile))
if err != nil {
ErrorAndExit("cannot open file", err)
}
return &fileLoadCloser{
file: file,
}
}
// NewReporter creates a new Reporter
func NewReporter(domain string, timerTypes []int, loader LoadCloser, printer Printer) *Reporter {
return &Reporter{
timerTypes: timerTypes,
domainID: domain,
loader: loader,
printer: printer,
}
}
// NewHistogramPrinter creates a new Printer to display timer task information in a histogram
func NewHistogramPrinter(c *cli.Context, timeFormat string) Printer {
return &histogramPrinter{
ctx: c,
timeFormat: timeFormat,
}
}
// NewJSONPrinter creates a new Printer to display timer task information in a JSON format
func NewJSONPrinter(c *cli.Context) Printer {
return &jsonPrinter{
ctx: c,
}
}
func (r *Reporter) filter(timers []*persistence.TimerTaskInfo) []*persistence.TimerTaskInfo {
taskTypes := intSliceToSet(r.timerTypes)
for i, t := range timers {
if len(r.domainID) > 0 && t.DomainID != r.domainID {
timers[i] = nil
continue
}
if _, ok := taskTypes[t.TaskType]; !ok {
timers[i] = nil
continue
}
}
return timers
}
// Report loads, filters and prints timer tasks
func (r *Reporter) Report() error {
return r.printer.Print(r.filter(r.loader.Load()))
}
// AdminTimers is used to list scheduled timers.
func AdminTimers(c *cli.Context) {
timerTypes := c.IntSlice(FlagTimerType)
if !c.IsSet(FlagTimerType) || (len(timerTypes) == 1 && timerTypes[0] == -1) {
timerTypes = []int{
persistence.TaskTypeDecisionTimeout,
persistence.TaskTypeActivityTimeout,
persistence.TaskTypeUserTimer,
persistence.TaskTypeWorkflowTimeout,
persistence.TaskTypeDeleteHistoryEvent,
persistence.TaskTypeActivityRetryTimer,
persistence.TaskTypeWorkflowBackoffTimer,
}
}
// setup loader
var loader LoadCloser
if !c.IsSet(FlagInputFile) {
loader = NewDBLoadCloser(c)
} else {
loader = NewFileLoadCloser(c)
}
defer loader.Close()
// setup printer
var printer Printer
if !c.Bool(FlagPrintJSON) {
var timerFormat string
if c.IsSet(FlagDateFormat) {
timerFormat = c.String(FlagDateFormat)
} else {
switch c.String(FlagBucketSize) {
case "day":
timerFormat = "2006-01-02"
case "hour":
timerFormat = "2006-01-02T15"
case "minute":
timerFormat = "2006-01-02T15:04"
case "second":
timerFormat = "2006-01-02T15:04:05"
default:
ErrorAndExit("unknown bucket size: "+c.String(FlagBucketSize), nil)
}
}
printer = NewHistogramPrinter(c, timerFormat)
} else {
printer = NewJSONPrinter(c)
}
reporter := NewReporter(c.String(FlagDomainID), timerTypes, loader, printer)
if err := reporter.Report(); err != nil {
ErrorAndExit("Reporter failed", err)
}
}
func (jp *jsonPrinter) Print(timers []*persistence.TimerTaskInfo) error {
for _, t := range timers {
if t == nil {
continue
}
data, err := json.Marshal(t)
if err != nil {
if !jp.ctx.Bool(FlagSkipErrorMode) {
ErrorAndExit("cannot marshal timer to json", err)
}
fmt.Println(err.Error())
} else {
fmt.Println(string(data))
}
}
return nil
}
func (cl *dbLoadCloser) Load() []*persistence.TimerTaskInfo {
batchSize := cl.ctx.Int(FlagBatchSize)
startDate := cl.ctx.String(FlagStartDate)
endDate := cl.ctx.String(FlagEndDate)
st, err := parseSingleTs(startDate)
if err != nil {
ErrorAndExit("wrong date format for "+FlagStartDate, err)
}
et, err := parseSingleTs(endDate)
if err != nil {
ErrorAndExit("wrong date format for "+FlagEndDate, err)
}
var timers []*persistence.TimerTaskInfo
isRetryable := func(err error) bool {
return persistence.IsTransientError(err) || common.IsContextTimeoutError(err)
}
throttleRetry := backoff.NewThrottleRetry(
backoff.WithRetryPolicy(common.CreatePersistenceRetryPolicy()),
backoff.WithRetryableError(isRetryable),
)
var token []byte
isFirstIteration := true
for isFirstIteration || len(token) != 0 {
isFirstIteration = false
req := persistence.GetTimerIndexTasksRequest{
MinTimestamp: st,
MaxTimestamp: et,
BatchSize: batchSize,
NextPageToken: token,
}
resp := &persistence.GetTimerIndexTasksResponse{}
op := func() error {
ctx, cancel := newContext(cl.ctx)
defer cancel()
var err error
resp, err = cl.executionManager.GetTimerIndexTasks(ctx, &req)
return err
}
err = throttleRetry.Do(context.Background(), op)
if err != nil {
ErrorAndExit("cannot get timer tasks for shard", err)
}
token = resp.NextPageToken
timers = append(timers, resp.Timers...)
}
return timers
}
func (cl *dbLoadCloser) Close() {
if cl.executionManager != nil {
cl.executionManager.Close()
}
}
func (fl *fileLoadCloser) Load() []*persistence.TimerTaskInfo {
var data []*persistence.TimerTaskInfo
dec := json.NewDecoder(fl.file)
for {
var timer persistence.TimerTaskInfo
if err := dec.Decode(&timer); err != nil {
if err == io.EOF || err == io.ErrUnexpectedEOF {
break
}
}
data = append(data, &timer)
}
return data
}
func (fl *fileLoadCloser) Close() {
if fl.file != nil {
fl.file.Close()
}
}
func (hp *histogramPrinter) Print(timers []*persistence.TimerTaskInfo) error {
h := NewHistogram()
for _, t := range timers {
if t == nil {
continue
}
h.Add(t.VisibilityTimestamp.Format(hp.timeFormat))
}
return h.Print(hp.ctx.Int(FlagShardMultiplier))
}