in repl/src/main/resources/fake_shell.py [0:0]
def main():
sys_stdin = sys.stdin
sys_stdout = sys.stdout
sys_stderr = sys.stderr
if sys.version >= '3':
sys.stdin = io.StringIO()
else:
sys.stdin = cStringIO.StringIO()
sys.stdout = UnicodeDecodingStringIO()
sys.stderr = UnicodeDecodingStringIO()
spark_major_version = os.getenv("LIVY_SPARK_MAJOR_VERSION")
try:
listening_port = 0
if os.environ.get("LIVY_TEST") != "true":
#Load spark into the context
exec('from pyspark.sql import HiveContext', global_dict)
exec('from pyspark.streaming import StreamingContext', global_dict)
exec('import pyspark.cloudpickle as cloudpickle', global_dict)
from py4j.java_gateway import java_import, JavaGateway, GatewayClient
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from pyspark.sql import SQLContext, HiveContext, Row
# Connect to the gateway
gateway_port = int(os.environ["PYSPARK_GATEWAY_PORT"])
try:
from py4j.java_gateway import GatewayParameters
gateway_secret = os.environ["PYSPARK_GATEWAY_SECRET"]
gateway = JavaGateway(gateway_parameters=GatewayParameters(
port=gateway_port, auth_token=gateway_secret, auto_convert=True))
except:
gateway = JavaGateway(GatewayClient(port=gateway_port), auto_convert=True)
# Import the classes used by PySpark
java_import(gateway.jvm, "org.apache.spark.SparkConf")
java_import(gateway.jvm, "org.apache.spark.api.java.*")
java_import(gateway.jvm, "org.apache.spark.api.python.*")
java_import(gateway.jvm, "org.apache.spark.ml.python.*")
java_import(gateway.jvm, "org.apache.spark.mllib.api.python.*")
java_import(gateway.jvm, "org.apache.spark.resource.*")
java_import(gateway.jvm, "org.apache.spark.sql.*")
java_import(gateway.jvm, "org.apache.spark.sql.api.python.*")
java_import(gateway.jvm, "org.apache.spark.sql.hive.*")
java_import(gateway.jvm, "scala.Tuple2")
jsc = gateway.entry_point.sc()
jconf = gateway.entry_point.sc().getConf()
jsqlc = gateway.entry_point.hivectx() if gateway.entry_point.hivectx() is not None \
else gateway.entry_point.sqlctx()
conf = SparkConf(_jvm = gateway.jvm, _jconf = jconf)
sc = SparkContext(jsc=jsc, gateway=gateway, conf=conf)
global_dict['sc'] = sc
if spark_major_version >= "2":
from pyspark.sql import SparkSession
spark_session = SparkSession(sc, gateway.entry_point.sparkSession())
sqlc = SQLContext(sc, spark_session, jsqlc)
global_dict['sqlContext'] = sqlc
global_dict['spark'] = spark_session
else:
sqlc = SQLContext(sc, jsqlc)
global_dict['sqlContext'] = sqlc
# LIVY-294, need to check whether HiveContext can work properly,
# fallback to SQLContext if HiveContext can not be initialized successfully.
# Only for spark-1.
code = textwrap.dedent("""