in vermeer/apps/graphio/hugegraph.go [537:641]
func (hgw *HugegraphWriter) Init(info WriterInitInfo) error {
hgw.delimiter = options.GetString(info.Params, "output.delimiter")
hgName := options.GetString(info.Params, "output.hugegraph_name")
hgw.username = options.GetString(info.Params, "output.hugegraph_username")
hgw.password = options.GetString(info.Params, "output.hugegraph_password")
hgw.serverAddr = options.GetString(info.Params, "output.hugegraph_server")
hgw.writeBackName = options.GetString(info.Params, "output.hugegraph_property")
olapWriteType := options.GetString(info.Params, "output.hugegraph_write_type")
if hgw.writeBackName == "" {
hgw.writeBackName = options.GetString(info.Params, "compute.algorithm")
}
var err error
hgw.hgSpace, hgw.hGraph, err = common.SplitHgName(hgName)
if err != nil {
return err
}
hgw.client = http.DefaultClient
switch olapWriteType {
case "OLAP_COMMON", "OLAP_SECONDARY", "OLAP_RANGE":
default:
return fmt.Errorf("output.hugegraph_write_type options only support OLAP_COMMON,OLAP_SECONDARY,OLAP_RANGE")
}
//创建一个propertyKey
propertyKey := addPropertyKey{Name: hgw.writeBackName, WriteType: olapWriteType,
DataType: info.OutputType, Cardinality: "SINGLE"}
propKeyBytes, err := json.Marshal(propertyKey)
if err != nil {
return err
}
propURL := fmt.Sprintf("%s/graphspaces/%s/graphs/%s/schema/propertykeys", hgw.serverAddr, hgw.hgSpace, hgw.hGraph)
propReq, err := http.NewRequest(http.MethodPost, propURL, bytes.NewReader(propKeyBytes))
if err != nil {
return err
}
propReq.SetBasicAuth(hgw.username, hgw.password)
propReq.Header.Set("Content-Type", "application/json")
resp, err := hgw.client.Do(propReq)
if err != nil {
return err
}
defer resp.Body.Close()
logrus.Infof("set propertykeys status:%v", resp.Status)
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusAccepted {
readAll, err := io.ReadAll(resp.Body)
if err != nil {
return err
}
return fmt.Errorf("hugegraph propertykeys request failed:%v", string(readAll))
}
hgw.vertexIDStrategy = make(map[string]common.HgVertexIDType)
//为hugegraph VertexLabel新增property
for _, labelName := range info.HgVertexSchema.HgPSchema.Labels {
vertexLabelURL := fmt.Sprintf("%s/graphspaces/%s/graphs/%s/schema/vertexlabels/%s?action=append",
hgw.serverAddr, hgw.hgSpace, hgw.hGraph, labelName)
vertexLabelBody := addVertexLabel{Name: labelName, Properties: []string{hgw.writeBackName},
NullableKeys: []string{hgw.writeBackName}}
vertexLabelBytes, err := json.Marshal(vertexLabelBody)
if err != nil {
return err
}
vertexLabelReq, err := http.NewRequest(http.MethodPut, vertexLabelURL, bytes.NewReader(vertexLabelBytes))
if err != nil {
return err
}
vertexLabelReq.Header.Set("Content-Type", "application/json")
vertexLabelReq.SetBasicAuth(hgw.username, hgw.password)
resp, err := hgw.client.Do(vertexLabelReq)
if err != nil {
return err
}
logrus.Infof("set vertex label status:%v", resp.Status)
respBodyAll, err := io.ReadAll(resp.Body)
if err != nil {
return err
}
respBody := make(map[string]any)
err = json.Unmarshal(respBodyAll, &respBody)
if err != nil {
return err
}
idStrategy, ok := respBody["id_strategy"].(string)
if !ok {
return fmt.Errorf("get vertex label id_strategy not correct %v,%v", respBody["id_strategy"], respBody)
}
switch idStrategy {
case "AUTOMATIC", "CUSTOMIZE_NUMBER":
hgw.vertexIDStrategy[labelName] = common.HgVertexIDTypeNumber
case "PRIMARY_KEY", "CUSTOMIZE_STRING":
hgw.vertexIDStrategy[labelName] = common.HgVertexIDTypeString
default:
logrus.Errorf("vertex label id_strategy not supported %v", idStrategy)
}
_ = resp.Body.Close()
}
hgw.writerURL = fmt.Sprintf("%s/graphspaces/%s/graphs/%s/graph/vertices/batch",
hgw.serverAddr, hgw.hgSpace, hgw.hGraph)
//for _, schema := range info.HgVertexSchema.Schema {
// hgw.updateStrategies[schema.PropKey] = "OVERRIDE"
//}
hgw.initBody()
return nil
}