common/persistence/nosql/nosqlplugin/cassandra/workflow_parsing_utils.go (544 lines of code) (raw):
// Copyright (c) 2021 Uber Technologies, Inc.
// Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package cassandra
import (
"time"
cql "github.com/gocql/gocql"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/checksum"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/persistence/nosql/nosqlplugin"
"github.com/uber/cadence/common/persistence/nosql/nosqlplugin/cassandra/gocql"
"github.com/uber/cadence/common/types"
)
var _emptyUUID = cql.UUID{}
func parseWorkflowExecutionInfo(result map[string]interface{}) *persistence.InternalWorkflowExecutionInfo {
info := &persistence.InternalWorkflowExecutionInfo{}
var completionEventData []byte
var completionEventEncoding common.EncodingType
var autoResetPoints []byte
var autoResetPointsEncoding common.EncodingType
for k, v := range result {
switch k {
case "domain_id":
info.DomainID = v.(gocql.UUID).String()
case "workflow_id":
info.WorkflowID = v.(string)
case "run_id":
info.RunID = v.(gocql.UUID).String()
case "first_run_id":
info.FirstExecutionRunID = v.(gocql.UUID).String()
if info.FirstExecutionRunID == _emptyUUID.String() {
// for backward compatibility, the gocql library doesn't handle the null uuid correectly https://github.com/gocql/gocql/blob/master/marshal.go#L1807
info.FirstExecutionRunID = ""
} else if info.FirstExecutionRunID == emptyRunID {
info.FirstExecutionRunID = ""
}
case "parent_domain_id":
info.ParentDomainID = v.(gocql.UUID).String()
if info.ParentDomainID == emptyDomainID {
info.ParentDomainID = ""
}
case "parent_workflow_id":
info.ParentWorkflowID = v.(string)
case "parent_run_id":
info.ParentRunID = v.(gocql.UUID).String()
if info.ParentRunID == emptyRunID {
info.ParentRunID = ""
}
case "initiated_id":
info.InitiatedID = v.(int64)
case "completion_event_batch_id":
info.CompletionEventBatchID = v.(int64)
case "completion_event":
completionEventData = v.([]byte)
case "completion_event_data_encoding":
completionEventEncoding = common.EncodingType(v.(string))
case "auto_reset_points":
autoResetPoints = v.([]byte)
case "auto_reset_points_encoding":
autoResetPointsEncoding = common.EncodingType(v.(string))
case "task_list":
info.TaskList = v.(string)
case "workflow_type_name":
info.WorkflowTypeName = v.(string)
case "workflow_timeout":
info.WorkflowTimeout = common.SecondsToDuration(int64(v.(int)))
case "decision_task_timeout":
info.DecisionStartToCloseTimeout = common.SecondsToDuration(int64(v.(int)))
case "execution_context":
info.ExecutionContext = v.([]byte)
case "state":
info.State = v.(int)
case "close_status":
info.CloseStatus = v.(int)
case "last_first_event_id":
info.LastFirstEventID = v.(int64)
case "last_event_task_id":
info.LastEventTaskID = v.(int64)
case "next_event_id":
info.NextEventID = v.(int64)
case "last_processed_event":
info.LastProcessedEvent = v.(int64)
case "start_time":
info.StartTimestamp = v.(time.Time)
case "last_updated_time":
info.LastUpdatedTimestamp = v.(time.Time)
case "create_request_id":
info.CreateRequestID = v.(gocql.UUID).String()
case "signal_count":
info.SignalCount = int32(v.(int))
case "history_size":
info.HistorySize = v.(int64)
case "decision_version":
info.DecisionVersion = v.(int64)
case "decision_schedule_id":
info.DecisionScheduleID = v.(int64)
case "decision_started_id":
info.DecisionStartedID = v.(int64)
case "decision_request_id":
info.DecisionRequestID = v.(string)
case "decision_timeout":
info.DecisionTimeout = common.SecondsToDuration(int64(v.(int)))
case "decision_attempt":
info.DecisionAttempt = v.(int64)
case "decision_timestamp":
info.DecisionStartedTimestamp = time.Unix(0, v.(int64))
case "decision_scheduled_timestamp":
info.DecisionScheduledTimestamp = time.Unix(0, v.(int64))
case "decision_original_scheduled_timestamp":
info.DecisionOriginalScheduledTimestamp = time.Unix(0, v.(int64))
case "cancel_requested":
info.CancelRequested = v.(bool)
case "cancel_request_id":
info.CancelRequestID = v.(string)
case "sticky_task_list":
info.StickyTaskList = v.(string)
case "sticky_schedule_to_start_timeout":
info.StickyScheduleToStartTimeout = common.SecondsToDuration(int64(v.(int)))
case "client_library_version":
info.ClientLibraryVersion = v.(string)
case "client_feature_version":
info.ClientFeatureVersion = v.(string)
case "client_impl":
info.ClientImpl = v.(string)
case "attempt":
info.Attempt = int32(v.(int))
case "has_retry_policy":
info.HasRetryPolicy = v.(bool)
case "init_interval":
info.InitialInterval = common.SecondsToDuration(int64(v.(int)))
case "backoff_coefficient":
info.BackoffCoefficient = v.(float64)
case "max_interval":
info.MaximumInterval = common.SecondsToDuration(int64(v.(int)))
case "max_attempts":
info.MaximumAttempts = int32(v.(int))
case "expiration_time":
info.ExpirationTime = v.(time.Time)
case "non_retriable_errors":
info.NonRetriableErrors = v.([]string)
case "branch_token":
info.BranchToken = v.([]byte)
case "cron_schedule":
info.CronSchedule = v.(string)
case "expiration_seconds":
info.ExpirationInterval = common.SecondsToDuration(int64(v.(int)))
case "search_attributes":
info.SearchAttributes = v.(map[string][]byte)
case "memo":
info.Memo = v.(map[string][]byte)
case "partition_config":
info.PartitionConfig = v.(map[string]string)
}
}
info.CompletionEvent = persistence.NewDataBlob(completionEventData, completionEventEncoding)
info.AutoResetPoints = persistence.NewDataBlob(autoResetPoints, autoResetPointsEncoding)
return info
}
// TODO: remove this after all 2DC workflows complete
func parseReplicationState(
result map[string]interface{},
) *persistence.ReplicationState {
if len(result) == 0 {
return nil
}
info := &persistence.ReplicationState{}
for k, v := range result {
switch k {
case "current_version":
info.CurrentVersion = v.(int64)
case "start_version":
info.StartVersion = v.(int64)
case "last_write_version":
info.LastWriteVersion = v.(int64)
case "last_write_event_id":
info.LastWriteEventID = v.(int64)
case "last_replication_info":
info.LastReplicationInfo = make(map[string]*persistence.ReplicationInfo)
replicationInfoMap := v.(map[string]map[string]interface{})
for key, value := range replicationInfoMap {
info.LastReplicationInfo[key] = parseReplicationInfo(value)
}
}
}
return info
}
func parseReplicationInfo(
result map[string]interface{},
) *persistence.ReplicationInfo {
info := &persistence.ReplicationInfo{}
for k, v := range result {
switch k {
case "version":
info.Version = v.(int64)
case "last_event_id":
info.LastEventID = v.(int64)
}
}
return info
}
func parseActivityInfo(
domainID string,
result map[string]interface{},
) *persistence.InternalActivityInfo {
info := &persistence.InternalActivityInfo{}
var sharedEncoding common.EncodingType
var scheduledEventData, startedEventData []byte
for k, v := range result {
switch k {
case "version":
info.Version = v.(int64)
case "schedule_id":
info.ScheduleID = v.(int64)
case "scheduled_event_batch_id":
info.ScheduledEventBatchID = v.(int64)
case "scheduled_event":
scheduledEventData = v.([]byte)
case "scheduled_time":
info.ScheduledTime = v.(time.Time)
case "started_id":
info.StartedID = v.(int64)
case "started_event":
startedEventData = v.([]byte)
case "started_time":
info.StartedTime = v.(time.Time)
case "activity_id":
info.ActivityID = v.(string)
case "request_id":
info.RequestID = v.(string)
case "details":
info.Details = v.([]byte)
case "schedule_to_start_timeout":
info.ScheduleToStartTimeout = common.SecondsToDuration(int64(v.(int)))
case "schedule_to_close_timeout":
info.ScheduleToCloseTimeout = common.SecondsToDuration(int64(v.(int)))
case "start_to_close_timeout":
info.StartToCloseTimeout = common.SecondsToDuration(int64(v.(int)))
case "heart_beat_timeout":
info.HeartbeatTimeout = common.SecondsToDuration(int64(v.(int)))
case "cancel_requested":
info.CancelRequested = v.(bool)
case "cancel_request_id":
info.CancelRequestID = v.(int64)
case "last_hb_updated_time":
info.LastHeartBeatUpdatedTime = v.(time.Time)
case "timer_task_status":
info.TimerTaskStatus = int32(v.(int))
case "attempt":
info.Attempt = int32(v.(int))
case "task_list":
info.TaskList = v.(string)
case "started_identity":
info.StartedIdentity = v.(string)
case "has_retry_policy":
info.HasRetryPolicy = v.(bool)
case "init_interval":
info.InitialInterval = common.SecondsToDuration(int64(v.(int)))
case "backoff_coefficient":
info.BackoffCoefficient = v.(float64)
case "max_interval":
info.MaximumInterval = common.SecondsToDuration(int64(v.(int)))
case "max_attempts":
info.MaximumAttempts = (int32)(v.(int))
case "expiration_time":
info.ExpirationTime = v.(time.Time)
case "non_retriable_errors":
info.NonRetriableErrors = v.([]string)
case "last_failure_reason":
info.LastFailureReason = v.(string)
case "last_worker_identity":
info.LastWorkerIdentity = v.(string)
case "last_failure_details":
info.LastFailureDetails = v.([]byte)
case "event_data_encoding":
sharedEncoding = common.EncodingType(v.(string))
}
}
info.DomainID = domainID
info.ScheduledEvent = persistence.NewDataBlob(scheduledEventData, sharedEncoding)
info.StartedEvent = persistence.NewDataBlob(startedEventData, sharedEncoding)
return info
}
func parseTimerInfo(
result map[string]interface{},
) *persistence.TimerInfo {
info := &persistence.TimerInfo{}
for k, v := range result {
switch k {
case "version":
info.Version = v.(int64)
case "timer_id":
info.TimerID = v.(string)
case "started_id":
info.StartedID = v.(int64)
case "expiry_time":
info.ExpiryTime = v.(time.Time)
case "task_id":
// task_id is a misleading variable, it actually serves
// the purpose of indicating whether a timer task is
// generated for this timer info
info.TaskStatus = v.(int64)
}
}
return info
}
func parseChildExecutionInfo(
result map[string]interface{},
) *persistence.InternalChildExecutionInfo {
info := &persistence.InternalChildExecutionInfo{}
var encoding common.EncodingType
var initiatedData []byte
var startedData []byte
for k, v := range result {
switch k {
case "version":
info.Version = v.(int64)
case "initiated_id":
info.InitiatedID = v.(int64)
case "initiated_event_batch_id":
info.InitiatedEventBatchID = v.(int64)
case "initiated_event":
initiatedData = v.([]byte)
case "started_id":
info.StartedID = v.(int64)
case "started_workflow_id":
info.StartedWorkflowID = v.(string)
case "started_run_id":
info.StartedRunID = v.(gocql.UUID).String()
case "started_event":
startedData = v.([]byte)
case "create_request_id":
info.CreateRequestID = v.(gocql.UUID).String()
case "event_data_encoding":
encoding = common.EncodingType(v.(string))
case "domain_id":
info.DomainID = v.(gocql.UUID).String()
if info.DomainID == _emptyUUID.String() {
// for backward compatibility, the gocql library doesn't handle the null uuid correectly https://github.com/gocql/gocql/blob/master/marshal.go#L1807
info.DomainID = ""
}
case "domain_name":
info.DomainNameDEPRECATED = v.(string)
case "workflow_type_name":
info.WorkflowTypeName = v.(string)
case "parent_close_policy":
info.ParentClosePolicy = types.ParentClosePolicy(v.(int))
}
}
info.InitiatedEvent = persistence.NewDataBlob(initiatedData, encoding)
info.StartedEvent = persistence.NewDataBlob(startedData, encoding)
return info
}
func parseRequestCancelInfo(
result map[string]interface{},
) *persistence.RequestCancelInfo {
info := &persistence.RequestCancelInfo{}
for k, v := range result {
switch k {
case "version":
info.Version = v.(int64)
case "initiated_id":
info.InitiatedID = v.(int64)
case "initiated_event_batch_id":
info.InitiatedEventBatchID = v.(int64)
case "cancel_request_id":
info.CancelRequestID = v.(string)
}
}
return info
}
func parseSignalInfo(
result map[string]interface{},
) *persistence.SignalInfo {
info := &persistence.SignalInfo{}
for k, v := range result {
switch k {
case "version":
info.Version = v.(int64)
case "initiated_id":
info.InitiatedID = v.(int64)
case "initiated_event_batch_id":
info.InitiatedEventBatchID = v.(int64)
case "signal_request_id":
info.SignalRequestID = v.(gocql.UUID).String()
case "signal_name":
info.SignalName = v.(string)
case "input":
info.Input = v.([]byte)
case "control":
info.Control = v.([]byte)
}
}
return info
}
func parseHistoryEventBatchBlob(
result map[string]interface{},
) *persistence.DataBlob {
eventBatch := &persistence.DataBlob{Encoding: common.EncodingTypeJSON}
for k, v := range result {
switch k {
case "encoding_type":
eventBatch.Encoding = common.EncodingType(v.(string))
case "data":
eventBatch.Data = v.([]byte)
}
}
return eventBatch
}
func parseTimerTaskInfo(
result map[string]interface{},
) *persistence.TimerTaskInfo {
info := &persistence.TimerTaskInfo{}
for k, v := range result {
switch k {
case "domain_id":
info.DomainID = v.(gocql.UUID).String()
case "workflow_id":
info.WorkflowID = v.(string)
case "run_id":
info.RunID = v.(gocql.UUID).String()
case "visibility_ts":
info.VisibilityTimestamp = v.(time.Time)
case "task_id":
info.TaskID = v.(int64)
case "type":
info.TaskType = v.(int)
case "timeout_type":
info.TimeoutType = v.(int)
case "event_id":
info.EventID = v.(int64)
case "schedule_attempt":
info.ScheduleAttempt = v.(int64)
case "version":
info.Version = v.(int64)
}
}
return info
}
func parseTransferTaskInfo(
result map[string]interface{},
) *persistence.TransferTaskInfo {
info := &persistence.TransferTaskInfo{}
for k, v := range result {
switch k {
case "domain_id":
info.DomainID = v.(gocql.UUID).String()
case "workflow_id":
info.WorkflowID = v.(string)
case "run_id":
info.RunID = v.(gocql.UUID).String()
case "visibility_ts":
info.VisibilityTimestamp = v.(time.Time)
case "task_id":
info.TaskID = v.(int64)
case "target_domain_id":
info.TargetDomainID = v.(gocql.UUID).String()
case "target_domain_ids":
targetDomainIDs := make(map[string]struct{})
dList := mustConvertToSlice(result["target_domain_ids"])
for _, v := range dList {
targetDomainIDs[v.(gocql.UUID).String()] = struct{}{}
}
info.TargetDomainIDs = targetDomainIDs
case "target_workflow_id":
info.TargetWorkflowID = v.(string)
case "target_run_id":
info.TargetRunID = v.(gocql.UUID).String()
if info.TargetRunID == persistence.TransferTaskTransferTargetRunID {
info.TargetRunID = ""
}
case "target_child_workflow_only":
info.TargetChildWorkflowOnly = v.(bool)
case "task_list":
info.TaskList = v.(string)
case "type":
info.TaskType = v.(int)
case "schedule_id":
info.ScheduleID = v.(int64)
case "record_visibility":
info.RecordVisibility = v.(bool)
case "version":
info.Version = v.(int64)
}
}
return info
}
func parseCrossClusterTaskInfo(
result map[string]interface{},
) *persistence.CrossClusterTaskInfo {
info := (*persistence.CrossClusterTaskInfo)(parseTransferTaskInfo(result))
if persistence.CrossClusterTaskDefaultTargetRunID == persistence.TransferTaskTransferTargetRunID {
return info
}
// incase CrossClusterTaskDefaultTargetRunID is updated and not equal to TransferTaskTransferTargetRunID
if v, ok := result["target_run_id"]; ok {
info.TargetRunID = v.(gocql.UUID).String()
if info.TargetRunID == persistence.CrossClusterTaskDefaultTargetRunID {
info.TargetRunID = ""
}
}
return info
}
func parseReplicationTaskInfo(
result map[string]interface{},
) *nosqlplugin.ReplicationTask {
info := &persistence.InternalReplicationTaskInfo{}
for k, v := range result {
switch k {
case "domain_id":
info.DomainID = v.(gocql.UUID).String()
case "workflow_id":
info.WorkflowID = v.(string)
case "run_id":
info.RunID = v.(gocql.UUID).String()
case "task_id":
info.TaskID = v.(int64)
case "type":
info.TaskType = v.(int)
case "first_event_id":
info.FirstEventID = v.(int64)
case "next_event_id":
info.NextEventID = v.(int64)
case "version":
info.Version = v.(int64)
case "scheduled_id":
info.ScheduledID = v.(int64)
case "branch_token":
info.BranchToken = v.([]byte)
case "new_run_branch_token":
info.NewRunBranchToken = v.([]byte)
case "created_time":
info.CreationTime = time.Unix(0, v.(int64))
}
}
return info
}
func parseChecksum(result map[string]interface{}) checksum.Checksum {
csum := checksum.Checksum{}
if len(result) == 0 {
return csum
}
for k, v := range result {
switch k {
case "flavor":
csum.Flavor = checksum.Flavor(v.(int))
case "version":
csum.Version = v.(int)
case "value":
csum.Value = v.([]byte)
}
}
return csum
}