tools/cli/admin_failover_commands.go (324 lines of code) (raw):
// Copyright (c) 2017-2020 Uber Technologies Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package cli
import (
"context"
"encoding/json"
"fmt"
"os/user"
"time"
"github.com/pborman/uuid"
"github.com/urfave/cli"
"github.com/uber/cadence/client/frontend"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/types"
"github.com/uber/cadence/service/worker/failovermanager"
)
const (
defaultAbortReason = "Failover aborted through admin CLI"
defaultBatchFailoverSize = 20
defaultBatchFailoverWaitTimeInSeconds = 30
defaultFailoverWorkflowTimeoutInSeconds = 1200
)
type startParams struct {
targetCluster string
sourceCluster string
batchFailoverSize int
batchFailoverWaitTimeInSeconds int
failoverWorkflowTimeout int
failoverTimeout int
domains []string
drillWaitTime int
cron string
}
// AdminFailoverStart start failover workflow
func AdminFailoverStart(c *cli.Context) {
params := &startParams{
targetCluster: getRequiredOption(c, FlagTargetCluster),
sourceCluster: getRequiredOption(c, FlagSourceCluster),
batchFailoverSize: c.Int(FlagFailoverBatchSize),
batchFailoverWaitTimeInSeconds: c.Int(FlagFailoverWaitTime),
failoverTimeout: c.Int(FlagFailoverTimeout),
failoverWorkflowTimeout: c.Int(FlagExecutionTimeout),
domains: c.StringSlice(FlagFailoverDomains),
drillWaitTime: c.Int(FlagFailoverDrillWaitTime),
cron: c.String(FlagCronSchedule),
}
failoverStart(c, params)
}
// AdminFailoverPause pause failover workflow
func AdminFailoverPause(c *cli.Context) {
err := executePauseOrResume(c, getFailoverWorkflowID(c), true)
if err != nil {
ErrorAndExit("Failed to pause failover workflow", err)
}
fmt.Println("Failover paused on " + getFailoverWorkflowID(c))
}
// AdminFailoverResume resume a paused failover workflow
func AdminFailoverResume(c *cli.Context) {
err := executePauseOrResume(c, getFailoverWorkflowID(c), false)
if err != nil {
ErrorAndExit("Failed to resume failover workflow", err)
}
fmt.Println("Failover resumed on " + getFailoverWorkflowID(c))
}
// AdminFailoverQuery query a failover workflow
func AdminFailoverQuery(c *cli.Context) {
client := getCadenceClient(c)
tcCtx, cancel := newContext(c)
defer cancel()
workflowID := getFailoverWorkflowID(c)
runID := getRunID(c)
result := query(tcCtx, client, workflowID, runID)
request := &types.DescribeWorkflowExecutionRequest{
Domain: common.SystemLocalDomainName,
Execution: &types.WorkflowExecution{
WorkflowID: workflowID,
RunID: runID,
},
}
descResp, err := client.DescribeWorkflowExecution(tcCtx, request)
if err != nil {
ErrorAndExit("Failed to describe workflow", err)
}
if isWorkflowTerminated(descResp) {
result.State = failovermanager.WorkflowAborted
}
prettyPrintJSONObject(result)
}
// AdminFailoverAbort abort a failover workflow
func AdminFailoverAbort(c *cli.Context) {
client := getCadenceClient(c)
tcCtx, cancel := newContext(c)
defer cancel()
reason := c.String(FlagReason)
if len(reason) == 0 {
reason = defaultAbortReason
}
workflowID := getFailoverWorkflowID(c)
runID := getRunID(c)
request := &types.TerminateWorkflowExecutionRequest{
Domain: common.SystemLocalDomainName,
WorkflowExecution: &types.WorkflowExecution{
WorkflowID: workflowID,
RunID: runID,
},
Reason: reason,
}
err := client.TerminateWorkflowExecution(tcCtx, request)
if err != nil {
ErrorAndExit("Failed to abort failover workflow", err)
}
fmt.Println("Failover aborted")
}
// AdminFailoverRollback rollback a failover run
func AdminFailoverRollback(c *cli.Context) {
client := getCadenceClient(c)
tcCtx, cancel := newContext(c)
defer cancel()
runID := getRunID(c)
queryResult := query(tcCtx, client, failovermanager.FailoverWorkflowID, runID)
if isWorkflowRunning(queryResult) {
request := &types.TerminateWorkflowExecutionRequest{
Domain: common.SystemLocalDomainName,
WorkflowExecution: &types.WorkflowExecution{
WorkflowID: failovermanager.FailoverWorkflowID,
RunID: runID,
},
Reason: "Rollback",
Identity: getCliIdentity(),
}
err := client.TerminateWorkflowExecution(tcCtx, request)
if err != nil {
ErrorAndExit("Failed to terminate failover workflow", err)
}
}
// query again
queryResult = query(tcCtx, client, failovermanager.FailoverWorkflowID, runID)
var rollbackDomains []string
// rollback includes both success and failed domains to make sure no leftover domains
rollbackDomains = append(rollbackDomains, queryResult.SuccessDomains...)
rollbackDomains = append(rollbackDomains, queryResult.FailedDomains...)
params := &startParams{
targetCluster: queryResult.SourceCluster,
sourceCluster: queryResult.TargetCluster,
domains: rollbackDomains,
batchFailoverSize: c.Int(FlagFailoverBatchSize),
batchFailoverWaitTimeInSeconds: c.Int(FlagFailoverWaitTime),
failoverTimeout: c.Int(FlagFailoverTimeout),
failoverWorkflowTimeout: c.Int(FlagExecutionTimeout),
}
failoverStart(c, params)
}
// AdminFailoverList list failover runs
func AdminFailoverList(c *cli.Context) {
c.Set(FlagWorkflowID, getFailoverWorkflowID(c))
c.GlobalSet(FlagDomain, common.SystemLocalDomainName)
ListWorkflow(c)
}
func query(
tcCtx context.Context,
client frontend.Client,
workflowID string,
runID string) *failovermanager.QueryResult {
request := &types.QueryWorkflowRequest{
Domain: common.SystemLocalDomainName,
Execution: &types.WorkflowExecution{
WorkflowID: workflowID,
RunID: runID,
},
Query: &types.WorkflowQuery{
QueryType: failovermanager.QueryType,
},
}
queryResp, err := client.QueryWorkflow(tcCtx, request)
if err != nil {
ErrorAndExit("Failed to query failover workflow", err)
}
if queryResp.GetQueryResult() == nil {
ErrorAndExit("QueryResult has no value", nil)
}
var queryResult failovermanager.QueryResult
err = json.Unmarshal(queryResp.GetQueryResult(), &queryResult)
if err != nil {
ErrorAndExit("Unable to deserialize QueryResult", nil)
}
return &queryResult
}
func isWorkflowRunning(queryResult *failovermanager.QueryResult) bool {
return queryResult.State == failovermanager.WorkflowRunning ||
queryResult.State == failovermanager.WorkflowPaused
}
func getCadenceClient(c *cli.Context) frontend.Client {
svcClient := cFactory.ServerFrontendClient(c)
return svcClient
}
func getRunID(c *cli.Context) string {
if c.IsSet(FlagRunID) {
return c.String(FlagRunID)
}
return ""
}
func failoverStart(c *cli.Context, params *startParams) {
validateStartParams(params)
workflowID := failovermanager.FailoverWorkflowID
targetCluster := params.targetCluster
sourceCluster := params.sourceCluster
batchFailoverSize := params.batchFailoverSize
batchFailoverWaitTimeInSeconds := params.batchFailoverWaitTimeInSeconds
workflowTimeout := int32(params.failoverWorkflowTimeout)
domains := params.domains
drillWaitTime := time.Duration(params.drillWaitTime) * time.Second
var gracefulFailoverTimeoutInSeconds *int32
if params.failoverTimeout > 0 {
gracefulFailoverTimeoutInSeconds = common.Int32Ptr(int32(params.failoverTimeout))
}
client := getCadenceClient(c)
tcCtx, cancel := newContext(c)
defer cancel()
memo, err := getWorkflowMemo(map[string]interface{}{
common.MemoKeyForOperator: getOperator(),
})
if err != nil {
ErrorAndExit("Failed to serialize memo", err)
}
request := &types.StartWorkflowExecutionRequest{
Domain: common.SystemLocalDomainName,
RequestID: uuid.New(),
WorkflowID: workflowID,
WorkflowIDReusePolicy: types.WorkflowIDReusePolicyAllowDuplicate.Ptr(),
TaskList: &types.TaskList{Name: failovermanager.TaskListName},
ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(workflowTimeout),
TaskStartToCloseTimeoutSeconds: common.Int32Ptr(defaultDecisionTimeoutInSeconds),
Memo: memo,
WorkflowType: &types.WorkflowType{Name: failovermanager.FailoverWorkflowTypeName},
}
if params.drillWaitTime > 0 {
request.WorkflowID = failovermanager.DrillWorkflowID
request.CronSchedule = params.cron
} else {
if len(params.cron) > 0 {
ErrorAndExit("The drill wait time is required when cron is specified.", nil)
}
// block if there is an on-going failover drill
if err := executePauseOrResume(c, failovermanager.DrillWorkflowID, true); err != nil {
switch err.(type) {
case *types.EntityNotExistsError:
break
case *types.WorkflowExecutionAlreadyCompletedError:
break
default:
ErrorAndExit("Failed to send pase signal to drill workflow", err)
}
}
fmt.Println("The failover drill workflow is paused. Please run 'cadence admin cluster failover resume --fd'" +
" to resume the drill workflow.")
}
foParams := failovermanager.FailoverParams{
TargetCluster: targetCluster,
SourceCluster: sourceCluster,
BatchFailoverSize: batchFailoverSize,
BatchFailoverWaitTimeInSeconds: batchFailoverWaitTimeInSeconds,
Domains: domains,
DrillWaitTime: drillWaitTime,
GracefulFailoverTimeoutInSeconds: gracefulFailoverTimeoutInSeconds,
}
input, err := json.Marshal(foParams)
if err != nil {
ErrorAndExit("Failed to serialize Failover Params", err)
}
request.Input = input
wf, err := client.StartWorkflowExecution(tcCtx, request)
if err != nil {
ErrorAndExit("Failed to start failover workflow", err)
}
fmt.Println("Failover workflow started")
fmt.Println("wid: " + workflowID)
fmt.Println("rid: " + wf.GetRunID())
}
func getFailoverWorkflowID(c *cli.Context) string {
if c.Bool(FlagFailoverDrill) {
return failovermanager.DrillWorkflowID
}
return failovermanager.FailoverWorkflowID
}
func getOperator() string {
user, err := user.Current()
if err != nil {
ErrorAndExit("Unable to get operator info", err)
}
return fmt.Sprintf("%s (username: %s)", user.Name, user.Username)
}
func isWorkflowTerminated(descResp *types.DescribeWorkflowExecutionResponse) bool {
return types.WorkflowExecutionCloseStatusTerminated.String() == descResp.GetWorkflowExecutionInfo().GetCloseStatus().String()
}
func executePauseOrResume(c *cli.Context, workflowID string, isPause bool) error {
client := getCadenceClient(c)
tcCtx, cancel := newContext(c)
defer cancel()
runID := getRunID(c)
var signalName string
if isPause {
signalName = failovermanager.PauseSignal
} else {
signalName = failovermanager.ResumeSignal
}
request := &types.SignalWorkflowExecutionRequest{
Domain: common.SystemLocalDomainName,
WorkflowExecution: &types.WorkflowExecution{
WorkflowID: workflowID,
RunID: runID,
},
SignalName: signalName,
Identity: getCliIdentity(),
}
return client.SignalWorkflowExecution(tcCtx, request)
}
func validateStartParams(params *startParams) {
if len(params.targetCluster) == 0 {
ErrorAndExit("targetCluster is not provided", nil)
}
if len(params.sourceCluster) == 0 {
ErrorAndExit("sourceCluster is not provided", nil)
}
if params.targetCluster == params.sourceCluster {
ErrorAndExit("targetCluster is same as sourceCluster", nil)
}
if params.batchFailoverSize <= 0 {
params.batchFailoverSize = defaultBatchFailoverSize
}
if params.batchFailoverWaitTimeInSeconds <= 0 {
params.batchFailoverWaitTimeInSeconds = defaultBatchFailoverWaitTimeInSeconds
}
if params.failoverWorkflowTimeout <= 0 {
params.failoverWorkflowTimeout = defaultFailoverWorkflowTimeoutInSeconds
}
}