graphlearn_torch/python/partition/random_partitioner.py (46 lines of code) (raw):

# Copyright 2022 Alibaba Group Holding Limited. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ============================================================================== from typing import List, Dict, Optional, Tuple, Union import torch from ..typing import NodeType, EdgeType, TensorDataType from .base import PartitionerBase, PartitionBook # Implementation of a random partitioner. class RandomPartitioner(PartitionerBase): r""" Random partitioner for graph topology and features. Args: output_dir: The output root directory for partitioned results. num_parts: Number of partitions. num_nodes: Number of graph nodes, should be a dict for hetero data. edge_index: The edge index data of graph edges, should be a dict for hetero data. node_feat: The node feature data, should be a dict for hetero data. node_feat_dtype: The data type of node features. edge_feat: The edge feature data, should be a dict for hetero data. edge_feat_dtype: The data type of edge features. edge_assign_strategy: The assignment strategy when partitioning edges, should be 'by_src' or 'by_dst'. chunk_size: The chunk size for partitioning. """ def __init__( self, output_dir: str, num_parts: int, num_nodes: Union[int, Dict[NodeType, int]], edge_index: Union[TensorDataType, Dict[EdgeType, TensorDataType]], node_feat: Optional[Union[TensorDataType, Dict[NodeType, TensorDataType]]] = None, node_feat_dtype: torch.dtype = torch.float32, edge_feat: Optional[Union[TensorDataType, Dict[EdgeType, TensorDataType]]] = None, edge_feat_dtype: torch.dtype = torch.float32, edge_weights: Optional[Union[TensorDataType, Dict[EdgeType, TensorDataType]]] = None, edge_assign_strategy: str = 'by_src', chunk_size: int = 10000, ): super().__init__(output_dir, num_parts, num_nodes, edge_index, node_feat, node_feat_dtype, edge_feat, edge_feat_dtype, edge_weights, edge_assign_strategy, chunk_size) def _partition_node( self, ntype: Optional[NodeType] = None ) -> Tuple[List[torch.Tensor], PartitionBook]: if 'hetero' == self.data_cls: assert ntype is not None node_num = self.num_nodes[ntype] else: node_num = self.num_nodes ids = torch.arange(node_num, dtype=torch.int64) partition_book = ids % self.num_parts rand_order = torch.randperm(ids.size(0)) partition_book = partition_book[rand_order] partition_results = [] for pidx in range(self.num_parts): mask = (partition_book == pidx) partition_results.append(torch.masked_select(ids, mask)) return partition_results, partition_book def _cache_node( self, ntype: Optional[NodeType] = None ) -> List[Optional[torch.Tensor]]: return [None for _ in range(self.num_parts)]