plugins/spark_upgrade/accessing_execution_plan.py (31 lines of code) (raw):

# Copyright (c) 2023 Uber Technologies, Inc. # <p>Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file # except in compliance with the License. You may obtain a copy of the License at # <p>http://www.apache.org/licenses/LICENSE-2.0 # <p>Unless required by applicable law or agreed to in writing, software distributed under the # License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either # express or implied. See the License for the specific language governing permissions and # limitations under the License. from typing import Any, List, Dict from execute_piranha import ExecutePiranha from polyglot_piranha import ( Rule, Filter, ) class AccessingExecutionPlan(ExecutePiranha): def __init__(self, paths_to_codebase: List[str]): super().__init__( paths_to_codebase=paths_to_codebase, substitutions={"queryExec": "queryExecution", "execPlan": "executedPlan"}, language="scala", ) def step_name(self) -> str: return "Access execution plans" def get_rules(self) -> List[Rule]: transform_IDFModel_args = Rule( name="accessing_execution_plan", query="cs :[dataframe].queryExecution.executedPlan", replace_node="*", replace="@dataframe.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec].initialPlan", holes={"queryExec", "execPlan"}, filters={Filter( enclosing_node="(var_definition) @var_def", not_contains=["""( (field_expression field: (identifier) @field_id (#eq? @field_id "initialPlan") ) @field_expr )"""], )} ) return [transform_IDFModel_args] def summaries_to_custom_dict(self, _) -> Dict[str, Any]: return {}