vermeer/apps/common/hugegraph_tools.go (295 lines of code) (raw):

/* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package common import ( "context" "encoding/json" "errors" "fmt" "io" "net/http" "strconv" "strings" "sync" "time" "github.com/sirupsen/logrus" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/metadata" pd "vermeer/apps/protos/hugegraph-pd-grpc" hstore "vermeer/apps/protos/hugegraph-store-grpc" ) const ( HgVertexIDTypeString HgVertexIDType = iota HgVertexIDTypeNumber ) const ( HgValueTypeInt = "INT" HgValueTypeFloat = "FLOAT" HgValueTypeString = "TEXT" ) type HgVertexIDType uint16 func FindValidPD(pdIPAddress []string) (string, error) { // retry 3 times to find valid pd address var pdIPAddr string var err error for i := 0; i < 3; i++ { pdIPAddr, err = findValidPD(pdIPAddress) if err == nil && pdIPAddr != "" { return pdIPAddr, nil } logrus.Errorf("find pd address error:%v", err) time.Sleep(500 * time.Millisecond) } return pdIPAddr, err } func findValidPD(pdIPAddress []string) (string, error) { // 找出有效的pd address wg := &sync.WaitGroup{} var pdIPAddr string tempCtx, tempCancel := context.WithTimeout(context.Background(), 5*time.Second) pdAuthority := PDAuthority{} tempCtx = pdAuthority.SetAuthority(tempCtx) for _, addr := range pdIPAddress { wg.Add(1) go func(ctx context.Context, cancel context.CancelFunc, addr string) { defer wg.Done() var md metadata.MD pdConn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { logrus.Errorf("grpc dial error:%v", err) return } pdClient := pd.NewPDClient(pdConn) _, err = pdClient.GetMembers(ctx, &pd.GetMembersRequest{}, grpc.Header(&md)) pdAuthority.SetToken(md) _ = pdConn.Close() if err == nil { pdIPAddr = addr cancel() } }(tempCtx, tempCancel, addr) } wg.Wait() tempCancel() if len(pdIPAddr) == 0 { logrus.Errorf("hugegraph.pd_address unable to connect :%v", pdIPAddress) return "", errors.New("hugegraph.pd_address unable to connect") } return pdIPAddr, nil } func FindServerAddr(pdIPAddress string, hgName string, username string, password string) (string, error) { // retry 3 times to find valid server address var serverAddr string var err error for i := 0; i < 3; i++ { serverAddr, err = findServerAddr(pdIPAddress, hgName, username, password) if err == nil && serverAddr != "" { return serverAddr, nil } logrus.Errorf("find server address error:%v", err) time.Sleep(500 * time.Millisecond) } return serverAddr, err } func findServerAddr(pdIPAddress string, hgName string, username string, password string) (string, error) { // 找出有效的server address ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() pdConn, err := grpc.Dial(pdIPAddress, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { logrus.Errorf("dial pd error:%v", err) return "", err } defer pdConn.Close() pdAuthority := PDAuthority{} ctx = pdAuthority.SetAuthority(ctx) pdClient := pd.NewDiscoveryServiceClient(pdConn) resp, err := pdClient.GetNodes(ctx, &pd.Query{}) if err != nil { return "", err } sameSpaceServer := make([]string, 0) defaultSpaceServer := make([]string, 0) otherServer := make([]string, 0) hgSpace, hGraph, err := SplitHgName(hgName) if err != nil { return "", err } for _, info := range resp.GetInfo() { if info.GetLabels()["GRAPHSPACE"] == hgSpace { sameSpaceServer = append(sameSpaceServer, info.Address) } else if info.GetLabels()["GRAPHSPACE"] == "DEFAULT" { defaultSpaceServer = append(defaultSpaceServer, info.Address) } else { otherServer = append(otherServer, info.Address) } } // 先校验图空间名字对应的server地址 validServerAddr := testServerIsValid(sameSpaceServer, hgSpace, hGraph, username, password) // 其次再校验默认图空间 if len(validServerAddr) == 0 { validServerAddr = testServerIsValid(defaultSpaceServer, hgSpace, hGraph, username, password) } // 最后校验其他server是否可用 if len(validServerAddr) == 0 { validServerAddr = testServerIsValid(otherServer, hgSpace, hGraph, username, password) } if len(validServerAddr) == 0 { logrus.Errorf("hugegraph server address unable to connect :%v,%v,%v", sameSpaceServer, defaultSpaceServer, otherServer) return "", errors.New("hugegraph server address unable to connect") } return validServerAddr, nil } func testServerIsValid(serverAdds []string, hgSpace, hGraph, username, password string) string { tempCtx, tempCancel := context.WithTimeout(context.Background(), 5*time.Second) defer tempCancel() var validServerAddr string wg := &sync.WaitGroup{} for _, addr := range serverAdds { wg.Add(1) go func(addr string, ctx context.Context, cancel context.CancelFunc) { defer wg.Done() url := fmt.Sprintf("%v/graphspaces/%v/graphs/%v/schema?format=json", addr, hgSpace, hGraph) req, err := http.NewRequest(http.MethodGet, url, nil) req.SetBasicAuth(username, password) if err != nil { return } req.WithContext(ctx) client := &http.Client{Timeout: 5 * time.Second} resp, err := client.Do(req) if err != nil { return } defer resp.Body.Close() if resp.StatusCode == http.StatusOK { validServerAddr = addr cancel() return } }(addr, tempCtx, tempCancel) } wg.Wait() return validServerAddr } func GetHugegraphSchema(serverAddr, hgName, username, password string) (map[string]any, error) { hgSpace, hGraph, err := SplitHgName(hgName) if err != nil { logrus.Errorf("split hg name failed, %v", err.Error()) return nil, fmt.Errorf("split hg name failed, %v", err.Error()) } url := fmt.Sprintf("%v/graphspaces/%v/graphs/%v/schema?format=json", serverAddr, hgSpace, hGraph) // retry 3 times var propertyFromHg map[string]any for i := 0; i < 3; i++ { propertyFromHg, err = getHugegraphSchema(url, username, password) if err == nil { return propertyFromHg, nil } logrus.Errorf("get schema failed, %v", err.Error()) time.Sleep(500 * time.Millisecond) } return nil, err } func getHugegraphSchema(url, username, password string) (map[string]any, error) { req, err := http.NewRequest(http.MethodGet, url, nil) req.SetBasicAuth(username, password) if err != nil { logrus.Errorf("create http request failed, %v", err.Error()) return nil, fmt.Errorf("create http request failed, %v", err.Error()) } ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() req.WithContext(ctx) client := &http.Client{Timeout: 5 * time.Second} resp, err := client.Do(req) if err != nil { logrus.Errorf("http request failed, %v", err.Error()) return nil, fmt.Errorf("http request failed, %v", err.Error()) } defer resp.Body.Close() body, err := io.ReadAll(resp.Body) if err != nil { return nil, fmt.Errorf("read response body failed, %v", err.Error()) } if resp.StatusCode != http.StatusOK { logrus.Errorf("response status not 200 OK, Get:%v", resp.Status) return nil, fmt.Errorf("get schema failed, status code:%v, body:%v", resp.StatusCode, string(body)) } propertyFromHg := make(map[string]any) err = json.Unmarshal(body, &propertyFromHg) if err != nil { logrus.Errorf("unmarshal response body failed, %v", err.Error()) return nil, fmt.Errorf("unmarshal response body failed, %v", err.Error()) } return propertyFromHg, nil } func SplitHgName(hgName string) (string, string, error) { hgSplit := strings.Split(hgName, "/") if len(hgSplit) != 3 { return "", "", fmt.Errorf("hugegraph name not correct:%v", hgName) } return hgSplit[0], hgSplit[1], nil } func VariantToInt(variant *hstore.Variant) (int32, error) { switch *variant.Type { case hstore.VariantType_VT_INT: return variant.GetValueInt32(), nil case hstore.VariantType_VT_LONG: return int32(variant.GetValueInt64()), nil } return 0, fmt.Errorf("hstore variant type wrong :%v", *variant.Type) } func VariantToFloat(variant *hstore.Variant) (float32, error) { switch *variant.Type { case hstore.VariantType_VT_FLOAT: return variant.GetValueFloat(), nil case hstore.VariantType_VT_DOUBLE: return float32(variant.GetValueDouble()), nil } return 0, fmt.Errorf("hstore variant type wrong :%v", *variant.Type) } func VariantToString(variant *hstore.Variant) (string, error) { switch *variant.Type { case hstore.VariantType_VT_BOOLEAN: if variant.GetValueBoolean() { return "true", nil } return "false", nil case hstore.VariantType_VT_INT: return strconv.FormatInt(int64(variant.GetValueInt32()), 10), nil case hstore.VariantType_VT_LONG: return strconv.FormatInt(variant.GetValueInt64(), 10), nil case hstore.VariantType_VT_FLOAT: return strconv.FormatFloat(float64(variant.GetValueFloat()), 'E', -1, 32), nil case hstore.VariantType_VT_DOUBLE: return strconv.FormatFloat(variant.GetValueDouble(), 'E', -1, 64), nil case hstore.VariantType_VT_STRING: return variant.GetValueString(), nil case hstore.VariantType_VT_BYTES: return string(variant.GetValueBytes()), nil case hstore.VariantType_VT_DATETIME: return variant.GetValueDatetime(), nil } return "", fmt.Errorf("hstore variant type wrong :%v", *variant.Type) } const ( CredentialKey = "credential" CredentialValue = "dmVybWVlcjokMmEkMDQkTjg5cUhlMHY1anFOSktoUVpIblRkT0ZTR21pTm9pQTJCMmZkV3BWMkJ3cnRKSzcyZFhZRC4=" TokenKey = "Pd-Token" ) type PDAuthority struct { token string locker sync.RWMutex } func (pa *PDAuthority) SetAuthority(ctx context.Context) context.Context { pa.locker.RLock() defer pa.locker.RUnlock() md := metadata.New(map[string]string{ CredentialKey: CredentialValue, }) if pa.token != "" { md.Set(TokenKey, pa.token) } // logrus.Infof("send md:%v", md) ctx = metadata.NewOutgoingContext(ctx, md) return ctx } func (pa *PDAuthority) SetToken(md metadata.MD) { pa.locker.Lock() defer pa.locker.Unlock() // logrus.Infof("recv md:%v", md) token := md.Get(TokenKey) if len(token) == 1 { pa.token = token[0] } }