func()

in vermeer/apps/graphio/hugegraph.go [61:204]


func (a *HugegraphMaker) MakeTasks(params map[string]string, taskID int32) ([]LoadPartition, error) {
	loadParts := make([]LoadPartition, 0)

	//连接pd,获取图的分区列表
	pdIPAddress := options.GetSliceString(params, "load.hg_pd_peers")
	graphName := options.GetString(params, "load.hugegraph_name")
	pdIPAddr, err := common.FindValidPD(pdIPAddress)
	if err != nil {
		return nil, err
	}

	//正式建立连接,进行查询
	ctx, cancel1 := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel1()
	pdConn, err := grpc.Dial(pdIPAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))
	if err != nil {
		logrus.Errorf("connect hugegraph-pb err:%v", err)
		return nil, err
	}
	pdAuthority := common.PDAuthority{}
	ctx = pdAuthority.SetAuthority(ctx)
	var md metadata.MD
	pdClient := pd.NewPDClient(pdConn)
	partitionsResp, err := pdClient.QueryPartitions(ctx,
		&pd.QueryPartitionsRequest{Query: &metapb.PartitionQuery{GraphName: &graphName}}, grpc.Header(&md))
	if err != nil {
		logrus.Infof("QueryPartitions err:%v", err)
		return nil, err
	}
	pdAuthority.SetToken(md)

	// 获取用户设置的分区列表,如果为空,则获取所有分区
	userSetPartitions := options.GetSliceInt(params, "load.hg_partitions")
	userSetPartitionMap := make(map[uint32]struct{})
	for _, partitionID := range userSetPartitions {
		userSetPartitionMap[uint32(partitionID)] = struct{}{}
	}
	logrus.Debugf("user partition:%v", userSetPartitions)

	partitions := partitionsResp.GetPartitions()

	leaderStore := make(map[uint64]string)
	partID := int32(1)
	for _, partition := range partitions {
		// 用户设置了分区列表,则只加载用户设置的分区
		// 如果设置为空,则加载全部分区
		if len(userSetPartitionMap) != 0 {
			if _, ok := userSetPartitionMap[partition.Id]; !ok {
				continue
			}
		}
		var leaderStoreID uint64
		ctx = pdAuthority.SetAuthority(ctx)
		var md metadata.MD
		shardGroup, err := pdClient.GetShardGroup(ctx, &pd.GetShardGroupRequest{GroupId: partition.Id}, grpc.Header(&md))
		if err != nil {
			logrus.Errorf("GetShardGroup err:%v", err)
			return nil, err
		}
		pdAuthority.SetToken(md)
		shards := shardGroup.GetShardGroup().GetShards()
		for _, shard := range shards {
			//找到partition的leader store_id
			if shard.Role == metapb.ShardRole_Leader {
				leaderStoreID = shard.StoreId
				break
			}
		}
		//重复leader不再执行获取地址
		if _, ok := leaderStore[leaderStoreID]; !ok {
			//获得新的leader的地址并写入map
			ctx = pdAuthority.SetAuthority(ctx)
			var md metadata.MD
			storeResp, err := pdClient.GetStore(ctx, &pd.GetStoreRequest{StoreId: leaderStoreID}, grpc.Header(&md))
			if err != nil {
				logrus.Errorf("GetStore %v err:%v", leaderStoreID, err)
				return nil, err
			}
			pdAuthority.SetToken(md)
			if storeResp.Store.State != metapb.StoreState_Up {
				logrus.Errorf("store:%v state not up:%v", storeResp.GetStore().Address, storeResp.Store.State)
				logrus.Errorf("partition id:%v not available", partition.Id)
				return nil, fmt.Errorf("store:%v state not up:%v", storeResp.GetStore().Address, storeResp.Store.State)
			}
			leaderStore[leaderStoreID] = storeResp.GetStore().Address
		}
		//将一个partition细分为n个load任务(暂不支持)
		partitionCount := partition.EndKey - partition.StartKey
		var n uint64 = 1
		//parallel := uint64(options.GetInt(params, "load.parallel"))
		//if partitionCount > parallel {
		//	n = parallel
		//}
		partCount := partitionCount / n
		for i := uint64(0); i < n; i++ {
			vStart := partition.StartKey + i*partCount
			vEnd := vStart + partCount
			if vEnd > partition.EndKey || i == n-1 {
				vEnd = partition.EndKey
			}

			vertexPart := LoadPartition{}
			vertexPart.Init(partID, taskID, LoadPartTypeVertex)
			vertexPart.Params = make(map[string]string)
			vertexPart.Params["graph_name"] = graphName
			vertexPart.Params["part_type"] = LoadPartTypeVertex
			vertexPart.Params["partition_id"] = strconv.FormatUint(uint64(partition.Id), 10)
			vertexPart.Params["store_id"] = strconv.FormatUint(leaderStoreID, 10)
			vertexPart.Params["store_address"] = leaderStore[leaderStoreID]
			vertexPart.Params["start_key"] = strconv.FormatUint(vStart, 10)
			vertexPart.Params["end_key"] = strconv.FormatUint(vEnd, 10)
			loadParts = append(loadParts, vertexPart)
			partID += 1

			edgePart := LoadPartition{}
			edgePart.Init(partID, taskID, LoadPartTypeEdge)
			edgePart.Params = make(map[string]string)
			edgePart.Params["graph_name"] = graphName
			edgePart.Params["part_type"] = LoadPartTypeEdge
			edgePart.Params["partition_id"] = vertexPart.Params["partition_id"]
			edgePart.Params["store_id"] = vertexPart.Params["store_id"]
			edgePart.Params["store_address"] = leaderStore[leaderStoreID]
			edgePart.Params["start_key"] = vertexPart.Params["start_key"]
			edgePart.Params["end_key"] = vertexPart.Params["end_key"]
			loadParts = append(loadParts, edgePart)
			partID += 1
		}
	}
	for i := range loadParts {
		loadParts[i].Params["load.hugegraph_conditions"] = params["load.hugegraph_conditions"]
		loadParts[i].Params["load.vertex_property"] = params["load.vertex_property"]
		loadParts[i].Params["load.edge_property"] = params["load.edge_property"]
		loadParts[i].Params["load.hugegraph_vertex_condition"] = params["load.hugegraph_vertex_condition"]
		loadParts[i].Params["load.hugegraph_edge_condition"] = params["load.hugegraph_edge_condition"]
		loadParts[i].Params["load.hugestore_batch_timeout"] = params["load.hugestore_batch_timeout"]
		loadParts[i].Params["load.hugestore_batchsize"] = params["load.hugestore_batchsize"]
	}
	err = pdConn.Close()
	if err != nil {
		logrus.Errorf("hugegraph-pd close err:%v", err)
		return nil, err
	}
	return loadParts, nil
}