common/persistence/elasticsearch/es_visibility_store.go (848 lines of code) (raw):
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package elasticsearch
import (
"context"
"encoding/json"
"errors"
"fmt"
"math"
"regexp"
"strconv"
"strings"
"time"
"github.com/cch123/elasticsql"
"github.com/valyala/fastjson"
"github.com/uber/cadence/.gen/go/indexer"
workflow "github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/definition"
es "github.com/uber/cadence/common/elasticsearch"
"github.com/uber/cadence/common/elasticsearch/query"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/messaging"
p "github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/service"
"github.com/uber/cadence/common/types"
"github.com/uber/cadence/common/types/mapper/thrift"
)
const (
esPersistenceName = "elasticsearch"
)
type (
esVisibilityStore struct {
esClient es.GenericClient
index string
producer messaging.Producer
logger log.Logger
config *service.Config
}
)
var _ p.VisibilityStore = (*esVisibilityStore)(nil)
// NewElasticSearchVisibilityStore create a visibility store connecting to ElasticSearch
func NewElasticSearchVisibilityStore(
esClient es.GenericClient,
index string,
producer messaging.Producer,
config *service.Config,
logger log.Logger,
) p.VisibilityStore {
return &esVisibilityStore{
esClient: esClient,
index: index,
producer: producer,
logger: logger.WithTags(tag.ComponentESVisibilityManager),
config: config,
}
}
func (v *esVisibilityStore) Close() {}
func (v *esVisibilityStore) GetName() string {
return esPersistenceName
}
func (v *esVisibilityStore) RecordWorkflowExecutionStarted(
ctx context.Context,
request *p.InternalRecordWorkflowExecutionStartedRequest,
) error {
v.checkProducer()
msg := createVisibilityMessage(
request.DomainUUID,
request.WorkflowID,
request.RunID,
request.WorkflowTypeName,
request.TaskList,
request.StartTimestamp.UnixNano(),
request.ExecutionTimestamp.UnixNano(),
request.TaskID,
request.Memo.Data,
request.Memo.GetEncoding(),
request.IsCron,
request.NumClusters,
request.SearchAttributes,
common.RecordStarted,
0, // will not be used
0, // will not be used
0, // will not be used
request.UpdateTimestamp.UnixNano(), // will be updated when workflow execution updates
int64(request.ShardID),
)
return v.producer.Publish(ctx, msg)
}
func (v *esVisibilityStore) RecordWorkflowExecutionClosed(
ctx context.Context,
request *p.InternalRecordWorkflowExecutionClosedRequest,
) error {
v.checkProducer()
msg := createVisibilityMessage(
request.DomainUUID,
request.WorkflowID,
request.RunID,
request.WorkflowTypeName,
request.TaskList,
request.StartTimestamp.UnixNano(),
request.ExecutionTimestamp.UnixNano(),
request.TaskID,
request.Memo.Data,
request.Memo.GetEncoding(),
request.IsCron,
request.NumClusters,
request.SearchAttributes,
common.RecordClosed,
request.CloseTimestamp.UnixNano(),
*thrift.FromWorkflowExecutionCloseStatus(&request.Status),
request.HistoryLength,
request.UpdateTimestamp.UnixNano(),
int64(request.ShardID),
)
return v.producer.Publish(ctx, msg)
}
func (v *esVisibilityStore) RecordWorkflowExecutionUninitialized(
ctx context.Context,
request *p.InternalRecordWorkflowExecutionUninitializedRequest,
) error {
v.checkProducer()
msg := getVisibilityMessageForUninitializedWorkflow(
request.DomainUUID,
request.WorkflowID,
request.RunID,
request.WorkflowTypeName,
request.UpdateTimestamp.UnixNano(),
request.ShardID,
)
return v.producer.Publish(ctx, msg)
}
func (v *esVisibilityStore) UpsertWorkflowExecution(
ctx context.Context,
request *p.InternalUpsertWorkflowExecutionRequest,
) error {
v.checkProducer()
msg := createVisibilityMessage(
request.DomainUUID,
request.WorkflowID,
request.RunID,
request.WorkflowTypeName,
request.TaskList,
request.StartTimestamp.UnixNano(),
request.ExecutionTimestamp.UnixNano(),
request.TaskID,
request.Memo.Data,
request.Memo.GetEncoding(),
request.IsCron,
request.NumClusters,
request.SearchAttributes,
common.UpsertSearchAttributes,
0, // will not be used
0, // will not be used
0, // will not be used
request.UpdateTimestamp.UnixNano(),
request.ShardID,
)
return v.producer.Publish(ctx, msg)
}
func (v *esVisibilityStore) ListOpenWorkflowExecutions(
ctx context.Context,
request *p.InternalListWorkflowExecutionsRequest,
) (*p.InternalListWorkflowExecutionsResponse, error) {
isRecordValid := func(rec *p.InternalVisibilityWorkflowExecutionInfo) bool {
return !request.EarliestTime.After(rec.StartTime) && !rec.StartTime.After(request.LatestTime)
}
resp, err := v.esClient.Search(ctx, &es.SearchRequest{
Index: v.index,
ListRequest: request,
IsOpen: true,
Filter: isRecordValid,
MatchQuery: nil,
MaxResultWindow: v.config.ESIndexMaxResultWindow(),
})
if err != nil {
return nil, &types.InternalServiceError{
Message: fmt.Sprintf("ListOpenWorkflowExecutions failed, %v", err),
}
}
return resp, nil
}
func (v *esVisibilityStore) ListClosedWorkflowExecutions(
ctx context.Context,
request *p.InternalListWorkflowExecutionsRequest,
) (*p.InternalListWorkflowExecutionsResponse, error) {
isRecordValid := func(rec *p.InternalVisibilityWorkflowExecutionInfo) bool {
return !request.EarliestTime.After(rec.CloseTime) && !rec.CloseTime.After(request.LatestTime)
}
resp, err := v.esClient.Search(ctx, &es.SearchRequest{
Index: v.index,
ListRequest: request,
IsOpen: false,
Filter: isRecordValid,
MatchQuery: nil,
MaxResultWindow: v.config.ESIndexMaxResultWindow(),
})
if err != nil {
return nil, &types.InternalServiceError{
Message: fmt.Sprintf("ListClosedWorkflowExecutions failed, %v", err),
}
}
return resp, nil
}
func (v *esVisibilityStore) ListOpenWorkflowExecutionsByType(
ctx context.Context,
request *p.InternalListWorkflowExecutionsByTypeRequest,
) (*p.InternalListWorkflowExecutionsResponse, error) {
isRecordValid := func(rec *p.InternalVisibilityWorkflowExecutionInfo) bool {
return !request.EarliestTime.After(rec.StartTime) && !rec.StartTime.After(request.LatestTime)
}
resp, err := v.esClient.Search(ctx, &es.SearchRequest{
Index: v.index,
ListRequest: &request.InternalListWorkflowExecutionsRequest,
IsOpen: true,
Filter: isRecordValid,
MatchQuery: query.NewMatchQuery(es.WorkflowType, request.WorkflowTypeName),
MaxResultWindow: v.config.ESIndexMaxResultWindow(),
})
if err != nil {
return nil, &types.InternalServiceError{
Message: fmt.Sprintf("ListOpenWorkflowExecutionsByType failed, %v", err),
}
}
return resp, nil
}
func (v *esVisibilityStore) ListClosedWorkflowExecutionsByType(
ctx context.Context,
request *p.InternalListWorkflowExecutionsByTypeRequest,
) (*p.InternalListWorkflowExecutionsResponse, error) {
isRecordValid := func(rec *p.InternalVisibilityWorkflowExecutionInfo) bool {
return !request.EarliestTime.After(rec.CloseTime) && !rec.CloseTime.After(request.LatestTime)
}
resp, err := v.esClient.Search(ctx, &es.SearchRequest{
Index: v.index,
ListRequest: &request.InternalListWorkflowExecutionsRequest,
IsOpen: false,
Filter: isRecordValid,
MatchQuery: query.NewMatchQuery(es.WorkflowType, request.WorkflowTypeName),
MaxResultWindow: v.config.ESIndexMaxResultWindow(),
})
if err != nil {
return nil, &types.InternalServiceError{
Message: fmt.Sprintf("ListClosedWorkflowExecutionsByType failed, %v", err),
}
}
return resp, nil
}
func (v *esVisibilityStore) ListOpenWorkflowExecutionsByWorkflowID(
ctx context.Context,
request *p.InternalListWorkflowExecutionsByWorkflowIDRequest,
) (*p.InternalListWorkflowExecutionsResponse, error) {
isRecordValid := func(rec *p.InternalVisibilityWorkflowExecutionInfo) bool {
return !request.EarliestTime.After(rec.StartTime) && !rec.StartTime.After(request.LatestTime)
}
resp, err := v.esClient.Search(ctx, &es.SearchRequest{
Index: v.index,
ListRequest: &request.InternalListWorkflowExecutionsRequest,
IsOpen: true,
Filter: isRecordValid,
MatchQuery: query.NewMatchQuery(es.WorkflowID, request.WorkflowID),
MaxResultWindow: v.config.ESIndexMaxResultWindow(),
})
if err != nil {
return nil, &types.InternalServiceError{
Message: fmt.Sprintf("ListOpenWorkflowExecutionsByWorkflowID failed, %v", err),
}
}
return resp, nil
}
func (v *esVisibilityStore) ListClosedWorkflowExecutionsByWorkflowID(
ctx context.Context,
request *p.InternalListWorkflowExecutionsByWorkflowIDRequest,
) (*p.InternalListWorkflowExecutionsResponse, error) {
isRecordValid := func(rec *p.InternalVisibilityWorkflowExecutionInfo) bool {
return !request.EarliestTime.After(rec.CloseTime) && !rec.CloseTime.After(request.LatestTime)
}
resp, err := v.esClient.Search(ctx, &es.SearchRequest{
Index: v.index,
ListRequest: &request.InternalListWorkflowExecutionsRequest,
IsOpen: false,
Filter: isRecordValid,
MatchQuery: query.NewMatchQuery(es.WorkflowID, request.WorkflowID),
MaxResultWindow: v.config.ESIndexMaxResultWindow(),
})
if err != nil {
return nil, &types.InternalServiceError{
Message: fmt.Sprintf("ListClosedWorkflowExecutionsByWorkflowID failed, %v", err),
}
}
return resp, nil
}
func (v *esVisibilityStore) ListClosedWorkflowExecutionsByStatus(
ctx context.Context,
request *p.InternalListClosedWorkflowExecutionsByStatusRequest,
) (*p.InternalListWorkflowExecutionsResponse, error) {
isRecordValid := func(rec *p.InternalVisibilityWorkflowExecutionInfo) bool {
return !request.EarliestTime.After(rec.CloseTime) && !rec.CloseTime.After(request.LatestTime)
}
resp, err := v.esClient.Search(ctx, &es.SearchRequest{
Index: v.index,
ListRequest: &request.InternalListWorkflowExecutionsRequest,
IsOpen: false,
Filter: isRecordValid,
MatchQuery: query.NewMatchQuery(es.CloseStatus, int32(*thrift.FromWorkflowExecutionCloseStatus(&request.Status))),
MaxResultWindow: v.config.ESIndexMaxResultWindow(),
})
if err != nil {
return nil, &types.InternalServiceError{
Message: fmt.Sprintf("ListClosedWorkflowExecutionsByStatus failed, %v", err),
}
}
return resp, nil
}
func (v *esVisibilityStore) GetClosedWorkflowExecution(
ctx context.Context,
request *p.InternalGetClosedWorkflowExecutionRequest,
) (*p.InternalGetClosedWorkflowExecutionResponse, error) {
resp, err := v.esClient.SearchForOneClosedExecution(ctx, v.index, request)
if err != nil {
return nil, &types.InternalServiceError{
Message: fmt.Sprintf("SearchForOneClosedExecution failed, %v", err),
}
}
return resp, nil
}
func (v *esVisibilityStore) DeleteWorkflowExecution(
ctx context.Context,
request *p.VisibilityDeleteWorkflowExecutionRequest,
) error {
v.checkProducer()
msg := getVisibilityMessageForDeletion(
request.DomainID,
request.WorkflowID,
request.RunID,
request.TaskID,
)
return v.producer.Publish(ctx, msg)
}
func (v *esVisibilityStore) DeleteUninitializedWorkflowExecution(
ctx context.Context,
request *p.VisibilityDeleteWorkflowExecutionRequest,
) error {
// verify if it is uninitialized workflow execution record
// if it is, then call the existing delete method to delete
query := fmt.Sprintf("StartTime = missing and DomainID = %s and RunID = %s", request.DomainID, request.RunID)
queryRequest := &p.CountWorkflowExecutionsRequest{
Domain: request.Domain,
Query: query,
}
resp, err := v.CountWorkflowExecutions(ctx, queryRequest)
if err != nil {
return err
}
if resp.Count > 0 {
if err = v.DeleteWorkflowExecution(ctx, request); err != nil {
return err
}
}
return nil
}
func (v *esVisibilityStore) ListWorkflowExecutions(
ctx context.Context,
request *p.ListWorkflowExecutionsByQueryRequest,
) (*p.InternalListWorkflowExecutionsResponse, error) {
checkPageSize(request)
token, err := es.GetNextPageToken(request.NextPageToken)
if err != nil {
return nil, err
}
queryDSL, err := v.getESQueryDSL(request, token)
if err != nil {
return nil, &types.BadRequestError{Message: fmt.Sprintf("Error when parse query: %v", err)}
}
resp, err := v.esClient.SearchByQuery(ctx, &es.SearchByQueryRequest{
Index: v.index,
Query: queryDSL,
NextPageToken: request.NextPageToken,
PageSize: request.PageSize,
Filter: nil,
MaxResultWindow: v.config.ESIndexMaxResultWindow(),
})
if err != nil {
return nil, &types.InternalServiceError{
Message: fmt.Sprintf("ListWorkflowExecutions failed, %v", err),
}
}
return resp, nil
}
func (v *esVisibilityStore) ScanWorkflowExecutions(
ctx context.Context,
request *p.ListWorkflowExecutionsByQueryRequest,
) (*p.InternalListWorkflowExecutionsResponse, error) {
checkPageSize(request)
token, err := es.GetNextPageToken(request.NextPageToken)
if err != nil {
return nil, err
}
var queryDSL string
if len(token.ScrollID) == 0 { // first call
queryDSL, err = getESQueryDSLForScan(request)
if err != nil {
return nil, &types.BadRequestError{Message: fmt.Sprintf("Error when parse query: %v", err)}
}
}
resp, err := v.esClient.ScanByQuery(ctx, &es.ScanByQueryRequest{
Index: v.index,
Query: queryDSL,
NextPageToken: request.NextPageToken,
PageSize: request.PageSize,
})
if err != nil {
return nil, &types.InternalServiceError{
Message: fmt.Sprintf("ScanWorkflowExecutions failed, %v", err),
}
}
return resp, nil
}
func (v *esVisibilityStore) CountWorkflowExecutions(
ctx context.Context,
request *p.CountWorkflowExecutionsRequest,
) (
*p.CountWorkflowExecutionsResponse, error) {
queryDSL, err := getESQueryDSLForCount(request)
if err != nil {
return nil, &types.BadRequestError{Message: fmt.Sprintf("Error when parse query: %v", err)}
}
count, err := v.esClient.CountByQuery(ctx, v.index, queryDSL)
if err != nil {
return nil, &types.InternalServiceError{
Message: fmt.Sprintf("CountWorkflowExecutions failed. Error: %v", err),
}
}
response := &p.CountWorkflowExecutionsResponse{Count: count}
return response, nil
}
const (
jsonMissingCloseTime = `{"missing":{"field":"CloseTime"}}`
jsonRangeOnExecutionTime = `{"range":{"ExecutionTime":`
jsonSortForOpen = `[{"StartTime":"desc"},{"RunID":"desc"}]`
jsonSortWithTieBreaker = `{"RunID":"desc"}`
jsonMissingStartTime = `{"missing":{"field":"StartTime"}}` // used to identify uninitialized workflow execution records
dslFieldSort = "sort"
dslFieldSearchAfter = "search_after"
dslFieldFrom = "from"
dslFieldSize = "size"
defaultDateTimeFormat = time.RFC3339 // used for converting UnixNano to string like 2018-02-15T16:16:36-08:00
)
var (
timeKeys = map[string]bool{
es.StartTime: true,
es.CloseTime: true,
es.ExecutionTime: true,
es.UpdateTime: true,
}
rangeKeys = map[string]bool{
"from": true,
"to": true,
"gt": true,
"lt": true,
"query": true,
}
)
var missingStartTimeRegex = regexp.MustCompile(jsonMissingStartTime)
func getESQueryDSLForScan(request *p.ListWorkflowExecutionsByQueryRequest) (string, error) {
sql := getSQLFromListRequest(request)
dsl, err := getCustomizedDSLFromSQL(sql, request.DomainUUID)
if err != nil {
return "", err
}
// remove not needed fields
dsl.Del(dslFieldSort)
return dsl.String(), nil
}
func getESQueryDSLForCount(request *p.CountWorkflowExecutionsRequest) (string, error) {
sql := getSQLFromCountRequest(request)
dsl, err := getCustomizedDSLFromSQL(sql, request.DomainUUID)
if err != nil {
return "", err
}
// remove not needed fields
dsl.Del(dslFieldFrom)
dsl.Del(dslFieldSize)
dsl.Del(dslFieldSort)
return dsl.String(), nil
}
func (v *esVisibilityStore) getESQueryDSL(request *p.ListWorkflowExecutionsByQueryRequest, token *es.ElasticVisibilityPageToken) (string, error) {
sql := getSQLFromListRequest(request)
dsl, err := getCustomizedDSLFromSQL(sql, request.DomainUUID)
if err != nil {
return "", err
}
sortField, err := v.processSortField(dsl)
if err != nil {
return "", err
}
if es.ShouldSearchAfter(token) {
valueOfSearchAfter, err := v.getValueOfSearchAfterInJSON(token, sortField)
if err != nil {
return "", err
}
dsl.Set(dslFieldSearchAfter, fastjson.MustParse(valueOfSearchAfter))
} else { // use from+size
dsl.Set(dslFieldFrom, fastjson.MustParse(strconv.Itoa(token.From)))
}
dslStr := cleanDSL(dsl.String())
return dslStr, nil
}
func getSQLFromListRequest(request *p.ListWorkflowExecutionsByQueryRequest) string {
var sql string
query := strings.TrimSpace(request.Query)
if query == "" {
sql = fmt.Sprintf("select * from dummy limit %d", request.PageSize)
} else if common.IsJustOrderByClause(query) {
sql = fmt.Sprintf("select * from dummy %s limit %d", request.Query, request.PageSize)
} else {
sql = fmt.Sprintf("select * from dummy where %s limit %d", request.Query, request.PageSize)
}
return sql
}
func getSQLFromCountRequest(request *p.CountWorkflowExecutionsRequest) string {
var sql string
if strings.TrimSpace(request.Query) == "" {
sql = "select * from dummy"
} else {
sql = fmt.Sprintf("select * from dummy where %s", request.Query)
}
return sql
}
func getCustomizedDSLFromSQL(sql string, domainID string) (*fastjson.Value, error) {
dslStr, _, err := elasticsql.Convert(sql)
if err != nil {
return nil, err
}
dsl, err := fastjson.Parse(dslStr) // dsl.String() will be a compact json without spaces
if err != nil {
return nil, err
}
dslStr = dsl.String()
if strings.Contains(dslStr, jsonMissingStartTime) { // isUninitialized
dsl = replaceQueryForUninitialized(dsl)
}
if strings.Contains(dslStr, jsonMissingCloseTime) { // isOpen
dsl = replaceQueryForOpen(dsl)
}
if strings.Contains(dslStr, jsonRangeOnExecutionTime) {
addQueryForExecutionTime(dsl)
}
addDomainToQuery(dsl, domainID)
if err := processAllValuesForKey(dsl, isCombinedKey, combinedProcessFunc); err != nil {
return nil, err
}
return dsl, nil
}
// ES v6 only accepts "must_not exists" query instead of "missing" query, but elasticsql produces "missing",
// so use this func to replace.
// Note it also means a temp limitation that we cannot support field missing search
func replaceQueryForOpen(dsl *fastjson.Value) *fastjson.Value {
re := regexp.MustCompile(jsonMissingCloseTime)
newDslStr := re.ReplaceAllString(dsl.String(), `{"bool":{"must_not":{"exists":{"field":"CloseTime"}}}}`)
dsl = fastjson.MustParse(newDslStr)
return dsl
}
// ES v6 only accepts "must_not exists" query instead of "missing" query, but elasticsql produces "missing",
// so use this func to replace.
func replaceQueryForUninitialized(dsl *fastjson.Value) *fastjson.Value {
newDslStr := missingStartTimeRegex.ReplaceAllString(dsl.String(), `{"bool":{"must_not":{"exists":{"field":"StartTime"}}}}`)
dsl = fastjson.MustParse(newDslStr)
return dsl
}
func addQueryForExecutionTime(dsl *fastjson.Value) {
executionTimeQueryString := `{"range" : {"ExecutionTime" : {"gt" : "0"}}}`
addMustQuery(dsl, executionTimeQueryString)
}
func addDomainToQuery(dsl *fastjson.Value, domainID string) {
if len(domainID) == 0 {
return
}
domainQueryString := fmt.Sprintf(`{"match_phrase":{"DomainID":{"query":"%s"}}}`, domainID)
addMustQuery(dsl, domainQueryString)
}
// addMustQuery is wrapping bool query with new bool query with must,
// reason not making a flat bool query is to ensure "should (or)" query works correctly in query context.
func addMustQuery(dsl *fastjson.Value, queryString string) {
valOfTopQuery := dsl.Get("query")
valOfBool := dsl.Get("query", "bool")
newValOfBool := fmt.Sprintf(`{"must":[%s,{"bool":%s}]}`, queryString, valOfBool.String())
valOfTopQuery.Set("bool", fastjson.MustParse(newValOfBool))
}
func (v *esVisibilityStore) processSortField(dsl *fastjson.Value) (string, error) {
isSorted := dsl.Exists(dslFieldSort)
var sortField string
if !isSorted { // set default sorting by StartTime desc
dsl.Set(dslFieldSort, fastjson.MustParse(jsonSortForOpen))
sortField = definition.StartTime
} else { // user provide sorting using order by
// sort validation on length
if len(dsl.GetArray(dslFieldSort)) > 1 {
return "", errors.New("only one field can be used to sort")
}
// sort validation to exclude IndexedValueTypeString
obj, _ := dsl.GetArray(dslFieldSort)[0].Object()
obj.Visit(func(k []byte, v *fastjson.Value) { // visit is only way to get object key in fastjson
sortField = string(k)
})
if v.getFieldType(sortField) == types.IndexedValueTypeString {
return "", errors.New("not able to sort by IndexedValueTypeString field, use IndexedValueTypeKeyword field")
}
// add RunID as tie-breaker
dsl.Get(dslFieldSort).Set("1", fastjson.MustParse(jsonSortWithTieBreaker))
}
return sortField, nil
}
func (v *esVisibilityStore) getFieldType(fieldName string) types.IndexedValueType {
if strings.HasPrefix(fieldName, definition.Attr) {
fieldName = fieldName[len(definition.Attr)+1:] // remove prefix
}
validMap := v.config.ValidSearchAttributes()
fieldType, ok := validMap[fieldName]
if !ok {
v.logger.Error("Unknown fieldName, validation should be done in frontend already", tag.Value(fieldName))
}
return common.ConvertIndexedValueTypeToInternalType(fieldType, v.logger)
}
func (v *esVisibilityStore) getValueOfSearchAfterInJSON(token *es.ElasticVisibilityPageToken, sortField string) (string, error) {
var sortVal interface{}
var err error
switch v.getFieldType(sortField) {
case types.IndexedValueTypeInt, types.IndexedValueTypeDatetime, types.IndexedValueTypeBool:
sortVal, err = token.SortValue.(json.Number).Int64()
if err != nil {
err, ok := err.(*strconv.NumError) // field not present, ES will return big int +-9223372036854776000
if !ok {
return "", err
}
if err.Num[0] == '-' { // desc
sortVal = math.MinInt64
} else { // asc
sortVal = math.MaxInt64
}
}
case types.IndexedValueTypeDouble:
switch token.SortValue.(type) {
case json.Number:
sortVal, err = token.SortValue.(json.Number).Float64()
if err != nil {
return "", err
}
case string: // field not present, ES will return "-Infinity" or "Infinity"
sortVal = fmt.Sprintf(`"%s"`, token.SortValue.(string))
}
case types.IndexedValueTypeKeyword:
if token.SortValue != nil {
sortVal = fmt.Sprintf(`"%s"`, token.SortValue.(string))
} else { // field not present, ES will return null (so token.SortValue is nil)
sortVal = "null"
}
default:
sortVal = token.SortValue
}
return fmt.Sprintf(`[%v, "%s"]`, sortVal, token.TieBreaker), nil
}
func (v *esVisibilityStore) checkProducer() {
if v.producer == nil {
// must be bug, check history setup
panic("message producer is nil")
}
}
func createVisibilityMessage(
// common parameters
domainID string,
wid,
rid string,
workflowTypeName string,
taskList string,
startTimeUnixNano int64,
executionTimeUnixNano int64,
taskID int64,
memo []byte,
encoding common.EncodingType,
isCron bool,
NumClusters int16,
searchAttributes map[string][]byte,
visibilityOperation common.VisibilityOperation,
// specific to certain status
endTimeUnixNano int64, // close execution
closeStatus workflow.WorkflowExecutionCloseStatus, // close execution
historyLength int64, // close execution
updateTimeUnixNano int64, // update execution,
shardID int64,
) *indexer.Message {
msgType := indexer.MessageTypeIndex
fields := map[string]*indexer.Field{
es.WorkflowType: {Type: &es.FieldTypeString, StringData: common.StringPtr(workflowTypeName)},
es.StartTime: {Type: &es.FieldTypeInt, IntData: common.Int64Ptr(startTimeUnixNano)},
es.ExecutionTime: {Type: &es.FieldTypeInt, IntData: common.Int64Ptr(executionTimeUnixNano)},
es.TaskList: {Type: &es.FieldTypeString, StringData: common.StringPtr(taskList)},
es.IsCron: {Type: &es.FieldTypeBool, BoolData: common.BoolPtr(isCron)},
es.NumClusters: {Type: &es.FieldTypeInt, IntData: common.Int64Ptr(int64(NumClusters))},
es.UpdateTime: {Type: &es.FieldTypeInt, IntData: common.Int64Ptr(updateTimeUnixNano)},
es.ShardID: {Type: &es.FieldTypeInt, IntData: common.Int64Ptr(shardID)},
}
if len(memo) != 0 {
fields[es.Memo] = &indexer.Field{Type: &es.FieldTypeBinary, BinaryData: memo}
fields[es.Encoding] = &indexer.Field{Type: &es.FieldTypeString, StringData: common.StringPtr(string(encoding))}
}
for k, v := range searchAttributes {
fields[k] = &indexer.Field{Type: &es.FieldTypeBinary, BinaryData: v}
}
switch visibilityOperation {
case common.RecordStarted:
case common.RecordClosed:
fields[es.CloseTime] = &indexer.Field{Type: &es.FieldTypeInt, IntData: common.Int64Ptr(endTimeUnixNano)}
fields[es.CloseStatus] = &indexer.Field{Type: &es.FieldTypeInt, IntData: common.Int64Ptr(int64(closeStatus))}
fields[es.HistoryLength] = &indexer.Field{Type: &es.FieldTypeInt, IntData: common.Int64Ptr(historyLength)}
}
var visibilityOperationThrift indexer.VisibilityOperation = -1
switch visibilityOperation {
case common.RecordStarted:
visibilityOperationThrift = indexer.VisibilityOperationRecordStarted
case common.RecordClosed:
visibilityOperationThrift = indexer.VisibilityOperationRecordClosed
case common.UpsertSearchAttributes:
visibilityOperationThrift = indexer.VisibilityOperationUpsertSearchAttributes
default:
panic("VisibilityOperation not set")
}
msg := &indexer.Message{
MessageType: &msgType,
DomainID: common.StringPtr(domainID),
WorkflowID: common.StringPtr(wid),
RunID: common.StringPtr(rid),
Version: common.Int64Ptr(taskID),
Fields: fields,
VisibilityOperation: &visibilityOperationThrift,
}
return msg
}
func getVisibilityMessageForDeletion(domainID, workflowID, runID string, docVersion int64) *indexer.Message {
msgType := indexer.MessageTypeDelete
msg := &indexer.Message{
MessageType: &msgType,
DomainID: common.StringPtr(domainID),
WorkflowID: common.StringPtr(workflowID),
RunID: common.StringPtr(runID),
Version: common.Int64Ptr(docVersion),
}
return msg
}
func getVisibilityMessageForUninitializedWorkflow(
domainID string,
wid,
rid string,
workflowTypeName string,
updateTimeUnixNano int64, // update execution
shardID int64,
) *indexer.Message {
msgType := indexer.MessageTypeCreate
fields := map[string]*indexer.Field{
es.WorkflowType: {Type: &es.FieldTypeString, StringData: common.StringPtr(workflowTypeName)},
es.UpdateTime: {Type: &es.FieldTypeInt, IntData: common.Int64Ptr(updateTimeUnixNano)},
es.ShardID: {Type: &es.FieldTypeInt, IntData: common.Int64Ptr(shardID)},
}
msg := &indexer.Message{
MessageType: &msgType,
DomainID: common.StringPtr(domainID),
WorkflowID: common.StringPtr(wid),
RunID: common.StringPtr(rid),
Fields: fields,
}
return msg
}
func checkPageSize(request *p.ListWorkflowExecutionsByQueryRequest) {
if request.PageSize == 0 {
request.PageSize = 1000
}
}
func processAllValuesForKey(
dsl *fastjson.Value,
keyFilter func(k string) bool,
processFunc func(obj *fastjson.Object, key string, v *fastjson.Value) error,
) error {
switch dsl.Type() {
case fastjson.TypeArray:
for _, val := range dsl.GetArray() {
if err := processAllValuesForKey(val, keyFilter, processFunc); err != nil {
return err
}
}
case fastjson.TypeObject:
objectVal := dsl.GetObject()
keys := []string{}
objectVal.Visit(func(key []byte, val *fastjson.Value) {
keys = append(keys, string(key))
})
for _, key := range keys {
var err error
val := objectVal.Get(key)
if keyFilter(key) {
err = processFunc(objectVal, key, val)
} else {
err = processAllValuesForKey(val, keyFilter, processFunc)
}
if err != nil {
return err
}
}
default:
// do nothing, since there's no key
}
return nil
}
func isCombinedKey(key string) bool {
return isTimeKey(key) || isCloseStatusKey(key)
}
func combinedProcessFunc(obj *fastjson.Object, key string, value *fastjson.Value) error {
if isTimeKey(key) {
return timeProcessFunc(obj, key, value)
}
if isCloseStatusKey(key) {
return closeStatusProcessFunc(obj, key, value)
}
return fmt.Errorf("unknown es dsl key %v for processing value", key)
}
func isTimeKey(key string) bool {
return timeKeys[key]
}
func timeProcessFunc(obj *fastjson.Object, key string, value *fastjson.Value) error {
return processAllValuesForKey(value, func(key string) bool {
return rangeKeys[key]
}, func(obj *fastjson.Object, key string, v *fastjson.Value) error {
timeStr := string(v.GetStringBytes())
// first check if already in int64 format
if _, err := strconv.ParseInt(timeStr, 10, 64); err == nil {
return nil
}
// try to parse time
parsedTime, err := time.Parse(defaultDateTimeFormat, timeStr)
if err != nil {
return err
}
obj.Set(key, fastjson.MustParse(fmt.Sprintf(`"%v"`, parsedTime.UnixNano())))
return nil
})
}
func isCloseStatusKey(key string) bool {
return key == es.CloseStatus
}
func closeStatusProcessFunc(obj *fastjson.Object, key string, value *fastjson.Value) error {
return processAllValuesForKey(value, func(key string) bool {
return rangeKeys[key]
}, func(obj *fastjson.Object, key string, v *fastjson.Value) error {
statusStr := string(v.GetStringBytes())
// first check if already in int64 format
if _, err := strconv.ParseInt(statusStr, 10, 64); err == nil {
return nil
}
// try to parse close status string
var parsedStatus types.WorkflowExecutionCloseStatus
err := parsedStatus.UnmarshalText([]byte(statusStr))
if err != nil {
return err
}
obj.Set(key, fastjson.MustParse(fmt.Sprintf(`"%d"`, parsedStatus)))
return nil
})
}
// elasticsql may transfer `Attr.Name` to "`Attr.Name`" instead of "Attr.Name" in dsl in some operator like "between and"
// this function is used to clean up
func cleanDSL(input string) string {
var re = regexp.MustCompile("(`)(Attr.\\w+)(`)")
result := re.ReplaceAllString(input, `$2`)
return result
}