Sources/SparkConnect/DataFrameReader.swift (104 lines of code) (raw):
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
import Foundation
/// An interface used to load a `DataFrame` from external storage systems
/// (e.g. file systems, key-value stores, etc). Use `SparkSession.read` to access this.
public actor DataFrameReader: Sendable {
var source: String = ""
var paths: [String] = []
var extraOptions: CaseInsensitiveDictionary = CaseInsensitiveDictionary([:])
var userSpecifiedSchemaDDL: String? = nil
let sparkSession: SparkSession
init(sparkSession: SparkSession) {
self.sparkSession = sparkSession
}
/// Returns the specified table/view as a ``DataFrame``. If it's a table, it must support batch
/// reading and the returned ``DataFrame`` is the batch scan query plan of this table. If it's a
/// view, the returned ``DataFrame`` is simply the query plan of the view, which can either be a
/// batch or streaming query plan.
///
/// - Parameter tableName: a qualified or unqualified name that designates a table or view. If a database is
/// specified, it identifies the table/view from the database. Otherwise, it first attempts to
/// find a temporary view with the given name and then match the table/view from the current
/// database. Note that, the global temporary view database is also valid here.
/// - Returns: A ``DataFrame`` instance.
public func table(_ tableName: String) -> DataFrame {
var namedTable = NamedTable()
namedTable.unparsedIdentifier = tableName
namedTable.options = self.extraOptions.toStringDictionary()
var read = Read()
read.namedTable = namedTable
var relation = Relation()
relation.read = read
var plan = Plan()
plan.opType = .root(relation)
return DataFrame(spark: sparkSession, plan: plan)
}
/// Specifies the input data source format.
/// - Parameter source: A string.
/// - Returns: A `DataFrameReader`.
public func format(_ source: String) -> DataFrameReader {
self.source = source
return self
}
/// Adds an input option for the underlying data source.
/// - Parameters:
/// - key: A key string.
/// - value: A value string.
/// - Returns: A `DataFrameReader`.
public func option(_ key: String, _ value: String) -> DataFrameReader {
self.extraOptions[key] = value
return self
}
/// Specifies the input schema. Some data sources (e.g. JSON) can infer the input schema
/// automatically from data. By specifying the schema here, the underlying data source can skip
/// the schema inference step, and thus speed up data loading.
/// - Parameter schema: A DDL schema string.
/// - Returns: A `DataFrameReader`.
@discardableResult
public func schema(_ schema: String) async throws -> DataFrameReader {
// Validate by parsing.
do {
try await sparkSession.client.ddlParse(schema)
} catch {
throw SparkConnectError.InvalidTypeException
}
self.userSpecifiedSchemaDDL = schema
return self
}
/// Loads input in as a `DataFrame`, for data sources that don't require a path (e.g. external
/// key-value stores).
/// - Returns: A `DataFrame`.
public func load() -> DataFrame {
return load([])
}
/// Loads input in as a `DataFrame`, for data sources that require a path (e.g. data backed by a
/// local or distributed file system).
/// - Parameter path: A path string.
/// - Returns: A `DataFrame`.
public func load(_ path: String) -> DataFrame {
return load([path])
}
/// Loads input in as a `DataFrame`, for data sources that support multiple paths. Only works if
/// the source is a HadoopFsRelationProvider.
/// - Parameter paths: An array of path strings.
/// - Returns: A `DataFrame`.
public func load(_ paths: [String]) -> DataFrame {
self.paths = paths
var dataSource = DataSource()
dataSource.format = self.source
dataSource.paths = self.paths
dataSource.options = self.extraOptions.toStringDictionary()
if let userSpecifiedSchemaDDL = self.userSpecifiedSchemaDDL {
dataSource.schema = userSpecifiedSchemaDDL
}
var read = Read()
read.dataSource = dataSource
var relation = Relation()
relation.read = read
var plan = Plan()
plan.opType = .root(relation)
return DataFrame(spark: sparkSession, plan: plan)
}
/// Loads a CSV file and returns the result as a `DataFrame`. See the documentation on the other
/// overloaded `csv()` method for more details.
/// - Parameter path: A path string
/// - Returns: A `DataFrame`.
public func csv(_ path: String) -> DataFrame {
self.source = "csv"
return load(path)
}
/// Loads CSV files and returns the result as a `DataFrame`.
/// This function will go through the input once to determine the input schema if `inferSchema`
/// is enabled. To avoid going through the entire data once, disable `inferSchema` option or
/// specify the schema explicitly using `schema`.
/// - Parameter paths: Path strings.
/// - Returns: A `DataFrame`.
public func csv(_ paths: String...) -> DataFrame {
self.source = "csv"
return load(paths)
}
/// Loads a JSON file and returns the result as a `DataFrame`.
/// - Parameter path: A path string
/// - Returns: A `DataFrame`.
public func json(_ path: String) -> DataFrame {
self.source = "json"
return load(path)
}
/// Loads JSON files and returns the result as a `DataFrame`.
/// - Parameter paths: Path strings
/// - Returns: A `DataFrame`.
public func json(_ paths: String...) -> DataFrame {
self.source = "json"
return load(paths)
}
/// Loads an XML file and returns the result as a `DataFrame`.
/// - Parameter path: A path string
/// - Returns: A `DataFrame`.
public func xml(_ path: String) -> DataFrame {
self.source = "xml"
return load(path)
}
/// Loads XML files and returns the result as a `DataFrame`.
/// - Parameter paths: Path strings
/// - Returns: A `DataFrame`.
public func xml(_ paths: String...) -> DataFrame {
self.source = "xml"
return load(paths)
}
/// Loads an ORC file and returns the result as a `DataFrame`.
/// - Parameter path: A path string
/// - Returns: A `DataFrame`.
public func orc(_ path: String) -> DataFrame {
self.source = "orc"
return load(path)
}
/// Loads ORC files and returns the result as a `DataFrame`.
/// - Parameter paths: Path strings
/// - Returns: A `DataFrame`.
public func orc(_ paths: String...) -> DataFrame {
self.source = "orc"
return load(paths)
}
/// Loads a Parquet file and returns the result as a `DataFrame`.
/// - Parameter path: A path string
/// - Returns: A `DataFrame`.
public func parquet(_ path: String) -> DataFrame {
self.source = "parquet"
return load(path)
}
/// Loads Parquet files, returning the result as a `DataFrame`.
/// - Parameter paths: Path strings
/// - Returns: A `DataFrame`.
public func parquet(_ paths: String...) -> DataFrame {
self.source = "parquet"
return load(paths)
}
}