pypaimon/py4j/gateway_server.py (72 lines of code) (raw):

################################################################################ # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. ################################################################################ import importlib.resources import os import platform import signal from subprocess import Popen, PIPE from pypaimon.py4j import constants def on_windows(): return platform.system() == "Windows" def find_java_executable(): java_executable = "java.exe" if on_windows() else "java" java_home = None if java_home is None and "JAVA_HOME" in os.environ: java_home = os.environ["JAVA_HOME"] if java_home is not None: java_executable = os.path.join(java_home, "bin", java_executable) return java_executable def launch_gateway_server_process(env): java_executable = find_java_executable() # TODO construct Java module log settings log_settings = [] jvm_args = env.get(constants.PYPAIMON_JVM_ARGS, '').split() classpath = _get_classpath(env) main_args = env.get(constants.PYPAIMON_MAIN_ARGS, '').split() command = [ java_executable, *jvm_args, # default jvm args "-XX:+IgnoreUnrecognizedVMOptions", "--add-opens=jdk.proxy2/jdk.proxy2=ALL-UNNAMED", *log_settings, "-cp", classpath, "-c", constants.PYPAIMON_MAIN_CLASS, *main_args ] preexec_fn = None if not on_windows(): def preexec_func(): # ignore ctrl-c / SIGINT signal.signal(signal.SIGINT, signal.SIG_IGN) preexec_fn = preexec_func return Popen(list(filter(lambda c: len(c) != 0, command)), stdin=PIPE, stderr=PIPE, preexec_fn=preexec_fn, env=env) _JAVA_DEPS_PACKAGE = 'pypaimon.jars' def _get_classpath(env): classpath = [] # note that jars are not packaged in test test_mode = os.environ.get(constants.PYPAIMON4J_TEST_MODE) if not test_mode or test_mode.lower() != "true": jars = importlib.resources.files(_JAVA_DEPS_PACKAGE) one_jar = next(iter(jars.iterdir()), None) if not one_jar: raise ValueError("Haven't found necessary python-java-bridge jar, this is unexpected.") builtin_java_classpath = os.path.join(os.path.dirname(str(one_jar)), '*') classpath.append(builtin_java_classpath) # user defined if constants.PYPAIMON_JAVA_CLASSPATH in env: classpath.append(env[constants.PYPAIMON_JAVA_CLASSPATH]) # hadoop hadoop_classpath = _get_hadoop_classpath(env) if hadoop_classpath is not None: classpath.append(hadoop_classpath) return os.pathsep.join(classpath) _HADOOP_DEPS_PACKAGE = 'pypaimon.hadoop-deps' def _get_hadoop_classpath(env): if constants.PYPAIMON_HADOOP_CLASSPATH in env: return env[constants.PYPAIMON_HADOOP_CLASSPATH] elif 'HADOOP_CLASSPATH' in env: return env['HADOOP_CLASSPATH'] else: # use built-in hadoop jars = importlib.resources.files(_HADOOP_DEPS_PACKAGE) one_jar = next(iter(jars.iterdir()), None) if not one_jar: raise EnvironmentError(f"The built-in Hadoop environment has been broken, this \ is unexpected. You can set one of '{constants.PYPAIMON_HADOOP_CLASSPATH}' or \ 'HADOOP_CLASSPATH' to continue.") return os.path.join(os.path.dirname(str(one_jar)), '*')