tools/cli/workflow_commands.go (1,858 lines of code) (raw):
// Copyright (c) 2017-2020 Uber Technologies Inc.
// Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package cli
import (
"bufio"
"context"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"math"
"math/rand"
"os"
"regexp"
"strconv"
"strings"
"sync"
"time"
"github.com/olekukonko/tablewriter"
"github.com/pborman/uuid"
"github.com/urfave/cli"
"github.com/uber/cadence/client/frontend"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/types"
"github.com/uber/cadence/service/history/execution"
)
// RestartWorkflow restarts a workflow execution
func RestartWorkflow(c *cli.Context) {
wfClient := getWorkflowClient(c)
domain := getRequiredGlobalOption(c, FlagDomain)
wid := getRequiredOption(c, FlagWorkflowID)
rid := c.String(FlagRunID)
ctx, cancel := newContext(c)
defer cancel()
resp, err := wfClient.RestartWorkflowExecution(
ctx,
&types.RestartWorkflowExecutionRequest{
Domain: domain,
WorkflowExecution: &types.WorkflowExecution{
WorkflowID: wid,
RunID: rid,
}, Identity: getCliIdentity(),
},
)
if err != nil {
ErrorAndExit("Restart workflow failed.", err)
} else {
fmt.Printf("Restarted Workflow Id: %s, run Id: %s\n", wid, resp.GetRunID())
}
}
// ShowHistory shows the history of given workflow execution based on workflowID and runID.
func ShowHistory(c *cli.Context) {
wid := getRequiredOption(c, FlagWorkflowID)
rid := c.String(FlagRunID)
showHistoryHelper(c, wid, rid)
}
// ShowHistoryWithWID shows the history of given workflow with workflow_id
func ShowHistoryWithWID(c *cli.Context) {
if !c.Args().Present() {
ErrorAndExit("Argument workflow_id is required.", nil)
}
wid := c.Args().First()
rid := ""
if c.NArg() >= 2 {
rid = c.Args().Get(1)
}
showHistoryHelper(c, wid, rid)
}
func showHistoryHelper(c *cli.Context, wid, rid string) {
wfClient := getWorkflowClient(c)
domain := getRequiredGlobalOption(c, FlagDomain)
printDateTime := c.Bool(FlagPrintDateTime)
printRawTime := c.Bool(FlagPrintRawTime)
printFully := c.Bool(FlagPrintFullyDetail)
printVersion := c.Bool(FlagPrintEventVersion)
outputFileName := c.String(FlagOutputFilename)
var maxFieldLength int
if c.IsSet(FlagMaxFieldLength) || !printFully {
maxFieldLength = c.Int(FlagMaxFieldLength)
}
resetPointsOnly := c.Bool(FlagResetPointsOnly)
ctx, cancel := newContext(c)
defer cancel()
history, err := GetHistory(ctx, wfClient, domain, wid, rid)
if err != nil {
ErrorAndExit(fmt.Sprintf("Failed to get history on workflow id: %s, run id: %s.", wid, rid), err)
}
prevEvent := types.HistoryEvent{}
if printFully { // dump everything
for _, e := range history.Events {
if resetPointsOnly {
if prevEvent.GetEventType() != types.EventTypeDecisionTaskStarted {
prevEvent = *e
continue
}
prevEvent = *e
}
fmt.Println(anyToString(e, true, maxFieldLength))
}
} else if c.IsSet(FlagEventID) { // only dump that event
eventID := c.Int(FlagEventID)
if eventID <= 0 || eventID > len(history.Events) {
ErrorAndExit("EventId out of range.", fmt.Errorf("number should be 1 - %d inclusive", len(history.Events)))
}
e := history.Events[eventID-1]
fmt.Println(anyToString(e, true, 0))
} else { // use table to pretty output, will trim long text
table := tablewriter.NewWriter(os.Stdout)
table.SetBorder(false)
table.SetColumnSeparator("")
for _, e := range history.Events {
if resetPointsOnly {
if prevEvent.GetEventType() != types.EventTypeDecisionTaskStarted {
prevEvent = *e
continue
}
prevEvent = *e
}
columns := []string{}
columns = append(columns, strconv.FormatInt(e.ID, 10))
if printRawTime {
columns = append(columns, strconv.FormatInt(e.GetTimestamp(), 10))
} else if printDateTime {
columns = append(columns, convertTime(e.GetTimestamp(), false))
}
if printVersion {
columns = append(columns, fmt.Sprintf("(Version: %v)", e.Version))
}
columns = append(columns, ColorEvent(e), HistoryEventToString(e, false, maxFieldLength))
table.Append(columns)
}
table.Render()
}
if outputFileName != "" {
serializer := &JSONHistorySerializer{}
data, err := serializer.Serialize(history)
if err != nil {
ErrorAndExit("Failed to serialize history data.", err)
}
if err := ioutil.WriteFile(outputFileName, data, 0666); err != nil {
ErrorAndExit("Failed to export history data file.", err)
}
}
// finally append activities with retry
frontendClient := cFactory.ServerFrontendClient(c)
resp, err := frontendClient.DescribeWorkflowExecution(ctx, &types.DescribeWorkflowExecutionRequest{
Domain: domain,
Execution: &types.WorkflowExecution{
WorkflowID: wid,
RunID: rid,
},
})
if err != nil {
if _, ok := err.(*types.EntityNotExistsError); ok {
fmt.Printf("%s %s\n", colorRed("Error:"), err)
return
}
ErrorAndExit("Describe workflow execution failed, cannot get information of pending activities", err)
}
fmt.Println("History Source: Default Storage")
descOutput := convertDescribeWorkflowExecutionResponse(resp, frontendClient, c)
if len(descOutput.PendingActivities) > 0 {
fmt.Println("============Workflow Pending activities============")
prettyPrintJSONObject(descOutput.PendingActivities)
fmt.Println("NOTE: ActivityStartedEvent with retry policy will be written into history when the activity is finished.")
}
}
// StartWorkflow starts a new workflow execution
func StartWorkflow(c *cli.Context) {
startWorkflowHelper(c, false)
}
// RunWorkflow starts a new workflow execution and print workflow progress and result
func RunWorkflow(c *cli.Context) {
startWorkflowHelper(c, true)
}
func startWorkflowHelper(c *cli.Context, shouldPrintProgress bool) {
serviceClient := cFactory.ServerFrontendClient(c)
startRequest := constructStartWorkflowRequest(c)
domain := startRequest.GetDomain()
wid := startRequest.GetWorkflowID()
workflowType := startRequest.WorkflowType.GetName()
taskList := startRequest.TaskList.GetName()
input := string(startRequest.Input)
startFn := func() {
tcCtx, cancel := newContext(c)
defer cancel()
resp, err := serviceClient.StartWorkflowExecution(tcCtx, startRequest)
if err != nil {
ErrorAndExit("Failed to create workflow.", err)
} else {
fmt.Printf("Started Workflow Id: %s, run Id: %s\n", wid, resp.GetRunID())
}
}
runFn := func() {
tcCtx, cancel := newContextForLongPoll(c)
defer cancel()
resp, err := serviceClient.StartWorkflowExecution(tcCtx, startRequest)
if err != nil {
ErrorAndExit("Failed to run workflow.", err)
}
// print execution summary
fmt.Println(colorMagenta("Running execution:"))
table := tablewriter.NewWriter(os.Stdout)
executionData := [][]string{
{"Workflow Id", wid},
{"Run Id", resp.GetRunID()},
{"Type", workflowType},
{"Domain", domain},
{"Task List", taskList},
{"Args", truncate(input)}, // in case of large input
}
table.SetBorder(false)
table.SetColumnSeparator(":")
table.AppendBulk(executionData) // Add Bulk Data
table.Render()
printWorkflowProgress(c, domain, wid, resp.GetRunID())
}
if shouldPrintProgress {
runFn()
} else {
startFn()
}
}
func constructStartWorkflowRequest(c *cli.Context) *types.StartWorkflowExecutionRequest {
domain := getRequiredGlobalOption(c, FlagDomain)
taskList := getRequiredOption(c, FlagTaskList)
workflowType := getRequiredOption(c, FlagWorkflowType)
et := c.Int(FlagExecutionTimeout)
if et == 0 {
ErrorAndExit(fmt.Sprintf("Option %s format is invalid.", FlagExecutionTimeout), nil)
}
dt := c.Int(FlagDecisionTimeout)
wid := c.String(FlagWorkflowID)
if len(wid) == 0 {
wid = uuid.New()
}
reusePolicy := defaultWorkflowIDReusePolicy.Ptr()
if c.IsSet(FlagWorkflowIDReusePolicy) {
reusePolicy = getWorkflowIDReusePolicy(c.Int(FlagWorkflowIDReusePolicy))
}
input := processJSONInput(c)
startRequest := &types.StartWorkflowExecutionRequest{
RequestID: uuid.New(),
Domain: domain,
WorkflowID: wid,
WorkflowType: &types.WorkflowType{
Name: workflowType,
},
TaskList: &types.TaskList{
Name: taskList,
},
Input: []byte(input),
ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(int32(et)),
TaskStartToCloseTimeoutSeconds: common.Int32Ptr(int32(dt)),
Identity: getCliIdentity(),
WorkflowIDReusePolicy: reusePolicy,
}
if c.IsSet(FlagCronSchedule) {
startRequest.CronSchedule = c.String(FlagCronSchedule)
}
if c.IsSet(FlagRetryAttempts) || c.IsSet(FlagRetryExpiration) {
startRequest.RetryPolicy = &types.RetryPolicy{
InitialIntervalInSeconds: int32(c.Int(FlagRetryInterval)),
BackoffCoefficient: c.Float64(FlagRetryBackoff),
}
if c.IsSet(FlagRetryAttempts) {
startRequest.RetryPolicy.MaximumAttempts = int32(c.Int(FlagRetryAttempts))
}
if c.IsSet(FlagRetryExpiration) {
startRequest.RetryPolicy.ExpirationIntervalInSeconds = int32(c.Int(FlagRetryExpiration))
}
if c.IsSet(FlagRetryMaxInterval) {
startRequest.RetryPolicy.MaximumIntervalInSeconds = int32(c.Int(FlagRetryMaxInterval))
}
}
if c.IsSet(DelayStartSeconds) {
startRequest.DelayStartSeconds = common.Int32Ptr(int32(c.Int(DelayStartSeconds)))
}
if c.IsSet(JitterStartSeconds) {
startRequest.JitterStartSeconds = common.Int32Ptr(int32(c.Int(JitterStartSeconds)))
}
headerFields := processHeader(c)
if len(headerFields) != 0 {
startRequest.Header = &types.Header{Fields: headerFields}
}
memoFields := processMemo(c)
if len(memoFields) != 0 {
startRequest.Memo = &types.Memo{Fields: memoFields}
}
searchAttrFields := processSearchAttr(c)
if len(searchAttrFields) != 0 {
startRequest.SearchAttributes = &types.SearchAttributes{IndexedFields: searchAttrFields}
}
return startRequest
}
func processSearchAttr(c *cli.Context) map[string][]byte {
rawSearchAttrKey := c.String(FlagSearchAttributesKey)
var searchAttrKeys []string
if strings.TrimSpace(rawSearchAttrKey) != "" {
searchAttrKeys = trimSpace(strings.Split(rawSearchAttrKey, searchAttrInputSeparator))
}
rawSearchAttrVal := c.String(FlagSearchAttributesVal)
var searchAttrVals []interface{}
if strings.TrimSpace(rawSearchAttrVal) != "" {
searchAttrValsStr := trimSpace(strings.Split(rawSearchAttrVal, searchAttrInputSeparator))
for _, v := range searchAttrValsStr {
searchAttrVals = append(searchAttrVals, convertStringToRealType(v))
}
}
if len(searchAttrKeys) != len(searchAttrVals) {
ErrorAndExit("Number of search attributes keys and values are not equal.", nil)
}
fields := map[string][]byte{}
for i, key := range searchAttrKeys {
val, err := json.Marshal(searchAttrVals[i])
if err != nil {
ErrorAndExit(fmt.Sprintf("Encode value %v error", val), err)
}
fields[key] = val
}
return fields
}
func processHeader(c *cli.Context) map[string][]byte {
headerKeys := processMultipleKeys(c.String(FlagHeaderKey), " ")
headerValues := processMultipleJSONValues(processJSONInputHelper(c, jsonTypeHeader))
if len(headerKeys) != len(headerValues) {
ErrorAndExit("Number of header keys and values are not equal.", nil)
}
return mapFromKeysValues(headerKeys, headerValues)
}
func processMemo(c *cli.Context) map[string][]byte {
memoKeys := processMultipleKeys(c.String(FlagMemoKey), " ")
memoValues := processMultipleJSONValues(processJSONInputHelper(c, jsonTypeMemo))
if len(memoKeys) != len(memoValues) {
ErrorAndExit("Number of memo keys and values are not equal.", nil)
}
return mapFromKeysValues(memoKeys, memoValues)
}
// helper function to print workflow progress with time refresh every second
func printWorkflowProgress(c *cli.Context, domain, wid, rid string) {
fmt.Println(colorMagenta("Progress:"))
wfClient := getWorkflowClient(c)
timeElapse := 1
isTimeElapseExist := false
doneChan := make(chan bool)
var lastEvent *types.HistoryEvent // used for print result of this run
ticker := time.NewTicker(time.Second).C
tcCtx, cancel := newIndefiniteContext(c)
defer cancel()
showDetails := c.Bool(FlagShowDetail)
var maxFieldLength int
if c.IsSet(FlagMaxFieldLength) {
maxFieldLength = c.Int(FlagMaxFieldLength)
}
go func() {
iterator, err := GetWorkflowHistoryIterator(tcCtx, wfClient, domain, wid, rid, true, types.HistoryEventFilterTypeAllEvent.Ptr())
if err != nil {
ErrorAndExit("Unable to get history events.", err)
}
for iterator.HasNext() {
entity, err := iterator.Next()
event := entity.(*types.HistoryEvent)
if err != nil {
ErrorAndExit("Unable to read event.", err)
}
if isTimeElapseExist {
removePrevious2LinesFromTerminal()
isTimeElapseExist = false
}
if showDetails {
fmt.Printf(" %d, %s, %s, %s\n", event.ID, convertTime(event.GetTimestamp(), false), ColorEvent(event), HistoryEventToString(event, true, maxFieldLength))
} else {
fmt.Printf(" %d, %s, %s\n", event.ID, convertTime(event.GetTimestamp(), false), ColorEvent(event))
}
lastEvent = event
}
doneChan <- true
}()
for {
select {
case <-ticker:
if isTimeElapseExist {
removePrevious2LinesFromTerminal()
}
fmt.Printf("\nTime elapse: %ds\n", timeElapse)
isTimeElapseExist = true
timeElapse++
case <-doneChan: // print result of this run
fmt.Println(colorMagenta("\nResult:"))
fmt.Printf(" Run Time: %d seconds\n", timeElapse)
printRunStatus(lastEvent)
return
}
}
}
// TerminateWorkflow terminates a workflow execution
func TerminateWorkflow(c *cli.Context) {
wfClient := getWorkflowClient(c)
domain := getRequiredGlobalOption(c, FlagDomain)
wid := getRequiredOption(c, FlagWorkflowID)
rid := c.String(FlagRunID)
reason := c.String(FlagReason)
ctx, cancel := newContext(c)
defer cancel()
err := wfClient.TerminateWorkflowExecution(
ctx,
&types.TerminateWorkflowExecutionRequest{
Domain: domain,
Reason: reason,
WorkflowExecution: &types.WorkflowExecution{
WorkflowID: wid,
RunID: rid,
}, Identity: getCliIdentity(),
},
)
if err != nil {
ErrorAndExit("Terminate workflow failed.", err)
} else {
fmt.Println("Terminate workflow succeeded.")
}
}
// CancelWorkflow cancels a workflow execution
func CancelWorkflow(c *cli.Context) {
wfClient := getWorkflowClient(c)
domain := getRequiredGlobalOption(c, FlagDomain)
wid := getRequiredOption(c, FlagWorkflowID)
rid := c.String(FlagRunID)
reason := c.String(FlagReason)
ctx, cancel := newContext(c)
defer cancel()
err := wfClient.RequestCancelWorkflowExecution(
ctx,
&types.RequestCancelWorkflowExecutionRequest{
Domain: domain,
WorkflowExecution: &types.WorkflowExecution{
WorkflowID: wid,
RunID: rid,
},
Identity: getCliIdentity(),
Cause: reason,
RequestID: uuid.New(),
},
)
if err != nil {
ErrorAndExit("Cancel workflow failed.", err)
} else {
fmt.Println("Cancel workflow succeeded.")
}
}
// SignalWorkflow signals a workflow execution
func SignalWorkflow(c *cli.Context) {
serviceClient := cFactory.ServerFrontendClient(c)
domain := getRequiredGlobalOption(c, FlagDomain)
wid := getRequiredOption(c, FlagWorkflowID)
rid := c.String(FlagRunID)
name := getRequiredOption(c, FlagName)
input := processJSONInput(c)
tcCtx, cancel := newContext(c)
defer cancel()
err := serviceClient.SignalWorkflowExecution(
tcCtx,
&types.SignalWorkflowExecutionRequest{
Domain: domain,
WorkflowExecution: &types.WorkflowExecution{
WorkflowID: wid,
RunID: rid,
},
SignalName: name,
Input: []byte(input),
Identity: getCliIdentity(),
RequestID: uuid.New(),
},
)
if err != nil {
ErrorAndExit("Signal workflow failed.", err)
} else {
fmt.Println("Signal workflow succeeded.")
}
}
// SignalWithStartWorkflowExecution starts a workflow execution if not already exists and signals it
func SignalWithStartWorkflowExecution(c *cli.Context) {
serviceClient := cFactory.ServerFrontendClient(c)
signalWithStartRequest := constructSignalWithStartWorkflowRequest(c)
tcCtx, cancel := newContext(c)
defer cancel()
resp, err := serviceClient.SignalWithStartWorkflowExecution(tcCtx, signalWithStartRequest)
if err != nil {
ErrorAndExit("SignalWithStart workflow failed.", err)
} else {
fmt.Printf("SignalWithStart workflow succeeded. Workflow Id: %s, run Id: %s\n", signalWithStartRequest.GetWorkflowID(), resp.GetRunID())
}
}
func constructSignalWithStartWorkflowRequest(c *cli.Context) *types.SignalWithStartWorkflowExecutionRequest {
startRequest := constructStartWorkflowRequest(c)
return &types.SignalWithStartWorkflowExecutionRequest{
Domain: startRequest.Domain,
WorkflowID: startRequest.WorkflowID,
WorkflowType: startRequest.WorkflowType,
TaskList: startRequest.TaskList,
Input: startRequest.Input,
ExecutionStartToCloseTimeoutSeconds: startRequest.ExecutionStartToCloseTimeoutSeconds,
TaskStartToCloseTimeoutSeconds: startRequest.TaskStartToCloseTimeoutSeconds,
Identity: startRequest.Identity,
RequestID: startRequest.RequestID,
WorkflowIDReusePolicy: startRequest.WorkflowIDReusePolicy,
RetryPolicy: startRequest.RetryPolicy,
CronSchedule: startRequest.CronSchedule,
Memo: startRequest.Memo,
SearchAttributes: startRequest.SearchAttributes,
Header: startRequest.Header,
SignalName: getRequiredOption(c, FlagName),
SignalInput: []byte(processJSONInputSignal(c)),
DelayStartSeconds: startRequest.DelayStartSeconds,
JitterStartSeconds: startRequest.JitterStartSeconds,
}
}
func processJSONInputSignal(c *cli.Context) string {
return processJSONInputHelper(c, jsonTypeSignal)
}
// QueryWorkflow query workflow execution
func QueryWorkflow(c *cli.Context) {
getRequiredGlobalOption(c, FlagDomain) // for pre-check and alert if not provided
getRequiredOption(c, FlagWorkflowID)
queryType := getRequiredOption(c, FlagQueryType)
queryWorkflowHelper(c, queryType)
}
// QueryWorkflowUsingStackTrace query workflow execution using __stack_trace as query type
func QueryWorkflowUsingStackTrace(c *cli.Context) {
queryWorkflowHelper(c, "__stack_trace")
}
// QueryWorkflowUsingQueryTypes list all query types of the workflow using __query_types as query type
func QueryWorkflowUsingQueryTypes(c *cli.Context) {
queryWorkflowHelper(c, "__query_types")
}
func queryWorkflowHelper(c *cli.Context, queryType string) {
serviceClient := cFactory.ServerFrontendClient(c)
domain := getRequiredGlobalOption(c, FlagDomain)
wid := getRequiredOption(c, FlagWorkflowID)
rid := c.String(FlagRunID)
input := processJSONInput(c)
tcCtx, cancel := newContext(c)
defer cancel()
queryRequest := &types.QueryWorkflowRequest{
Domain: domain,
Execution: &types.WorkflowExecution{
WorkflowID: wid,
RunID: rid,
},
Query: &types.WorkflowQuery{
QueryType: queryType,
},
}
if input != "" {
queryRequest.Query.QueryArgs = []byte(input)
}
if c.IsSet(FlagQueryRejectCondition) {
var rejectCondition types.QueryRejectCondition
switch c.String(FlagQueryRejectCondition) {
case "not_open":
rejectCondition = types.QueryRejectConditionNotOpen
case "not_completed_cleanly":
rejectCondition = types.QueryRejectConditionNotCompletedCleanly
default:
ErrorAndExit(fmt.Sprintf("invalid reject condition %v, valid values are \"not_open\" and \"not_completed_cleanly\"", c.String(FlagQueryRejectCondition)), nil)
}
queryRequest.QueryRejectCondition = &rejectCondition
}
if c.IsSet(FlagQueryConsistencyLevel) {
var consistencyLevel types.QueryConsistencyLevel
switch c.String(FlagQueryConsistencyLevel) {
case "eventual":
consistencyLevel = types.QueryConsistencyLevelEventual
case "strong":
consistencyLevel = types.QueryConsistencyLevelStrong
default:
ErrorAndExit(fmt.Sprintf("invalid query consistency level %v, valid values are \"eventual\" and \"strong\"", c.String(FlagQueryConsistencyLevel)), nil)
}
queryRequest.QueryConsistencyLevel = &consistencyLevel
}
queryResponse, err := serviceClient.QueryWorkflow(tcCtx, queryRequest)
if err != nil {
ErrorAndExit("Query workflow failed.", err)
return
}
if queryResponse.QueryRejected != nil {
fmt.Printf("Query was rejected, workflow is in state: %v\n", *queryResponse.QueryRejected.CloseStatus)
} else {
// assume it is json encoded
fmt.Print(string(queryResponse.QueryResult))
}
}
// ListWorkflow list workflow executions based on filters
func ListWorkflow(c *cli.Context) {
displayPagedWorkflows(c, filterExcludedWorkflows(c, listWorkflows(c)), !c.Bool(FlagMore))
}
// ListAllWorkflow list all workflow executions based on filters
func ListAllWorkflow(c *cli.Context) {
displayAllWorkflows(c, filterExcludedWorkflows(c, listWorkflows(c)))
}
// ScanAllWorkflow list all workflow executions using Scan API.
// It should be faster than ListAllWorkflow, but result are not sorted.
func ScanAllWorkflow(c *cli.Context) {
displayAllWorkflows(c, scanWorkflows(c))
}
func isQueryOpen(query string) bool {
var openWFPattern = regexp.MustCompile(`CloseTime[ ]*=[ ]*missing`)
return openWFPattern.MatchString(query)
}
// CountWorkflow count number of workflows
func CountWorkflow(c *cli.Context) {
wfClient := getWorkflowClient(c)
domain := getRequiredGlobalOption(c, FlagDomain)
query := c.String(FlagListQuery)
request := &types.CountWorkflowExecutionsRequest{
Domain: domain,
Query: query,
}
ctx, cancel := newContextForLongPoll(c)
defer cancel()
response, err := wfClient.CountWorkflowExecutions(ctx, request)
if err != nil {
ErrorAndExit("Failed to count workflow.", err)
}
fmt.Println(response.GetCount())
}
// ListArchivedWorkflow lists archived workflow executions based on filters
func ListArchivedWorkflow(c *cli.Context) {
printAll := c.Bool(FlagAll)
if printAll {
displayAllWorkflows(c, listArchivedWorkflows(c))
} else {
displayPagedWorkflows(c, listArchivedWorkflows(c), false)
}
}
// DescribeWorkflow show information about the specified workflow execution
func DescribeWorkflow(c *cli.Context) {
wid := getRequiredOption(c, FlagWorkflowID)
rid := c.String(FlagRunID)
describeWorkflowHelper(c, wid, rid)
}
// DescribeWorkflowWithID show information about the specified workflow execution
func DescribeWorkflowWithID(c *cli.Context) {
if !c.Args().Present() {
ErrorAndExit("Argument workflow_id is required.", nil)
}
wid := c.Args().First()
rid := ""
if c.NArg() >= 2 {
rid = c.Args().Get(1)
}
describeWorkflowHelper(c, wid, rid)
}
func describeWorkflowHelper(c *cli.Context, wid, rid string) {
frontendClient := cFactory.ServerFrontendClient(c)
domain := getRequiredGlobalOption(c, FlagDomain)
printRaw := c.Bool(FlagPrintRaw) // printRaw is false by default,
// and will show datetime and decoded search attributes instead of raw timestamp and byte arrays
printResetPointsOnly := c.Bool(FlagResetPointsOnly)
ctx, cancel := newContext(c)
defer cancel()
resp, err := frontendClient.DescribeWorkflowExecution(ctx, &types.DescribeWorkflowExecutionRequest{
Domain: domain,
Execution: &types.WorkflowExecution{
WorkflowID: wid,
RunID: rid,
},
})
if err != nil {
ErrorAndExit("Describe workflow execution failed", err)
}
if printResetPointsOnly {
printAutoResetPoints(resp)
return
}
var o interface{}
if printRaw {
o = resp
} else {
o = convertDescribeWorkflowExecutionResponse(resp, frontendClient, c)
}
prettyPrintJSONObject(o)
}
type AutoResetPointRow struct {
BinaryChecksum string `header:"Binary Checksum"`
CreateTime time.Time `header:"Create Time"`
RunID string `header:"RunID"`
EventID int64 `header:"EventID"`
}
func printAutoResetPoints(resp *types.DescribeWorkflowExecutionResponse) {
fmt.Println("Auto Reset Points:")
table := []AutoResetPointRow{}
if resp.WorkflowExecutionInfo.AutoResetPoints == nil || len(resp.WorkflowExecutionInfo.AutoResetPoints.Points) == 0 {
return
}
for _, pt := range resp.WorkflowExecutionInfo.AutoResetPoints.Points {
table = append(table, AutoResetPointRow{
BinaryChecksum: pt.GetBinaryChecksum(),
CreateTime: time.Unix(0, pt.GetCreatedTimeNano()),
RunID: pt.GetRunID(),
EventID: pt.GetFirstDecisionCompletedID(),
})
}
RenderTable(os.Stdout, table, RenderOptions{Color: true, Border: true, PrintDateTime: true})
}
// describeWorkflowExecutionResponse is used to print datetime instead of print raw time
type describeWorkflowExecutionResponse struct {
ExecutionConfiguration *types.WorkflowExecutionConfiguration
WorkflowExecutionInfo workflowExecutionInfo
PendingActivities []*pendingActivityInfo
PendingChildren []*types.PendingChildExecutionInfo
PendingDecision *pendingDecisionInfo
}
// workflowExecutionInfo has same fields as types.WorkflowExecutionInfo, but has datetime instead of raw time
type workflowExecutionInfo struct {
Execution *types.WorkflowExecution
Type *types.WorkflowType
StartTime *string // change from *int64
CloseTime *string // change from *int64
CloseStatus *types.WorkflowExecutionCloseStatus
HistoryLength int64
ParentDomainID *string
ParentExecution *types.WorkflowExecution
Memo *types.Memo
SearchAttributes map[string]interface{}
AutoResetPoints *types.ResetPoints
PartitionConfig map[string]string
}
// pendingActivityInfo has same fields as types.PendingActivityInfo, but different field type for better display
type pendingActivityInfo struct {
ActivityID string
ActivityType *types.ActivityType
State *types.PendingActivityState
ScheduledTimestamp *string `json:",omitempty"` // change from *int64
LastStartedTimestamp *string `json:",omitempty"` // change from *int64
HeartbeatDetails *string `json:",omitempty"` // change from []byte
LastHeartbeatTimestamp *string `json:",omitempty"` // change from *int64
Attempt int32 `json:",omitempty"`
MaximumAttempts int32 `json:",omitempty"`
ExpirationTimestamp *string `json:",omitempty"` // change from *int64
LastFailureReason *string `json:",omitempty"`
LastWorkerIdentity string `json:",omitempty"`
LastFailureDetails *string `json:",omitempty"` // change from []byte
}
type pendingDecisionInfo struct {
State *types.PendingDecisionState
OriginalScheduledTimestamp *string `json:",omitempty"` // change from *int64
ScheduledTimestamp *string `json:",omitempty"` // change from *int64
StartedTimestamp *string `json:",omitempty"` // change from *int64
Attempt int64 `json:",omitempty"`
}
func convertDescribeWorkflowExecutionResponse(resp *types.DescribeWorkflowExecutionResponse,
wfClient frontend.Client, c *cli.Context) *describeWorkflowExecutionResponse {
info := resp.WorkflowExecutionInfo
executionInfo := workflowExecutionInfo{
Execution: info.Execution,
Type: info.Type,
StartTime: common.StringPtr(convertTime(info.GetStartTime(), false)),
CloseTime: common.StringPtr(convertTime(info.GetCloseTime(), false)),
CloseStatus: info.CloseStatus,
HistoryLength: info.HistoryLength,
ParentDomainID: info.ParentDomainID,
ParentExecution: info.ParentExecution,
Memo: info.Memo,
SearchAttributes: convertSearchAttributesToMapOfInterface(info.SearchAttributes, wfClient, c),
AutoResetPoints: info.AutoResetPoints,
PartitionConfig: info.PartitionConfig,
}
var pendingActs []*pendingActivityInfo
var tmpAct *pendingActivityInfo
for _, pa := range resp.PendingActivities {
tmpAct = &pendingActivityInfo{
ActivityID: pa.ActivityID,
ActivityType: pa.ActivityType,
State: pa.State,
ScheduledTimestamp: timestampPtrToStringPtr(pa.ScheduledTimestamp, false),
LastStartedTimestamp: timestampPtrToStringPtr(pa.LastStartedTimestamp, false),
LastHeartbeatTimestamp: timestampPtrToStringPtr(pa.LastHeartbeatTimestamp, false),
Attempt: pa.Attempt,
MaximumAttempts: pa.MaximumAttempts,
ExpirationTimestamp: timestampPtrToStringPtr(pa.ExpirationTimestamp, false),
LastFailureReason: pa.LastFailureReason,
LastWorkerIdentity: pa.LastWorkerIdentity,
}
if pa.HeartbeatDetails != nil {
tmpAct.HeartbeatDetails = common.StringPtr(string(pa.HeartbeatDetails))
}
if pa.LastFailureDetails != nil {
tmpAct.LastFailureDetails = common.StringPtr(string(pa.LastFailureDetails))
}
pendingActs = append(pendingActs, tmpAct)
}
var pendingDecision *pendingDecisionInfo
if resp.PendingDecision != nil {
pendingDecision = &pendingDecisionInfo{
State: resp.PendingDecision.State,
ScheduledTimestamp: timestampPtrToStringPtr(resp.PendingDecision.ScheduledTimestamp, false),
StartedTimestamp: timestampPtrToStringPtr(resp.PendingDecision.StartedTimestamp, false),
Attempt: resp.PendingDecision.Attempt,
}
// TODO: Idea here is only display decision task original scheduled timestamp if user are
// using decision heartbeat. And we should be able to tell whether a decision task has heartbeat
// or not by comparing the original scheduled timestamp and scheduled timestamp.
// However, currently server may assign different value to original scheduled timestamp and
// scheduled time even if there's no decision heartbeat.
// if resp.PendingDecision.OriginalScheduledTimestamp != nil &&
// resp.PendingDecision.ScheduledTimestamp != nil &&
// *resp.PendingDecision.OriginalScheduledTimestamp != *resp.PendingDecision.ScheduledTimestamp {
// pendingDecision.OriginalScheduledTimestamp = timestampPtrToStringPtr(resp.PendingDecision.OriginalScheduledTimestamp, false)
// }
}
return &describeWorkflowExecutionResponse{
ExecutionConfiguration: resp.ExecutionConfiguration,
WorkflowExecutionInfo: executionInfo,
PendingActivities: pendingActs,
PendingChildren: resp.PendingChildren,
PendingDecision: pendingDecision,
}
}
func convertSearchAttributesToMapOfInterface(searchAttributes *types.SearchAttributes,
wfClient frontend.Client, c *cli.Context) map[string]interface{} {
if searchAttributes == nil || len(searchAttributes.GetIndexedFields()) == 0 {
return nil
}
result := make(map[string]interface{})
ctx, cancel := newContext(c)
defer cancel()
validSearchAttributes, err := wfClient.GetSearchAttributes(ctx)
if err != nil {
ErrorAndExit("Error when get search attributes", err)
}
validKeys := validSearchAttributes.GetKeys()
indexedFields := searchAttributes.GetIndexedFields()
for k, v := range indexedFields {
valueType := validKeys[k]
deserializedValue, err := common.DeserializeSearchAttributeValue(v, valueType)
if err != nil {
ErrorAndExit("Error deserializing search attribute value", err)
}
result[k] = deserializedValue
}
return result
}
func getAllWorkflowIDsByQuery(c *cli.Context, query string) map[string]bool {
wfClient := getWorkflowClient(c)
pageSize := 1000
var nextPageToken []byte
var info []*types.WorkflowExecutionInfo
result := map[string]bool{}
for {
info, nextPageToken = scanWorkflowExecutions(wfClient, pageSize, nextPageToken, query, c)
for _, we := range info {
wid := we.Execution.GetWorkflowID()
result[wid] = true
}
if nextPageToken == nil {
break
}
}
return result
}
func printRunStatus(event *types.HistoryEvent) {
switch event.GetEventType() {
case types.EventTypeWorkflowExecutionCompleted:
fmt.Printf(" Status: %s\n", colorGreen("COMPLETED"))
fmt.Printf(" Output: %s\n", string(event.WorkflowExecutionCompletedEventAttributes.Result))
case types.EventTypeWorkflowExecutionFailed:
fmt.Printf(" Status: %s\n", colorRed("FAILED"))
fmt.Printf(" Reason: %s\n", event.WorkflowExecutionFailedEventAttributes.GetReason())
fmt.Printf(" Detail: %s\n", string(event.WorkflowExecutionFailedEventAttributes.Details))
case types.EventTypeWorkflowExecutionTimedOut:
fmt.Printf(" Status: %s\n", colorRed("TIMEOUT"))
fmt.Printf(" Timeout Type: %s\n", event.WorkflowExecutionTimedOutEventAttributes.GetTimeoutType())
case types.EventTypeWorkflowExecutionCanceled:
fmt.Printf(" Status: %s\n", colorRed("CANCELED"))
fmt.Printf(" Detail: %s\n", string(event.WorkflowExecutionCanceledEventAttributes.Details))
}
}
// WorkflowRow is a presentation layer entity use to render a table of workflows
type WorkflowRow struct {
WorkflowType string `header:"Workflow Type" maxLength:"32"`
WorkflowID string `header:"Workflow ID"`
RunID string `header:"Run ID"`
TaskList string `header:"Task List"`
IsCron bool `header:"Is Cron"`
StartTime time.Time `header:"Start Time"`
ExecutionTime time.Time `header:"Execution Time"`
EndTime time.Time `header:"End Time"`
CloseStatus string `header:"Close Status"`
HistoryLength int64 `header:"History Length"`
UpdateTime time.Time `header:"Update Time"`
Memo map[string]string `header:"Memo"`
SearchAttributes map[string]interface{} `header:"Search Attributes"`
}
func newWorkflowRow(workflow *types.WorkflowExecutionInfo) WorkflowRow {
memo := map[string]string{}
for k, v := range workflow.Memo.GetFields() {
memo[k] = string(v)
}
sa := map[string]interface{}{}
for k, v := range workflow.SearchAttributes.GetIndexedFields() {
var decodedVal interface{}
json.Unmarshal(v, &decodedVal)
sa[k] = decodedVal
}
return WorkflowRow{
WorkflowType: workflow.Type.GetName(),
WorkflowID: workflow.Execution.GetWorkflowID(),
RunID: workflow.Execution.GetRunID(),
TaskList: workflow.TaskList,
IsCron: workflow.IsCron,
StartTime: time.Unix(0, workflow.GetStartTime()),
ExecutionTime: time.Unix(0, workflow.GetExecutionTime()),
EndTime: time.Unix(0, workflow.GetCloseTime()),
UpdateTime: time.Unix(0, workflow.GetUpdateTime()),
CloseStatus: workflow.GetCloseStatus().String(),
HistoryLength: workflow.HistoryLength,
Memo: memo,
SearchAttributes: sa,
}
}
func workflowTableOptions(c *cli.Context) RenderOptions {
isScanQueryOpen := isQueryOpen(c.String(FlagListQuery))
return RenderOptions{
DefaultTemplate: templateTable,
Color: true,
PrintDateTime: c.Bool(FlagPrintDateTime),
PrintRawTime: c.Bool(FlagPrintRawTime),
OptionalColumns: map[string]bool{
"End Time": !(c.Bool(FlagOpen) || isScanQueryOpen),
"Memo": c.Bool(FlagPrintMemo),
"Search Attributes": c.Bool(FlagPrintSearchAttr),
},
}
}
type getWorkflowPageFn func([]byte) ([]*types.WorkflowExecutionInfo, []byte)
func getAllWorkflows(getWorkflowPage getWorkflowPageFn) []*types.WorkflowExecutionInfo {
var all, page []*types.WorkflowExecutionInfo
var nextPageToken []byte
for {
page, nextPageToken = getWorkflowPage(nextPageToken)
all = append(all, page...)
if len(nextPageToken) == 0 {
break
}
}
return all
}
func filterExcludedWorkflows(c *cli.Context, getWorkflowPage getWorkflowPageFn) getWorkflowPageFn {
excludeWIDs := map[string]bool{}
if c.IsSet(FlagListQuery) && c.IsSet(FlagExcludeWorkflowIDByQuery) {
excludeQuery := c.String(FlagExcludeWorkflowIDByQuery)
excludeWIDs = getAllWorkflowIDsByQuery(c, excludeQuery)
fmt.Printf("found %d workflowIDs to exclude\n", len(excludeWIDs))
}
return func(nextPageToken []byte) ([]*types.WorkflowExecutionInfo, []byte) {
page, nextPageToken := getWorkflowPage(nextPageToken)
filtered := make([]*types.WorkflowExecutionInfo, 0, len(page))
for _, workflow := range page {
if excludeWIDs[workflow.GetExecution().GetWorkflowID()] {
continue
}
filtered = append(filtered, workflow)
}
return filtered, nextPageToken
}
}
func displayPagedWorkflows(c *cli.Context, getWorkflowPage getWorkflowPageFn, firstPageOnly bool) {
var page []*types.WorkflowExecutionInfo
var nextPageToken []byte
for {
page, nextPageToken = getWorkflowPage(nextPageToken)
displayWorkflows(c, page)
if firstPageOnly {
break
}
if len(nextPageToken) == 0 {
break
}
if !showNextPage() {
break
}
}
}
func displayAllWorkflows(c *cli.Context, getWorkflowsPage getWorkflowPageFn) {
displayWorkflows(c, getAllWorkflows(getWorkflowsPage))
}
func displayWorkflows(c *cli.Context, workflows []*types.WorkflowExecutionInfo) {
printJSON := c.Bool(FlagPrintJSON)
printDecodedRaw := c.Bool(FlagPrintFullyDetail)
if printJSON || printDecodedRaw {
fmt.Println("[")
printListResults(workflows, printJSON, false)
fmt.Println("]")
} else {
tableOptions := workflowTableOptions(c)
var table []WorkflowRow
for _, workflow := range workflows {
table = append(table, newWorkflowRow(workflow))
}
Render(c, table, tableOptions)
}
}
func listWorkflowExecutions(client frontend.Client, pageSize int, domain, query string, c *cli.Context) getWorkflowPageFn {
return func(nextPageToken []byte) ([]*types.WorkflowExecutionInfo, []byte) {
request := &types.ListWorkflowExecutionsRequest{
Domain: domain,
PageSize: int32(pageSize),
NextPageToken: nextPageToken,
Query: query,
}
ctx, cancel := newContextForLongPoll(c)
defer cancel()
response, err := client.ListWorkflowExecutions(ctx, request)
if err != nil {
ErrorAndExit("Failed to list workflow.", err)
}
return response.Executions, response.NextPageToken
}
}
func listOpenWorkflow(client frontend.Client, pageSize int, earliestTime, latestTime int64, domain, workflowID, workflowType string, c *cli.Context) getWorkflowPageFn {
return func(nextPageToken []byte) ([]*types.WorkflowExecutionInfo, []byte) {
request := &types.ListOpenWorkflowExecutionsRequest{
Domain: domain,
MaximumPageSize: int32(pageSize),
NextPageToken: nextPageToken,
StartTimeFilter: &types.StartTimeFilter{
EarliestTime: common.Int64Ptr(earliestTime),
LatestTime: common.Int64Ptr(latestTime),
},
}
if len(workflowID) > 0 {
request.ExecutionFilter = &types.WorkflowExecutionFilter{WorkflowID: workflowID}
}
if len(workflowType) > 0 {
request.TypeFilter = &types.WorkflowTypeFilter{Name: workflowType}
}
ctx, cancel := newContextForLongPoll(c)
defer cancel()
response, err := client.ListOpenWorkflowExecutions(ctx, request)
if err != nil {
ErrorAndExit("Failed to list open workflow.", err)
}
return response.Executions, response.NextPageToken
}
}
func listClosedWorkflow(client frontend.Client, pageSize int, earliestTime, latestTime int64, domain, workflowID, workflowType string, workflowStatus types.WorkflowExecutionCloseStatus, c *cli.Context) getWorkflowPageFn {
return func(nextPageToken []byte) ([]*types.WorkflowExecutionInfo, []byte) {
request := &types.ListClosedWorkflowExecutionsRequest{
Domain: domain,
MaximumPageSize: int32(pageSize),
NextPageToken: nextPageToken,
StartTimeFilter: &types.StartTimeFilter{
EarliestTime: common.Int64Ptr(earliestTime),
LatestTime: common.Int64Ptr(latestTime),
},
}
if len(workflowID) > 0 {
request.ExecutionFilter = &types.WorkflowExecutionFilter{WorkflowID: workflowID}
}
if len(workflowType) > 0 {
request.TypeFilter = &types.WorkflowTypeFilter{Name: workflowType}
}
if workflowStatus != workflowStatusNotSet {
request.StatusFilter = &workflowStatus
}
ctx, cancel := newContextForLongPoll(c)
defer cancel()
response, err := client.ListClosedWorkflowExecutions(ctx, request)
if err != nil {
ErrorAndExit("Failed to list closed workflow.", err)
}
return response.Executions, response.NextPageToken
}
}
func listWorkflows(c *cli.Context) getWorkflowPageFn {
wfClient := getWorkflowClient(c)
domain := getRequiredGlobalOption(c, FlagDomain)
earliestTime := parseTime(c.String(FlagEarliestTime), 0)
latestTime := parseTime(c.String(FlagLatestTime), time.Now().UnixNano())
workflowID := c.String(FlagWorkflowID)
workflowType := c.String(FlagWorkflowType)
queryOpen := c.Bool(FlagOpen)
pageSize := c.Int(FlagPageSize)
if pageSize <= 0 {
pageSize = defaultPageSizeForList
}
var workflowStatus types.WorkflowExecutionCloseStatus
if c.IsSet(FlagWorkflowStatus) {
if queryOpen {
ErrorAndExit(optionErr, errors.New("you can only filter on status for closed workflow, not open workflow"))
}
workflowStatus = getWorkflowStatus(c.String(FlagWorkflowStatus))
} else {
workflowStatus = workflowStatusNotSet
}
if len(workflowID) > 0 && len(workflowType) > 0 {
ErrorAndExit(optionErr, errors.New("you can filter on workflow_id or workflow_type, but not on both"))
}
ctx, cancel := newContextForLongPoll(c)
defer cancel()
resp, err := wfClient.CountWorkflowExecutions(
ctx,
&types.CountWorkflowExecutionsRequest{
Domain: domain,
Query: c.String(FlagListQuery),
},
)
if err == nil {
fmt.Printf("Fetching %v workflows...\n", resp.GetCount())
}
if c.IsSet(FlagListQuery) {
listQuery := c.String(FlagListQuery)
return listWorkflowExecutions(wfClient, pageSize, domain, listQuery, c)
} else if queryOpen {
return listOpenWorkflow(wfClient, pageSize, earliestTime, latestTime, domain, workflowID, workflowType, c)
} else {
return listClosedWorkflow(wfClient, pageSize, earliestTime, latestTime, domain, workflowID, workflowType, workflowStatus, c)
}
}
func listArchivedWorkflows(c *cli.Context) getWorkflowPageFn {
wfClient := getWorkflowClient(c)
domain := getRequiredGlobalOption(c, FlagDomain)
pageSize := c.Int(FlagPageSize)
listQuery := getRequiredOption(c, FlagListQuery)
if pageSize <= 0 {
pageSize = defaultPageSizeForList
}
contextTimeout := defaultContextTimeoutForListArchivedWorkflow
if c.GlobalIsSet(FlagContextTimeout) {
contextTimeout = time.Duration(c.GlobalInt(FlagContextTimeout)) * time.Second
}
return func(nextPageToken []byte) ([]*types.WorkflowExecutionInfo, []byte) {
request := &types.ListArchivedWorkflowExecutionsRequest{
Domain: domain,
PageSize: int32(pageSize),
Query: listQuery,
NextPageToken: nextPageToken,
}
ctx, cancel := context.WithTimeout(context.Background(), contextTimeout)
defer cancel()
result, err := wfClient.ListArchivedWorkflowExecutions(ctx, request)
if err != nil {
cancel()
ErrorAndExit("Failed to list archived workflow.", err)
}
return result.Executions, result.NextPageToken
}
}
func scanWorkflows(c *cli.Context) getWorkflowPageFn {
wfClient := getWorkflowClient(c)
listQuery := c.String(FlagListQuery)
pageSize := c.Int(FlagPageSize)
if pageSize <= 0 {
pageSize = defaultPageSizeForScan
}
return func(nextPageToken []byte) ([]*types.WorkflowExecutionInfo, []byte) {
return scanWorkflowExecutions(wfClient, pageSize, nextPageToken, listQuery, c)
}
}
func scanWorkflowExecutions(client frontend.Client, pageSize int, nextPageToken []byte, query string, c *cli.Context) ([]*types.WorkflowExecutionInfo, []byte) {
domain := getRequiredGlobalOption(c, FlagDomain)
request := &types.ListWorkflowExecutionsRequest{
Domain: domain,
PageSize: int32(pageSize),
NextPageToken: nextPageToken,
Query: query,
}
ctx, cancel := newContextForLongPoll(c)
defer cancel()
response, err := client.ScanWorkflowExecutions(ctx, request)
if err != nil {
ErrorAndExit("Failed to list workflow.", err)
}
return response.Executions, response.NextPageToken
}
func getWorkflowStatus(statusStr string) types.WorkflowExecutionCloseStatus {
if status, ok := workflowClosedStatusMap[strings.ToLower(statusStr)]; ok {
return status
}
ErrorAndExit(optionErr, errors.New("option status is not one of allowed values "+
"[completed, failed, canceled, terminated, continued_as_new, timed_out]"))
return 0
}
func getWorkflowIDReusePolicy(value int) *types.WorkflowIDReusePolicy {
if value >= 0 && types.WorkflowIDReusePolicy(value) <= types.WorkflowIDReusePolicyTerminateIfRunning {
return types.WorkflowIDReusePolicy(value).Ptr()
}
// At this point, the policy should return if the value is valid
ErrorAndExit(fmt.Sprintf("Option %v value is not in supported range.", FlagWorkflowIDReusePolicy), nil)
return nil
}
// default will print decoded raw
func printListResults(executions []*types.WorkflowExecutionInfo, inJSON bool, more bool) {
for i, execution := range executions {
if inJSON {
j, _ := json.Marshal(execution)
if more || i < len(executions)-1 {
fmt.Println(string(j) + ",")
} else {
fmt.Println(string(j))
}
} else {
if more || i < len(executions)-1 {
fmt.Println(anyToString(execution, true, 0) + ",")
} else {
fmt.Println(anyToString(execution, true, 0))
}
}
}
}
// ObserveHistory show the process of running workflow
func ObserveHistory(c *cli.Context) {
wid := getRequiredOption(c, FlagWorkflowID)
rid := c.String(FlagRunID)
domain := getRequiredGlobalOption(c, FlagDomain)
printWorkflowProgress(c, domain, wid, rid)
}
// ResetWorkflow reset workflow
func ResetWorkflow(c *cli.Context) {
domain := getRequiredGlobalOption(c, FlagDomain)
wid := getRequiredOption(c, FlagWorkflowID)
reason := getRequiredOption(c, FlagReason)
if len(reason) == 0 {
ErrorAndExit("wrong reason", fmt.Errorf("reason cannot be empty"))
}
eventID := c.Int64(FlagEventID)
resetType := c.String(FlagResetType)
decisionOffset := c.Int(FlagDecisionOffset)
if decisionOffset > 0 {
ErrorAndExit("Only decision offset <=0 is supported", nil)
}
extraForResetType, ok := resetTypesMap[resetType]
if !ok && eventID <= 0 {
ErrorAndExit("Must specify valid eventID or valid resetType", nil)
}
if ok && len(extraForResetType) > 0 {
getRequiredOption(c, extraForResetType)
}
ctx, cancel := newContext(c)
defer cancel()
frontendClient := cFactory.ServerFrontendClient(c)
rid := c.String(FlagRunID)
var err error
if rid == "" {
rid, err = getCurrentRunID(ctx, domain, wid, frontendClient)
if err != nil {
ErrorAndExit("Cannot get latest RunID as default", err)
}
}
resetBaseRunID := rid
decisionFinishID := eventID
if resetType != "" {
resetBaseRunID, decisionFinishID, err = getResetEventIDByType(ctx, c, resetType, decisionOffset, domain, wid, rid, frontendClient)
if err != nil {
ErrorAndExit("getResetEventIDByType failed", err)
}
}
resp, err := frontendClient.ResetWorkflowExecution(ctx, &types.ResetWorkflowExecutionRequest{
Domain: domain,
WorkflowExecution: &types.WorkflowExecution{
WorkflowID: wid,
RunID: resetBaseRunID,
},
Reason: fmt.Sprintf("%v:%v", getCurrentUserFromEnv(), reason),
DecisionFinishEventID: decisionFinishID,
RequestID: uuid.New(),
SkipSignalReapply: c.Bool(FlagSkipSignalReapply),
})
if err != nil {
ErrorAndExit("reset failed", err)
}
prettyPrintJSONObject(resp)
}
func processResets(c *cli.Context, domain string, wes chan types.WorkflowExecution, done chan bool, wg *sync.WaitGroup, params batchResetParamsType) {
for {
select {
case we := <-wes:
fmt.Println("received: ", we.GetWorkflowID(), we.GetRunID())
wid := we.GetWorkflowID()
rid := we.GetRunID()
var err error
for i := 0; i < 3; i++ {
err = doReset(c, domain, wid, rid, params)
if err == nil {
break
}
if _, ok := err.(*types.BadRequestError); ok {
break
}
fmt.Println("failed and retry...: ", wid, rid, err)
time.Sleep(time.Millisecond * time.Duration(rand.Intn(2000)))
}
time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))
if err != nil {
fmt.Println("[ERROR] failed processing: ", wid, rid, err.Error())
}
case <-done:
wg.Done()
return
}
}
}
type batchResetParamsType struct {
reason string
skipCurrentOpen bool
skipCurrentCompleted bool
nonDeterministicOnly bool
skipBaseNotCurrent bool
dryRun bool
resetType string
decisionOffset int
skipSignalReapply bool
}
// ResetInBatch resets workflow in batch
func ResetInBatch(c *cli.Context) {
domain := getRequiredGlobalOption(c, FlagDomain)
resetType := getRequiredOption(c, FlagResetType)
decisionOffset := c.Int(FlagDecisionOffset)
if decisionOffset > 0 {
ErrorAndExit("Only decision offset <=0 is supported", nil)
}
inFileName := c.String(FlagInputFile)
query := c.String(FlagListQuery)
excludeFileName := c.String(FlagExcludeFile)
excludeQuery := c.String(FlagExcludeWorkflowIDByQuery)
separator := c.String(FlagInputSeparator)
parallel := c.Int(FlagParallismDeprecated)
if parallel == 1 {
parallel = c.Int(FlagParallelism)
}
extraForResetType, ok := resetTypesMap[resetType]
if !ok {
ErrorAndExit("Not supported reset type", nil)
} else if len(extraForResetType) > 0 {
getRequiredOption(c, extraForResetType)
}
if excludeFileName != "" && excludeQuery != "" {
ErrorAndExit("Only one of the excluding option is allowed", nil)
}
batchResetParams := batchResetParamsType{
reason: getRequiredOption(c, FlagReason),
skipCurrentOpen: c.Bool(FlagSkipCurrentOpen),
skipCurrentCompleted: c.Bool(FlagSkipCurrentCompleted),
nonDeterministicOnly: c.Bool(FlagNonDeterministicOnly),
skipBaseNotCurrent: c.Bool(FlagSkipBaseIsNotCurrent),
dryRun: c.Bool(FlagDryRun),
resetType: resetType,
decisionOffset: decisionOffset,
skipSignalReapply: c.Bool(FlagSkipSignalReapply),
}
if inFileName == "" && query == "" {
ErrorAndExit("Must provide input file or list query to get target workflows to reset", nil)
}
wg := &sync.WaitGroup{}
wes := make(chan types.WorkflowExecution)
done := make(chan bool)
for i := 0; i < parallel; i++ {
wg.Add(1)
go processResets(c, domain, wes, done, wg, batchResetParams)
}
// read excluded workflowIDs
excludeWIDs := map[string]bool{}
if excludeFileName != "" {
excludeWIDs = loadWorkflowIDsFromFile(excludeFileName, separator)
}
if excludeQuery != "" {
excludeWIDs = getAllWorkflowIDsByQuery(c, excludeQuery)
}
fmt.Println("num of excluded WorkflowIDs:", len(excludeWIDs))
if len(inFileName) > 0 {
inFile, err := os.Open(inFileName)
if err != nil {
ErrorAndExit("Open failed", err)
}
defer inFile.Close()
scanner := bufio.NewScanner(inFile)
idx := 0
for scanner.Scan() {
idx++
line := strings.TrimSpace(scanner.Text())
if len(line) == 0 {
fmt.Printf("line %v is empty, skipped\n", idx)
continue
}
cols := strings.Split(line, separator)
if len(cols) < 1 {
ErrorAndExit("Split failed", fmt.Errorf("line %v has less than 1 cols separated by comma, only %v ", idx, len(cols)))
}
fmt.Printf("Start processing line %v ...\n", idx)
wid := strings.TrimSpace(cols[0])
rid := ""
if len(cols) > 1 {
rid = strings.TrimSpace(cols[1])
}
if excludeWIDs[wid] {
fmt.Println("skip by exclude file: ", wid, rid)
continue
}
wes <- types.WorkflowExecution{
WorkflowID: wid,
RunID: rid,
}
}
} else {
wfClient := getWorkflowClient(c)
pageSize := 1000
var nextPageToken []byte
var result []*types.WorkflowExecutionInfo
for {
result, nextPageToken = scanWorkflowExecutions(wfClient, pageSize, nextPageToken, query, c)
for _, we := range result {
wid := we.Execution.GetWorkflowID()
rid := we.Execution.GetRunID()
if excludeWIDs[wid] {
fmt.Println("skip by exclude file: ", wid, rid)
continue
}
wes <- types.WorkflowExecution{
WorkflowID: wid,
RunID: rid,
}
}
if nextPageToken == nil {
break
}
}
}
close(done)
fmt.Println("wait for all goroutines...")
wg.Wait()
}
func loadWorkflowIDsFromFile(excludeFileName, separator string) map[string]bool {
excludeWIDs := map[string]bool{}
if len(excludeFileName) > 0 {
// This code is only used in the CLI. The input provided is from a trusted user.
// #nosec
excFile, err := os.Open(excludeFileName)
if err != nil {
ErrorAndExit("Open failed2", err)
}
defer excFile.Close()
scanner := bufio.NewScanner(excFile)
idx := 0
for scanner.Scan() {
idx++
line := strings.TrimSpace(scanner.Text())
if len(line) == 0 {
fmt.Printf("line %v is empty, skipped\n", idx)
continue
}
cols := strings.Split(line, separator)
if len(cols) < 1 {
ErrorAndExit("Split failed", fmt.Errorf("line %v has less than 1 cols separated by comma, only %v ", idx, len(cols)))
}
wid := strings.TrimSpace(cols[0])
excludeWIDs[wid] = true
}
}
return excludeWIDs
}
func printErrorAndReturn(msg string, err error) error {
fmt.Println(msg)
return err
}
func doReset(c *cli.Context, domain, wid, rid string, params batchResetParamsType) error {
ctx, cancel := newContext(c)
defer cancel()
frontendClient := cFactory.ServerFrontendClient(c)
resp, err := frontendClient.DescribeWorkflowExecution(ctx, &types.DescribeWorkflowExecutionRequest{
Domain: domain,
Execution: &types.WorkflowExecution{
WorkflowID: wid,
},
})
if err != nil {
return printErrorAndReturn("DescribeWorkflowExecution failed", err)
}
currentRunID := resp.WorkflowExecutionInfo.Execution.GetRunID()
if currentRunID != rid && params.skipBaseNotCurrent {
fmt.Println("skip because base run is different from current run: ", wid, rid, currentRunID)
return nil
}
if rid == "" {
rid = currentRunID
}
if resp.WorkflowExecutionInfo.CloseStatus == nil || resp.WorkflowExecutionInfo.CloseTime == nil {
if params.skipCurrentOpen {
fmt.Println("skip because current run is open: ", wid, rid, currentRunID)
return nil
}
}
if resp.WorkflowExecutionInfo.GetCloseStatus() == types.WorkflowExecutionCloseStatusCompleted {
if params.skipCurrentCompleted {
fmt.Println("skip because current run is completed: ", wid, rid, currentRunID)
return nil
}
}
if params.nonDeterministicOnly {
isLDN, err := isLastEventDecisionTaskFailedWithNonDeterminism(ctx, domain, wid, rid, frontendClient)
if err != nil {
return printErrorAndReturn("check isLastEventDecisionTaskFailedWithNonDeterminism failed", err)
}
if !isLDN {
fmt.Println("skip because last event is not DecisionTaskFailedWithNonDeterminism")
return nil
}
}
resetBaseRunID, decisionFinishID, err := getResetEventIDByType(ctx, c, params.resetType, params.decisionOffset, domain, wid, rid, frontendClient)
if err != nil {
return printErrorAndReturn("getResetEventIDByType failed", err)
}
fmt.Println("DecisionFinishEventId for reset:", wid, rid, resetBaseRunID, decisionFinishID)
if params.dryRun {
fmt.Printf("dry run to reset wid: %v, rid:%v to baseRunID:%v, eventID:%v \n", wid, rid, resetBaseRunID, decisionFinishID)
} else {
resp2, err := frontendClient.ResetWorkflowExecution(ctx, &types.ResetWorkflowExecutionRequest{
Domain: domain,
WorkflowExecution: &types.WorkflowExecution{
WorkflowID: wid,
RunID: resetBaseRunID,
},
DecisionFinishEventID: decisionFinishID,
RequestID: uuid.New(),
Reason: fmt.Sprintf("%v:%v", getCurrentUserFromEnv(), params.reason),
SkipSignalReapply: params.skipSignalReapply,
})
if err != nil {
return printErrorAndReturn("ResetWorkflowExecution failed", err)
}
fmt.Println("new runID for wid/rid is ,", wid, rid, resp2.GetRunID())
}
return nil
}
func isLastEventDecisionTaskFailedWithNonDeterminism(ctx context.Context, domain, wid, rid string, frontendClient frontend.Client) (bool, error) {
req := &types.GetWorkflowExecutionHistoryRequest{
Domain: domain,
Execution: &types.WorkflowExecution{
WorkflowID: wid,
RunID: rid,
},
MaximumPageSize: 1000,
NextPageToken: nil,
}
var firstEvent, decisionFailed *types.HistoryEvent
for {
resp, err := frontendClient.GetWorkflowExecutionHistory(ctx, req)
if err != nil {
return false, printErrorAndReturn("GetWorkflowExecutionHistory failed", err)
}
for _, e := range resp.GetHistory().GetEvents() {
if firstEvent == nil {
firstEvent = e
}
if e.GetEventType() == types.EventTypeDecisionTaskFailed {
decisionFailed = e
} else if e.GetEventType() == types.EventTypeDecisionTaskCompleted {
decisionFailed = nil
}
}
if len(resp.NextPageToken) != 0 {
req.NextPageToken = resp.NextPageToken
} else {
break
}
}
if decisionFailed != nil {
attr := decisionFailed.GetDecisionTaskFailedEventAttributes()
if attr.GetCause() == types.DecisionTaskFailedCauseWorkflowWorkerUnhandledFailure ||
strings.Contains(string(attr.GetDetails()), "nondeterministic") {
fmt.Printf("found non-deterministic workflow wid:%v, rid:%v, originalStartTime:%v \n", wid, rid, time.Unix(0, firstEvent.GetTimestamp()))
return true, nil
}
}
return false, nil
}
func getResetEventIDByType(
ctx context.Context,
c *cli.Context,
resetType string, decisionOffset int,
domain, wid, rid string,
frontendClient frontend.Client,
) (resetBaseRunID string, decisionFinishID int64, err error) {
// default to the same runID
resetBaseRunID = rid
fmt.Println("resetType:", resetType)
switch resetType {
case resetTypeLastDecisionCompleted:
decisionFinishID, err = getLastDecisionTaskByType(ctx, domain, wid, rid, frontendClient, types.EventTypeDecisionTaskCompleted, decisionOffset)
if err != nil {
return
}
case resetTypeLastContinuedAsNew:
// this reset type may change the base runID
resetBaseRunID, decisionFinishID, err = getLastContinueAsNewID(ctx, domain, wid, rid, frontendClient)
if err != nil {
return
}
case resetTypeFirstDecisionCompleted:
decisionFinishID, err = getFirstDecisionTaskByType(ctx, domain, wid, rid, frontendClient, types.EventTypeDecisionTaskCompleted)
if err != nil {
return
}
case resetTypeBadBinary:
binCheckSum := c.String(FlagResetBadBinaryChecksum)
decisionFinishID, err = getBadDecisionCompletedID(ctx, domain, wid, rid, binCheckSum, frontendClient)
if err != nil {
return
}
case resetTypeDecisionCompletedTime:
earliestTime := parseTime(c.String(FlagEarliestTime), 0)
decisionFinishID, err = getEarliestDecisionID(ctx, domain, wid, rid, earliestTime, frontendClient)
if err != nil {
return
}
case resetTypeFirstDecisionScheduled:
decisionFinishID, err = getFirstDecisionTaskByType(ctx, domain, wid, rid, frontendClient, types.EventTypeDecisionTaskScheduled)
if err != nil {
return
}
// decisionFinishID is exclusive in reset API
decisionFinishID++
case resetTypeLastDecisionScheduled:
decisionFinishID, err = getLastDecisionTaskByType(ctx, domain, wid, rid, frontendClient, types.EventTypeDecisionTaskScheduled, decisionOffset)
if err != nil {
return
}
// decisionFinishID is exclusive in reset API
decisionFinishID++
default:
panic("not supported resetType")
}
return
}
func getFirstDecisionTaskByType(
ctx context.Context,
domain string,
workflowID string,
runID string,
frontendClient frontend.Client,
decisionType types.EventType,
) (decisionFinishID int64, err error) {
req := &types.GetWorkflowExecutionHistoryRequest{
Domain: domain,
Execution: &types.WorkflowExecution{
WorkflowID: workflowID,
RunID: runID,
},
MaximumPageSize: 1000,
NextPageToken: nil,
}
for {
resp, err := frontendClient.GetWorkflowExecutionHistory(ctx, req)
if err != nil {
return 0, printErrorAndReturn("GetWorkflowExecutionHistory failed", err)
}
for _, e := range resp.GetHistory().GetEvents() {
if e.GetEventType() == decisionType {
decisionFinishID = e.ID
return decisionFinishID, nil
}
}
if len(resp.NextPageToken) != 0 {
req.NextPageToken = resp.NextPageToken
} else {
break
}
}
if decisionFinishID == 0 {
return 0, printErrorAndReturn("Get DecisionFinishID failed", fmt.Errorf("no DecisionFinishID"))
}
return
}
func getCurrentRunID(ctx context.Context, domain, wid string, frontendClient frontend.Client) (string, error) {
resp, err := frontendClient.DescribeWorkflowExecution(ctx, &types.DescribeWorkflowExecutionRequest{
Domain: domain,
Execution: &types.WorkflowExecution{
WorkflowID: wid,
},
})
if err != nil {
return "", err
}
return resp.WorkflowExecutionInfo.Execution.GetRunID(), nil
}
func getBadDecisionCompletedID(ctx context.Context, domain, wid, rid, binChecksum string, frontendClient frontend.Client) (decisionFinishID int64, err error) {
resp, err := frontendClient.DescribeWorkflowExecution(ctx, &types.DescribeWorkflowExecutionRequest{
Domain: domain,
Execution: &types.WorkflowExecution{
WorkflowID: wid,
RunID: rid,
},
})
if err != nil {
return 0, printErrorAndReturn("DescribeWorkflowExecution failed", err)
}
_, p := execution.FindAutoResetPoint(clock.NewRealTimeSource(), &types.BadBinaries{
Binaries: map[string]*types.BadBinaryInfo{
binChecksum: {},
},
}, resp.WorkflowExecutionInfo.AutoResetPoints)
if p != nil {
decisionFinishID = p.GetFirstDecisionCompletedID()
}
if decisionFinishID == 0 {
return 0, printErrorAndReturn("Get DecisionFinishID failed", &types.BadRequestError{Message: "no DecisionFinishID"})
}
return
}
func getLastDecisionTaskByType(
ctx context.Context,
domain string,
workflowID string,
runID string,
frontendClient frontend.Client,
decisionType types.EventType,
decisionOffset int,
) (int64, error) {
// this fixedSizeQueue is for remembering the offset decision eventID
fixedSizeQueue := make([]int64, 0)
size := int(math.Abs(float64(decisionOffset))) + 1
req := &types.GetWorkflowExecutionHistoryRequest{
Domain: domain,
Execution: &types.WorkflowExecution{
WorkflowID: workflowID,
RunID: runID,
},
MaximumPageSize: 1000,
NextPageToken: nil,
}
for {
resp, err := frontendClient.GetWorkflowExecutionHistory(ctx, req)
if err != nil {
return 0, printErrorAndReturn("GetWorkflowExecutionHistory failed", err)
}
for _, e := range resp.GetHistory().GetEvents() {
if e.GetEventType() == decisionType {
decisionEventID := e.ID
fixedSizeQueue = append(fixedSizeQueue, decisionEventID)
if len(fixedSizeQueue) > size {
fixedSizeQueue = fixedSizeQueue[1:]
}
}
}
if len(resp.NextPageToken) != 0 {
req.NextPageToken = resp.NextPageToken
} else {
break
}
}
if len(fixedSizeQueue) == 0 {
return 0, printErrorAndReturn("Get DecisionFinishID failed", fmt.Errorf("no DecisionFinishID"))
}
return fixedSizeQueue[0], nil
}
func getLastContinueAsNewID(ctx context.Context, domain, wid, rid string, frontendClient frontend.Client) (resetBaseRunID string, decisionFinishID int64, err error) {
// get first event
req := &types.GetWorkflowExecutionHistoryRequest{
Domain: domain,
Execution: &types.WorkflowExecution{
WorkflowID: wid,
RunID: rid,
},
MaximumPageSize: 1,
NextPageToken: nil,
}
resp, err := frontendClient.GetWorkflowExecutionHistory(ctx, req)
if err != nil {
return "", 0, printErrorAndReturn("GetWorkflowExecutionHistory failed", err)
}
firstEvent := resp.History.Events[0]
resetBaseRunID = firstEvent.GetWorkflowExecutionStartedEventAttributes().GetContinuedExecutionRunID()
if resetBaseRunID == "" {
return "", 0, printErrorAndReturn("GetWorkflowExecutionHistory failed", fmt.Errorf("cannot get resetBaseRunID"))
}
req = &types.GetWorkflowExecutionHistoryRequest{
Domain: domain,
Execution: &types.WorkflowExecution{
WorkflowID: wid,
RunID: resetBaseRunID,
},
MaximumPageSize: 1000,
NextPageToken: nil,
}
for {
resp, err := frontendClient.GetWorkflowExecutionHistory(ctx, req)
if err != nil {
return "", 0, printErrorAndReturn("GetWorkflowExecutionHistory failed", err)
}
for _, e := range resp.GetHistory().GetEvents() {
if e.GetEventType() == types.EventTypeDecisionTaskCompleted {
decisionFinishID = e.ID
}
}
if len(resp.NextPageToken) != 0 {
req.NextPageToken = resp.NextPageToken
} else {
break
}
}
if decisionFinishID == 0 {
return "", 0, printErrorAndReturn("Get DecisionFinishID failed", fmt.Errorf("no DecisionFinishID"))
}
return
}
// CompleteActivity completes an activity
func CompleteActivity(c *cli.Context) {
domain := getRequiredGlobalOption(c, FlagDomain)
wid := getRequiredOption(c, FlagWorkflowID)
rid := getRequiredOption(c, FlagRunID)
activityID := getRequiredOption(c, FlagActivityID)
if len(activityID) == 0 {
ErrorAndExit("Invalid activityID", fmt.Errorf("activityID cannot be empty"))
}
result := getRequiredOption(c, FlagResult)
identity := getRequiredOption(c, FlagIdentity)
ctx, cancel := newContext(c)
defer cancel()
frontendClient := cFactory.ServerFrontendClient(c)
err := frontendClient.RespondActivityTaskCompletedByID(ctx, &types.RespondActivityTaskCompletedByIDRequest{
Domain: domain,
WorkflowID: wid,
RunID: rid,
ActivityID: activityID,
Result: []byte(result),
Identity: identity,
})
if err != nil {
ErrorAndExit("Completing activity failed", err)
} else {
fmt.Println("Complete activity successfully.")
}
}
// FailActivity fails an activity
func FailActivity(c *cli.Context) {
domain := getRequiredGlobalOption(c, FlagDomain)
wid := getRequiredOption(c, FlagWorkflowID)
rid := getRequiredOption(c, FlagRunID)
activityID := getRequiredOption(c, FlagActivityID)
if len(activityID) == 0 {
ErrorAndExit("Invalid activityID", fmt.Errorf("activityID cannot be empty"))
}
reason := getRequiredOption(c, FlagReason)
detail := getRequiredOption(c, FlagDetail)
identity := getRequiredOption(c, FlagIdentity)
ctx, cancel := newContext(c)
defer cancel()
frontendClient := cFactory.ServerFrontendClient(c)
err := frontendClient.RespondActivityTaskFailedByID(ctx, &types.RespondActivityTaskFailedByIDRequest{
Domain: domain,
WorkflowID: wid,
RunID: rid,
ActivityID: activityID,
Reason: common.StringPtr(reason),
Details: []byte(detail),
Identity: identity,
})
if err != nil {
ErrorAndExit("Failing activity failed", err)
} else {
fmt.Println("Fail activity successfully.")
}
}
// ObserveHistoryWithID show the process of running workflow
func ObserveHistoryWithID(c *cli.Context) {
domain := getRequiredGlobalOption(c, FlagDomain)
if !c.Args().Present() {
ErrorAndExit("Argument workflow_id is required.", nil)
}
wid := c.Args().First()
rid := ""
if c.NArg() >= 2 {
rid = c.Args().Get(1)
}
printWorkflowProgress(c, domain, wid, rid)
}
func getEarliestDecisionID(
ctx context.Context,
domain string, wid string,
rid string, earliestTime int64,
frontendClient frontend.Client,
) (decisionFinishID int64, err error) {
req := &types.GetWorkflowExecutionHistoryRequest{
Domain: domain,
Execution: &types.WorkflowExecution{
WorkflowID: wid,
RunID: rid,
},
MaximumPageSize: 1000,
NextPageToken: nil,
}
OuterLoop:
for {
resp, err := frontendClient.GetWorkflowExecutionHistory(ctx, req)
if err != nil {
return 0, printErrorAndReturn("GetWorkflowExecutionHistory failed", err)
}
for _, e := range resp.GetHistory().GetEvents() {
if e.GetEventType() == types.EventTypeDecisionTaskCompleted {
if e.GetTimestamp() >= earliestTime {
decisionFinishID = e.ID
break OuterLoop
}
}
}
if len(resp.NextPageToken) != 0 {
req.NextPageToken = resp.NextPageToken
} else {
break
}
}
if decisionFinishID == 0 {
return 0, printErrorAndReturn("Get DecisionFinishID failed", fmt.Errorf("no DecisionFinishID"))
}
return
}