common/persistence/sql/sql_visibility_store.go (360 lines of code) (raw):

// Copyright (c) 2017 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. package sql import ( "context" "database/sql" "encoding/json" "fmt" "time" workflow "github.com/uber/cadence/.gen/go/shared" "github.com/uber/cadence/common" "github.com/uber/cadence/common/config" "github.com/uber/cadence/common/log" p "github.com/uber/cadence/common/persistence" "github.com/uber/cadence/common/persistence/sql/sqlplugin" "github.com/uber/cadence/common/types" "github.com/uber/cadence/common/types/mapper/thrift" ) type ( sqlVisibilityStore struct { sqlStore } visibilityPageToken struct { Time time.Time RunID string } ) // NewSQLVisibilityStore creates an instance of ExecutionStore func NewSQLVisibilityStore(cfg config.SQL, logger log.Logger) (p.VisibilityStore, error) { db, err := NewSQLDB(&cfg) if err != nil { return nil, err } return &sqlVisibilityStore{ sqlStore: sqlStore{ db: db, logger: logger, }, }, nil } func (s *sqlVisibilityStore) RecordWorkflowExecutionStarted( ctx context.Context, request *p.InternalRecordWorkflowExecutionStartedRequest, ) error { _, err := s.db.InsertIntoVisibility(ctx, &sqlplugin.VisibilityRow{ DomainID: request.DomainUUID, WorkflowID: request.WorkflowID, RunID: request.RunID, StartTime: request.StartTimestamp, ExecutionTime: request.ExecutionTimestamp, WorkflowTypeName: request.WorkflowTypeName, Memo: request.Memo.Data, Encoding: string(request.Memo.GetEncoding()), IsCron: request.IsCron, NumClusters: request.NumClusters, UpdateTime: request.UpdateTimestamp, ShardID: request.ShardID, }) if err != nil { return convertCommonErrors(s.db, "RecordWorkflowExecutionStarted", "", err) } return nil } func (s *sqlVisibilityStore) RecordWorkflowExecutionClosed( ctx context.Context, request *p.InternalRecordWorkflowExecutionClosedRequest, ) error { closeTime := request.CloseTimestamp result, err := s.db.ReplaceIntoVisibility(ctx, &sqlplugin.VisibilityRow{ DomainID: request.DomainUUID, WorkflowID: request.WorkflowID, RunID: request.RunID, StartTime: request.StartTimestamp, ExecutionTime: request.ExecutionTimestamp, WorkflowTypeName: request.WorkflowTypeName, CloseTime: &closeTime, CloseStatus: common.Int32Ptr(int32(*thrift.FromWorkflowExecutionCloseStatus(&request.Status))), HistoryLength: &request.HistoryLength, Memo: request.Memo.Data, Encoding: string(request.Memo.GetEncoding()), IsCron: request.IsCron, NumClusters: request.NumClusters, UpdateTime: request.UpdateTimestamp, ShardID: request.ShardID, }) if err != nil { return convertCommonErrors(s.db, "RecordWorkflowExecutionClosed", "", err) } noRowsAffected, err := result.RowsAffected() if err != nil { return &types.InternalServiceError{ Message: fmt.Sprintf("RecordWorkflowExecutionClosed rowsAffected error: %v", err), } } if noRowsAffected > 2 { // either adds a new row or deletes old row and adds new row return &types.InternalServiceError{ Message: fmt.Sprintf("RecordWorkflowExecutionClosed unexpected numRows (%v) updated", noRowsAffected), } } return nil } func (s *sqlVisibilityStore) RecordWorkflowExecutionUninitialized( ctx context.Context, request *p.InternalRecordWorkflowExecutionUninitializedRequest, ) error { // temporary: not implemented, only implemented for ES return nil } func (s *sqlVisibilityStore) UpsertWorkflowExecution( _ context.Context, request *p.InternalUpsertWorkflowExecutionRequest, ) error { if p.IsNopUpsertWorkflowRequest(request) { return nil } return p.ErrVisibilityOperationNotSupported } func (s *sqlVisibilityStore) ListOpenWorkflowExecutions( ctx context.Context, request *p.InternalListWorkflowExecutionsRequest, ) (*p.InternalListWorkflowExecutionsResponse, error) { return s.listWorkflowExecutions("ListOpenWorkflowExecutions", request.NextPageToken, request.EarliestTime, request.LatestTime, func(readLevel *visibilityPageToken) ([]sqlplugin.VisibilityRow, error) { return s.db.SelectFromVisibility(ctx, &sqlplugin.VisibilityFilter{ DomainID: request.DomainUUID, MinStartTime: &request.EarliestTime, MaxStartTime: &readLevel.Time, RunID: &readLevel.RunID, PageSize: &request.PageSize, }) }) } func (s *sqlVisibilityStore) ListClosedWorkflowExecutions( ctx context.Context, request *p.InternalListWorkflowExecutionsRequest, ) (*p.InternalListWorkflowExecutionsResponse, error) { return s.listWorkflowExecutions("ListClosedWorkflowExecutions", request.NextPageToken, request.EarliestTime, request.LatestTime, func(readLevel *visibilityPageToken) ([]sqlplugin.VisibilityRow, error) { return s.db.SelectFromVisibility(ctx, &sqlplugin.VisibilityFilter{ DomainID: request.DomainUUID, MinStartTime: &request.EarliestTime, MaxStartTime: &readLevel.Time, Closed: true, RunID: &readLevel.RunID, PageSize: &request.PageSize, }) }) } func (s *sqlVisibilityStore) ListOpenWorkflowExecutionsByType( ctx context.Context, request *p.InternalListWorkflowExecutionsByTypeRequest, ) (*p.InternalListWorkflowExecutionsResponse, error) { return s.listWorkflowExecutions("ListOpenWorkflowExecutionsByType", request.NextPageToken, request.EarliestTime, request.LatestTime, func(readLevel *visibilityPageToken) ([]sqlplugin.VisibilityRow, error) { return s.db.SelectFromVisibility(ctx, &sqlplugin.VisibilityFilter{ DomainID: request.DomainUUID, MinStartTime: &request.EarliestTime, MaxStartTime: &readLevel.Time, RunID: &readLevel.RunID, WorkflowTypeName: &request.WorkflowTypeName, PageSize: &request.PageSize, }) }) } func (s *sqlVisibilityStore) ListClosedWorkflowExecutionsByType( ctx context.Context, request *p.InternalListWorkflowExecutionsByTypeRequest, ) (*p.InternalListWorkflowExecutionsResponse, error) { return s.listWorkflowExecutions("ListClosedWorkflowExecutionsByType", request.NextPageToken, request.EarliestTime, request.LatestTime, func(readLevel *visibilityPageToken) ([]sqlplugin.VisibilityRow, error) { return s.db.SelectFromVisibility(ctx, &sqlplugin.VisibilityFilter{ DomainID: request.DomainUUID, MinStartTime: &request.EarliestTime, MaxStartTime: &readLevel.Time, Closed: true, RunID: &readLevel.RunID, WorkflowTypeName: &request.WorkflowTypeName, PageSize: &request.PageSize, }) }) } func (s *sqlVisibilityStore) ListOpenWorkflowExecutionsByWorkflowID( ctx context.Context, request *p.InternalListWorkflowExecutionsByWorkflowIDRequest, ) (*p.InternalListWorkflowExecutionsResponse, error) { return s.listWorkflowExecutions("ListOpenWorkflowExecutionsByWorkflowID", request.NextPageToken, request.EarliestTime, request.LatestTime, func(readLevel *visibilityPageToken) ([]sqlplugin.VisibilityRow, error) { return s.db.SelectFromVisibility(ctx, &sqlplugin.VisibilityFilter{ DomainID: request.DomainUUID, MinStartTime: &request.EarliestTime, MaxStartTime: &readLevel.Time, RunID: &readLevel.RunID, WorkflowID: &request.WorkflowID, PageSize: &request.PageSize, }) }) } func (s *sqlVisibilityStore) ListClosedWorkflowExecutionsByWorkflowID( ctx context.Context, request *p.InternalListWorkflowExecutionsByWorkflowIDRequest, ) (*p.InternalListWorkflowExecutionsResponse, error) { return s.listWorkflowExecutions("ListClosedWorkflowExecutionsByWorkflowID", request.NextPageToken, request.EarliestTime, request.LatestTime, func(readLevel *visibilityPageToken) ([]sqlplugin.VisibilityRow, error) { return s.db.SelectFromVisibility(ctx, &sqlplugin.VisibilityFilter{ DomainID: request.DomainUUID, MinStartTime: &request.EarliestTime, MaxStartTime: &readLevel.Time, Closed: true, RunID: &readLevel.RunID, WorkflowID: &request.WorkflowID, PageSize: &request.PageSize, }) }) } func (s *sqlVisibilityStore) ListClosedWorkflowExecutionsByStatus( ctx context.Context, request *p.InternalListClosedWorkflowExecutionsByStatusRequest, ) (*p.InternalListWorkflowExecutionsResponse, error) { return s.listWorkflowExecutions("ListClosedWorkflowExecutionsByStatus", request.NextPageToken, request.EarliestTime, request.LatestTime, func(readLevel *visibilityPageToken) ([]sqlplugin.VisibilityRow, error) { return s.db.SelectFromVisibility(ctx, &sqlplugin.VisibilityFilter{ DomainID: request.DomainUUID, MinStartTime: &request.EarliestTime, MaxStartTime: &readLevel.Time, Closed: true, RunID: &readLevel.RunID, CloseStatus: common.Int32Ptr(int32(*thrift.FromWorkflowExecutionCloseStatus(&request.Status))), PageSize: &request.PageSize, }) }) } func (s *sqlVisibilityStore) GetClosedWorkflowExecution( ctx context.Context, request *p.InternalGetClosedWorkflowExecutionRequest, ) (*p.InternalGetClosedWorkflowExecutionResponse, error) { execution := request.Execution rows, err := s.db.SelectFromVisibility(ctx, &sqlplugin.VisibilityFilter{ DomainID: request.DomainUUID, Closed: true, RunID: &execution.RunID, }) if err != nil { if err == sql.ErrNoRows { return nil, &types.EntityNotExistsError{ Message: fmt.Sprintf("Workflow execution not found. WorkflowId: %v, RunId: %v", execution.GetWorkflowID(), execution.GetRunID()), } } return nil, convertCommonErrors(s.db, "GetClosedWorkflowExecution", "", err) } rows[0].DomainID = request.DomainUUID rows[0].RunID = execution.GetRunID() rows[0].WorkflowID = execution.GetWorkflowID() return &p.InternalGetClosedWorkflowExecutionResponse{Execution: s.rowToInfo(&rows[0])}, nil } func (s *sqlVisibilityStore) DeleteWorkflowExecution( ctx context.Context, request *p.VisibilityDeleteWorkflowExecutionRequest, ) error { _, err := s.db.DeleteFromVisibility(ctx, &sqlplugin.VisibilityFilter{ DomainID: request.DomainID, RunID: &request.RunID, }) if err != nil { return convertCommonErrors(s.db, "DeleteWorkflowExecution", "", err) } return nil } func (s *sqlVisibilityStore) DeleteUninitializedWorkflowExecution( ctx context.Context, request *p.VisibilityDeleteWorkflowExecutionRequest, ) error { // temporary: not implemented, only implemented for ES return nil } func (s *sqlVisibilityStore) ListWorkflowExecutions( _ context.Context, _ *p.ListWorkflowExecutionsByQueryRequest, ) (*p.InternalListWorkflowExecutionsResponse, error) { return nil, p.ErrVisibilityOperationNotSupported } func (s *sqlVisibilityStore) ScanWorkflowExecutions( _ context.Context, _ *p.ListWorkflowExecutionsByQueryRequest, ) (*p.InternalListWorkflowExecutionsResponse, error) { return nil, p.ErrVisibilityOperationNotSupported } func (s *sqlVisibilityStore) CountWorkflowExecutions( _ context.Context, _ *p.CountWorkflowExecutionsRequest, ) (*p.CountWorkflowExecutionsResponse, error) { return nil, p.ErrVisibilityOperationNotSupported } func (s *sqlVisibilityStore) rowToInfo(row *sqlplugin.VisibilityRow) *p.InternalVisibilityWorkflowExecutionInfo { if row.ExecutionTime.UnixNano() == 0 { row.ExecutionTime = row.StartTime } info := &p.InternalVisibilityWorkflowExecutionInfo{ WorkflowID: row.WorkflowID, RunID: row.RunID, TypeName: row.WorkflowTypeName, StartTime: row.StartTime, ExecutionTime: row.ExecutionTime, IsCron: row.IsCron, NumClusters: row.NumClusters, Memo: p.NewDataBlob(row.Memo, common.EncodingType(row.Encoding)), UpdateTime: row.UpdateTime, ShardID: row.ShardID, } if row.CloseStatus != nil { status := workflow.WorkflowExecutionCloseStatus(*row.CloseStatus) info.Status = thrift.ToWorkflowExecutionCloseStatus(&status) info.CloseTime = *row.CloseTime info.HistoryLength = *row.HistoryLength } return info } func (s *sqlVisibilityStore) listWorkflowExecutions(opName string, pageToken []byte, earliestTime time.Time, latestTime time.Time, selectOp func(readLevel *visibilityPageToken) ([]sqlplugin.VisibilityRow, error)) (*p.InternalListWorkflowExecutionsResponse, error) { var readLevel *visibilityPageToken var err error if len(pageToken) > 0 { readLevel, err = s.deserializePageToken(pageToken) if err != nil { return nil, err } } else { readLevel = &visibilityPageToken{Time: latestTime, RunID: ""} } rows, err := selectOp(readLevel) if err != nil { return nil, convertCommonErrors(s.db, opName, "", err) } if len(rows) == 0 { return &p.InternalListWorkflowExecutionsResponse{}, nil } var infos = make([]*p.InternalVisibilityWorkflowExecutionInfo, len(rows)) for i, row := range rows { infos[i] = s.rowToInfo(&row) } var nextPageToken []byte lastRow := rows[len(rows)-1] lastStartTime := lastRow.StartTime if lastStartTime.Sub(earliestTime).Nanoseconds() > 0 { nextPageToken, err = s.serializePageToken(&visibilityPageToken{ Time: lastStartTime, RunID: lastRow.RunID, }) if err != nil { return nil, err } } return &p.InternalListWorkflowExecutionsResponse{ Executions: infos, NextPageToken: nextPageToken, }, nil } func (s *sqlVisibilityStore) deserializePageToken(data []byte) (*visibilityPageToken, error) { var token visibilityPageToken err := json.Unmarshal(data, &token) return &token, err } func (s *sqlVisibilityStore) serializePageToken(token *visibilityPageToken) ([]byte, error) { data, err := json.Marshal(token) return data, err }