func executeUpdateWorkflowBatchTransaction()

in common/persistence/nosql/nosqlplugin/cassandra/workflow_utils.go [160:273]


func executeUpdateWorkflowBatchTransaction(
	session gocql.Session,
	batch gocql.Batch,
	currentWorkflowRequest *nosqlplugin.CurrentWorkflowWriteRequest,
	previousNextEventIDCondition int64,
	shardCondition *nosqlplugin.ShardCondition,
) error {
	previous := make(map[string]interface{})
	applied, iter, err := session.MapExecuteBatchCAS(batch, previous)
	defer func() {
		if iter != nil {
			_ = iter.Close()
		}
	}()

	if err != nil {
		return err
	}

	if applied {
		return nil
	}

	requestRunID := currentWorkflowRequest.Row.RunID
	requestRangeID := shardCondition.RangeID
	requestConditionalRunID := ""
	if currentWorkflowRequest.Condition != nil {
		requestConditionalRunID = currentWorkflowRequest.Condition.GetCurrentRunID()
	}

	// There can be three reasons why the query does not get applied: the RangeID has changed, or the next_event_id or current_run_id check failed.
	// Check the row info returned by Cassandra to figure out which one it is.
	rangeIDMismatch := false
	actualRangeID := int64(0)
	nextEventIDMismatch := false
	actualNextEventID := int64(0)
	runIDMismatch := false
	actualCurrRunID := ""
	var allPrevious []map[string]interface{}

	for {
		rowType, ok := previous["type"].(int)
		if !ok {
			// This should never happen, as all our rows have the type field.
			break
		}

		runID := previous["run_id"].(gocql.UUID).String()

		if rowType == rowTypeShard {
			if actualRangeID, ok = previous["range_id"].(int64); ok && actualRangeID != requestRangeID {
				// UpdateWorkflowExecution failed because rangeID was modified
				rangeIDMismatch = true
			}
		} else if rowType == rowTypeExecution && runID == requestRunID {
			if actualNextEventID, ok = previous["next_event_id"].(int64); ok && actualNextEventID != previousNextEventIDCondition {
				// UpdateWorkflowExecution failed because next event ID is unexpected
				nextEventIDMismatch = true
			}
		} else if rowType == rowTypeExecution && runID == permanentRunID {
			// UpdateWorkflowExecution failed because current_run_id is unexpected
			if actualCurrRunID = previous["current_run_id"].(gocql.UUID).String(); requestConditionalRunID != "" && actualCurrRunID != requestConditionalRunID {
				// UpdateWorkflowExecution failed because next event ID is unexpected
				runIDMismatch = true
			}
		}

		allPrevious = append(allPrevious, previous)
		previous = make(map[string]interface{})
		if !iter.MapScan(previous) {
			// Cassandra returns the actual row that caused a condition failure, so we should always return
			// from the checks above, but just in case.
			break
		}
	}

	if rangeIDMismatch {
		return &nosqlplugin.WorkflowOperationConditionFailure{
			ShardRangeIDNotMatch: common.Int64Ptr(actualRangeID),
		}
	}

	if runIDMismatch {
		msg := fmt.Sprintf("Failed to update mutable state. requestConditionalRunID: %v, Actual Value: %v",
			requestConditionalRunID, actualCurrRunID)
		return &nosqlplugin.WorkflowOperationConditionFailure{
			CurrentWorkflowConditionFailInfo: &msg,
		}
	}

	if nextEventIDMismatch {
		msg := fmt.Sprintf("Failed to update mutable state. previousNextEventIDCondition: %v, actualNextEventID: %v, Request Current RunID: %v",
			previousNextEventIDCondition, actualNextEventID, requestRunID)
		return &nosqlplugin.WorkflowOperationConditionFailure{
			UnknownConditionFailureDetails: &msg,
		}
	}

	// At this point we only know that the write was not applied.
	var columns []string
	columnID := 0
	for _, previous := range allPrevious {
		for k, v := range previous {
			columns = append(columns, fmt.Sprintf("%v: %s=%v", columnID, k, v))
		}
		columnID++
	}

	msg := fmt.Sprintf("Failed to update mutable state. ShardID: %v, RangeID: %v, previousNextEventIDCondition: %v, requestConditionalRunID: %v, columns: (%v)",
		shardCondition.ShardID, requestRangeID, previousNextEventIDCondition, requestConditionalRunID, strings.Join(columns, ","))
	return &nosqlplugin.WorkflowOperationConditionFailure{
		UnknownConditionFailureDetails: &msg,
	}
}