python/pyspark/shell.py (88 lines of code) (raw):
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
An interactive shell.
This file is designed to be launched as a PYTHONSTARTUP script.
"""
import atexit
import builtins
import os
import platform
import warnings
import sys
import pyspark
from pyspark.core.context import SparkContext
from pyspark.logger import SPARK_LOG_SCHEMA # noqa: F401
from pyspark.sql import SparkSession
from pyspark.sql.context import SQLContext
from pyspark.sql.utils import is_remote
from urllib.parse import urlparse
if getattr(builtins, "__IPYTHON__", False):
# (Only) during PYTHONSTARTUP execution, IPython temporarily adds the parent
# directory of the script into the Python path, which results in searching
# packages under `pyspark` directory.
# For example, `import pandas` attempts to import `pyspark.pandas`, see also SPARK-42266.
if "__file__" in globals():
parent_dir = os.path.abspath(os.path.dirname(__file__))
if parent_dir in sys.path:
sys.path.remove(parent_dir)
if is_remote():
try:
# Creates pyspark.sql.connect.SparkSession.
spark = SparkSession.builder.getOrCreate()
from pyspark.sql.connect.shell import PROGRESS_BAR_ENABLED
# Check if th eprogress bar needs to be disabled.
if PROGRESS_BAR_ENABLED not in os.environ:
os.environ[PROGRESS_BAR_ENABLED] = "1"
else:
val = os.getenv(PROGRESS_BAR_ENABLED, "false")
if val.lower().strip() == "false":
os.environ[PROGRESS_BAR_ENABLED] = "0"
elif val.lower().strip() == "true":
os.environ[PROGRESS_BAR_ENABLED] = "1"
val = os.environ[PROGRESS_BAR_ENABLED]
if val not in ("1", "0"):
raise ValueError(
f"Environment variable '{PROGRESS_BAR_ENABLED}' must "
f"be set to either 1 or 0, found: {val}"
)
except Exception:
import sys
import traceback
warnings.warn("Failed to initialize Spark session.")
traceback.print_exc(file=sys.stderr)
sys.exit(1)
version = pyspark.__version__
sc = None
else:
if os.environ.get("SPARK_EXECUTOR_URI"):
SparkContext.setSystemProperty("spark.executor.uri", os.environ["SPARK_EXECUTOR_URI"])
SparkContext._ensure_initialized()
try:
spark = SparkSession._create_shell_session()
except Exception:
import sys
import traceback
warnings.warn("Failed to initialize Spark session.")
traceback.print_exc(file=sys.stderr)
sys.exit(1)
sc = spark.sparkContext
atexit.register((lambda sc: lambda: sc.stop())(sc))
# for compatibility
sqlContext = SQLContext._get_or_create(sc)
sqlCtx = sqlContext
version = sc.version
sql = spark.sql
print(
r"""Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version %s
/_/
"""
% version
)
print(
"Using Python version %s (%s, %s)"
% (platform.python_version(), platform.python_build()[0], platform.python_build()[1])
)
if is_remote():
url = os.environ.get("SPARK_REMOTE", os.environ.get("MASTER", None))
assert url is not None
if url.startswith("local"):
url = "sc://localhost" # only for display in the console.
print("Client connected to the Spark Connect server at %s" % urlparse(url).netloc)
else:
print("Spark context Web UI available at %s" % (sc.uiWebUrl)) # type: ignore[union-attr]
print(
"Spark context available as 'sc' (master = %s, app id = %s)."
% (sc.master, sc.applicationId) # type: ignore[union-attr]
)
print("SparkSession available as 'spark'.")
# The ./bin/pyspark script stores the old PYTHONSTARTUP value in OLD_PYTHONSTARTUP,
# which allows us to execute the user's PYTHONSTARTUP file:
_pythonstartup = os.environ.get("OLD_PYTHONSTARTUP")
if _pythonstartup and os.path.isfile(_pythonstartup):
with open(_pythonstartup) as f:
code = compile(f.read(), _pythonstartup, "exec")
exec(code)