vermeer/apps/graphio/hdfs_file.go (319 lines of code) (raw):

/* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package graphio import ( "bufio" "encoding/json" "fmt" "os/user" "path/filepath" "strings" "vermeer/apps/common" "vermeer/apps/options" "vermeer/apps/structure" "github.com/colinmarc/hdfs/v2" "github.com/colinmarc/hdfs/v2/hadoopconf" krb "github.com/jcmturner/gokrb5/v8/client" krbCfg "github.com/jcmturner/gokrb5/v8/config" krbKeytab "github.com/jcmturner/gokrb5/v8/keytab" "github.com/sirupsen/logrus" ) const useKrbYes = 1 func init() { LoadMakers[LoadTypeHdfs] = &HdfsMaker{} } type HdfsMaker struct{} func (a *HdfsMaker) CreateGraphLoader() GraphLoader { return &HdfsLoader{} } func (a *HdfsMaker) CreateGraphWriter() GraphWriter { return &HdfsWriter{} } func (a *HdfsMaker) MakeTasks(params map[string]string, taskID int32) ([]LoadPartition, error) { partID := int32(1) loadParts := make([]LoadPartition, 0) namenode := options.GetString(params, "load.hdfs_namenode") hdfsConfPath := options.GetString(params, "load.hdfs_conf_path") krbRealm := options.GetString(params, "load.krb_realm") krbName := options.GetString(params, "load.krb_name") keytabPath := options.GetString(params, "load.krb_keytab_path") krbConfPath := options.GetString(params, "load.krb_conf_path") useKrb := options.GetInt(params, "load.hdfs_use_krb") client, err := GetHdfsClient(namenode, hdfsConfPath, krbRealm, keytabPath, krbConfPath, krbName, useKrb) if err != nil { return nil, err } files := options.GetString(params, "load.vertex_files") dir := filepath.Dir(files) readDir, err := client.ReadDir(dir) if err != nil { return nil, err } for _, info := range readDir { newPath := filepath.Join(dir, info.Name()) match, err := filepath.Match(files, newPath) if err != nil { return nil, err } if match { part := LoadPartition{} part.Init(partID, taskID, LoadPartTypeVertex) part.Params = make(map[string]string) part.Params["load.file_path"] = newPath loadParts = append(loadParts, part) partID += 1 } } files = options.GetString(params, "load.edge_files") dir = filepath.Dir(files) readDir, err = client.ReadDir(dir) if err != nil { return nil, err } for _, info := range readDir { newPath := filepath.Join(dir, info.Name()) match, err := filepath.Match(files, newPath) if err != nil { return nil, err } if match { part := LoadPartition{} part.Init(partID, taskID, LoadPartTypeEdge) part.Params = make(map[string]string) part.Params["load.file_path"] = newPath loadParts = append(loadParts, part) partID += 1 } } for i := range loadParts { loadParts[i].Params["load.hdfs_namenode"] = params["load.hdfs_namenode"] loadParts[i].Params["load.hdfs_conf_path"] = params["load.hdfs_conf_path"] loadParts[i].Params["load.krb_realm"] = params["load.krb_realm"] loadParts[i].Params["load.krb_name"] = params["load.krb_name"] loadParts[i].Params["load.krb_keytab_path"] = params["load.krb_keytab_path"] loadParts[i].Params["load.krb_conf_path"] = params["load.krb_conf_path"] loadParts[i].Params["load.hdfs_use_krb"] = params["load.hdfs_use_krb"] } return loadParts, nil } type HdfsLoader struct { reader *bufio.Reader file *hdfs.FileReader client *hdfs.Client schema structure.PropertySchema filename string delimiter string count int useProperty bool } func (hl *HdfsLoader) Init(params map[string]string, schema structure.PropertySchema) error { hl.useProperty = false if options.GetInt(params, "load.use_property") == 1 { hl.useProperty = true hl.schema = schema } namenode := options.GetString(params, "load.hdfs_namenode") hdfsConfPath := options.GetString(params, "load.hdfs_conf_path") krbRealm := options.GetString(params, "load.krb_realm") krbName := options.GetString(params, "load.krb_name") keytabPath := options.GetString(params, "load.krb_keytab_path") krbConfPath := options.GetString(params, "load.krb_conf_path") useKrb := options.GetInt(params, "load.hdfs_use_krb") client, err := GetHdfsClient(namenode, hdfsConfPath, krbRealm, keytabPath, krbConfPath, krbName, useKrb) if err != nil { return err } hl.client = client hl.filename = options.GetString(params, "load.file_path") hl.file, err = hl.client.Open(hl.filename) if err != nil { return err } hl.reader = bufio.NewReader(hl.file) hl.delimiter = options.GetString(params, "load.delimiter") return nil } func (hl *HdfsLoader) ReadVertex(vertex *structure.Vertex, property *structure.PropertyValue) error { line, err := hl.reader.ReadString('\n') if err != nil { return err } line = strings.TrimSpace(line) ss := strings.Split(line, hl.delimiter) vertex.ID = ss[0] if hl.useProperty && len(ss) > 1 { propStr := strings.TrimSpace(line[len(ss[0]):]) property.LoadFromString(propStr, hl.schema) } hl.count += 1 return nil } func (hl *HdfsLoader) ReadEdge(edge *structure.Edge, property *structure.PropertyValue) error { line, err := hl.reader.ReadString('\n') if err != nil { return err } line = strings.TrimSpace(line) ss := strings.Split(line, hl.delimiter) if len(ss) < 2 { return fmt.Errorf("read edge format error") } edge.Source = ss[0] edge.Target = ss[1] if hl.useProperty && len(ss) > 2 { var ps string for i := 2; i < len(ss); i++ { ps += ss[i] } propStr := strings.TrimSpace(ps) property.LoadFromString(propStr, hl.schema) } hl.count += 1 return nil } func (hl *HdfsLoader) Name() string { return hl.filename } func (hl *HdfsLoader) ReadCount() int { return hl.count } func (hl *HdfsLoader) Close() { err := hl.file.Close() if err != nil { logrus.Errorf("hdfs loader file close error: %s", err) } err = hl.client.Close() if err != nil { logrus.Errorf("hdfs loader client close error: %s", err) } } type HdfsWriter struct { file *hdfs.FileWriter writer *bufio.Writer client *hdfs.Client count int delimiter string filePath string } func (hw *HdfsWriter) Init(info WriterInitInfo) error { hw.delimiter = options.GetString(info.Params, "output.delimiter") switch info.Mode { case WriteModeVertexValue: hw.filePath = options.GetString(info.Params, "output.file_path") zeroPad := 1 for info.MaxID /= 10; info.MaxID > 0; info.MaxID /= 10 { zeroPad++ } hw.filePath += "_" + common.ItoaPad(info.PartID, zeroPad) case WriteModeStatistics: hw.filePath = options.GetString(info.Params, "output.statistics_file_path") } namenode := options.GetString(info.Params, "output.hdfs_namenode") hdfsConfPath := options.GetString(info.Params, "output.hdfs_conf_path") krbRealm := options.GetString(info.Params, "output.krb_realm") krbName := options.GetString(info.Params, "output.krb_name") keytabPath := options.GetString(info.Params, "output.krb_keytab_path") krbConfPath := options.GetString(info.Params, "output.krb_conf_path") useKrb := options.GetInt(info.Params, "output.hdfs_use_krb") client, err := GetHdfsClient(namenode, hdfsConfPath, krbRealm, keytabPath, krbConfPath, krbName, useKrb) if err != nil { return err } hw.client = client //find dir path dirPath := filepath.Dir(hw.filePath) err = hw.client.MkdirAll(dirPath, 0755) if err != nil { logrus.Errorf("hdfs writer mkdir error: %s", err) return err } //清空已有同名文件实现覆盖写 _ = hw.client.Remove(hw.filePath) hw.file, err = hw.client.CreateFile(hw.filePath, 3, 128*1024*1024, 0666) if err != nil { logrus.Errorf("hdfs writer open file error: %s", err) return err } hw.writer = bufio.NewWriterSize(hw.file, 1*1024*1024) return nil } func (hw *HdfsWriter) WriteVertex(vertexValue WriteVertexValue) { builder := strings.Builder{} valueString := vertexValue.Value.ToString() builder.Grow(len(vertexValue.VertexID) + len(valueString) + 2) builder.WriteString(vertexValue.VertexID) builder.WriteString(hw.delimiter) builder.WriteString(valueString) builder.WriteString("\n") _, _ = hw.writer.WriteString(builder.String()) } func (hw *HdfsWriter) WriteCount() int { return hw.count } func (hw *HdfsWriter) WriteStatistics(statistics map[string]any) error { bytes, err := json.Marshal(statistics) if err != nil { return err } _, err = hw.writer.Write(bytes) if err != nil { return err } return nil } func (hw *HdfsWriter) Close() { err := hw.writer.Flush() if err != nil { logrus.Errorf("hdfs writer flush error: %s", err) } err = hw.file.Close() if err != nil { logrus.Errorf("hdfs writer file close error: %s", err) } err = hw.client.Close() if err != nil { logrus.Errorf("hdfs writer client close error: %s", err) } } func GetHdfsClient(namenode, hdfsConfPath, realm, keytabPath, krbConfPath, krbName string, useKrb int) (*hdfs.Client, error) { conf, err := hadoopconf.LoadFromEnvironment() if err != nil { return nil, err } if hdfsConfPath != "" { conf, err = hadoopconf.Load(hdfsConfPath) if err != nil { return nil, err } } option := hdfs.ClientOptionsFromConf(conf) if namenode != "" { option.Addresses = strings.Split(namenode, ",") } u, err := user.Current() if err != nil { return nil, err } option.User = u.Username if useKrb == useKrbYes { c, err := getKrb(krbName, realm, keytabPath, krbConfPath) if err != nil { return nil, err } option.KerberosClient = c option.KerberosServicePrincipleName = krbName } client, err := hdfs.NewClient(option) if err != nil { return nil, err } return client, nil } func getKrb(username, realm, ktPath, krbConfPath string) (*krb.Client, error) { krb5conf, err := krbCfg.Load(krbConfPath) if err != nil { return nil, err } kt, err := krbKeytab.Load(ktPath) if err != nil { return nil, err } client := krb.NewWithKeytab(username, realm, kt, krb5conf) return client, nil }