core/maxframe/lib/sparse/matrix.py (186 lines of code) (raw):

#!/usr/bin/env python # -*- coding: utf-8 -*- # Copyright 1999-2025 Alibaba Group Holding Ltd. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from collections.abc import Iterable from typing import List import numpy as np from .array import SparseArray, SparseNDArray from .core import ( cp, cps, get_array_module, get_sparse_module, issparse, naked, splinalg, sps, ) def zeros_sparse_matrix(shape, dtype=float, gpu=False): m = sps if not gpu else cps return SparseMatrix(m.csr_matrix(shape, dtype=np.dtype(dtype))) def diag_sparse_matrix(v, k=0, gpu=False): v = naked(v) if gpu and get_array_module(v) is not cp: v = cp.asarray(v) if not gpu and get_array_module(v) is not np: v = v.get() if v.ndim == 1: sparse_m = sps if not gpu else cps m = n = v.size + k mat = sparse_m.spdiags(v[None], [k], m, n, format="csr") return SparseMatrix(mat) else: assert v.ndim == 2 sparse_m = sps if not gpu else cps sparse_eye = sparse_m.eye(v.shape[0], v.shape[1], k=k) mat = sparse_eye.multiply(v).tocoo() size = sparse_eye.nnz col = mat.col - max(k, 0) row = get_array_module(col).zeros((len(col),)) return SparseNDArray( sparse_m.csr_matrix((mat.data, (row, col)), shape=(1, size)), shape=(size,) ) def eye_sparse_matrix(N, M=None, k=0, dtype=float, gpu=False): m = sps if not gpu else cps return SparseMatrix(m.eye(N, n=M, k=k, dtype=dtype, format="csr")) def triu_sparse_matrix(m, k=0, gpu=False): m = naked(m) if gpu and get_array_module(m) is not cp: m = cp.asarray(m) if not gpu and get_array_module(m) is not np: m = m.get() sparse_m = sps if not gpu else cps mat = sparse_m.triu(m, k=k) return SparseMatrix(mat) def tril_sparse_matrix(m, k=0, gpu=False): m = naked(m) if gpu and get_array_module(m) is not cp: m = cp.asarray(m) if not gpu and get_array_module(m) is not np: m = m.get() sparse_m = sps if not gpu else cps mat = sparse_m.tril(m, k=k) return SparseMatrix(mat) def where(cond, x, y): cond, x, y = [SparseMatrix(i) if issparse(i) else i for i in (cond, x, y)] return cond * x + (cond * (-y) + y) def lu_sparse_matrix(a): a = naked(a) a = a.tocsc() super_lu = splinalg.splu( a, permc_spec="NATURAL", diag_pivot_thresh=0, options={"SymmetricMode": True} ) l_ = super_lu.L u = super_lu.U p = sps.lil_matrix(a.shape) p[super_lu.perm_r.copy(), np.arange(a.shape[1])] = 1 return ( SparseMatrix(p), SparseMatrix(l_), SparseMatrix(u), ) def solve_triangular_sparse_matrix(a, b, lower=False, sparse=True): a = naked(a) b = b.toarray() if issparse(b) else b x = splinalg.spsolve_triangular(a, b, lower=lower) if sparse: spx = ( sps.csr_matrix(x).reshape(x.shape[0], 1) if len(x.shape) == 1 else sps.csr_matrix(x) ) return SparseNDArray(spx, shape=x.shape) else: return x def block(arrs: List[List[SparseArray]]) -> SparseArray: mats = [] for dim_arrs in arrs: mats.append([naked(a) for a in dim_arrs]) return SparseNDArray(sps.bmat(mats, format="csr")) class SparseMatrix(SparseArray): __slots__ = ("spmatrix",) def __init__(self, spmatrix, shape=()): if shape and len(shape) != 2: raise ValueError("Only accept 2-d array") if isinstance(spmatrix, SparseMatrix): self.spmatrix = spmatrix.spmatrix else: self.spmatrix = spmatrix.tocsr() @property def shape(self): return self.spmatrix.shape @property def size(self): return int(np.prod(self.shape)) def transpose(self, axes=None): assert axes is None or tuple(axes) == (1, 0) return SparseMatrix(self.spmatrix.transpose()) @property def T(self): return SparseMatrix(self.spmatrix.T) def dot(self, other, sparse=True): other_shape = other.shape try: other = naked(other) except TypeError: return NotImplemented if sparse: if len(other_shape) == 1: x = self.spmatrix.dot(other.T) else: x = self.spmatrix.dot(other) else: a = self.spmatrix.toarray() if issparse(other): other = other.toarray().reshape(other_shape) x = a.dot(other) if issparse(x): shape = (x.shape[0],) if len(other_shape) == 1 else x.shape return SparseNDArray(x, shape=shape) return get_array_module(x).asarray(x) def concatenate(self, other, axis=0): try: other = naked(other) except TypeError: return NotImplemented if issparse(other): xps = get_sparse_module(self.spmatrix) if axis not in (0, 1): raise ValueError("axis can only be 0 or 1") method = xps.vstack if axis == 0 else xps.hstack x = method((self.spmatrix, other)) else: xp = get_array_module(self.spmatrix) x = xp.concatenate((self.spmatrix.toarray(), other), axis=axis) if issparse(x): return SparseMatrix(x) return get_array_module(x).asarray(x) def _reduction( self, method_name, axis=None, dtype=None, keepdims=None, todense=False, **kw ): # TODO: support keepdims if isinstance(axis, tuple): if sorted(axis) != [0, 1]: assert len(axis) == 1 axis = axis[0] else: axis = None if todense: x = self.spmatrix.toarray() x = getattr(get_array_module(x), method_name)(x, axis=axis, **kw) else: x = getattr(self.spmatrix, method_name)(axis=axis, **kw) if not isinstance(axis, Iterable): axis = (axis,) axis = list(range(len(self.shape))) if axis is None else axis shape = tuple( s if i not in axis else 1 for i, s in enumerate(self.shape) if keepdims or i not in axis ) m = get_array_module(x) if issparse(x): return SparseNDArray(x, shape=shape) if m.isscalar(x): if keepdims: return m.array([x])[0].reshape((1,) * self.ndim) else: return m.array([x])[0] else: return m.asarray(x).reshape(shape)