graphlearn_torch/python/data/unified_tensor.py (56 lines of code) (raw):

# Copyright 2022 Alibaba Group Holding Limited. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ============================================================================== from typing import List import torch from .. import py_graphlearn_torch as pywrap class UnifiedTensor(object): r""" Creates a CPU and GPUs unified Tensor for GPU direct access. For the tensor stored in the CPU memory, we use ZERO-COPY to provide efficient GPU access. For tensors stored in the GPU memory, p2p access between GPUs(such as NVLink) is required. Args: current_device (int): An integer to represent the GPU device where the underlying cuda operation kernel is launched. dtype (torch.dtype): The data type of the tensor elements. """ def __init__(self, current_device: int, dtype: torch.dtype = torch.float32): self.current_device = current_device self.dtype = dtype self.unified_tensor = pywrap.UnifiedTensor(current_device, dtype) self.cpu_part = None # tensor stored in CPU memory. def __getitem__(self, ids): ids = ids.to(self.current_device) return self.unified_tensor[ids] def append_shared_tensor(self, shared_tensor): r""" Append from `SharedTensor`. Args: shared_tensor: A `pywrap.SharedTensor` object which means GPU tensor that can be shared with other GPUs. """ self.unified_tensor.append_shared_tensor(shared_tensor) def append_cpu_tensor(self, cpu_tensor: torch.Tensor): r""" Append from CPU tensor. Args: cpu_tensor: A CPU torch.Tensor object which will be stored in pinned memory for ZERO-COPY. """ self.unified_tensor.append_cpu_tensor(cpu_tensor) def init_from(self, tensors: List[torch.Tensor], tensor_devices: List[int]): r""" Initialize from CPU torch.Tensors. Args: tensors: CPU torch.Tensors indicating the tensors that need to be stored on different GPUs and CPU. tensor_devices: The indices of devices indicating the location of the tensor storage, -1 means on CPU and other > 0 value means on GPUs. Note that tensors and tensor_devices must correspond to each other. """ self.unified_tensor.init_from(tensors, tensor_devices) @property def shape(self): return self.unified_tensor.shape() @property def device(self): return self.unified_tensor.device() @property def numel(self): return self.unified_tensor.numel() def size(self, dim): return self.unified_tensor.size(dim) def stride(self, dim): return self.unified_tensor.stride(dim) def share_ipc(self): r""" Shares ipc handles. Returns: A list of cuda ipcs and cpu part tensor. """ shared_tensors = self.unified_tensor.share_cuda_ipc() cuda_ipc_list = [item.share_cuda_ipc() for item in shared_tensors] return cuda_ipc_list, self.cpu_part def from_ipc_handle(self, cuda_ipc_list, cpu_part): r""" Builds from ipc handle. Args: cuda_ipc_list: A list of CUDA ipcs, in the same order as tensors_devices. cpu_part: A CPU torch.Tensor. """ for ipc in cuda_ipc_list: shared_tensor = pywrap.SharedTensor() shared_tensor.from_cuda_ipc(ipc) self.unified_tensor.append_shared_tensor(shared_tensor) if cpu_part is not None and cpu_part.numel() > 0: self.cpu_part = cpu_part self.unified_tensor.append_cpu_tensor(cpu_part) @classmethod def new_from_ipc(cls, ipc_handles, current_device: int, dtype: torch.dtype): r""" Creates `UnifiedTensor` from ipc handles. Args: ipc_handles: ipc handles consists of CUDA ipcs and cpu part torch.Tensor. current_device (int): An integer to represent the GPU device where the underlying cuda operation kernel is launched. dtype (torch.dtype): The data type of the tensor elements. Returns: A `UnifiedTensor` instance. """ cuda_ipc_list, cpu_part = ipc_handles unified_tensor = cls(current_device, dtype) unified_tensor.from_ipc_handle(cuda_ipc_list, cpu_part) return unified_tensor