func()

in vermeer/apps/graphio/hugegraph.go [219:329]


func (hgl *HugegraphLoader) Init(params map[string]string, schema structure.PropertySchema) error {
	if options.GetInt(params, "load.use_property") == 1 {
		hgl.useProperty = true
	}
	hgl.useLabel = false
	hgl.schema = schema
	hgl.graphName = options.GetString(params, "graph_name")
	partitionId, err := strconv.ParseUint(options.GetString(params, "partition_id"), 10, 32)
	if err != nil {
		logrus.Errorf("get uint partition_id err:%v", err)
		return err
	}
	hgl.partitionID = uint32(partitionId)

	storeAddress := options.GetString(params, "store_address")
	hgl.storeAddr = storeAddress
	startKey, err := strconv.ParseUint(options.GetString(params, "start_key"), 10, 64)
	if err != nil {
		logrus.Errorf("get uint start_key err:%v", err)
		return err
	}
	hgl.startKey = uint32(startKey)

	endKey, err := strconv.ParseUint(options.GetString(params, "end_key"), 10, 64)
	if err != nil {
		logrus.Errorf("get uint end_key err:%v", err)
		return err
	}
	hgl.endKey = uint32(endKey)
	var condition string
	scanType := hstore.ScanPartitionRequest_SCAN_UNKNOWN
	loadPartType := options.GetString(params, "part_type")
	if loadPartType == LoadPartTypeVertex {
		scanType = hstore.ScanPartitionRequest_SCAN_VERTEX
		condition = options.GetString(params, "load.hugegraph_vertex_condition")
	} else if loadPartType == LoadPartTypeEdge {
		scanType = hstore.ScanPartitionRequest_SCAN_EDGE
		condition = options.GetString(params, "load.hugegraph_edge_condition")
	}

	propertyLabels := make([]int64, 0)
	if hgl.useProperty {
		var propLabels []string
		if loadPartType == LoadPartTypeVertex {
			propLabels = strings.Split(options.GetString(params, "load.vertex_property"), ",")
		} else if loadPartType == LoadPartTypeEdge {
			propLabels = strings.Split(options.GetString(params, "load.edge_property"), ",")
		}

		for _, label := range propLabels {
			if label == "" {
				continue
			}
			if strings.TrimSpace(label) == "label" {
				hgl.useLabel = true
				continue
			}
			iLabel, err := strconv.ParseInt(label, 10, 64)
			if err != nil {
				logrus.Errorf("property schema label type not int :%v", label)
				continue
			}
			propertyLabels = append(propertyLabels, iLabel)
		}
	} else {
		propertyLabels = []int64{-1}
	}
	if loadPartType == LoadPartTypeEdge {
		isVaild := false
		for _, prop := range propertyLabels {
			if prop >= 0 {
				isVaild = true
				break
			}
		}
		if !isVaild {
			hgl.useProperty = false
			logrus.Debugf("edge load without property")
		}
	}
	bacthSize := options.GetInt(params, "load.hugestore_batchsize")
	scanRequest := &hstore.ScanPartitionRequest_ScanRequest{
		ScanRequest: &hstore.ScanPartitionRequest_Request{
			ScanType:    scanType,
			GraphName:   hgl.graphName,
			PartitionId: hgl.partitionID,
			//StartCode:   hgl.startKey,
			//EndCode:     hgl.endKey,
			//Condition
			Condition: condition,
			Boundary:  0x10,
			//需要返回的property id,不填就返回全部。
			Properties: propertyLabels,
			BatchSize:  int32(bacthSize),
		},
	}

	storeTimeout := options.GetInt(params, "load.hugestore_batch_timeout")
	if storeTimeout < 60 {
		storeTimeout = 60
	}

	hgl.handler = &HStoreHandler{}
	err = hgl.handler.Init(scanRequest, storeAddress, int32(storeTimeout))
	if err != nil {
		logrus.Errorf("scan handler init error:%v", err)
		return err
	}

	return nil
}