client/schema.go (346 lines of code) (raw):

// Copyright (c) 2017-2018 Uber Technologies, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package client import ( "bytes" "encoding/json" "fmt" "io/ioutil" "net/http" "strconv" "strings" "sync" "time" "github.com/uber-go/tally" metaCom "github.com/uber/aresdb/metastore/common" "github.com/uber/aresdb/utils" "go.uber.org/zap" ) // SchemaFetcher is the interface for fetch schema and enums type SchemaFetcher interface { // FetchAllSchemas fetches all schemas FetchAllSchemas() ([]*metaCom.Table, error) // FetchSchema fetch one schema for given table FetchSchema(table string) (*metaCom.Table, error) // FetchAllEnums fetches all enums for given table and column FetchAllEnums(tableName string, columnName string) ([]string, error) // ExtendEnumCases extends enum cases to given table column ExtendEnumCases(tableName, columnName string, enumCases []string) ([]int, error) } // httpSchemaFetcher is a http based schema fetcher type httpSchemaFetcher struct { httpClient http.Client metricScope tally.Scope address string } // CachedSchemaHandler handles schema and enum requests with cache type CachedSchemaHandler struct { *sync.RWMutex logger *zap.SugaredLogger metricScope tally.Scope schemaFetcher SchemaFetcher // mapping from table name to table schema schemas map[string]*TableSchema // map from table to columnID to enum dictionary // use columnID instead of name since column name can be reused // table names can be reused as well, deleting and adding a new table // will anyway requires job restart enumMappings map[string]map[int]enumDict // map from table to columnID to default enum id. Initialized during bootstrap // and will be set only if default value is non nil. enumDefaultValueMappings map[string]map[int]int } // NewCachedSchemaHandler creates a new cached schema handler func NewCachedSchemaHandler(logger *zap.SugaredLogger, scope tally.Scope, schamaFetcher SchemaFetcher) *CachedSchemaHandler { return &CachedSchemaHandler{ RWMutex: &sync.RWMutex{}, logger: logger, metricScope: scope, schemaFetcher: schamaFetcher, schemas: make(map[string]*TableSchema), enumMappings: make(map[string]map[int]enumDict), enumDefaultValueMappings: make(map[string]map[int]int), } } // NewHttpSchemaFetcher creates a new http schema fetcher func NewHttpSchemaFetcher(httpClient http.Client, address string, scope tally.Scope) SchemaFetcher { return &httpSchemaFetcher{ metricScope: scope, address: address, httpClient: httpClient, } } // Start starts the CachedSchemaHandler, if interval > 0, will start periodical refresh func (cf *CachedSchemaHandler) Start(interval int) { cf.FetchSchemas() if interval <= 0 { return } go func(refreshInterval int) { ticks := time.Tick(time.Duration(refreshInterval) * time.Second) for range ticks { cf.FetchSchemas() } }(interval) return } // TranslateEnum translates given enum value to its enumID func (cf *CachedSchemaHandler) TranslateEnum(tableName string, columnID int, value interface{}, caseInsensitive bool) (enumID int, err error) { if value == nil { return -1, nil } enumCase, ok := value.(string) if !ok { return 0, utils.StackError(nil, "Enum value should be string, but got: %T", value) } if caseInsensitive { enumCase = strings.ToLower(enumCase) } cf.RLock() // here it already make sure the enum dictionary exists in cache enumID, ok = cf.enumMappings[tableName][columnID][enumCase] cf.RUnlock() if !ok { cf.metricScope.Tagged( map[string]string{ "TableName": tableName, "ColumnID": strconv.Itoa(columnID), }, ).Counter("new_enum_case_rows_ignored").Inc(int64(1)) if defaultValue, ok := cf.enumDefaultValueMappings[tableName][columnID]; ok { return defaultValue, nil } return -1, nil } return enumID, nil } // FetchAllSchema fetch all schemas func (cf *CachedSchemaHandler) FetchAllSchema() error { tables, err := cf.schemaFetcher.FetchAllSchemas() if err != nil { return err } for _, table := range tables { cf.setTable(table) err := cf.fetchAndSetEnumCases(table) if err != nil { return err } } return nil } // FetchSchemas fetch schemas in schemas of CachedSchemaHandler func (cf *CachedSchemaHandler) FetchSchemas() { cf.RLock() tables := make([]string, 0, len(cf.schemas)) for tableName := range cf.schemas { tables = append(tables, tableName) } cf.RUnlock() for _, tableName := range tables { _, err := cf.FetchSchema(tableName) if err != nil { cf.logger.With("error", err.Error(), "table", tableName).Errorf("Failed to fetch table schema") } } return } // FetchSchema fetchs the schema of given table name func (cf *CachedSchemaHandler) FetchSchema(tableName string) (*TableSchema, error) { cf.RLock() schema, exist := cf.schemas[tableName] cf.RUnlock() if exist { return schema, nil } table, err := cf.schemaFetcher.FetchSchema(tableName) if err != nil { return nil, err } schema = cf.setTable(table) err = cf.fetchAndSetEnumCases(table) return schema, err } // PrepareEnumCases prepares enum cases func (cf *CachedSchemaHandler) PrepareEnumCases(tableName, columnName string, enumCases []string) error { newEnumCases := make([]string, 0, len(enumCases)) cf.RLock() schema, exist := cf.schemas[tableName] if !exist { cf.RUnlock() return nil } columnID, exist := schema.ColumnDict[columnName] if !exist { cf.RUnlock() return nil } caseInsensitive := schema.Table.Columns[columnID].CaseInsensitive disableAutoExpand := schema.Table.Columns[columnID].DisableAutoExpand for _, enumCase := range enumCases { if _, exist := cf.enumMappings[tableName][columnID][enumCase]; !exist { newEnumCases = append(newEnumCases, enumCase) } } cf.RUnlock() numNewEnumCases := len(newEnumCases) if disableAutoExpand && numNewEnumCases > 0 { // It's recommended to set up elk or sentry logging to catch this error. cf.logger.With( "TableName", tableName, "ColumnName", columnName, "ColumnID", columnID, "newEnumCasesSet", newEnumCases, "caseInsensitive", caseInsensitive, ).Warn("Finding new enum cases during ingestion but enum auto expansion is disabled") cf.metricScope.Tagged( map[string]string{ "TableName": tableName, "ColumnID": strconv.Itoa(columnID), }, ).Counter("new_enum_cases_ignored").Inc(int64(numNewEnumCases)) return nil } enumIDs, err := cf.schemaFetcher.ExtendEnumCases(tableName, columnName, newEnumCases) if err != nil { return err } cf.Lock() for index, enumCase := range newEnumCases { if caseInsensitive { enumCase = strings.ToLower(enumCase) } cf.enumMappings[tableName][columnID][enumCase] = enumIDs[index] } cf.Unlock() return nil } func (cf *CachedSchemaHandler) fetchAndSetEnumCases(table *metaCom.Table) error { enumMappings := make(map[int]enumDict) enumDefaultValueMappings := make(map[int]int) for columnID, column := range table.Columns { if column.Deleted { continue } enumMappings[columnID] = make(enumDict) caseInsensitive := column.CaseInsensitive var defValuePtr *string if column.DefaultValue != nil { var defValue = *column.DefaultValue if caseInsensitive { defValue = strings.ToLower(defValue) } defValuePtr = &defValue } if column.IsEnumBasedColumn() { enumCases, err := cf.schemaFetcher.FetchAllEnums(table.Name, column.Name) if err == nil { for enumID, enumCase := range enumCases { // Convert to lower case for comparison during ingestion. if caseInsensitive { enumCase = strings.ToLower(enumCase) } // all mapping should be pre created enumMappings[columnID][enumCase] = enumID if defValuePtr != nil { if *defValuePtr == enumCase { enumDefaultValueMappings[columnID] = enumID } } } } else { cf.metricScope.Tagged(map[string]string{ "table": table.Name, "columnID": strconv.Itoa(columnID), }).Counter("err_fetch_enum_dict").Inc(1) return utils.StackError(err, "Failed to fetch enum cases for table: %s, column: %d", table.Name, columnID) } } } cf.Lock() cf.enumMappings[table.Name] = enumMappings cf.enumDefaultValueMappings[table.Name] = enumDefaultValueMappings cf.Unlock() return nil } func (cf *CachedSchemaHandler) setTable(table *metaCom.Table) *TableSchema { columnDict := make(map[string]int) for columnID, column := range table.Columns { if !column.Deleted { columnDict[column.Name] = columnID } } schema := &TableSchema{ Table: table, ColumnDict: columnDict, } cf.Lock() cf.schemas[table.Name] = schema if _, tableExist := cf.enumMappings[table.Name]; !tableExist { cf.enumMappings[table.Name] = make(map[int]enumDict) cf.enumDefaultValueMappings[table.Name] = make(map[int]int) } for columnID, column := range table.Columns { if !column.Deleted && column.IsEnumBasedColumn() { if _, columnExist := cf.enumMappings[table.Name][columnID]; !columnExist { cf.enumMappings[table.Name][columnID] = make(enumDict) } } } cf.Unlock() return schema } func (hf *httpSchemaFetcher) FetchAllEnums(tableName, columnName string) ([]string, error) { var enumDictReponse []string resp, err := hf.httpClient.Get(hf.enumDictPath(tableName, columnName)) err = hf.readJSONResponse(resp, err, &enumDictReponse) return enumDictReponse, err } func (hf *httpSchemaFetcher) ExtendEnumCases(tableName, columnName string, enumCases []string) ([]int, error) { if len(enumCases) == 0 { return []int{}, nil } enumCasesRequest := enumCasesWrapper{ EnumCases: enumCases, } enumCasesBytes, err := json.Marshal(enumCasesRequest) if err != nil { return nil, utils.StackError(err, "Failed to marshal enum cases") } var enumIDs []int resp, err := hf.httpClient.Post(hf.enumDictPath(tableName, columnName), applicationJSONHeader, bytes.NewReader(enumCasesBytes)) err = hf.readJSONResponse(resp, err, &enumIDs) if err != nil { return nil, err } return enumIDs, nil } func (hf *httpSchemaFetcher) FetchSchema(tableName string) (*metaCom.Table, error) { var table metaCom.Table resp, err := hf.httpClient.Get(hf.tablePath(tableName)) err = hf.readJSONResponse(resp, err, &table) if err != nil { return nil, err } return &table, nil } func (hf *httpSchemaFetcher) FetchAllSchemas() ([]*metaCom.Table, error) { var tables []string resp, err := hf.httpClient.Get(hf.listTablesPath()) err = hf.readJSONResponse(resp, err, &tables) if err != nil { return nil, utils.StackError(err, "Failed to fetch table list") } var res []*metaCom.Table for _, tableName := range tables { table, err := hf.FetchSchema(tableName) if err != nil { hf.metricScope.Tagged(map[string]string{ "table": tableName, }).Counter("err_fetch_table").Inc(1) return nil, utils.StackError(err, "Failed to fetch schema error") } res = append(res, table) } return res, nil } func (hf *httpSchemaFetcher) readJSONResponse(response *http.Response, err error, data interface{}) error { if err != nil { return utils.StackError(err, "Failed call remote endpoint") } respBytes, err := ioutil.ReadAll(response.Body) if err != nil { return utils.StackError(err, "Failed to read response body") } if response.StatusCode != http.StatusOK { return utils.StackError(nil, "Received error response %d:%s from remote endpoint", response.StatusCode, respBytes) } err = json.Unmarshal(respBytes, data) fmt.Printf("resp: %s, data: %+v\n", string(respBytes), data) if err != nil { return utils.StackError(err, "Failed to unmarshal json") } return nil } func (hf *httpSchemaFetcher) tablePath(tableName string) string { return fmt.Sprintf("%s/%s", hf.listTablesPath(), tableName) } func (hf *httpSchemaFetcher) listTablesPath() string { return fmt.Sprintf("http://%s/schema/tables", hf.address) } func (hf *httpSchemaFetcher) enumDictPath(tableName, columnName string) string { return fmt.Sprintf("%s/%s/columns/%s/enum-cases", hf.listTablesPath(), tableName, columnName) }