pypaimon/py4j/gateway_server.py (72 lines of code) (raw):
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import importlib.resources
import os
import platform
import signal
from subprocess import Popen, PIPE
from pypaimon.py4j import constants
def on_windows():
return platform.system() == "Windows"
def find_java_executable():
java_executable = "java.exe" if on_windows() else "java"
java_home = None
if java_home is None and "JAVA_HOME" in os.environ:
java_home = os.environ["JAVA_HOME"]
if java_home is not None:
java_executable = os.path.join(java_home, "bin", java_executable)
return java_executable
def launch_gateway_server_process(env):
java_executable = find_java_executable()
# TODO construct Java module log settings
log_settings = []
jvm_args = env.get(constants.PYPAIMON_JVM_ARGS, '').split()
classpath = _get_classpath(env)
main_args = env.get(constants.PYPAIMON_MAIN_ARGS, '').split()
command = [
java_executable,
*jvm_args,
# default jvm args
"-XX:+IgnoreUnrecognizedVMOptions",
"--add-opens=jdk.proxy2/jdk.proxy2=ALL-UNNAMED",
*log_settings,
"-cp",
classpath,
"-c",
constants.PYPAIMON_MAIN_CLASS,
*main_args
]
preexec_fn = None
if not on_windows():
def preexec_func():
# ignore ctrl-c / SIGINT
signal.signal(signal.SIGINT, signal.SIG_IGN)
preexec_fn = preexec_func
return Popen(list(filter(lambda c: len(c) != 0, command)),
stdin=PIPE, stderr=PIPE, preexec_fn=preexec_fn, env=env)
_JAVA_DEPS_PACKAGE = 'pypaimon.jars'
def _get_classpath(env):
classpath = []
# note that jars are not packaged in test
test_mode = os.environ.get(constants.PYPAIMON4J_TEST_MODE)
if not test_mode or test_mode.lower() != "true":
jars = importlib.resources.files(_JAVA_DEPS_PACKAGE)
one_jar = next(iter(jars.iterdir()), None)
if not one_jar:
raise ValueError("Haven't found necessary python-java-bridge jar, this is unexpected.")
builtin_java_classpath = os.path.join(os.path.dirname(str(one_jar)), '*')
classpath.append(builtin_java_classpath)
# user defined
if constants.PYPAIMON_JAVA_CLASSPATH in env:
classpath.append(env[constants.PYPAIMON_JAVA_CLASSPATH])
# hadoop
hadoop_classpath = _get_hadoop_classpath(env)
if hadoop_classpath is not None:
classpath.append(hadoop_classpath)
return os.pathsep.join(classpath)
_HADOOP_DEPS_PACKAGE = 'pypaimon.hadoop-deps'
def _get_hadoop_classpath(env):
if constants.PYPAIMON_HADOOP_CLASSPATH in env:
return env[constants.PYPAIMON_HADOOP_CLASSPATH]
elif 'HADOOP_CLASSPATH' in env:
return env['HADOOP_CLASSPATH']
else:
# use built-in hadoop
jars = importlib.resources.files(_HADOOP_DEPS_PACKAGE)
one_jar = next(iter(jars.iterdir()), None)
if not one_jar:
raise EnvironmentError(f"The built-in Hadoop environment has been broken, this \
is unexpected. You can set one of '{constants.PYPAIMON_HADOOP_CLASSPATH}' or \
'HADOOP_CLASSPATH' to continue.")
return os.path.join(os.path.dirname(str(one_jar)), '*')