ballista/client/src/extension.rs (189 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. use ballista_core::extension::SessionConfigHelperExt; pub use ballista_core::extension::{SessionConfigExt, SessionStateExt}; use ballista_core::{ serde::protobuf::{scheduler_grpc_client::SchedulerGrpcClient, CreateSessionParams}, utils::create_grpc_client_connection, }; use datafusion::{ error::DataFusionError, execution::SessionState, prelude::{SessionConfig, SessionContext}, }; use url::Url; const DEFAULT_SCHEDULER_PORT: u16 = 50050; /// Module provides [SessionContextExt] which adds `standalone*` and `remote*` /// methods to [SessionContext]. /// /// Provided methods set up [SessionContext] with [BallistaQueryPlanner](ballista_core::utils), which /// handles running plans on Ballista clusters. /// ///```no_run /// use ballista::prelude::SessionContextExt; /// use datafusion::prelude::SessionContext; /// /// # #[tokio::main] /// # async fn main() -> datafusion::error::Result<()> { /// let ctx: SessionContext = SessionContext::remote("df://localhost:50050").await?; /// # Ok(()) /// # } ///``` /// /// [SessionContextExt::standalone()] provides an easy way to start up /// local cluster. It is an optional feature which should be enabled /// with `standalone` /// ///```no_run /// use ballista::prelude::SessionContextExt; /// use datafusion::prelude::SessionContext; /// /// # #[tokio::main] /// # async fn main() -> datafusion::error::Result<()> { /// let ctx: SessionContext = SessionContext::standalone().await?; /// # Ok(()) /// # } ///``` /// /// There are still few limitations on query distribution, thus not all /// [SessionContext] functionalities are supported. /// #[async_trait::async_trait] pub trait SessionContextExt { /// Creates a context for executing queries against a standalone Ballista scheduler instance /// /// It wills start local ballista cluster with scheduler and executor. #[cfg(feature = "standalone")] async fn standalone() -> datafusion::error::Result<SessionContext>; /// Creates a context for executing queries against a standalone Ballista scheduler instance /// with custom session state. /// /// It wills start local ballista cluster with scheduler and executor. #[cfg(feature = "standalone")] async fn standalone_with_state( state: SessionState, ) -> datafusion::error::Result<SessionContext>; /// Creates a context for executing queries against a remote Ballista scheduler instance async fn remote(url: &str) -> datafusion::error::Result<SessionContext>; /// Creates a context for executing queries against a remote Ballista scheduler instance /// with custom session state async fn remote_with_state( url: &str, state: SessionState, ) -> datafusion::error::Result<SessionContext>; } #[async_trait::async_trait] impl SessionContextExt for SessionContext { async fn remote_with_state( url: &str, state: SessionState, ) -> datafusion::error::Result<SessionContext> { let config = state.config(); let scheduler_url = Extension::parse_url(url)?; log::info!( "Connecting to Ballista scheduler at {}", scheduler_url.clone() ); let remote_session_id = Extension::setup_remote(config, scheduler_url.clone()).await?; log::info!( "Server side SessionContext created with session id: {}", remote_session_id ); let session_state = state.upgrade_for_ballista(scheduler_url, remote_session_id)?; Ok(SessionContext::new_with_state(session_state)) } async fn remote(url: &str) -> datafusion::error::Result<SessionContext> { let config = SessionConfig::new_with_ballista(); let scheduler_url = Extension::parse_url(url)?; log::info!( "Connecting to Ballista scheduler at: {}", scheduler_url.clone() ); let remote_session_id = Extension::setup_remote(&config, scheduler_url.clone()).await?; log::info!( "Server side SessionContext created with session id: {}", remote_session_id ); let session_state = SessionState::new_ballista_state(scheduler_url, remote_session_id)?; Ok(SessionContext::new_with_state(session_state)) } #[cfg(feature = "standalone")] async fn standalone_with_state( state: SessionState, ) -> datafusion::error::Result<SessionContext> { let (remote_session_id, scheduler_url) = Extension::setup_standalone(Some(&state)).await?; let session_state = state.upgrade_for_ballista(scheduler_url, remote_session_id.clone())?; log::info!( "Server side SessionContext created with session id: {}", remote_session_id ); Ok(SessionContext::new_with_state(session_state)) } #[cfg(feature = "standalone")] async fn standalone() -> datafusion::error::Result<Self> { log::info!("Running in local mode. Scheduler will be run in-proc"); let (remote_session_id, scheduler_url) = Extension::setup_standalone(None).await?; let session_state = SessionState::new_ballista_state(scheduler_url, remote_session_id.clone())?; log::info!( "Server side SessionContext created with session id: {}", remote_session_id ); Ok(SessionContext::new_with_state(session_state)) } } struct Extension {} impl Extension { fn parse_url(url: &str) -> datafusion::error::Result<String> { let url = Url::parse(url).map_err(|e| DataFusionError::Configuration(e.to_string()))?; let host = url.host().ok_or(DataFusionError::Configuration( "hostname should be provided".to_string(), ))?; let port = url.port().unwrap_or(DEFAULT_SCHEDULER_PORT); let scheduler_url = format!("http://{}:{}", &host, port); Ok(scheduler_url) } #[cfg(feature = "standalone")] async fn setup_standalone( session_state: Option<&SessionState>, ) -> datafusion::error::Result<(String, String)> { use ballista_core::{serde::BallistaCodec, utils::default_config_producer}; let addr = match session_state { None => ballista_scheduler::standalone::new_standalone_scheduler() .await .map_err(|e| DataFusionError::Configuration(e.to_string()))?, Some(session_state) => { ballista_scheduler::standalone::new_standalone_scheduler_from_state( session_state, ) .await .map_err(|e| DataFusionError::Configuration(e.to_string()))? } }; let config = session_state .map(|s| s.config().clone()) .unwrap_or_else(default_config_producer); let scheduler_url = format!("http://localhost:{}", addr.port()); let mut scheduler = loop { match SchedulerGrpcClient::connect(scheduler_url.clone()).await { Err(_) => { tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; log::info!("Attempting to connect to in-proc scheduler..."); } Ok(scheduler) => break scheduler, } }; let remote_session_id = scheduler .create_session(CreateSessionParams { settings: config.to_key_value_pairs(), }) .await .map_err(|e| DataFusionError::Execution(format!("{e:?}")))? .into_inner() .session_id; let concurrent_tasks = config.ballista_standalone_parallelism(); match session_state { None => { ballista_executor::new_standalone_executor( scheduler, concurrent_tasks, BallistaCodec::default(), ) .await .map_err(|e| DataFusionError::Configuration(e.to_string()))?; } Some(session_state) => { ballista_executor::new_standalone_executor_from_state( scheduler, concurrent_tasks, session_state, ) .await .map_err(|e| DataFusionError::Configuration(e.to_string()))?; } } Ok((remote_session_id, scheduler_url)) } async fn setup_remote( config: &SessionConfig, scheduler_url: String, ) -> datafusion::error::Result<String> { let connection = create_grpc_client_connection(scheduler_url.clone()) .await .map_err(|e| DataFusionError::Execution(format!("{e:?}")))?; let limit = config.ballista_grpc_client_max_message_size(); let mut scheduler = SchedulerGrpcClient::new(connection) .max_encoding_message_size(limit) .max_decoding_message_size(limit); let remote_session_id = scheduler .create_session(CreateSessionParams { settings: config.to_key_value_pairs(), }) .await .map_err(|e| DataFusionError::Execution(format!("{e:?}")))? .into_inner() .session_id; Ok(remote_session_id) } }