common/pinot/pinot_client.go (105 lines of code) (raw):

// The MIT License (MIT) // Copyright (c) 2017-2020 Uber Technologies Inc. // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in all // copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE // SOFTWARE. package pinot import ( "encoding/json" "fmt" "github.com/startreedata/pinot-client-go/pinot" "github.com/uber/cadence/common/config" "github.com/uber/cadence/common/log" p "github.com/uber/cadence/common/persistence" "github.com/uber/cadence/common/types" ) type PinotClient struct { client *pinot.Connection logger log.Logger tableName string serviceName string } func NewPinotClient(client *pinot.Connection, logger log.Logger, pinotConfig *config.PinotVisibilityConfig) GenericClient { return &PinotClient{ client: client, logger: logger, tableName: pinotConfig.Table, serviceName: pinotConfig.ServiceName, } } func (c *PinotClient) Search(request *SearchRequest) (*SearchResponse, error) { resp, err := c.client.ExecuteSQL(c.tableName, request.Query) if err != nil { return nil, &types.InternalServiceError{ Message: fmt.Sprintf("Pinot Search failed, %v", err), } } token, err := GetNextPageToken(request.ListRequest.NextPageToken) if err != nil { return nil, &types.InternalServiceError{ Message: fmt.Sprintf("Get NextPage token failed, %v", err), } } return c.getInternalListWorkflowExecutionsResponse(resp, request.Filter, token, request.ListRequest.PageSize, request.MaxResultWindow) } func (c *PinotClient) CountByQuery(query string) (int64, error) { resp, err := c.client.ExecuteSQL(c.tableName, query) if err != nil { return 0, &types.InternalServiceError{ Message: fmt.Sprintf("CountWorkflowExecutions ExecuteSQL failed, %v", err), } } count, err := resp.ResultTable.Rows[0][0].(json.Number).Int64() if err == nil { return count, nil } return -1, &types.InternalServiceError{ Message: fmt.Sprintf("can't convert result to integer!, query = %s, query result = %v, err = %v", query, resp.ResultTable.Rows[0][0], err), } } func (c *PinotClient) GetTableName() string { return c.tableName } // Pinot Response Translator // We flattened the search attributes into columns in Pinot table // This function converts the search result back to VisibilityRecord func (c *PinotClient) getInternalListWorkflowExecutionsResponse( resp *pinot.BrokerResponse, isRecordValid func(rec *p.InternalVisibilityWorkflowExecutionInfo) bool, token *PinotVisibilityPageToken, pageSize int, maxResultWindow int, ) (*p.InternalListWorkflowExecutionsResponse, error) { response := &p.InternalListWorkflowExecutionsResponse{} if resp == nil || resp.ResultTable == nil || resp.ResultTable.GetRowCount() == 0 { return response, nil } schema := resp.ResultTable.DataSchema // get the schema to map results columnNames := schema.ColumnNames actualHits := resp.ResultTable.Rows numOfActualHits := resp.ResultTable.GetRowCount() response.Executions = make([]*p.InternalVisibilityWorkflowExecutionInfo, 0) for i := 0; i < numOfActualHits; i++ { workflowExecutionInfo := ConvertSearchResultToVisibilityRecord(actualHits[i], columnNames, c.logger) if isRecordValid == nil || isRecordValid(workflowExecutionInfo) { response.Executions = append(response.Executions, workflowExecutionInfo) } } if numOfActualHits == pageSize { // this means the response is not the last page var nextPageToken []byte var err error // ES Search API support pagination using From and PageSize, but has limit that From+PageSize cannot exceed a threshold // In pinot we just skip (previous pages * page limit) items and take the next (number of page limit) items nextPageToken, err = SerializePageToken(&PinotVisibilityPageToken{From: token.From + numOfActualHits}) if err != nil { return nil, err } response.NextPageToken = make([]byte, len(nextPageToken)) copy(response.NextPageToken, nextPageToken) } return response, nil } func (c *PinotClient) getInternalGetClosedWorkflowExecutionResponse(resp *pinot.BrokerResponse) ( *p.InternalGetClosedWorkflowExecutionResponse, error, ) { if resp == nil { return nil, nil } response := &p.InternalGetClosedWorkflowExecutionResponse{} schema := resp.ResultTable.DataSchema // get the schema to map results columnNames := schema.ColumnNames actualHits := resp.ResultTable.Rows response.Execution = ConvertSearchResultToVisibilityRecord(actualHits[0], columnNames, c.logger) return response, nil }