func()

in vermeer/apps/graphio/hugegraph.go [537:641]


func (hgw *HugegraphWriter) Init(info WriterInitInfo) error {
	hgw.delimiter = options.GetString(info.Params, "output.delimiter")
	hgName := options.GetString(info.Params, "output.hugegraph_name")
	hgw.username = options.GetString(info.Params, "output.hugegraph_username")
	hgw.password = options.GetString(info.Params, "output.hugegraph_password")
	hgw.serverAddr = options.GetString(info.Params, "output.hugegraph_server")
	hgw.writeBackName = options.GetString(info.Params, "output.hugegraph_property")
	olapWriteType := options.GetString(info.Params, "output.hugegraph_write_type")
	if hgw.writeBackName == "" {
		hgw.writeBackName = options.GetString(info.Params, "compute.algorithm")
	}
	var err error
	hgw.hgSpace, hgw.hGraph, err = common.SplitHgName(hgName)
	if err != nil {
		return err
	}
	hgw.client = http.DefaultClient

	switch olapWriteType {
	case "OLAP_COMMON", "OLAP_SECONDARY", "OLAP_RANGE":

	default:
		return fmt.Errorf("output.hugegraph_write_type options only support OLAP_COMMON,OLAP_SECONDARY,OLAP_RANGE")
	}
	//创建一个propertyKey
	propertyKey := addPropertyKey{Name: hgw.writeBackName, WriteType: olapWriteType,
		DataType: info.OutputType, Cardinality: "SINGLE"}
	propKeyBytes, err := json.Marshal(propertyKey)
	if err != nil {
		return err
	}
	propURL := fmt.Sprintf("%s/graphspaces/%s/graphs/%s/schema/propertykeys", hgw.serverAddr, hgw.hgSpace, hgw.hGraph)
	propReq, err := http.NewRequest(http.MethodPost, propURL, bytes.NewReader(propKeyBytes))
	if err != nil {
		return err
	}
	propReq.SetBasicAuth(hgw.username, hgw.password)
	propReq.Header.Set("Content-Type", "application/json")
	resp, err := hgw.client.Do(propReq)
	if err != nil {
		return err
	}
	defer resp.Body.Close()
	logrus.Infof("set propertykeys status:%v", resp.Status)
	if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusAccepted {
		readAll, err := io.ReadAll(resp.Body)
		if err != nil {
			return err
		}
		return fmt.Errorf("hugegraph propertykeys request failed:%v", string(readAll))
	}
	hgw.vertexIDStrategy = make(map[string]common.HgVertexIDType)
	//为hugegraph VertexLabel新增property
	for _, labelName := range info.HgVertexSchema.HgPSchema.Labels {
		vertexLabelURL := fmt.Sprintf("%s/graphspaces/%s/graphs/%s/schema/vertexlabels/%s?action=append",
			hgw.serverAddr, hgw.hgSpace, hgw.hGraph, labelName)
		vertexLabelBody := addVertexLabel{Name: labelName, Properties: []string{hgw.writeBackName},
			NullableKeys: []string{hgw.writeBackName}}
		vertexLabelBytes, err := json.Marshal(vertexLabelBody)
		if err != nil {
			return err
		}
		vertexLabelReq, err := http.NewRequest(http.MethodPut, vertexLabelURL, bytes.NewReader(vertexLabelBytes))
		if err != nil {
			return err
		}
		vertexLabelReq.Header.Set("Content-Type", "application/json")
		vertexLabelReq.SetBasicAuth(hgw.username, hgw.password)
		resp, err := hgw.client.Do(vertexLabelReq)
		if err != nil {
			return err
		}
		logrus.Infof("set vertex label status:%v", resp.Status)
		respBodyAll, err := io.ReadAll(resp.Body)
		if err != nil {
			return err
		}
		respBody := make(map[string]any)
		err = json.Unmarshal(respBodyAll, &respBody)
		if err != nil {
			return err
		}
		idStrategy, ok := respBody["id_strategy"].(string)
		if !ok {
			return fmt.Errorf("get vertex label id_strategy not correct %v,%v", respBody["id_strategy"], respBody)
		}
		switch idStrategy {
		case "AUTOMATIC", "CUSTOMIZE_NUMBER":
			hgw.vertexIDStrategy[labelName] = common.HgVertexIDTypeNumber
		case "PRIMARY_KEY", "CUSTOMIZE_STRING":
			hgw.vertexIDStrategy[labelName] = common.HgVertexIDTypeString
		default:
			logrus.Errorf("vertex label id_strategy not supported %v", idStrategy)
		}
		_ = resp.Body.Close()
	}
	hgw.writerURL = fmt.Sprintf("%s/graphspaces/%s/graphs/%s/graph/vertices/batch",
		hgw.serverAddr, hgw.hgSpace, hgw.hGraph)

	//for _, schema := range info.HgVertexSchema.Schema {
	//	hgw.updateStrategies[schema.PropKey] = "OVERRIDE"
	//}
	hgw.initBody()
	return nil
}