pyspark/graphar_pyspark/util.py (144 lines of code) (raw):
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Bindings to org.apache.graphar.util."""
from __future__ import annotations
from typing import Optional
from pyspark.sql import DataFrame
from graphar_pyspark import GraphArSession, _check_session
class IndexGenerator:
"""IndexGenerator is an object to help generating the indices for vertex/edge DataFrames."""
@staticmethod
def construct_vertex_index_mapping(
vertex_df: DataFrame,
primary_key: str,
) -> DataFrame:
"""Generate a vertex index mapping from the primary key.
The resulting DataFrame contains two columns: vertex index & primary key.
:param vertex_df: input vertex DataFrame.
:param primary_key: the primary key of vertex
:returns: a DataFrame contains two columns: vertex index & primary key.
"""
_check_session()
return DataFrame(
GraphArSession.graphar.util.IndexGenerator.constructVertexIndexMapping(
vertex_df._jdf,
primary_key,
),
GraphArSession.ss,
)
@staticmethod
def generate_vertex_index_column(vertex_df: DataFrame) -> DataFrame:
"""Add a column contains vertex index to DataFrame.
:param vertex_df: the input vertex DataFrame.
:returns: DataFrame that contains a new vertex index column.
"""
_check_session()
return DataFrame(
GraphArSession.graphar.util.IndexGenerator.generateVertexIndexColumn(
vertex_df._jdf,
),
GraphArSession.ss,
)
@staticmethod
def generate_vertex_index_column_and_index_mapping(
vertex_df: DataFrame,
primary_key: str = "",
) -> (DataFrame, DataFrame):
"""Add an index column and generate a new index mapping.
:param vertex_df: the input vertex DataFrame.
:param primary_key: the primary key of vertex.
:returns: the new vertex DataFrame and mapping DataFrame.
"""
_check_session()
jvm_res = GraphArSession.graphar.util.IndexGenerator.generateVertexIndexColumnAndIndexMapping(
vertex_df._jdf,
primary_key,
)
return (
DataFrame(jvm_res._1(), GraphArSession.ss),
DataFrame(jvm_res._2(), GraphArSession.ss),
)
@staticmethod
def generate_edge_index_column(edge_df: DataFrame) -> DataFrame:
"""Add a column contains edge index to input edge DataFrame.
:param edge_df: DataFrame with edges.
:returns: DataFrame with edges and index.
"""
_check_session()
return DataFrame(
GraphArSession.graphar.util.IndexGenerator.generateEdgeIndexColumn(
edge_df._jdf,
),
GraphArSession.ss,
)
@staticmethod
def generate_src_index_for_edges_from_mapping(
edge_df: DataFrame,
src_column_name: str,
src_index_mapping: DataFrame,
) -> DataFrame:
"""Join the edge table with the vertex index mapping for source column.
:param edge_df: edges DataFrame
:param src_column_name: join-column
:param src_index_mapping: mapping DataFrame
:returns: DataFrame with index
"""
_check_session()
return DataFrame(
GraphArSession.graphar.util.IndexGenerator.generateSrcIndexForEdgesFromMapping(
edge_df._jdf,
src_column_name,
src_index_mapping._jdf,
),
GraphArSession.ss,
)
@staticmethod
def generate_dst_index_for_edges_from_mapping(
edge_df: DataFrame,
dst_column_name: str,
dst_index_mapping: DataFrame,
) -> DataFrame:
"""Join the edge table with the vertex index mapping for destination column.
:param edge_df: edges DataFrame
:param dst_column_name: join-column
:param dst_index_mapping: mapping DataFrame
:returns: DataFrame with index
"""
_check_session()
return DataFrame(
GraphArSession.graphar.util.IndexGenerator.generateDstIndexForEdgesFromMapping(
edge_df._jdf,
dst_column_name,
dst_index_mapping._jdf,
),
GraphArSession.ss,
)
@staticmethod
def generate_src_and_dst_index_for_edges_from_mapping(
edge_df: DataFrame,
src_column_name: Optional[str],
dst_column_name: Optional[str],
src_index_mapping: DataFrame,
dst_index_mapping: DataFrame,
) -> DataFrame:
"""Join the edge table with the vertex index mapping for source & destination columns.
Assumes that the first and second columns are the src and dst columns if they are None.
:param edge_df: edge DataFrame
:param src_column_name: src column, optional (the first col from edge_df will be used if None)
:param dst_column_name: dst column, optional (the second col from edge_df will be used if None)
:param src_index_mapping: source mapping DataFrame
:param dst_index_mapping: dest mapping DataFrame
:returns: DataFrame with indices
"""
_check_session()
if src_column_name is None:
src_column_name = edge_df.columns[0]
if dst_column_name is None:
dst_column_name = edge_df.columns[1]
return DataFrame(
GraphArSession.graphar.util.IndexGenerator.generateSrcAndDstIndexForEdgesFromMapping(
edge_df._jdf,
src_column_name,
dst_column_name,
src_index_mapping._jdf,
dst_index_mapping._jdf,
),
GraphArSession.ss,
)
@staticmethod
def generate_scr_index_for_edges(
edge_df: DataFrame,
src_column_name: str,
) -> DataFrame:
"""Construct vertex index for source column.
:param edge_df: edge DataFrame
:param src_column_name: source column
:returns: DataFrame with index
"""
_check_session()
return DataFrame(
GraphArSession.graphar.util.IndexGenerator.generateSrcIndexForEdges(
edge_df._jdf,
src_column_name,
),
GraphArSession.ss,
)
@staticmethod
def generate_dst_index_for_edges(
edge_df: DataFrame,
dst_column_name: str,
) -> DataFrame:
"""Construct vertex index for destination column.
:param edge_df: edge DataFrame
:param src_column_name: destination column
:returns: DataFrame with index
"""
_check_session()
return DataFrame(
GraphArSession.graphar.util.IndexGenerator.generateDstIndexForEdges(
edge_df._jdf,
dst_column_name,
),
GraphArSession.ss,
)
@staticmethod
def generate_src_and_dst_index_unitedly_for_edges(
edge_df: DataFrame,
src_column_name: str,
dst_column_name: str,
) -> DataFrame:
"""Union and construct vertex index for source & destination columns.
:param edge_df: edge DataFrame
:param src_column_name: source column name
:param dst_column_name: destination column name
:returns: DataFrame with index
"""
_check_session()
return DataFrame(
GraphArSession.graphar.util.IndexGenerator.generateSrcAndDstIndexUnitedlyForEdges(
edge_df._jdf,
src_column_name,
dst_column_name,
),
GraphArSession.ss,
)