pypaimon/py4j/java_gateway.py (73 lines of code) (raw):

################################################################################ # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. ################################################################################ import os import shutil import struct import tempfile import time from logging import WARN from py4j.java_gateway import (java_import, logger, JavaGateway, GatewayParameters, CallbackServerParameters) from threading import RLock from pypaimon.py4j.gateway_server import launch_gateway_server_process from pypaimon.py4j import constants from pypaimon.py4j.util.exceptions import install_py4j_hooks _gateway = None _lock = RLock() def get_gateway(): # type: () -> JavaGateway global _gateway global _lock with _lock: if _gateway is None: # Set the level to WARN to mute the noisy INFO level logs logger.level = WARN _gateway = launch_gateway() callback_server = _gateway.get_callback_server() callback_server_listening_address = callback_server.get_listening_address() callback_server_listening_port = callback_server.get_listening_port() _gateway.jvm.org.apache.paimon.python.PythonEnvUtils.resetCallbackClient( _gateway.java_gateway_server, callback_server_listening_address, callback_server_listening_port) # import the paimon view import_paimon_view(_gateway) # TODO add exception handler for better exception stacktrace install_py4j_hooks() _gateway.entry_point.put("Watchdog", Watchdog()) return _gateway def launch_gateway(): # type: () -> JavaGateway """ launch jvm gateway """ # Create a temporary directory where the gateway server should write the connection information. conn_info_dir = tempfile.mkdtemp() try: fd, conn_info_file = tempfile.mkstemp(dir=conn_info_dir) os.close(fd) os.unlink(conn_info_file) env = dict(os.environ) env[constants.PYPAIMON_CONN_INFO_PATH] = conn_info_file p = launch_gateway_server_process(env) while not p.poll() and not os.path.isfile(conn_info_file): time.sleep(0.1) if not os.path.isfile(conn_info_file): stderr_info = p.stderr.read().decode('utf-8') raise RuntimeError( "Java gateway process exited before sending its port number.\nStderr:\n" + stderr_info ) with open(conn_info_file, "rb") as info: gateway_port = struct.unpack("!I", info.read(4))[0] finally: shutil.rmtree(conn_info_dir) # Connect to the gateway gateway = JavaGateway( gateway_parameters=GatewayParameters(port=gateway_port, auto_convert=True), callback_server_parameters=CallbackServerParameters( port=0, daemonize=True, daemonize_connections=True)) return gateway def import_paimon_view(gateway): java_import(gateway.jvm, "org.apache.paimon.table.*") java_import(gateway.jvm, "org.apache.paimon.options.Options") java_import(gateway.jvm, "org.apache.paimon.catalog.*") java_import(gateway.jvm, "org.apache.paimon.schema.Schema*") java_import(gateway.jvm, 'org.apache.paimon.types.*') java_import(gateway.jvm, 'org.apache.paimon.python.*') java_import(gateway.jvm, "org.apache.paimon.data.*") java_import(gateway.jvm, "org.apache.paimon.predicate.PredicateBuilder") class Watchdog(object): """ Used to provide to Java side to check whether its parent process is alive. """ def ping(self): time.sleep(10) return True class Java: implements = ["org.apache.paimon.python.PythonGatewayServer$Watchdog"]