public static FlinkOperatorConfiguration fromConfiguration()

in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java [81:228]


    public static FlinkOperatorConfiguration fromConfiguration(Configuration operatorConfig) {
        Duration reconcileInterval =
                operatorConfig.get(KubernetesOperatorConfigOptions.OPERATOR_RECONCILE_INTERVAL);

        int reconcilerMaxParallelism =
                operatorConfig.getInteger(
                        KubernetesOperatorConfigOptions.OPERATOR_RECONCILE_PARALLELISM);

        Duration restApiReadyDelay =
                operatorConfig.get(
                        KubernetesOperatorConfigOptions.OPERATOR_OBSERVER_REST_READY_DELAY);

        Duration progressCheckInterval =
                operatorConfig.get(
                        KubernetesOperatorConfigOptions.OPERATOR_OBSERVER_PROGRESS_CHECK_INTERVAL);

        Duration flinkClientTimeout =
                operatorConfig.get(KubernetesOperatorConfigOptions.OPERATOR_FLINK_CLIENT_TIMEOUT);

        Duration flinkCancelJobTimeout =
                operatorConfig.get(
                        KubernetesOperatorConfigOptions.OPERATOR_FLINK_CLIENT_CANCEL_TIMEOUT);

        Duration flinkShutdownClusterTimeout =
                operatorConfig.get(
                        KubernetesOperatorConfigOptions.OPERATOR_RESOURCE_CLEANUP_TIMEOUT);

        String artifactsBaseDir =
                operatorConfig.get(
                        KubernetesOperatorConfigOptions.OPERATOR_USER_ARTIFACTS_BASE_DIR);

        Integer savepointHistoryCountThreshold =
                operatorConfig.get(
                        KubernetesOperatorConfigOptions
                                .OPERATOR_SAVEPOINT_HISTORY_MAX_COUNT_THRESHOLD);
        Duration savepointHistoryAgeThreshold =
                operatorConfig.get(
                        KubernetesOperatorConfigOptions
                                .OPERATOR_SAVEPOINT_HISTORY_MAX_AGE_THRESHOLD);

        Boolean exceptionStackTraceEnabled =
                operatorConfig.get(
                        KubernetesOperatorConfigOptions.OPERATOR_EXCEPTION_STACK_TRACE_ENABLED);
        int exceptionStackTraceLengthThreshold =
                operatorConfig.get(
                        KubernetesOperatorConfigOptions.OPERATOR_EXCEPTION_STACK_TRACE_MAX_LENGTH);
        int exceptionFieldLengthThreshold =
                operatorConfig.get(
                        KubernetesOperatorConfigOptions.OPERATOR_EXCEPTION_FIELD_MAX_LENGTH);
        int exceptionThrowableCountThreshold =
                operatorConfig.get(
                        KubernetesOperatorConfigOptions
                                .OPERATOR_EXCEPTION_THROWABLE_LIST_MAX_COUNT);
        Map<String, String> exceptionLabelMapper =
                operatorConfig.get(KubernetesOperatorConfigOptions.OPERATOR_EXCEPTION_LABEL_MAPPER);

        String flinkServiceHostOverride = null;
        if (getEnv(ENV_KUBERNETES_SERVICE_HOST).isEmpty()) {
            // not running in k8s, simplify local development
            flinkServiceHostOverride = "localhost";
        }
        Set<String> watchedNamespaces = null;
        if (EnvUtils.get(ENV_WATCH_NAMESPACES).isEmpty()) {
            // if the env var is not set use the config file, the default if neither set is
            // all namespaces
            watchedNamespaces =
                    new HashSet<>(
                            Arrays.asList(
                                    operatorConfig
                                            .get(
                                                    KubernetesOperatorConfigOptions
                                                            .OPERATOR_WATCHED_NAMESPACES)
                                            .split(NAMESPACES_SPLITTER_KEY)));
        } else {
            watchedNamespaces =
                    new HashSet<>(
                            Arrays.asList(
                                    EnvUtils.get(ENV_WATCH_NAMESPACES)
                                            .get()
                                            .split(NAMESPACES_SPLITTER_KEY)));
        }

        boolean dynamicNamespacesEnabled =
                operatorConfig.get(
                        KubernetesOperatorConfigOptions.OPERATOR_DYNAMIC_NAMESPACES_ENABLED);

        boolean josdkMetricsEnabled =
                operatorConfig.get(KubernetesOperatorMetricOptions.OPERATOR_JOSDK_METRICS_ENABLED);

        boolean kubernetesClientMetricsEnabled =
                operatorConfig.get(
                        KubernetesOperatorMetricOptions.OPERATOR_KUBERNETES_CLIENT_METRICS_ENABLED);

        boolean kubernetesClientMetricsHttpResponseCodeGroupsEnabled =
                operatorConfig.get(
                        KubernetesOperatorMetricOptions
                                .OPERATOR_KUBERNETES_CLIENT_METRICS_HTTP_RESPONSE_CODE_GROUPS_ENABLED);

        int metricsHistogramSampleSize =
                operatorConfig.get(
                        KubernetesOperatorMetricOptions.OPERATOR_METRICS_HISTOGRAM_SAMPLE_SIZE);

        GenericRetry retryConfiguration = getRetryConfig(operatorConfig);
        RateLimiter rateLimiter = getRateLimiter(operatorConfig);

        String labelSelector =
                operatorConfig.getString(KubernetesOperatorConfigOptions.OPERATOR_LABEL_SELECTOR);

        DeletionPropagation deletionPropagation =
                operatorConfig.get(KubernetesOperatorConfigOptions.RESOURCE_DELETION_PROPAGATION);

        boolean snapshotResourcesEnabled =
                operatorConfig.get(KubernetesOperatorConfigOptions.SNAPSHOT_RESOURCE_ENABLED);

        Duration slowRequestThreshold =
                operatorConfig.get(OPERATOR_KUBERNETES_SLOW_REQUEST_THRESHOLD);

        return new FlinkOperatorConfiguration(
                reconcileInterval,
                reconcilerMaxParallelism,
                progressCheckInterval,
                restApiReadyDelay,
                flinkClientTimeout,
                flinkServiceHostOverride,
                watchedNamespaces,
                dynamicNamespacesEnabled,
                josdkMetricsEnabled,
                metricsHistogramSampleSize,
                kubernetesClientMetricsEnabled,
                kubernetesClientMetricsHttpResponseCodeGroupsEnabled,
                flinkCancelJobTimeout,
                flinkShutdownClusterTimeout,
                artifactsBaseDir,
                savepointHistoryCountThreshold,
                savepointHistoryAgeThreshold,
                retryConfiguration,
                rateLimiter,
                exceptionStackTraceEnabled,
                exceptionStackTraceLengthThreshold,
                exceptionFieldLengthThreshold,
                exceptionThrowableCountThreshold,
                exceptionLabelMapper,
                labelSelector,
                getLeaderElectionConfig(operatorConfig),
                deletionPropagation,
                snapshotResourcesEnabled,
                slowRequestThreshold);
    }