in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java [81:228]
public static FlinkOperatorConfiguration fromConfiguration(Configuration operatorConfig) {
Duration reconcileInterval =
operatorConfig.get(KubernetesOperatorConfigOptions.OPERATOR_RECONCILE_INTERVAL);
int reconcilerMaxParallelism =
operatorConfig.getInteger(
KubernetesOperatorConfigOptions.OPERATOR_RECONCILE_PARALLELISM);
Duration restApiReadyDelay =
operatorConfig.get(
KubernetesOperatorConfigOptions.OPERATOR_OBSERVER_REST_READY_DELAY);
Duration progressCheckInterval =
operatorConfig.get(
KubernetesOperatorConfigOptions.OPERATOR_OBSERVER_PROGRESS_CHECK_INTERVAL);
Duration flinkClientTimeout =
operatorConfig.get(KubernetesOperatorConfigOptions.OPERATOR_FLINK_CLIENT_TIMEOUT);
Duration flinkCancelJobTimeout =
operatorConfig.get(
KubernetesOperatorConfigOptions.OPERATOR_FLINK_CLIENT_CANCEL_TIMEOUT);
Duration flinkShutdownClusterTimeout =
operatorConfig.get(
KubernetesOperatorConfigOptions.OPERATOR_RESOURCE_CLEANUP_TIMEOUT);
String artifactsBaseDir =
operatorConfig.get(
KubernetesOperatorConfigOptions.OPERATOR_USER_ARTIFACTS_BASE_DIR);
Integer savepointHistoryCountThreshold =
operatorConfig.get(
KubernetesOperatorConfigOptions
.OPERATOR_SAVEPOINT_HISTORY_MAX_COUNT_THRESHOLD);
Duration savepointHistoryAgeThreshold =
operatorConfig.get(
KubernetesOperatorConfigOptions
.OPERATOR_SAVEPOINT_HISTORY_MAX_AGE_THRESHOLD);
Boolean exceptionStackTraceEnabled =
operatorConfig.get(
KubernetesOperatorConfigOptions.OPERATOR_EXCEPTION_STACK_TRACE_ENABLED);
int exceptionStackTraceLengthThreshold =
operatorConfig.get(
KubernetesOperatorConfigOptions.OPERATOR_EXCEPTION_STACK_TRACE_MAX_LENGTH);
int exceptionFieldLengthThreshold =
operatorConfig.get(
KubernetesOperatorConfigOptions.OPERATOR_EXCEPTION_FIELD_MAX_LENGTH);
int exceptionThrowableCountThreshold =
operatorConfig.get(
KubernetesOperatorConfigOptions
.OPERATOR_EXCEPTION_THROWABLE_LIST_MAX_COUNT);
Map<String, String> exceptionLabelMapper =
operatorConfig.get(KubernetesOperatorConfigOptions.OPERATOR_EXCEPTION_LABEL_MAPPER);
String flinkServiceHostOverride = null;
if (getEnv(ENV_KUBERNETES_SERVICE_HOST).isEmpty()) {
// not running in k8s, simplify local development
flinkServiceHostOverride = "localhost";
}
Set<String> watchedNamespaces = null;
if (EnvUtils.get(ENV_WATCH_NAMESPACES).isEmpty()) {
// if the env var is not set use the config file, the default if neither set is
// all namespaces
watchedNamespaces =
new HashSet<>(
Arrays.asList(
operatorConfig
.get(
KubernetesOperatorConfigOptions
.OPERATOR_WATCHED_NAMESPACES)
.split(NAMESPACES_SPLITTER_KEY)));
} else {
watchedNamespaces =
new HashSet<>(
Arrays.asList(
EnvUtils.get(ENV_WATCH_NAMESPACES)
.get()
.split(NAMESPACES_SPLITTER_KEY)));
}
boolean dynamicNamespacesEnabled =
operatorConfig.get(
KubernetesOperatorConfigOptions.OPERATOR_DYNAMIC_NAMESPACES_ENABLED);
boolean josdkMetricsEnabled =
operatorConfig.get(KubernetesOperatorMetricOptions.OPERATOR_JOSDK_METRICS_ENABLED);
boolean kubernetesClientMetricsEnabled =
operatorConfig.get(
KubernetesOperatorMetricOptions.OPERATOR_KUBERNETES_CLIENT_METRICS_ENABLED);
boolean kubernetesClientMetricsHttpResponseCodeGroupsEnabled =
operatorConfig.get(
KubernetesOperatorMetricOptions
.OPERATOR_KUBERNETES_CLIENT_METRICS_HTTP_RESPONSE_CODE_GROUPS_ENABLED);
int metricsHistogramSampleSize =
operatorConfig.get(
KubernetesOperatorMetricOptions.OPERATOR_METRICS_HISTOGRAM_SAMPLE_SIZE);
GenericRetry retryConfiguration = getRetryConfig(operatorConfig);
RateLimiter rateLimiter = getRateLimiter(operatorConfig);
String labelSelector =
operatorConfig.getString(KubernetesOperatorConfigOptions.OPERATOR_LABEL_SELECTOR);
DeletionPropagation deletionPropagation =
operatorConfig.get(KubernetesOperatorConfigOptions.RESOURCE_DELETION_PROPAGATION);
boolean snapshotResourcesEnabled =
operatorConfig.get(KubernetesOperatorConfigOptions.SNAPSHOT_RESOURCE_ENABLED);
Duration slowRequestThreshold =
operatorConfig.get(OPERATOR_KUBERNETES_SLOW_REQUEST_THRESHOLD);
return new FlinkOperatorConfiguration(
reconcileInterval,
reconcilerMaxParallelism,
progressCheckInterval,
restApiReadyDelay,
flinkClientTimeout,
flinkServiceHostOverride,
watchedNamespaces,
dynamicNamespacesEnabled,
josdkMetricsEnabled,
metricsHistogramSampleSize,
kubernetesClientMetricsEnabled,
kubernetesClientMetricsHttpResponseCodeGroupsEnabled,
flinkCancelJobTimeout,
flinkShutdownClusterTimeout,
artifactsBaseDir,
savepointHistoryCountThreshold,
savepointHistoryAgeThreshold,
retryConfiguration,
rateLimiter,
exceptionStackTraceEnabled,
exceptionStackTraceLengthThreshold,
exceptionFieldLengthThreshold,
exceptionThrowableCountThreshold,
exceptionLabelMapper,
labelSelector,
getLeaderElectionConfig(operatorConfig),
deletionPropagation,
snapshotResourcesEnabled,
slowRequestThreshold);
}