def add_jars_to_context_class_loader()

in flink-ml-python/pyflink/ml/__init__.py [0:0]


def add_jars_to_context_class_loader(jar_urls):
    """
    Add jars to Python gateway server for local compilation and local execution (i.e. minicluster).
    There are many component in Flink which won't be added to classpath by default. e.g. Kafka
    connector, JDBC connector, CSV format etc. This utility function can be used to hot load the
    jars.

    :param jar_urls: The list of jar urls.
    """
    gateway = get_gateway()
    # validate and normalize
    jar_urls = [gateway.jvm.java.net.URL(url) for url in jar_urls]
    context_classloader = gateway.jvm.Thread.currentThread().getContextClassLoader()
    existing_urls = []
    class_loader_name = context_classloader.getClass().getName()
    if class_loader_name == "java.net.URLClassLoader":
        existing_urls = set([url.toString() for url in context_classloader.getURLs()])
    if all([url.toString() in existing_urls for url in jar_urls]):
        # if urls all existed, no need to create new class loader.
        return

    URLClassLoaderClass = load_java_class("java.net.URLClassLoader")
    if is_instance_of(context_classloader, URLClassLoaderClass):
        if class_loader_name == "org.apache.flink.runtime.execution.librarycache." \
                                "FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader":
            ensureInner = context_classloader.getClass().getDeclaredMethod("ensureInner", None)
            ensureInner.setAccessible(True)
            context_classloader = ensureInner.invoke(context_classloader, None)

        addURL = URLClassLoaderClass.getDeclaredMethod(
            "addURL",
            to_jarray(
                gateway.jvm.Class,
                [load_java_class("java.net.URL")]))
        addURL.setAccessible(True)

        for url in jar_urls:
            addURL.invoke(context_classloader, to_jarray(get_gateway().jvm.Object, [url]))

    else:
        context_classloader = create_url_class_loader(jar_urls, context_classloader)
        gateway.jvm.Thread.currentThread().setContextClassLoader(context_classloader)