in vermeer/apps/graphio/hugegraph.go [219:329]
func (hgl *HugegraphLoader) Init(params map[string]string, schema structure.PropertySchema) error {
if options.GetInt(params, "load.use_property") == 1 {
hgl.useProperty = true
}
hgl.useLabel = false
hgl.schema = schema
hgl.graphName = options.GetString(params, "graph_name")
partitionId, err := strconv.ParseUint(options.GetString(params, "partition_id"), 10, 32)
if err != nil {
logrus.Errorf("get uint partition_id err:%v", err)
return err
}
hgl.partitionID = uint32(partitionId)
storeAddress := options.GetString(params, "store_address")
hgl.storeAddr = storeAddress
startKey, err := strconv.ParseUint(options.GetString(params, "start_key"), 10, 64)
if err != nil {
logrus.Errorf("get uint start_key err:%v", err)
return err
}
hgl.startKey = uint32(startKey)
endKey, err := strconv.ParseUint(options.GetString(params, "end_key"), 10, 64)
if err != nil {
logrus.Errorf("get uint end_key err:%v", err)
return err
}
hgl.endKey = uint32(endKey)
var condition string
scanType := hstore.ScanPartitionRequest_SCAN_UNKNOWN
loadPartType := options.GetString(params, "part_type")
if loadPartType == LoadPartTypeVertex {
scanType = hstore.ScanPartitionRequest_SCAN_VERTEX
condition = options.GetString(params, "load.hugegraph_vertex_condition")
} else if loadPartType == LoadPartTypeEdge {
scanType = hstore.ScanPartitionRequest_SCAN_EDGE
condition = options.GetString(params, "load.hugegraph_edge_condition")
}
propertyLabels := make([]int64, 0)
if hgl.useProperty {
var propLabels []string
if loadPartType == LoadPartTypeVertex {
propLabels = strings.Split(options.GetString(params, "load.vertex_property"), ",")
} else if loadPartType == LoadPartTypeEdge {
propLabels = strings.Split(options.GetString(params, "load.edge_property"), ",")
}
for _, label := range propLabels {
if label == "" {
continue
}
if strings.TrimSpace(label) == "label" {
hgl.useLabel = true
continue
}
iLabel, err := strconv.ParseInt(label, 10, 64)
if err != nil {
logrus.Errorf("property schema label type not int :%v", label)
continue
}
propertyLabels = append(propertyLabels, iLabel)
}
} else {
propertyLabels = []int64{-1}
}
if loadPartType == LoadPartTypeEdge {
isVaild := false
for _, prop := range propertyLabels {
if prop >= 0 {
isVaild = true
break
}
}
if !isVaild {
hgl.useProperty = false
logrus.Debugf("edge load without property")
}
}
bacthSize := options.GetInt(params, "load.hugestore_batchsize")
scanRequest := &hstore.ScanPartitionRequest_ScanRequest{
ScanRequest: &hstore.ScanPartitionRequest_Request{
ScanType: scanType,
GraphName: hgl.graphName,
PartitionId: hgl.partitionID,
//StartCode: hgl.startKey,
//EndCode: hgl.endKey,
//Condition
Condition: condition,
Boundary: 0x10,
//需要返回的property id,不填就返回全部。
Properties: propertyLabels,
BatchSize: int32(bacthSize),
},
}
storeTimeout := options.GetInt(params, "load.hugestore_batch_timeout")
if storeTimeout < 60 {
storeTimeout = 60
}
hgl.handler = &HStoreHandler{}
err = hgl.handler.Init(scanRequest, storeAddress, int32(storeTimeout))
if err != nil {
logrus.Errorf("scan handler init error:%v", err)
return err
}
return nil
}