def squared_distance()

in flink-ml-python/pyflink/ml/linalg.py [0:0]


    def squared_distance(self, other: Union[Vector, np.ndarray, Sized]) -> np.ndarray:
        """
        Squared distance of two Vectors.

        Examples:
        ::

            >>> sparse = SparseVector(4, [1, 3], [3.0, 4.0])
            >>> sparse.squared_distance(sparse)
            0.0
            >>> sparse.squared_distance(array.array('d', [1., 2., 3., 4.]))
            11.0
            >>> sparse.squared_distance(np.array([1., 2., 3., 4.]))
            11.0
            >>> sparse2 = SparseVector(4, [2], [1.0])
            >>> sparse.squared_distance(sparse2)
            26.0
            >>> sparse2.squared_distance(sparse)
            26.0
        """
        assert len(self) == len(other), "dimension mismatch"

        if isinstance(other, np.ndarray) or isinstance(other, DenseVector):
            if isinstance(other, np.ndarray) and other.ndim != 1:
                raise ValueError(
                    "Cannot call squared_distance with %d-dimensional array" % other.ndim)

            if isinstance(other, DenseVector):
                other = other.to_array()
            sparse_ind = np.zeros(other.size, dtype=bool)
            sparse_ind[self._indices] = True
            dist = other[sparse_ind] - self._values
            result = np.dot(dist, dist)

            other_ind = other[~sparse_ind]
            result += np.dot(other_ind, other_ind)
            return result
        elif isinstance(other, SparseVector):
            i = 0
            j = 0
            result = 0.0
            while i < len(self._indices) and j < len(other._indices):
                if self._indices[i] == other._indices[j]:
                    diff = self._values[i] - other._values[j]
                    result += diff * diff
                    i += 1
                    j += 1
                elif self._indices[i] < other._indices[j]:
                    result += self._values[i] * self._values[i]
                    i += 1
                else:
                    result += other._values[j] * other._values[j]
                    j += 1
            while i < len(self._indices):
                result += self._values[i] * self._values[i]
                i += 1
            while j < len(other._indices):
                result += other._values[j] * other._values[j]
                j += 1
            return np.float_(result)  # type: ignore
        else:
            if isinstance(other, (array.array, np.ndarray, list, tuple, range)):
                return self.squared_distance(DenseVector(other))
            raise ValueError('Cannot call with the type %s' % (type(other)))