ballista/scheduler/src/config.rs (237 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. // //! Ballista scheduler specific configuration use crate::cluster::DistributionPolicy; use crate::SessionBuilder; use ballista_core::{config::TaskSchedulingPolicy, ConfigProducer}; use datafusion_proto::logical_plan::LogicalExtensionCodec; use datafusion_proto::physical_plan::PhysicalExtensionCodec; use std::sync::Arc; #[cfg(feature = "build-binary")] include!(concat!( env!("OUT_DIR"), "/scheduler_configure_me_config.rs" )); /// Configurations for the ballista scheduler of scheduling jobs and tasks #[derive(Clone)] pub struct SchedulerConfig { /// Namespace of this scheduler. Schedulers using the same cluster storage and namespace /// will share global cluster state. pub namespace: String, /// The external hostname of the scheduler pub external_host: String, /// The bind host for the scheduler's gRPC service pub bind_host: String, /// The bind port for the scheduler's gRPC service pub bind_port: u16, /// The task scheduling policy for the scheduler pub scheduling_policy: TaskSchedulingPolicy, /// The event loop buffer size. for a system of high throughput, a larger value like 1000000 is recommended pub event_loop_buffer_size: u32, /// Policy of distributing tasks to available executor slots. For a cluster with single scheduler, round-robin is recommended pub task_distribution: TaskDistributionPolicy, /// The delayed interval for cleaning up finished job data, mainly the shuffle data, 0 means the cleaning up is disabled pub finished_job_data_clean_up_interval_seconds: u64, /// The delayed interval for cleaning up finished job state stored in the backend, 0 means the cleaning up is disabled. pub finished_job_state_clean_up_interval_seconds: u64, /// The route endpoint for proxying flight sql results via scheduler pub advertise_flight_sql_endpoint: Option<String>, /// If provided, submitted jobs which do not have tasks scheduled will be resubmitted after `job_resubmit_interval_ms` /// milliseconds pub job_resubmit_interval_ms: Option<u64>, /// Configuration for ballista cluster storage pub cluster_storage: ClusterStorageConfig, /// Time in seconds to allow executor for graceful shutdown. Once an executor signals it has entered Terminating status /// the scheduler should only consider the executor dead after this time interval has elapsed pub executor_termination_grace_period: u64, /// The maximum expected processing time of a scheduler event (microseconds). Zero means disable. pub scheduler_event_expected_processing_duration: u64, /// The maximum size of a decoded message at the grpc server side. pub grpc_server_max_decoding_message_size: u32, /// The maximum size of an encoded message at the grpc server side. pub grpc_server_max_encoding_message_size: u32, /// The executor timeout in seconds. It should be longer than executor's heartbeat intervals. pub executor_timeout_seconds: u64, /// The interval to check expired or dead executors pub expire_dead_executor_interval_seconds: u64, /// [ConfigProducer] override option pub override_config_producer: Option<ConfigProducer>, /// [SessionBuilder] override option pub override_session_builder: Option<SessionBuilder>, /// [PhysicalExtensionCodec] override option pub override_logical_codec: Option<Arc<dyn LogicalExtensionCodec>>, /// [PhysicalExtensionCodec] override option pub override_physical_codec: Option<Arc<dyn PhysicalExtensionCodec>>, } impl Default for SchedulerConfig { fn default() -> Self { Self { namespace: String::default(), external_host: "localhost".into(), bind_port: 50050, bind_host: "127.0.0.1".into(), scheduling_policy: Default::default(), event_loop_buffer_size: 10000, task_distribution: Default::default(), finished_job_data_clean_up_interval_seconds: 300, finished_job_state_clean_up_interval_seconds: 3600, advertise_flight_sql_endpoint: None, cluster_storage: Default::default(), job_resubmit_interval_ms: None, executor_termination_grace_period: 0, scheduler_event_expected_processing_duration: 0, grpc_server_max_decoding_message_size: 16777216, grpc_server_max_encoding_message_size: 16777216, executor_timeout_seconds: 180, expire_dead_executor_interval_seconds: 15, override_config_producer: None, override_session_builder: None, override_logical_codec: None, override_physical_codec: None, } } } impl SchedulerConfig { pub fn scheduler_name(&self) -> String { format!("{}:{}", self.external_host, self.bind_port) } pub fn is_push_staged_scheduling(&self) -> bool { matches!(self.scheduling_policy, TaskSchedulingPolicy::PushStaged) } pub fn with_namespace(mut self, namespace: impl Into<String>) -> Self { self.namespace = namespace.into(); self } pub fn with_hostname(mut self, hostname: impl Into<String>) -> Self { self.external_host = hostname.into(); self } pub fn with_port(mut self, port: u16) -> Self { self.bind_port = port; self } pub fn with_scheduler_policy(mut self, policy: TaskSchedulingPolicy) -> Self { self.scheduling_policy = policy; self } pub fn with_event_loop_buffer_size(mut self, buffer_size: u32) -> Self { self.event_loop_buffer_size = buffer_size; self } pub fn with_finished_job_data_clean_up_interval_seconds( mut self, interval_seconds: u64, ) -> Self { self.finished_job_data_clean_up_interval_seconds = interval_seconds; self } pub fn with_finished_job_state_clean_up_interval_seconds( mut self, interval_seconds: u64, ) -> Self { self.finished_job_state_clean_up_interval_seconds = interval_seconds; self } pub fn with_advertise_flight_sql_endpoint( mut self, endpoint: Option<String>, ) -> Self { self.advertise_flight_sql_endpoint = endpoint; self } pub fn with_task_distribution(mut self, policy: TaskDistributionPolicy) -> Self { self.task_distribution = policy; self } pub fn with_cluster_storage(mut self, config: ClusterStorageConfig) -> Self { self.cluster_storage = config; self } pub fn with_job_resubmit_interval_ms(mut self, interval_ms: u64) -> Self { self.job_resubmit_interval_ms = Some(interval_ms); self } pub fn with_remove_executor_wait_secs(mut self, value: u64) -> Self { self.executor_termination_grace_period = value; self } pub fn with_grpc_server_max_decoding_message_size(mut self, value: u32) -> Self { self.grpc_server_max_decoding_message_size = value; self } pub fn with_grpc_server_max_encoding_message_size(mut self, value: u32) -> Self { self.grpc_server_max_encoding_message_size = value; self } pub fn with_override_config_producer( mut self, override_config_producer: ConfigProducer, ) -> Self { self.override_config_producer = Some(override_config_producer); self } pub fn with_override_session_builder( mut self, override_session_builder: SessionBuilder, ) -> Self { self.override_session_builder = Some(override_session_builder); self } } #[derive(Clone, Debug, Default)] pub enum ClusterStorageConfig { #[default] Memory, } /// Policy of distributing tasks to available executor slots /// /// It needs to be visible to code generated by configure_me #[derive(Clone, Copy, Debug, serde::Deserialize)] #[cfg_attr(feature = "build-binary", derive(clap::ValueEnum))] pub enum TaskDistribution { /// Eagerly assign tasks to executor slots. This will assign as many task slots per executor /// as are currently available Bias, /// Distribute tasks evenly across executors. This will try and iterate through available executors /// and assign one task to each executor until all tasks are assigned. RoundRobin, /// 1. Firstly, try to bind tasks without scanning source files by [`RoundRobin`] policy. /// 2. Then for a task for scanning source files, firstly calculate a hash value based on input files. /// And then bind it with an execute according to consistent hashing policy. /// 3. If needed, work stealing can be enabled based on the tolerance of the consistent hashing. ConsistentHash, } #[cfg(feature = "build-binary")] impl std::str::FromStr for TaskDistribution { type Err = String; fn from_str(s: &str) -> std::result::Result<Self, Self::Err> { clap::ValueEnum::from_str(s, true) } } #[cfg(feature = "build-binary")] impl configure_me::parse_arg::ParseArgFromStr for TaskDistribution { fn describe_type<W: std::fmt::Write>(mut writer: W) -> std::fmt::Result { write!(writer, "The executor slots policy for the scheduler") } } #[derive(Debug, Clone, Default)] pub enum TaskDistributionPolicy { /// Eagerly assign tasks to executor slots. This will assign as many task slots per executor /// as are currently available #[default] Bias, /// Distribute tasks evenly across executors. This will try and iterate through available executors /// and assign one task to each executor until all tasks are assigned. RoundRobin, /// 1. Firstly, try to bind tasks without scanning source files by [`RoundRobin`] policy. /// 2. Then for a task for scanning source files, firstly calculate a hash value based on input files. /// And then bind it with an execute according to consistent hashing policy. /// 3. If needed, work stealing can be enabled based on the tolerance of the consistent hashing. ConsistentHash { num_replicas: usize, tolerance: usize, }, /// User provided task distribution policy Custom(Arc<dyn DistributionPolicy>), } #[cfg(feature = "build-binary")] impl TryFrom<Config> for SchedulerConfig { type Error = ballista_core::error::BallistaError; fn try_from(opt: Config) -> Result<Self, Self::Error> { let task_distribution = match opt.task_distribution { TaskDistribution::Bias => TaskDistributionPolicy::Bias, TaskDistribution::RoundRobin => TaskDistributionPolicy::RoundRobin, TaskDistribution::ConsistentHash => { let num_replicas = opt.consistent_hash_num_replicas as usize; let tolerance = opt.consistent_hash_tolerance as usize; TaskDistributionPolicy::ConsistentHash { num_replicas, tolerance, } } }; let config = SchedulerConfig { namespace: opt.namespace, external_host: opt.external_host, bind_port: opt.bind_port, bind_host: opt.bind_host, scheduling_policy: opt.scheduler_policy, event_loop_buffer_size: opt.event_loop_buffer_size, task_distribution, finished_job_data_clean_up_interval_seconds: opt .finished_job_data_clean_up_interval_seconds, finished_job_state_clean_up_interval_seconds: opt .finished_job_state_clean_up_interval_seconds, advertise_flight_sql_endpoint: opt.advertise_flight_sql_endpoint, cluster_storage: Default::default(), job_resubmit_interval_ms: (opt.job_resubmit_interval_ms > 0) .then_some(opt.job_resubmit_interval_ms), executor_termination_grace_period: opt.executor_termination_grace_period, scheduler_event_expected_processing_duration: opt .scheduler_event_expected_processing_duration, grpc_server_max_decoding_message_size: opt .grpc_server_max_decoding_message_size, grpc_server_max_encoding_message_size: opt .grpc_server_max_encoding_message_size, executor_timeout_seconds: opt.executor_timeout_seconds, expire_dead_executor_interval_seconds: opt .expire_dead_executor_interval_seconds, override_config_producer: None, override_logical_codec: None, override_physical_codec: None, override_session_builder: None, }; Ok(config) } }