common/persistence/pinot/pinot_visibility_store.go (867 lines of code) (raw):

// Copyright (c) 2017 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. package pinotvisibility import ( "context" "encoding/json" "fmt" "strings" "time" "github.com/uber/cadence/.gen/go/indexer" workflow "github.com/uber/cadence/.gen/go/shared" "github.com/uber/cadence/common" "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/tag" "github.com/uber/cadence/common/messaging" p "github.com/uber/cadence/common/persistence" pnt "github.com/uber/cadence/common/pinot" "github.com/uber/cadence/common/service" "github.com/uber/cadence/common/types" "github.com/uber/cadence/common/types/mapper/thrift" ) const ( pinotPersistenceName = "pinot" DescendingOrder = "DESC" AcendingOrder = "ASC" DomainID = "DomainID" WorkflowID = "WorkflowID" RunID = "RunID" WorkflowType = "WorkflowType" CloseStatus = "CloseStatus" HistoryLength = "HistoryLength" TaskList = "TaskList" IsCron = "IsCron" NumClusters = "NumClusters" ShardID = "ShardID" Attr = "Attr" StartTime = "StartTime" CloseTime = "CloseTime" UpdateTime = "UpdateTime" ExecutionTime = "ExecutionTime" IsDeleted = "IsDeleted" // used for Pinot deletion/rolling upsert only, not visible to user EventTimeMs = "EventTimeMs" // used for Pinot deletion/rolling upsert only, not visible to user // used to be micro second oneMicroSecondInNano = int64(time.Microsecond / time.Nanosecond) ) type ( pinotVisibilityStore struct { pinotClient pnt.GenericClient producer messaging.Producer logger log.Logger config *service.Config pinotQueryValidator *pnt.VisibilityQueryValidator } ) var _ p.VisibilityStore = (*pinotVisibilityStore)(nil) func NewPinotVisibilityStore( pinotClient pnt.GenericClient, config *service.Config, producer messaging.Producer, logger log.Logger, ) p.VisibilityStore { if producer == nil { // must be bug, check history setup logger.Fatal("message producer is nil") } return &pinotVisibilityStore{ pinotClient: pinotClient, producer: producer, logger: logger.WithTags(tag.ComponentPinotVisibilityManager), config: config, pinotQueryValidator: pnt.NewPinotQueryValidator(config.ValidSearchAttributes()), } } func (v *pinotVisibilityStore) Close() { // Not needed for pinot, just keep for visibility store interface } func (v *pinotVisibilityStore) GetName() string { return pinotPersistenceName } func (v *pinotVisibilityStore) RecordWorkflowExecutionStarted( ctx context.Context, request *p.InternalRecordWorkflowExecutionStartedRequest, ) error { msg, err := createVisibilityMessage( request.DomainUUID, request.WorkflowID, request.RunID, request.WorkflowTypeName, request.TaskList, request.StartTimestamp.UnixMilli(), request.ExecutionTimestamp.UnixMilli(), request.TaskID, request.Memo.Data, request.Memo.GetEncoding(), request.IsCron, request.NumClusters, -1, // represent invalid close time, means open workflow execution -1, // represent invalid close status, means open workflow execution 0, // will be updated when workflow execution updates request.UpdateTimestamp.UnixMilli(), int64(request.ShardID), request.SearchAttributes, false, ) if err != nil { return err } return v.producer.Publish(ctx, msg) } func (v *pinotVisibilityStore) RecordWorkflowExecutionClosed(ctx context.Context, request *p.InternalRecordWorkflowExecutionClosedRequest) error { msg, err := createVisibilityMessage( request.DomainUUID, request.WorkflowID, request.RunID, request.WorkflowTypeName, request.TaskList, request.StartTimestamp.UnixMilli(), request.ExecutionTimestamp.UnixMilli(), request.TaskID, request.Memo.Data, request.Memo.GetEncoding(), request.IsCron, request.NumClusters, request.CloseTimestamp.UnixMilli(), *thrift.FromWorkflowExecutionCloseStatus(&request.Status), request.HistoryLength, request.UpdateTimestamp.UnixMilli(), int64(request.ShardID), request.SearchAttributes, false, ) if err != nil { return err } return v.producer.Publish(ctx, msg) } func (v *pinotVisibilityStore) RecordWorkflowExecutionUninitialized(ctx context.Context, request *p.InternalRecordWorkflowExecutionUninitializedRequest) error { msg, err := createVisibilityMessage( request.DomainUUID, request.WorkflowID, request.RunID, request.WorkflowTypeName, "", -1, -1, 0, nil, "", false, 0, -1, // represent invalid close time, means open workflow execution -1, // represent invalid close status, means open workflow execution 0, // will be updated when workflow execution updates request.UpdateTimestamp.UnixMilli(), request.ShardID, nil, false, ) if err != nil { return err } return v.producer.Publish(ctx, msg) } func (v *pinotVisibilityStore) UpsertWorkflowExecution(ctx context.Context, request *p.InternalUpsertWorkflowExecutionRequest) error { msg, err := createVisibilityMessage( request.DomainUUID, request.WorkflowID, request.RunID, request.WorkflowTypeName, request.TaskList, request.StartTimestamp.UnixMilli(), request.ExecutionTimestamp.UnixMilli(), request.TaskID, request.Memo.Data, request.Memo.GetEncoding(), request.IsCron, request.NumClusters, -1, // represent invalid close time, means open workflow execution -1, // represent invalid close status, means open workflow execution 0, // will not be used request.UpdateTimestamp.UnixMilli(), request.ShardID, request.SearchAttributes, false, ) if err != nil { return err } return v.producer.Publish(ctx, msg) } func (v *pinotVisibilityStore) DeleteWorkflowExecution( ctx context.Context, request *p.VisibilityDeleteWorkflowExecutionRequest, ) error { msg, err := createDeleteVisibilityMessage( request.DomainID, request.WorkflowID, request.RunID, true, ) if err != nil { return err } return v.producer.Publish(ctx, msg) } func (v *pinotVisibilityStore) DeleteUninitializedWorkflowExecution( ctx context.Context, request *p.VisibilityDeleteWorkflowExecutionRequest, ) error { // verify if it is uninitialized workflow execution record // if it is, then call the existing delete method to delete query := fmt.Sprintf("StartTime = missing and DomainID = '%s' and RunID = '%s'", request.DomainID, request.RunID) queryRequest := &p.CountWorkflowExecutionsRequest{ Domain: request.Domain, Query: query, } resp, err := v.CountWorkflowExecutions(ctx, queryRequest) if err != nil { return err } if resp.Count > 0 { if err = v.DeleteWorkflowExecution(ctx, request); err != nil { return err } } return nil } func (v *pinotVisibilityStore) ListOpenWorkflowExecutions( ctx context.Context, request *p.InternalListWorkflowExecutionsRequest, ) (*p.InternalListWorkflowExecutionsResponse, error) { isRecordValid := func(rec *p.InternalVisibilityWorkflowExecutionInfo) bool { return !request.EarliestTime.After(rec.StartTime) && !rec.StartTime.After(request.LatestTime) } query, err := getListWorkflowExecutionsQuery(v.pinotClient.GetTableName(), request, false) if err != nil { v.logger.Error(fmt.Sprintf("failed to build list workflow executions query %v", err)) return nil, err } req := &pnt.SearchRequest{ Query: query, IsOpen: true, Filter: isRecordValid, MaxResultWindow: v.config.ESIndexMaxResultWindow(), ListRequest: request, } return v.pinotClient.Search(req) } func (v *pinotVisibilityStore) ListClosedWorkflowExecutions( ctx context.Context, request *p.InternalListWorkflowExecutionsRequest, ) (*p.InternalListWorkflowExecutionsResponse, error) { isRecordValid := func(rec *p.InternalVisibilityWorkflowExecutionInfo) bool { return !request.EarliestTime.After(rec.CloseTime) && !rec.CloseTime.After(request.LatestTime) } query, err := getListWorkflowExecutionsQuery(v.pinotClient.GetTableName(), request, true) if err != nil { v.logger.Error(fmt.Sprintf("failed to build list workflow executions query %v", err)) return nil, err } req := &pnt.SearchRequest{ Query: query, IsOpen: true, Filter: isRecordValid, MaxResultWindow: v.config.ESIndexMaxResultWindow(), ListRequest: request, } return v.pinotClient.Search(req) } func (v *pinotVisibilityStore) ListOpenWorkflowExecutionsByType(ctx context.Context, request *p.InternalListWorkflowExecutionsByTypeRequest) (*p.InternalListWorkflowExecutionsResponse, error) { isRecordValid := func(rec *p.InternalVisibilityWorkflowExecutionInfo) bool { return !request.EarliestTime.After(rec.StartTime) && !rec.StartTime.After(request.LatestTime) } query, err := getListWorkflowExecutionsByTypeQuery(v.pinotClient.GetTableName(), request, false) if err != nil { v.logger.Error(fmt.Sprintf("failed to build list workflow executions by type query %v", err)) return nil, err } req := &pnt.SearchRequest{ Query: query, IsOpen: true, Filter: isRecordValid, MaxResultWindow: v.config.ESIndexMaxResultWindow(), ListRequest: &request.InternalListWorkflowExecutionsRequest, } return v.pinotClient.Search(req) } func (v *pinotVisibilityStore) ListClosedWorkflowExecutionsByType(ctx context.Context, request *p.InternalListWorkflowExecutionsByTypeRequest) (*p.InternalListWorkflowExecutionsResponse, error) { isRecordValid := func(rec *p.InternalVisibilityWorkflowExecutionInfo) bool { return !request.EarliestTime.After(rec.CloseTime) && !rec.CloseTime.After(request.LatestTime) } query, err := getListWorkflowExecutionsByTypeQuery(v.pinotClient.GetTableName(), request, true) if err != nil { v.logger.Error(fmt.Sprintf("failed to build list workflow executions by type query %v", err)) return nil, err } req := &pnt.SearchRequest{ Query: query, IsOpen: true, Filter: isRecordValid, MaxResultWindow: v.config.ESIndexMaxResultWindow(), ListRequest: &request.InternalListWorkflowExecutionsRequest, } return v.pinotClient.Search(req) } func (v *pinotVisibilityStore) ListOpenWorkflowExecutionsByWorkflowID(ctx context.Context, request *p.InternalListWorkflowExecutionsByWorkflowIDRequest) (*p.InternalListWorkflowExecutionsResponse, error) { isRecordValid := func(rec *p.InternalVisibilityWorkflowExecutionInfo) bool { return !request.EarliestTime.After(rec.StartTime) && !rec.StartTime.After(request.LatestTime) } query, err := getListWorkflowExecutionsByWorkflowIDQuery(v.pinotClient.GetTableName(), request, false) if err != nil { v.logger.Error(fmt.Sprintf("failed to build list workflow executions by workflowID query %v", err)) return nil, err } req := &pnt.SearchRequest{ Query: query, IsOpen: true, Filter: isRecordValid, MaxResultWindow: v.config.ESIndexMaxResultWindow(), ListRequest: &request.InternalListWorkflowExecutionsRequest, } return v.pinotClient.Search(req) } func (v *pinotVisibilityStore) ListClosedWorkflowExecutionsByWorkflowID(ctx context.Context, request *p.InternalListWorkflowExecutionsByWorkflowIDRequest) (*p.InternalListWorkflowExecutionsResponse, error) { isRecordValid := func(rec *p.InternalVisibilityWorkflowExecutionInfo) bool { return !request.EarliestTime.After(rec.CloseTime) && !rec.CloseTime.After(request.LatestTime) } query, err := getListWorkflowExecutionsByWorkflowIDQuery(v.pinotClient.GetTableName(), request, true) if err != nil { v.logger.Error(fmt.Sprintf("failed to build list workflow executions by workflowID query %v", err)) return nil, err } req := &pnt.SearchRequest{ Query: query, IsOpen: true, Filter: isRecordValid, MaxResultWindow: v.config.ESIndexMaxResultWindow(), ListRequest: &request.InternalListWorkflowExecutionsRequest, } return v.pinotClient.Search(req) } func (v *pinotVisibilityStore) ListClosedWorkflowExecutionsByStatus(ctx context.Context, request *p.InternalListClosedWorkflowExecutionsByStatusRequest) (*p.InternalListWorkflowExecutionsResponse, error) { isRecordValid := func(rec *p.InternalVisibilityWorkflowExecutionInfo) bool { return !request.EarliestTime.After(rec.CloseTime) && !rec.CloseTime.After(request.LatestTime) } query, err := getListWorkflowExecutionsByStatusQuery(v.pinotClient.GetTableName(), request) if err != nil { v.logger.Error(fmt.Sprintf("failed to build list workflow executions by status query %v", err)) return nil, err } req := &pnt.SearchRequest{ Query: query, IsOpen: true, Filter: isRecordValid, MaxResultWindow: v.config.ESIndexMaxResultWindow(), ListRequest: &request.InternalListWorkflowExecutionsRequest, } return v.pinotClient.Search(req) } func (v *pinotVisibilityStore) GetClosedWorkflowExecution(ctx context.Context, request *p.InternalGetClosedWorkflowExecutionRequest) (*p.InternalGetClosedWorkflowExecutionResponse, error) { query := getGetClosedWorkflowExecutionQuery(v.pinotClient.GetTableName(), request) req := &pnt.SearchRequest{ Query: query, IsOpen: true, Filter: nil, MaxResultWindow: v.config.ESIndexMaxResultWindow(), ListRequest: &p.InternalListWorkflowExecutionsRequest{ // create a new request to avoid nil pointer exceptions DomainUUID: request.DomainUUID, Domain: request.Domain, EarliestTime: time.Time{}, LatestTime: time.Time{}, PageSize: 1, NextPageToken: nil, }, } resp, err := v.pinotClient.Search(req) if err != nil { return nil, &types.InternalServiceError{ Message: fmt.Sprintf("Pinot GetClosedWorkflowExecution failed, %v", err), } } return &p.InternalGetClosedWorkflowExecutionResponse{ Execution: resp.Executions[0], }, nil } func (v *pinotVisibilityStore) ListWorkflowExecutions(ctx context.Context, request *p.ListWorkflowExecutionsByQueryRequest) (*p.InternalListWorkflowExecutionsResponse, error) { checkPageSize(request) query, err := v.getListWorkflowExecutionsByQueryQuery(v.pinotClient.GetTableName(), request) if err != nil { v.logger.Error(fmt.Sprintf("failed to build list workflow executions query %v", err)) return nil, err } req := &pnt.SearchRequest{ Query: query, IsOpen: true, Filter: nil, MaxResultWindow: v.config.ESIndexMaxResultWindow(), ListRequest: &p.InternalListWorkflowExecutionsRequest{ DomainUUID: request.DomainUUID, Domain: request.Domain, EarliestTime: time.Time{}, LatestTime: time.Time{}, NextPageToken: request.NextPageToken, PageSize: request.PageSize, }, } return v.pinotClient.Search(req) } func (v *pinotVisibilityStore) ScanWorkflowExecutions(ctx context.Context, request *p.ListWorkflowExecutionsByQueryRequest) (*p.InternalListWorkflowExecutionsResponse, error) { checkPageSize(request) query, err := v.getListWorkflowExecutionsByQueryQuery(v.pinotClient.GetTableName(), request) if err != nil { v.logger.Error(fmt.Sprintf("failed to build scan workflow executions query %v", err)) return nil, err } req := &pnt.SearchRequest{ Query: query, IsOpen: true, Filter: nil, MaxResultWindow: v.config.ESIndexMaxResultWindow(), ListRequest: &p.InternalListWorkflowExecutionsRequest{ DomainUUID: request.DomainUUID, Domain: request.Domain, EarliestTime: time.Time{}, LatestTime: time.Time{}, NextPageToken: request.NextPageToken, PageSize: request.PageSize, }, } return v.pinotClient.Search(req) } func (v *pinotVisibilityStore) CountWorkflowExecutions(ctx context.Context, request *p.CountWorkflowExecutionsRequest) (*p.CountWorkflowExecutionsResponse, error) { query := v.getCountWorkflowExecutionsQuery(v.pinotClient.GetTableName(), request) resp, err := v.pinotClient.CountByQuery(query) if err != nil { return nil, &types.InternalServiceError{ Message: fmt.Sprintf("CountClosedWorkflowExecutions failed, %v", err), } } return &p.CountWorkflowExecutionsResponse{ Count: resp, }, nil } // a new function to create visibility message for deletion // don't use the other function and provide some nil values because it may cause nil pointer exceptions func createDeleteVisibilityMessage(domainID string, wid, rid string, isDeleted bool, ) (*indexer.PinotMessage, error) { m := make(map[string]interface{}) m[DomainID] = domainID m[WorkflowID] = wid m[RunID] = rid m[IsDeleted] = isDeleted m[EventTimeMs] = time.Now().UnixMilli() serializedMsg, err := json.Marshal(m) if err != nil { return nil, err } msg := &indexer.PinotMessage{ WorkflowID: common.StringPtr(wid), Payload: serializedMsg, } return msg, nil } func createVisibilityMessage( // common parameters domainID string, wid, rid string, workflowTypeName string, taskList string, startTimeUnixMilli int64, executionTimeUnixMilli int64, taskID int64, memo []byte, encoding common.EncodingType, isCron bool, numClusters int16, // specific to certain status closeTimeUnixMilli int64, // close execution closeStatus workflow.WorkflowExecutionCloseStatus, // close execution historyLength int64, // close execution updateTimeUnixMilli int64, // update execution, shardID int64, rawSearchAttributes map[string][]byte, isDeleted bool, ) (*indexer.PinotMessage, error) { m := make(map[string]interface{}) // loop through all input parameters m[DomainID] = domainID m[WorkflowID] = wid m[RunID] = rid m[WorkflowType] = workflowTypeName m[TaskList] = taskList m[StartTime] = startTimeUnixMilli m[ExecutionTime] = executionTimeUnixMilli m[IsCron] = isCron m[NumClusters] = numClusters m[CloseTime] = closeTimeUnixMilli m[CloseStatus] = int(closeStatus) m[HistoryLength] = historyLength m[UpdateTime] = updateTimeUnixMilli m[ShardID] = shardID m[IsDeleted] = isDeleted m[EventTimeMs] = updateTimeUnixMilli // same as update time when record is upserted, could not use updateTime directly since this will be modified by Pinot SearchAttributes := make(map[string]interface{}) var err error for key, value := range rawSearchAttributes { value, err = isTimeStruct(value) if err != nil { return nil, err } var val interface{} err = json.Unmarshal(value, &val) if err != nil { return nil, err } SearchAttributes[key] = val } m[Attr] = SearchAttributes serializedMsg, err := json.Marshal(m) if err != nil { return nil, err } msg := &indexer.PinotMessage{ WorkflowID: common.StringPtr(wid), Payload: serializedMsg, } return msg, nil } // check if value is time.Time type // if it is, convert it to unixMilli // if it isn't time, return the original value func isTimeStruct(value []byte) ([]byte, error) { var time time.Time err := json.Unmarshal(value, &time) if err == nil { unixTime := time.UnixMilli() value, err = json.Marshal(unixTime) if err != nil { return nil, err } } return value, nil } /****************************** Request Translator ******************************/ type PinotQuery struct { query string filters PinotQueryFilter sorters string limits string } type PinotQueryFilter struct { string } func NewPinotQuery(tableName string) PinotQuery { return PinotQuery{ query: fmt.Sprintf("SELECT *\nFROM %s\n", tableName), filters: PinotQueryFilter{}, sorters: "", limits: "", } } func NewPinotCountQuery(tableName string) PinotQuery { return PinotQuery{ query: fmt.Sprintf("SELECT COUNT(*)\nFROM %s\n", tableName), filters: PinotQueryFilter{}, sorters: "", limits: "", } } func (q *PinotQuery) String() string { return fmt.Sprintf("%s%s%s%s", q.query, q.filters.string, q.sorters, q.limits) } func (q *PinotQuery) concatSorter(sorter string) { q.sorters += sorter + "\n" } func (q *PinotQuery) addPinotSorter(orderBy string, order string) { if q.sorters == "" { q.sorters = "Order BY " } else { q.sorters += ", " } q.sorters += fmt.Sprintf("%s %s\n", orderBy, order) } func (q *PinotQuery) addOffsetAndLimits(offset int, limit int) { q.limits += fmt.Sprintf("LIMIT %d, %d\n", offset, limit) } func (f *PinotQueryFilter) checkFirstFilter() { if f.string == "" { f.string = "WHERE " } else { f.string += "AND " } } func (f *PinotQueryFilter) addEqual(obj string, val interface{}) { f.checkFirstFilter() if _, ok := val.(string); ok { val = fmt.Sprintf("'%s'", val) } else { val = fmt.Sprintf("%v", val) } quotedVal := fmt.Sprintf("%s", val) f.string += fmt.Sprintf("%s = %s\n", obj, quotedVal) } // addQuery adds a complete query into the filter func (f *PinotQueryFilter) addQuery(query string) { f.checkFirstFilter() f.string += fmt.Sprintf("%s\n", query) } // addGte check object is greater than or equals to val func (f *PinotQueryFilter) addGte(obj string, val int) { f.checkFirstFilter() f.string += fmt.Sprintf("%s >= %s\n", obj, fmt.Sprintf("%v", val)) } // addLte check object is less than val func (f *PinotQueryFilter) addLt(obj string, val interface{}) { f.checkFirstFilter() f.string += fmt.Sprintf("%s < %s\n", obj, fmt.Sprintf("%v", val)) } func (f *PinotQueryFilter) addTimeRange(obj string, earliest interface{}, latest interface{}) { f.checkFirstFilter() f.string += fmt.Sprintf("%s BETWEEN %v AND %v\n", obj, earliest, latest) } func (v *pinotVisibilityStore) getCountWorkflowExecutionsQuery(tableName string, request *p.CountWorkflowExecutionsRequest) string { if request == nil { return "" } query := NewPinotCountQuery(tableName) // need to add Domain ID query.filters.addEqual(DomainID, request.DomainUUID) query.filters.addEqual(IsDeleted, false) requestQuery := strings.TrimSpace(request.Query) // if customized query is empty, directly return if requestQuery == "" { return query.String() } requestQuery = filterPrefix(requestQuery) comparExpr, _ := parseOrderBy(requestQuery) comparExpr, err := v.pinotQueryValidator.ValidateQuery(comparExpr) if err != nil { v.logger.Error(fmt.Sprintf("pinot query validator error: %s", err)) } comparExpr = filterPrefix(comparExpr) if comparExpr != "" { query.filters.addQuery(comparExpr) } return query.String() } func (v *pinotVisibilityStore) getListWorkflowExecutionsByQueryQuery(tableName string, request *p.ListWorkflowExecutionsByQueryRequest) (string, error) { if request == nil { return "", nil } token, err := pnt.GetNextPageToken(request.NextPageToken) if err != nil { return "", fmt.Errorf("next page token: %w", err) } query := NewPinotQuery(tableName) // need to add Domain ID query.filters.addEqual(DomainID, request.DomainUUID) query.filters.addEqual(IsDeleted, false) requestQuery := strings.TrimSpace(request.Query) // if customized query is empty, directly return if requestQuery == "" { query.addOffsetAndLimits(token.From, request.PageSize) return query.String(), nil } requestQuery = filterPrefix(requestQuery) if common.IsJustOrderByClause(requestQuery) { query.concatSorter(requestQuery) query.addOffsetAndLimits(token.From, request.PageSize) return query.String(), nil } comparExpr, orderBy := parseOrderBy(requestQuery) comparExpr, err = v.pinotQueryValidator.ValidateQuery(comparExpr) if err != nil { return "", fmt.Errorf("pinot query validator error: %w, query: %s", err, request.Query) } comparExpr = filterPrefix(comparExpr) if comparExpr != "" { query.filters.addQuery(comparExpr) } if orderBy != "" { query.concatSorter(orderBy) } // MUST HAVE! because pagination wouldn't work without order by clause! if query.sorters == "" { query.addPinotSorter(StartTime, "DESC") } query.addOffsetAndLimits(token.From, request.PageSize) return query.String(), nil } func filterPrefix(query string) string { prefix := fmt.Sprintf("`%s.", Attr) postfix := "`" query = strings.ReplaceAll(query, prefix, "") return strings.ReplaceAll(query, postfix, "") } /* Can have cases: 1. A = B 2. A < B 3. A > B 4. A <= B 5. A >= B */ func splitElement(element string) (string, string, string) { if element == "" { return "", "", "" } listLE := strings.Split(element, "<=") listGE := strings.Split(element, ">=") listE := strings.Split(element, "=") listL := strings.Split(element, "<") listG := strings.Split(element, ">") if len(listLE) > 1 { return strings.TrimSpace(listLE[0]), strings.TrimSpace(listLE[1]), "<=" } if len(listGE) > 1 { return strings.TrimSpace(listGE[0]), strings.TrimSpace(listGE[1]), ">=" } if len(listE) > 1 { return strings.TrimSpace(listE[0]), strings.TrimSpace(listE[1]), "=" } if len(listL) > 1 { return strings.TrimSpace(listL[0]), strings.TrimSpace(listL[1]), "<" } if len(listG) > 1 { return strings.TrimSpace(listG[0]), strings.TrimSpace(listG[1]), ">" } return "", "", "" } /* Order by XXX DESC -> if startWith("Order by") -> return "", element CustomizedString = 'cannot be used in order by' -> if last character is ‘ or " -> return element, "" CustomizedInt = 1 (without order by clause) -> if !contains("Order by") -> return element, "" CustomizedString = 'cannot be used in order by' Order by XXX DESC -> Find the index x of last appearance of "order by" -> return element[0, x], element[x, len] CustomizedInt = 1 Order by XXX DESC -> Find the index x of last appearance of "order by" -> return element[0, x], element[x, len] */ func parseOrderBy(element string) (string, string) { // case 1: when order by query also passed in if common.IsJustOrderByClause(element) { return "", element } // case 2: when last element is a string if element[len(element)-1] == '\'' || element[len(element)-1] == '"' { return element, "" } // case 3: when last element doesn't contain "order by" if !strings.Contains(strings.ToLower(element), "order by") { return element, "" } // case 4: general case elementArray := strings.Split(element, " ") orderByIndex := findLastOrderBy(elementArray) // find the last appearance of "order by" is the answer if orderByIndex == 0 { return element, "" } return strings.Join(elementArray[:orderByIndex], " "), strings.Join(elementArray[orderByIndex:], " ") } func findLastOrderBy(list []string) int { for i := len(list) - 2; i >= 0; i-- { if strings.Contains(list[i], "\"") || strings.Contains(list[i], "'") { return 0 // means order by is inside a string } if strings.ToLower(list[i]) == "order" && strings.ToLower(list[i+1]) == "by" { return i } } return 0 } func getListWorkflowExecutionsQuery(tableName string, request *p.InternalListWorkflowExecutionsRequest, isClosed bool) (string, error) { if request == nil { return "", nil } token, err := pnt.GetNextPageToken(request.NextPageToken) if err != nil { return "", fmt.Errorf("next page token: %w", err) } from := token.From pageSize := request.PageSize query := NewPinotQuery(tableName) query.filters.addEqual(DomainID, request.DomainUUID) query.filters.addEqual(IsDeleted, false) earliest := request.EarliestTime.UnixMilli() - oneMicroSecondInNano latest := request.LatestTime.UnixMilli() + oneMicroSecondInNano if isClosed { query.filters.addTimeRange(CloseTime, earliest, latest) // convert Unix Time to miliseconds query.filters.addGte(CloseStatus, 0) } else { query.filters.addTimeRange(StartTime, earliest, latest) // convert Unix Time to miliseconds query.filters.addLt(CloseStatus, 0) query.filters.addEqual(CloseTime, -1) } query.addPinotSorter(StartTime, DescendingOrder) query.addOffsetAndLimits(from, pageSize) return query.String(), nil } func getListWorkflowExecutionsByTypeQuery(tableName string, request *p.InternalListWorkflowExecutionsByTypeRequest, isClosed bool) (string, error) { if request == nil { return "", nil } query := NewPinotQuery(tableName) query.filters.addEqual(DomainID, request.DomainUUID) query.filters.addEqual(IsDeleted, false) query.filters.addEqual(WorkflowType, request.WorkflowTypeName) earliest := request.EarliestTime.UnixMilli() - oneMicroSecondInNano latest := request.LatestTime.UnixMilli() + oneMicroSecondInNano if isClosed { query.filters.addTimeRange(CloseTime, earliest, latest) // convert Unix Time to miliseconds query.filters.addGte(CloseStatus, 0) } else { query.filters.addTimeRange(StartTime, earliest, latest) // convert Unix Time to miliseconds query.filters.addLt(CloseStatus, 0) query.filters.addEqual(CloseTime, -1) } query.addPinotSorter(StartTime, DescendingOrder) token, err := pnt.GetNextPageToken(request.NextPageToken) if err != nil { return "", fmt.Errorf("next page token: %w", err) } from := token.From pageSize := request.PageSize query.addOffsetAndLimits(from, pageSize) return query.String(), nil } func getListWorkflowExecutionsByWorkflowIDQuery(tableName string, request *p.InternalListWorkflowExecutionsByWorkflowIDRequest, isClosed bool) (string, error) { if request == nil { return "", nil } query := NewPinotQuery(tableName) query.filters.addEqual(DomainID, request.DomainUUID) query.filters.addEqual(IsDeleted, false) query.filters.addEqual(WorkflowID, request.WorkflowID) earliest := request.EarliestTime.UnixMilli() - oneMicroSecondInNano latest := request.LatestTime.UnixMilli() + oneMicroSecondInNano if isClosed { query.filters.addTimeRange(CloseTime, earliest, latest) // convert Unix Time to miliseconds query.filters.addGte(CloseStatus, 0) } else { query.filters.addTimeRange(StartTime, earliest, latest) // convert Unix Time to miliseconds query.filters.addLt(CloseStatus, 0) query.filters.addEqual(CloseTime, -1) } query.addPinotSorter(StartTime, DescendingOrder) token, err := pnt.GetNextPageToken(request.NextPageToken) if err != nil { return "", fmt.Errorf("next page token: %w", err) } from := token.From pageSize := request.PageSize query.addOffsetAndLimits(from, pageSize) return query.String(), nil } func getListWorkflowExecutionsByStatusQuery(tableName string, request *p.InternalListClosedWorkflowExecutionsByStatusRequest) (string, error) { if request == nil { return "", nil } query := NewPinotQuery(tableName) query.filters.addEqual(DomainID, request.DomainUUID) query.filters.addEqual(IsDeleted, false) status := "0" switch request.Status.String() { case "COMPLETED": status = "0" case "FAILED": status = "1" case "CANCELED": status = "2" case "TERMINATED": status = "3" case "CONTINUED_AS_NEW": status = "4" case "TIMED_OUT": status = "5" } query.filters.addEqual(CloseStatus, status) query.filters.addTimeRange(CloseTime, request.EarliestTime.UnixMilli(), request.LatestTime.UnixMilli()) // convert Unix Time to miliseconds query.addPinotSorter(StartTime, DescendingOrder) token, err := pnt.GetNextPageToken(request.NextPageToken) if err != nil { return "", fmt.Errorf("next page token: %w", err) } from := token.From pageSize := request.PageSize query.addOffsetAndLimits(from, pageSize) return query.String(), nil } func getGetClosedWorkflowExecutionQuery(tableName string, request *p.InternalGetClosedWorkflowExecutionRequest) string { if request == nil { return "" } query := NewPinotQuery(tableName) query.filters.addEqual(DomainID, request.DomainUUID) query.filters.addEqual(IsDeleted, false) query.filters.addGte(CloseStatus, 0) query.filters.addEqual(WorkflowID, request.Execution.GetWorkflowID()) rid := request.Execution.GetRunID() if rid != "" { query.filters.addEqual(RunID, rid) } return query.String() } func checkPageSize(request *p.ListWorkflowExecutionsByQueryRequest) { if request.PageSize == 0 { request.PageSize = 1000 } }