in common/persistence/nosql/nosqlplugin/cassandra/workflow_utils.go [160:273]
func executeUpdateWorkflowBatchTransaction(
session gocql.Session,
batch gocql.Batch,
currentWorkflowRequest *nosqlplugin.CurrentWorkflowWriteRequest,
previousNextEventIDCondition int64,
shardCondition *nosqlplugin.ShardCondition,
) error {
previous := make(map[string]interface{})
applied, iter, err := session.MapExecuteBatchCAS(batch, previous)
defer func() {
if iter != nil {
_ = iter.Close()
}
}()
if err != nil {
return err
}
if applied {
return nil
}
requestRunID := currentWorkflowRequest.Row.RunID
requestRangeID := shardCondition.RangeID
requestConditionalRunID := ""
if currentWorkflowRequest.Condition != nil {
requestConditionalRunID = currentWorkflowRequest.Condition.GetCurrentRunID()
}
// There can be three reasons why the query does not get applied: the RangeID has changed, or the next_event_id or current_run_id check failed.
// Check the row info returned by Cassandra to figure out which one it is.
rangeIDMismatch := false
actualRangeID := int64(0)
nextEventIDMismatch := false
actualNextEventID := int64(0)
runIDMismatch := false
actualCurrRunID := ""
var allPrevious []map[string]interface{}
for {
rowType, ok := previous["type"].(int)
if !ok {
// This should never happen, as all our rows have the type field.
break
}
runID := previous["run_id"].(gocql.UUID).String()
if rowType == rowTypeShard {
if actualRangeID, ok = previous["range_id"].(int64); ok && actualRangeID != requestRangeID {
// UpdateWorkflowExecution failed because rangeID was modified
rangeIDMismatch = true
}
} else if rowType == rowTypeExecution && runID == requestRunID {
if actualNextEventID, ok = previous["next_event_id"].(int64); ok && actualNextEventID != previousNextEventIDCondition {
// UpdateWorkflowExecution failed because next event ID is unexpected
nextEventIDMismatch = true
}
} else if rowType == rowTypeExecution && runID == permanentRunID {
// UpdateWorkflowExecution failed because current_run_id is unexpected
if actualCurrRunID = previous["current_run_id"].(gocql.UUID).String(); requestConditionalRunID != "" && actualCurrRunID != requestConditionalRunID {
// UpdateWorkflowExecution failed because next event ID is unexpected
runIDMismatch = true
}
}
allPrevious = append(allPrevious, previous)
previous = make(map[string]interface{})
if !iter.MapScan(previous) {
// Cassandra returns the actual row that caused a condition failure, so we should always return
// from the checks above, but just in case.
break
}
}
if rangeIDMismatch {
return &nosqlplugin.WorkflowOperationConditionFailure{
ShardRangeIDNotMatch: common.Int64Ptr(actualRangeID),
}
}
if runIDMismatch {
msg := fmt.Sprintf("Failed to update mutable state. requestConditionalRunID: %v, Actual Value: %v",
requestConditionalRunID, actualCurrRunID)
return &nosqlplugin.WorkflowOperationConditionFailure{
CurrentWorkflowConditionFailInfo: &msg,
}
}
if nextEventIDMismatch {
msg := fmt.Sprintf("Failed to update mutable state. previousNextEventIDCondition: %v, actualNextEventID: %v, Request Current RunID: %v",
previousNextEventIDCondition, actualNextEventID, requestRunID)
return &nosqlplugin.WorkflowOperationConditionFailure{
UnknownConditionFailureDetails: &msg,
}
}
// At this point we only know that the write was not applied.
var columns []string
columnID := 0
for _, previous := range allPrevious {
for k, v := range previous {
columns = append(columns, fmt.Sprintf("%v: %s=%v", columnID, k, v))
}
columnID++
}
msg := fmt.Sprintf("Failed to update mutable state. ShardID: %v, RangeID: %v, previousNextEventIDCondition: %v, requestConditionalRunID: %v, columns: (%v)",
shardCondition.ShardID, requestRangeID, previousNextEventIDCondition, requestConditionalRunID, strings.Join(columns, ","))
return &nosqlplugin.WorkflowOperationConditionFailure{
UnknownConditionFailureDetails: &msg,
}
}