in vermeer/apps/graphio/hugegraph.go [61:204]
func (a *HugegraphMaker) MakeTasks(params map[string]string, taskID int32) ([]LoadPartition, error) {
loadParts := make([]LoadPartition, 0)
//连接pd,获取图的分区列表
pdIPAddress := options.GetSliceString(params, "load.hg_pd_peers")
graphName := options.GetString(params, "load.hugegraph_name")
pdIPAddr, err := common.FindValidPD(pdIPAddress)
if err != nil {
return nil, err
}
//正式建立连接,进行查询
ctx, cancel1 := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel1()
pdConn, err := grpc.Dial(pdIPAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
logrus.Errorf("connect hugegraph-pb err:%v", err)
return nil, err
}
pdAuthority := common.PDAuthority{}
ctx = pdAuthority.SetAuthority(ctx)
var md metadata.MD
pdClient := pd.NewPDClient(pdConn)
partitionsResp, err := pdClient.QueryPartitions(ctx,
&pd.QueryPartitionsRequest{Query: &metapb.PartitionQuery{GraphName: &graphName}}, grpc.Header(&md))
if err != nil {
logrus.Infof("QueryPartitions err:%v", err)
return nil, err
}
pdAuthority.SetToken(md)
// 获取用户设置的分区列表,如果为空,则获取所有分区
userSetPartitions := options.GetSliceInt(params, "load.hg_partitions")
userSetPartitionMap := make(map[uint32]struct{})
for _, partitionID := range userSetPartitions {
userSetPartitionMap[uint32(partitionID)] = struct{}{}
}
logrus.Debugf("user partition:%v", userSetPartitions)
partitions := partitionsResp.GetPartitions()
leaderStore := make(map[uint64]string)
partID := int32(1)
for _, partition := range partitions {
// 用户设置了分区列表,则只加载用户设置的分区
// 如果设置为空,则加载全部分区
if len(userSetPartitionMap) != 0 {
if _, ok := userSetPartitionMap[partition.Id]; !ok {
continue
}
}
var leaderStoreID uint64
ctx = pdAuthority.SetAuthority(ctx)
var md metadata.MD
shardGroup, err := pdClient.GetShardGroup(ctx, &pd.GetShardGroupRequest{GroupId: partition.Id}, grpc.Header(&md))
if err != nil {
logrus.Errorf("GetShardGroup err:%v", err)
return nil, err
}
pdAuthority.SetToken(md)
shards := shardGroup.GetShardGroup().GetShards()
for _, shard := range shards {
//找到partition的leader store_id
if shard.Role == metapb.ShardRole_Leader {
leaderStoreID = shard.StoreId
break
}
}
//重复leader不再执行获取地址
if _, ok := leaderStore[leaderStoreID]; !ok {
//获得新的leader的地址并写入map
ctx = pdAuthority.SetAuthority(ctx)
var md metadata.MD
storeResp, err := pdClient.GetStore(ctx, &pd.GetStoreRequest{StoreId: leaderStoreID}, grpc.Header(&md))
if err != nil {
logrus.Errorf("GetStore %v err:%v", leaderStoreID, err)
return nil, err
}
pdAuthority.SetToken(md)
if storeResp.Store.State != metapb.StoreState_Up {
logrus.Errorf("store:%v state not up:%v", storeResp.GetStore().Address, storeResp.Store.State)
logrus.Errorf("partition id:%v not available", partition.Id)
return nil, fmt.Errorf("store:%v state not up:%v", storeResp.GetStore().Address, storeResp.Store.State)
}
leaderStore[leaderStoreID] = storeResp.GetStore().Address
}
//将一个partition细分为n个load任务(暂不支持)
partitionCount := partition.EndKey - partition.StartKey
var n uint64 = 1
//parallel := uint64(options.GetInt(params, "load.parallel"))
//if partitionCount > parallel {
// n = parallel
//}
partCount := partitionCount / n
for i := uint64(0); i < n; i++ {
vStart := partition.StartKey + i*partCount
vEnd := vStart + partCount
if vEnd > partition.EndKey || i == n-1 {
vEnd = partition.EndKey
}
vertexPart := LoadPartition{}
vertexPart.Init(partID, taskID, LoadPartTypeVertex)
vertexPart.Params = make(map[string]string)
vertexPart.Params["graph_name"] = graphName
vertexPart.Params["part_type"] = LoadPartTypeVertex
vertexPart.Params["partition_id"] = strconv.FormatUint(uint64(partition.Id), 10)
vertexPart.Params["store_id"] = strconv.FormatUint(leaderStoreID, 10)
vertexPart.Params["store_address"] = leaderStore[leaderStoreID]
vertexPart.Params["start_key"] = strconv.FormatUint(vStart, 10)
vertexPart.Params["end_key"] = strconv.FormatUint(vEnd, 10)
loadParts = append(loadParts, vertexPart)
partID += 1
edgePart := LoadPartition{}
edgePart.Init(partID, taskID, LoadPartTypeEdge)
edgePart.Params = make(map[string]string)
edgePart.Params["graph_name"] = graphName
edgePart.Params["part_type"] = LoadPartTypeEdge
edgePart.Params["partition_id"] = vertexPart.Params["partition_id"]
edgePart.Params["store_id"] = vertexPart.Params["store_id"]
edgePart.Params["store_address"] = leaderStore[leaderStoreID]
edgePart.Params["start_key"] = vertexPart.Params["start_key"]
edgePart.Params["end_key"] = vertexPart.Params["end_key"]
loadParts = append(loadParts, edgePart)
partID += 1
}
}
for i := range loadParts {
loadParts[i].Params["load.hugegraph_conditions"] = params["load.hugegraph_conditions"]
loadParts[i].Params["load.vertex_property"] = params["load.vertex_property"]
loadParts[i].Params["load.edge_property"] = params["load.edge_property"]
loadParts[i].Params["load.hugegraph_vertex_condition"] = params["load.hugegraph_vertex_condition"]
loadParts[i].Params["load.hugegraph_edge_condition"] = params["load.hugegraph_edge_condition"]
loadParts[i].Params["load.hugestore_batch_timeout"] = params["load.hugestore_batch_timeout"]
loadParts[i].Params["load.hugestore_batchsize"] = params["load.hugestore_batchsize"]
}
err = pdConn.Close()
if err != nil {
logrus.Errorf("hugegraph-pd close err:%v", err)
return nil, err
}
return loadParts, nil
}