plugins/spark_upgrade/spark_config/__init__.py (85 lines of code) (raw):
# Copyright (c) 2023 Uber Technologies, Inc.
# <p>Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
# except in compliance with the License. You may obtain a copy of the License at
# <p>http://www.apache.org/licenses/LICENSE-2.0
# <p>Unless required by applicable law or agreed to in writing, software distributed under the
# License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
# express or implied. See the License for the specific language governing permissions and
# limitations under the License.
from typing import Any, List, Dict
from execute_piranha import ExecutePiranha
from polyglot_piranha import (
Filter,
OutgoingEdges,
Rule,
)
_JAVASPARKCONTEXT_OCE_QUERY = """(
(object_creation_expression
type: (_) @oce_typ
(#eq? @oce_typ "JavaSparkContext")
) @oce
)"""
_SPARK_SESSION_BUILDER_CHAIN_QUERY = """(
(method_invocation
object: (method_invocation
object: (identifier) @spark_session
name: (identifier) @receiver
)
(#eq? @spark_session "SparkSession")
(#eq? @receiver "builder")
) @mi
)"""
_EXPR_STMT_CHAIN_ENDS_WITH_GETORCREATE_QUERY = """(
(expression_statement
(method_invocation
name: (identifier) @last
)
(#match? @last "getOrCreate")
) @expr_stmt
)"""
_SCALA_CHAIN_ENDS_WITH_GETORCREATE_QUERY = """(
(field_expression
field: (identifier) @last_field
(#eq? @last_field "getOrCreate")
) @field_expr
)"""
class SparkConfigChange(ExecutePiranha):
def __init__(self, paths_to_codebase: List[str], language: str = "scala"):
super().__init__(
paths_to_codebase=paths_to_codebase,
substitutions={
"spark_conf": "SparkConf",
},
language=language,
)
def step_name(self) -> str:
return "Spark Config Change"
def get_rules(self) -> List[Rule]:
# filters cannot be added without reinstantiating Rule(), so we create the full filter set before
fs = {
Filter(
not_enclosing_node='cs new SparkConf().set("spark.sql.legacy.timeParserPolicy","LEGACY").set("spark.sql.legacy.allowUntypedScalaUDF", "true")'
),
}
if self.language == "java":
fs.add(Filter(not_enclosing_node=_JAVASPARKCONTEXT_OCE_QUERY))
fs.add(
Filter(not_enclosing_node=_EXPR_STMT_CHAIN_ENDS_WITH_GETORCREATE_QUERY)
)
elif self.language == "scala":
fs.add(Filter(not_enclosing_node=_SCALA_CHAIN_ENDS_WITH_GETORCREATE_QUERY))
update_spark_conf_init = Rule(
name="update_spark_conf_init",
query="cs new SparkConf()",
replace_node="*",
replace='new SparkConf().set("spark.sql.legacy.timeParserPolicy","LEGACY").set("spark.sql.legacy.allowUntypedScalaUDF", "true")',
filters=fs,
)
fs2 = {
Filter(
not_enclosing_node='cs SparkSession.builder().config("spark.sql.legacy.timeParserPolicy","LEGACY").config("spark.sql.legacy.allowUntypedScalaUDF", "true")'
)
}
if self.language == "java":
fs2.add(Filter(not_enclosing_node=_JAVASPARKCONTEXT_OCE_QUERY))
fs2.add(Filter(not_enclosing_node=_SPARK_SESSION_BUILDER_CHAIN_QUERY))
fs2.add(
Filter(not_enclosing_node=_EXPR_STMT_CHAIN_ENDS_WITH_GETORCREATE_QUERY)
)
elif self.language == "scala":
fs2.add(Filter(not_enclosing_node=_SCALA_CHAIN_ENDS_WITH_GETORCREATE_QUERY))
update_spark_session_builder_init = Rule(
name="update_spark_conf_init",
query="cs SparkSession.builder()",
replace_node="*",
replace='SparkSession.builder().config("spark.sql.legacy.timeParserPolicy","LEGACY").config("spark.sql.legacy.allowUntypedScalaUDF", "true")',
filters=fs2,
)
update_import_array_queue = Rule(
name="update_import_array_queue",
query=(
"cs import org.spark_project.jetty.util.ArrayQueue;"
if self.language == "java"
else "cs import org.spark_project.jetty.util.ArrayQueue"
),
replace_node="*",
replace=(
"import java.util.ArrayDeque;"
if self.language == "java"
else "import java.util.ArrayDeque"
),
)
return [
update_spark_conf_init,
update_spark_session_builder_init,
update_import_array_queue,
]
def get_edges(self) -> List[OutgoingEdges]:
return []
def summaries_to_custom_dict(self, _) -> Dict[str, Any]:
return {}