pyspark/graphar_pyspark/util.py (144 lines of code) (raw):

# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. """Bindings to org.apache.graphar.util.""" from __future__ import annotations from typing import Optional from pyspark.sql import DataFrame from graphar_pyspark import GraphArSession, _check_session class IndexGenerator: """IndexGenerator is an object to help generating the indices for vertex/edge DataFrames.""" @staticmethod def construct_vertex_index_mapping( vertex_df: DataFrame, primary_key: str, ) -> DataFrame: """Generate a vertex index mapping from the primary key. The resulting DataFrame contains two columns: vertex index & primary key. :param vertex_df: input vertex DataFrame. :param primary_key: the primary key of vertex :returns: a DataFrame contains two columns: vertex index & primary key. """ _check_session() return DataFrame( GraphArSession.graphar.util.IndexGenerator.constructVertexIndexMapping( vertex_df._jdf, primary_key, ), GraphArSession.ss, ) @staticmethod def generate_vertex_index_column(vertex_df: DataFrame) -> DataFrame: """Add a column contains vertex index to DataFrame. :param vertex_df: the input vertex DataFrame. :returns: DataFrame that contains a new vertex index column. """ _check_session() return DataFrame( GraphArSession.graphar.util.IndexGenerator.generateVertexIndexColumn( vertex_df._jdf, ), GraphArSession.ss, ) @staticmethod def generate_vertex_index_column_and_index_mapping( vertex_df: DataFrame, primary_key: str = "", ) -> (DataFrame, DataFrame): """Add an index column and generate a new index mapping. :param vertex_df: the input vertex DataFrame. :param primary_key: the primary key of vertex. :returns: the new vertex DataFrame and mapping DataFrame. """ _check_session() jvm_res = GraphArSession.graphar.util.IndexGenerator.generateVertexIndexColumnAndIndexMapping( vertex_df._jdf, primary_key, ) return ( DataFrame(jvm_res._1(), GraphArSession.ss), DataFrame(jvm_res._2(), GraphArSession.ss), ) @staticmethod def generate_edge_index_column(edge_df: DataFrame) -> DataFrame: """Add a column contains edge index to input edge DataFrame. :param edge_df: DataFrame with edges. :returns: DataFrame with edges and index. """ _check_session() return DataFrame( GraphArSession.graphar.util.IndexGenerator.generateEdgeIndexColumn( edge_df._jdf, ), GraphArSession.ss, ) @staticmethod def generate_src_index_for_edges_from_mapping( edge_df: DataFrame, src_column_name: str, src_index_mapping: DataFrame, ) -> DataFrame: """Join the edge table with the vertex index mapping for source column. :param edge_df: edges DataFrame :param src_column_name: join-column :param src_index_mapping: mapping DataFrame :returns: DataFrame with index """ _check_session() return DataFrame( GraphArSession.graphar.util.IndexGenerator.generateSrcIndexForEdgesFromMapping( edge_df._jdf, src_column_name, src_index_mapping._jdf, ), GraphArSession.ss, ) @staticmethod def generate_dst_index_for_edges_from_mapping( edge_df: DataFrame, dst_column_name: str, dst_index_mapping: DataFrame, ) -> DataFrame: """Join the edge table with the vertex index mapping for destination column. :param edge_df: edges DataFrame :param dst_column_name: join-column :param dst_index_mapping: mapping DataFrame :returns: DataFrame with index """ _check_session() return DataFrame( GraphArSession.graphar.util.IndexGenerator.generateDstIndexForEdgesFromMapping( edge_df._jdf, dst_column_name, dst_index_mapping._jdf, ), GraphArSession.ss, ) @staticmethod def generate_src_and_dst_index_for_edges_from_mapping( edge_df: DataFrame, src_column_name: Optional[str], dst_column_name: Optional[str], src_index_mapping: DataFrame, dst_index_mapping: DataFrame, ) -> DataFrame: """Join the edge table with the vertex index mapping for source & destination columns. Assumes that the first and second columns are the src and dst columns if they are None. :param edge_df: edge DataFrame :param src_column_name: src column, optional (the first col from edge_df will be used if None) :param dst_column_name: dst column, optional (the second col from edge_df will be used if None) :param src_index_mapping: source mapping DataFrame :param dst_index_mapping: dest mapping DataFrame :returns: DataFrame with indices """ _check_session() if src_column_name is None: src_column_name = edge_df.columns[0] if dst_column_name is None: dst_column_name = edge_df.columns[1] return DataFrame( GraphArSession.graphar.util.IndexGenerator.generateSrcAndDstIndexForEdgesFromMapping( edge_df._jdf, src_column_name, dst_column_name, src_index_mapping._jdf, dst_index_mapping._jdf, ), GraphArSession.ss, ) @staticmethod def generate_scr_index_for_edges( edge_df: DataFrame, src_column_name: str, ) -> DataFrame: """Construct vertex index for source column. :param edge_df: edge DataFrame :param src_column_name: source column :returns: DataFrame with index """ _check_session() return DataFrame( GraphArSession.graphar.util.IndexGenerator.generateSrcIndexForEdges( edge_df._jdf, src_column_name, ), GraphArSession.ss, ) @staticmethod def generate_dst_index_for_edges( edge_df: DataFrame, dst_column_name: str, ) -> DataFrame: """Construct vertex index for destination column. :param edge_df: edge DataFrame :param src_column_name: destination column :returns: DataFrame with index """ _check_session() return DataFrame( GraphArSession.graphar.util.IndexGenerator.generateDstIndexForEdges( edge_df._jdf, dst_column_name, ), GraphArSession.ss, ) @staticmethod def generate_src_and_dst_index_unitedly_for_edges( edge_df: DataFrame, src_column_name: str, dst_column_name: str, ) -> DataFrame: """Union and construct vertex index for source & destination columns. :param edge_df: edge DataFrame :param src_column_name: source column name :param dst_column_name: destination column name :returns: DataFrame with index """ _check_session() return DataFrame( GraphArSession.graphar.util.IndexGenerator.generateSrcAndDstIndexUnitedlyForEdges( edge_df._jdf, src_column_name, dst_column_name, ), GraphArSession.ss, )