pkg/common/utils/mysql/doris.go (100 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package mysql import ( _ "github.com/go-sql-driver/mysql" "k8s.io/klog/v2" "sort" "strconv" "strings" ) const ( FE_FOLLOWER_ROLE = "FOLLOWER" FE_OBSERVE_ROLE = "OBSERVER" ) type Frontend struct { Name string `json:"name" db:"Name"` Host string `json:"host" db:"Host"` EditLogPort int `json:"edit_log_port" db:"EditLogPort"` HttpPort int `json:"http_port" db:"HttpPort"` QueryPort int `json:"query_port" db:"QueryPort"` RpcPort int `json:"rpc_port" db:"RpcPort"` ArrowFlightSqlPort int `json:"arrow_flight_sql_port" db:"ArrowFlightSqlPort"` Role string `json:"role" db:"Role"` IsMaster bool `json:"is_master" db:"IsMaster"` ClusterId string `json:"cluster_id" db:"ClusterId"` Join bool `json:"join" db:"Join"` Alive bool `json:"alive" db:"Alive"` ReplayedJournalId string `json:"replayed_journal_id" db:"ReplayedJournalId"` LastStartTime *string `json:"last_start_time" db:"LastStartTime"` LastHeartbeat *string `json:"last_heartbeat" db:"LastHeartbeat"` IsHelper bool `json:"is_helper" db:"IsHelper"` ErrMsg string `json:"err_msg" db:"ErrMsg"` Version *string `json:"version" db:"Version"` CurrentConnected string `json:"current_connected" db:"CurrentConnected"` } type Backend struct { BackendID string `json:"backend_id" db:"BackendId"` Host string `json:"host" db:"Host"` HeartbeatPort int `json:"heartbeat_port" db:"HeartbeatPort"` BePort int `json:"be_port" db:"BePort"` HttpPort int `json:"http_port" db:"HttpPort"` BrpcPort int `json:"brpc_port" db:"BrpcPort"` ArrowFlightSqlPort int `json:"arrow_flight_sql_port" db:"ArrowFlightSqlPort"` LastStartTime *string `json:"last_start_time" db:"LastStartTime"` LastHeartbeat *string `json:"last_heartbeat" db:"LastHeartbeat"` Alive bool `json:"alive" db:"Alive"` SystemDecommissioned bool `json:"system_decommissioned" db:"SystemDecommissioned"` TabletNum int64 `json:"tablet_num" db:"TabletNum"` DataUsedCapacity string `json:"data_used_capacity" db:"DataUsedCapacity"` TrashUsedCapacity string `json:"trash_used_capacity" db:"TrashUsedCapacity"` TrashUsedCapcacity string `json:"trash_used_capcacity" db:"TrashUsedCapcacity"` AvailCapacity string `json:"avail_capacity" db:"AvailCapacity"` TotalCapacity string `json:"total_capacity" db:"TotalCapacity"` UsedPct string `json:"used_pct" db:"UsedPct"` MaxDiskUsedPct string `json:"max_disk_used_pct" db:"MaxDiskUsedPct"` RemoteUsedCapacity string `json:"remote_used_capacity" db:"RemoteUsedCapacity"` Tag string `json:"tag" db:"Tag"` ErrMsg string `json:"err_msg" db:"ErrMsg"` Version *string `json:"version" db:"Version"` Status string `json:"status" db:"Status"` HeartbeatFailureCounter int `json:"heartbeat_failure_counter" db:"HeartbeatFailureCounter"` NodeRole string `json:"node_role" db:"NodeRole"` CpuCores string `json:"cpu_cores" db:"CpuCores"` Memory string `json:"memory" db:"Memory"` } // BuildSeqNumberToFrontendMap // input ipMap key is podIP,value is fe.podName(from 'kubectl get pods -owide') // return frontendMap key is fe pod index ,value is frontend func BuildSeqNumberToFrontendMap(frontends []*Frontend, ipMap map[string]string, podTemplateName string) (map[int]*Frontend, error) { frontendMap := make(map[int]*Frontend) for _, fe := range frontends { var podSignName string if strings.HasPrefix(fe.Host, podTemplateName) { // use fqdn, not need ipMap // podSignName like: doriscluster-sample-fe-0.doriscluster-sample-fe-internal.doris.svc.cluster.local podSignName = fe.Host } else { // use ip // podSignName like: doriscluster-sample-fe-0 podSignName = ipMap[fe.Host] } split := strings.Split(strings.Split(strings.Split(podSignName, podTemplateName)[1], ".")[0], "-") num, err := strconv.Atoi(split[len(split)-1]) if err != nil { klog.Errorf("buildSeqNumberToFrontend can not split pod name,pod name: %s,err:%s", podSignName, err.Error()) return nil, err } frontendMap[num] = fe } return frontendMap, nil } // FindNeedDeletedFrontends means descending sort fe by index and return top needRemovedAmount func FindNeedDeletedObservers(frontendMap map[int]*Frontend, needRemovedAmount int32) []*Frontend { var topFrontends []*Frontend if int(needRemovedAmount) <= len(frontendMap) { keys := make([]int, 0, len(frontendMap)) for k := range frontendMap { keys = append(keys, k) } sort.Slice(keys, func(i, j int) bool { return keys[i] > keys[j] }) for i := 0; i < int(needRemovedAmount); i++ { topFrontends = append(topFrontends, frontendMap[keys[i]]) } } else { klog.Errorf("findNeedDeletedFrontends frontendMap size(%d) not larger than needRemovedAmount(%d)", len(frontendMap), needRemovedAmount) } return topFrontends }