Sources/SparkConnect/DataFrame.swift (542 lines of code) (raw):
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
import Atomics
import Foundation
import GRPCCore
import GRPCNIOTransportHTTP2
import GRPCProtobuf
import Synchronization
/// A distributed collection of data organized into named columns.
///
/// A DataFrame is equivalent to a relational table in Spark SQL, and can be created using various
/// functions in ``SparkSession``. Once created, it can be manipulated using the various domain-specific
/// language (DSL) functions defined in: ``DataFrame``, ``Column``, and functions.
///
/// ## Creating DataFrames
///
/// DataFrames can be created from various sources:
///
/// ```swift
/// // From a range
/// let df1 = try await spark.range(1, 100)
///
/// // From a SQL query
/// let df2 = try await spark.sql("SELECT * FROM users")
///
/// // From files
/// let df3 = try await spark.read.csv("data.csv")
/// ```
///
/// ## Common Operations
///
/// ### Transformations
///
/// ```swift
/// // Select specific columns
/// let names = try await df.select("name", "age")
///
/// // Filter rows
/// let adults = try await df.filter("age >= 18")
///
/// // Group and aggregate
/// let stats = try await df.groupBy("department").agg("avg(salary)", "count(*)")
/// ```
///
/// ### Actions
///
/// ```swift
/// // Show the first 20 rows
/// try await df.show()
///
/// // Collect all data to the driver
/// let rows = try await df.collect()
///
/// // Count rows
/// let count = try await df.count()
/// ```
///
/// ## Topics
///
/// ### Basic Information
/// - ``columns``
/// - ``schema``
/// - ``dtypes``
/// - ``sparkSession``
///
/// ### Data Collection
/// - ``count()``
/// - ``collect()``
/// - ``head(_:)``
/// - ``tail(_:)``
/// - ``show()``
/// - ``show(_:)``
/// - ``show(_:_:)``
/// - ``show(_:_:_:)``
///
/// ### Transformation Operations
/// - ``toDF(_:)``
/// - ``select(_:)``
/// - ``selectExpr(_:)``
/// - ``filter(_:)``
/// - ``where(_:)``
/// - ``sort(_:)``
/// - ``orderBy(_:)``
/// - ``limit(_:)``
/// - ``offset(_:)``
/// - ``drop(_:)``
/// - ``dropDuplicates(_:)``
/// - ``dropDuplicatesWithinWatermark(_:)``
/// - ``distinct()``
/// - ``withColumnRenamed(_:_:)``
///
/// ### Join Operations
/// - ``join(_:)``
/// - ``join(_:_:_:)``
/// - ``join(_:joinExprs:)``
/// - ``join(_:joinExprs:joinType:)``
/// - ``crossJoin(_:)``
/// - ``lateralJoin(_:)``
/// - ``lateralJoin(_:joinType:)``
/// - ``lateralJoin(_:joinExprs:)``
/// - ``lateralJoin(_:joinExprs:joinType:)``
///
/// ### Set Operations
/// - ``union(_:)``
/// - ``unionAll(_:)``
/// - ``unionByName(_:_:)``
/// - ``intersect(_:)``
/// - ``intersectAll(_:)``
/// - ``except(_:)``
/// - ``exceptAll(_:)``
///
/// ### Partitioning
/// - ``repartition(_:)``
/// - ``repartition(_:_:)``
/// - ``repartitionByExpression(_:_:)``
/// - ``coalesce(_:)``
///
/// ### Grouping Operations
/// - ``groupBy(_:)``
/// - ``rollup(_:)``
/// - ``cube(_:)``
///
/// ### Persistence
/// - ``cache()``
/// - ``persist(storageLevel:)``
/// - ``unpersist(blocking:)``
/// - ``storageLevel``
///
/// ### Schema Information
/// - ``printSchema()``
/// - ``printSchema(_:)``
/// - ``explain()``
/// - ``explain(_:)``
///
/// ### View Creation
/// - ``createTempView(_:)``
/// - ``createOrReplaceTempView(_:)``
/// - ``createGlobalTempView(_:)``
/// - ``createOrReplaceGlobalTempView(_:)``
///
/// ### Write Operations
/// - ``write``
/// - ``writeTo(_:)``
///
/// ### Sampling
/// - ``sample(_:_:_:)``
/// - ``sample(_:_:)``
/// - ``sample(_:)``
///
/// ### Statistics
/// - ``describe(_:)``
/// - ``summary(_:)``
///
/// ### Utility Methods
/// - ``isEmpty()``
/// - ``isLocal()``
/// - ``isStreaming()``
/// - ``inputFiles()``
/// - ``semanticHash()``
/// - ``sameSemantics(other:)``
///
/// ### Internal Methods
/// - ``rdd()``
/// - ``getPlan()``
public actor DataFrame: Sendable {
var spark: SparkSession
var plan: Plan
private var _schema: DataType? = nil
private var batches: [RecordBatch] = [RecordBatch]()
/// Create a new `DataFrame`instance with the given Spark session and plan.
/// - Parameters:
/// - spark: A ``SparkSession`` instance to use.
/// - plan: A plan to execute.
init(spark: SparkSession, plan: Plan) {
self.spark = spark
self.plan = plan
}
/// Create a new `DataFrame` instance with the given SparkSession and a SQL statement.
/// - Parameters:
/// - spark: A `SparkSession` instance to use.
/// - sqlText: A SQL statement.
/// - posArgs: An array of strings.
init(spark: SparkSession, sqlText: String, _ posArgs: [Sendable]? = nil) async throws {
self.spark = spark
if let posArgs {
self.plan = sqlText.toSparkConnectPlan(posArgs)
} else {
self.plan = sqlText.toSparkConnectPlan
}
}
init(spark: SparkSession, sqlText: String, _ args: [String: Sendable]) async throws {
self.spark = spark
self.plan = sqlText.toSparkConnectPlan(args)
}
public func getPlan() -> Sendable {
return self.plan
}
/// Set the schema. This is used to store the analized schema response from `Spark Connect` server.
/// - Parameter schema: <#schema description#>
private func setSchema(_ schema: DataType) {
self._schema = schema
}
/// Add `Apache Arrow`'s `RecordBatch`s to the internal array.
/// - Parameter batches: An array of ``RecordBatch``.
private func addBatches(_ batches: [RecordBatch]) {
self.batches.append(contentsOf: batches)
}
/// Return the `SparkSession` of this `DataFrame`.
public var sparkSession: SparkSession {
get async throws {
return self.spark
}
}
/// A method to access the underlying Spark's `RDD`.
/// In `Spark Connect`, this feature is not allowed by design.
public func rdd() throws {
// SQLSTATE: 0A000
// [UNSUPPORTED_CONNECT_FEATURE.RDD]
// Feature is not supported in Spark Connect: Resilient Distributed Datasets (RDDs).
throw SparkConnectError.UnsupportedOperationException
}
/// Return an array of column name strings
public var columns: [String] {
get async throws {
var columns: [String] = []
try await analyzePlanIfNeeded()
for field in self._schema!.struct.fields {
columns.append(field.name)
}
return columns
}
}
/// Return a `JSON` string of data type because we cannot expose the internal type ``DataType``.
public var schema: String {
get async throws {
try await analyzePlanIfNeeded()
return try self._schema!.jsonString()
}
}
/// Returns all column names and their data types as an array.
public var dtypes: [(String, String)] {
get async throws {
try await analyzePlanIfNeeded()
return try self._schema!.struct.fields.map { ($0.name, try $0.dataType.simpleString) }
}
}
private func withGPRC<Result: Sendable>(
_ f: (GRPCClient<GRPCNIOTransportHTTP2.HTTP2ClientTransport.Posix>) async throws -> Result
) async throws -> Result {
try await withGRPCClient(
transport: .http2NIOPosix(
target: .dns(host: spark.client.host, port: spark.client.port),
transportSecurity: .plaintext
),
interceptors: spark.client.getIntercepters()
) { client in
return try await f(client)
}
}
private func analyzePlanIfNeeded() async throws {
if self._schema != nil {
return
}
try await withGPRC { client in
let service = Spark_Connect_SparkConnectService.Client(wrapping: client)
let response = try await service.analyzePlan(
spark.client.getAnalyzePlanRequest(spark.sessionID, plan))
self.setSchema(response.schema.schema)
}
}
/// Return the total number of rows.
/// - Returns: a `Int64` value.
@discardableResult
public func count() async throws -> Int64 {
let counter = Atomic(Int64(0))
try await withGPRC { client in
let service = Spark_Connect_SparkConnectService.Client(wrapping: client)
try await service.executePlan(spark.client.getExecutePlanRequest(plan)) {
response in
for try await m in response.messages {
counter.add(m.arrowBatch.rowCount, ordering: .relaxed)
}
}
}
return counter.load(ordering: .relaxed)
}
/// Execute the plan and try to fill `schema` and `batches`.
private func execute() async throws {
// Clear all existing batches.
self.batches.removeAll()
try await withGPRC { client in
let service = Spark_Connect_SparkConnectService.Client(wrapping: client)
try await service.executePlan(spark.client.getExecutePlanRequest(plan)) {
response in
for try await m in response.messages {
if m.hasSchema {
// The original schema should arrive before ArrowBatches
await self.setSchema(m.schema)
}
let ipcStreamBytes = m.arrowBatch.data
if !ipcStreamBytes.isEmpty && m.arrowBatch.rowCount > 0 {
let IPC_CONTINUATION_TOKEN = Int32(-1)
// Schema
assert(ipcStreamBytes[0..<4].int32 == IPC_CONTINUATION_TOKEN)
let schemaSize = Int64(ipcStreamBytes[4..<8].int32)
let schema = Data(ipcStreamBytes[8..<(8 + schemaSize)])
// Arrow IPC Data
assert(
ipcStreamBytes[(8 + schemaSize)..<(8 + schemaSize + 4)].int32
== IPC_CONTINUATION_TOKEN)
var pos: Int64 = 8 + schemaSize + 4
let dataHeaderSize = Int64(ipcStreamBytes[pos..<(pos + 4)].int32)
pos += 4
let dataHeader = Data(ipcStreamBytes[pos..<(pos + dataHeaderSize)])
pos += dataHeaderSize
let dataBodySize = Int64(ipcStreamBytes.count) - pos - 8
let dataBody = Data(ipcStreamBytes[pos..<(pos + dataBodySize)])
// Read ArrowBatches
let reader = ArrowReader()
let arrowResult = ArrowReader.makeArrowReaderResult()
_ = reader.fromMessage(schema, dataBody: Data(), result: arrowResult)
_ = reader.fromMessage(dataHeader, dataBody: dataBody, result: arrowResult)
await self.addBatches(arrowResult.batches)
}
}
}
}
}
/// Execute the plan and return the result as ``[Row]``.
/// - Returns: ``[Row]``
public func collect() async throws -> [Row] {
try await execute()
var result: [Row] = []
for batch in self.batches {
for i in 0..<batch.length {
var values: [Sendable?] = []
for column in batch.columns {
if column.data.isNull(i) {
values.append(nil)
} else {
let array = column.array
switch column.data.type.info {
case .primitiveInfo(.boolean):
values.append(array.asAny(i) as? Bool)
case .primitiveInfo(.int8):
values.append(array.asAny(i) as? Int8)
case .primitiveInfo(.int16):
values.append(array.asAny(i) as? Int16)
case .primitiveInfo(.int32):
values.append(array.asAny(i) as? Int32)
case .primitiveInfo(.int64):
values.append(array.asAny(i) as! Int64)
case .primitiveInfo(.float):
values.append(array.asAny(i) as? Float)
case .primitiveInfo(.double):
values.append(array.asAny(i) as? Double)
case .primitiveInfo(.date32):
values.append(array.asAny(i) as! Date)
case ArrowType.ArrowBinary:
values.append((array as! AsString).asString(i).utf8)
case .complexInfo(.strct):
values.append((array as! AsString).asString(i))
default:
values.append(array.asAny(i) as? String)
}
}
}
result.append(Row(valueArray: values))
}
}
return result
}
/// Displays the top 20 rows of ``DataFrame`` in a tabular form.
public func show() async throws {
try await show(20)
}
/// Displays the top 20 rows of ``DataFrame`` in a tabular form.
/// - Parameter truncate: Whether truncate long strings. If true, strings more than 20 characters will be truncated
/// and all cells will be aligned right
public func show(_ truncate: Bool) async throws {
try await show(20, truncate)
}
/// Displays the ``DataFrame`` in a tabular form.
/// - Parameters:
/// - numRows: Number of rows to show
/// - truncate: Whether truncate long strings. If true, strings more than 20 characters will be truncated
/// and all cells will be aligned right
public func show(_ numRows: Int32 = 20, _ truncate: Bool = true) async throws {
try await show(numRows, truncate ? 20 : 0)
}
/// Displays the ``DataFrame`` in a tabular form.
/// - Parameters:
/// - numRows: Number of rows to show
/// - truncate: If set to more than 0, truncates strings to `truncate` characters and all cells will be aligned right.
/// - vertical: If set to true, prints output rows vertically (one line per column value).
public func show(_ numRows: Int32, _ truncate: Int32, _ vertical: Bool = false) async throws {
let rows = try await showString(numRows, truncate, vertical).collect()
assert(rows.count == 1)
assert(rows[0].length == 1)
print(try rows[0].get(0) as! String)
}
func showString(_ numRows: Int32, _ truncate: Int32, _ vertical: Bool) -> DataFrame {
let plan = SparkConnectClient.getShowString(self.plan.root, numRows, truncate, vertical)
return DataFrame(spark: self.spark, plan: plan)
}
/// Selects a subset of existing columns using column names.
/// - Parameter cols: Column names
/// - Returns: A ``DataFrame`` with subset of columns.
public func select(_ cols: String...) -> DataFrame {
return DataFrame(spark: self.spark, plan: SparkConnectClient.getProject(self.plan.root, cols))
}
/// Selects a subset of existing columns using column names.
/// - Parameter cols: Column names
/// - Returns: A ``DataFrame`` with subset of columns.
public func toDF(_ cols: String...) -> DataFrame {
let df = if cols.isEmpty {
DataFrame(spark: self.spark, plan: self.plan)
} else {
DataFrame(spark: self.spark, plan: SparkConnectClient.getProject(self.plan.root, cols))
}
return df
}
/// Projects a set of expressions and returns a new ``DataFrame``.
/// - Parameter exprs: Expression strings
/// - Returns: A ``DataFrame`` with subset of columns.
public func selectExpr(_ exprs: String...) -> DataFrame {
return DataFrame(spark: self.spark, plan: SparkConnectClient.getProjectExprs(self.plan.root, exprs))
}
/// Returns a new Dataset with a column dropped. This is a no-op if schema doesn't contain column name.
/// - Parameter cols: Column names
/// - Returns: A ``DataFrame`` with subset of columns.
public func drop(_ cols: String...) -> DataFrame {
return DataFrame(spark: self.spark, plan: SparkConnectClient.getDrop(self.plan.root, cols))
}
/// Returns a new ``DataFrame`` that contains only the unique rows from this ``DataFrame``.
/// This is an alias for `distinct`. If column names are given, Spark considers only those columns.
/// - Parameter cols: Column names
/// - Returns: A ``DataFrame``.
public func dropDuplicates(_ cols: String...) -> DataFrame {
let plan = SparkConnectClient.getDropDuplicates(self.plan.root, cols, withinWatermark: false)
return DataFrame(spark: self.spark, plan: plan)
}
/// Returns a new Dataset with duplicates rows removed, within watermark.
/// If column names are given, Spark considers only those columns.
/// - Parameter cols: Column names
/// - Returns: A ``DataFrame``.
public func dropDuplicatesWithinWatermark(_ cols: String...) -> DataFrame {
let plan = SparkConnectClient.getDropDuplicates(self.plan.root, cols, withinWatermark: true)
return DataFrame(spark: self.spark, plan: plan)
}
/// Computes basic statistics for numeric and string columns, including count, mean, stddev, min,
/// and max. If no columns are given, this function computes statistics for all numerical or
/// string columns.
/// - Parameter cols: Column names.
/// - Returns: A ``DataFrame`` containing basic statistics.
public func describe(_ cols: String...) -> DataFrame {
return DataFrame(spark: self.spark, plan: SparkConnectClient.getDescribe(self.plan.root, cols))
}
/// Computes specified statistics for numeric and string columns. Available statistics are:
/// count, mean, stddev, min, max, arbitrary approximate percentiles specified as a percentage (e.g. 75%)
/// count_distinct, approx_count_distinct . If no statistics are given, this function computes count, mean,
/// stddev, min, approximate quartiles (percentiles at 25%, 50%, and 75%), and max.
/// - Parameter statistics: Statistics names.
/// - Returns: A ``DataFrame`` containing specified statistics.
public func summary(_ statistics: String...) -> DataFrame {
return DataFrame(spark: self.spark, plan: SparkConnectClient.getSummary(self.plan.root, statistics))
}
/// Returns a new Dataset with a column renamed. This is a no-op if schema doesn't contain existingName.
/// - Parameters:
/// - existingName: A existing column name to be renamed.
/// - newName: A new column name.
/// - Returns: A ``DataFrame`` with the renamed column.
public func withColumnRenamed(_ existingName: String, _ newName: String) -> DataFrame {
return withColumnRenamed([existingName: newName])
}
/// Returns a new Dataset with columns renamed. This is a no-op if schema doesn't contain existingName.
/// - Parameters:
/// - colNames: A list of existing colum names to be renamed.
/// - newColNames: A list of new column names.
/// - Returns: A ``DataFrame`` with the renamed columns.
public func withColumnRenamed(_ colNames: [String], _ newColNames: [String]) -> DataFrame {
let dic = Dictionary(uniqueKeysWithValues: zip(colNames, newColNames))
return DataFrame(spark: self.spark, plan: SparkConnectClient.getWithColumnRenamed(self.plan.root, dic))
}
/// Returns a new Dataset with columns renamed. This is a no-op if schema doesn't contain existingName.
/// - Parameter colsMap: A dictionary of existing column name and new column name.
/// - Returns: A ``DataFrame`` with the renamed columns.
public func withColumnRenamed(_ colsMap: [String: String]) -> DataFrame {
return DataFrame(spark: self.spark, plan: SparkConnectClient.getWithColumnRenamed(self.plan.root, colsMap))
}
/// Filters rows using the given condition.
///
/// The condition should be a SQL expression that evaluates to a boolean value.
///
/// ```swift
/// // Filter with simple condition
/// let adults = df.filter("age >= 18")
///
/// // Filter with complex condition
/// let qualifiedUsers = df.filter("age >= 21 AND department = 'Engineering'")
///
/// // Filter with SQL functions
/// let recent = df.filter("date_diff(current_date(), join_date) < 30")
/// ```
///
/// - Parameter conditionExpr: A SQL expression string for filtering
/// - Returns: A new DataFrame containing only rows that match the condition
public func filter(_ conditionExpr: String) -> DataFrame {
return DataFrame(spark: self.spark, plan: SparkConnectClient.getFilter(self.plan.root, conditionExpr))
}
/// Filters rows using the given condition (alias for filter).
///
/// This method is an alias for ``filter(_:)`` and behaves identically.
///
/// ```swift
/// let highSalary = df.where("salary > 100000")
/// ```
///
/// - Parameter conditionExpr: A SQL expression string for filtering
/// - Returns: A new DataFrame containing only rows that match the condition
public func `where`(_ conditionExpr: String) -> DataFrame {
return filter(conditionExpr)
}
/// Returns a new DataFrame sorted by the specified columns.
///
/// By default, sorts in ascending order. Use `desc("column")` for descending order.
///
/// ```swift
/// // Sort by single column (ascending)
/// let sorted = df.sort("age")
///
/// // Sort by multiple columns
/// let multiSort = df.sort("department", "salary")
///
/// // Sort with mixed order
/// let mixedSort = df.sort("department", "desc(salary)")
/// ```
///
/// - Parameter cols: Column names or expressions to sort by
/// - Returns: A new DataFrame sorted by the specified columns
public func sort(_ cols: String...) -> DataFrame {
return DataFrame(spark: self.spark, plan: SparkConnectClient.getSort(self.plan.root, cols))
}
/// Return a new ``DataFrame`` sorted by the specified column(s).
/// - Parameter cols: Column names.
/// - Returns: A sorted ``DataFrame``
public func orderBy(_ cols: String...) -> DataFrame {
return DataFrame(spark: self.spark, plan: SparkConnectClient.getSort(self.plan.root, cols))
}
/// Limits the result count to the number specified.
///
/// This transformation is often used for:
/// - Previewing data
/// - Reducing data size for testing
/// - Implementing pagination
///
/// ```swift
/// // Get top 10 records
/// let top10 = df.limit(10)
///
/// // Preview data
/// let preview = df.filter("status = 'active'").limit(100)
/// ```
///
/// - Parameter n: Maximum number of rows to return
/// - Returns: A new DataFrame with at most n rows
public func limit(_ n: Int32) -> DataFrame {
return DataFrame(spark: self.spark, plan: SparkConnectClient.getLimit(self.plan.root, n))
}
/// Returns a new Dataset by skipping the first `n` rows.
/// - Parameter n: Number of rows to skip.
/// - Returns: A subset of the rows
public func offset(_ n: Int32) -> DataFrame {
return DataFrame(spark: self.spark, plan: SparkConnectClient.getOffset(self.plan.root, n))
}
/// Returns a new ``Dataset`` by sampling a fraction of rows, using a user-supplied seed.
/// - Parameters:
/// - withReplacement: Sample with replacement or not.
/// - fraction: Fraction of rows to generate, range [0.0, 1.0].
/// - seed: Seed for sampling.
/// - Returns: A subset of the records.
public func sample(_ withReplacement: Bool, _ fraction: Double, _ seed: Int64) -> DataFrame {
return DataFrame(spark: self.spark, plan: SparkConnectClient.getSample(self.plan.root, withReplacement, fraction, seed))
}
/// Returns a new ``Dataset`` by sampling a fraction of rows, using a random seed.
/// - Parameters:
/// - withReplacement: Sample with replacement or not.
/// - fraction: Fraction of rows to generate, range [0.0, 1.0].
/// - Returns: A subset of the records.
public func sample(_ withReplacement: Bool, _ fraction: Double) -> DataFrame {
return sample(withReplacement, fraction, Int64.random(in: Int64.min...Int64.max))
}
/// Returns a new ``Dataset`` by sampling a fraction of rows (without replacement), using a
/// user-supplied seed.
/// - Parameters:
/// - fraction: Fraction of rows to generate, range [0.0, 1.0].
/// - seed: Seed for sampling.
/// - Returns: A subset of the records.
public func sample(_ fraction: Double, _ seed: Int64) -> DataFrame {
return sample(false, fraction, seed)
}
/// Returns a new ``Dataset`` by sampling a fraction of rows (without replacement), using a
/// random seed.
/// - Parameters:
/// - fraction: Fraction of rows to generate, range [0.0, 1.0].
/// - Returns: A subset of the records.
public func sample(_ fraction: Double) -> DataFrame {
return sample(false, fraction)
}
/// Returns the first n rows.
///
/// This method is useful for quickly examining the contents of a DataFrame.
///
/// ```swift
/// // Get the first row
/// let firstRow = try await df.head()
///
/// // Get the first 5 rows
/// let firstFive = try await df.head(5)
/// ```
///
/// - Parameter n: Number of rows to return (default: 1)
/// - Returns: An array of ``Row`` objects
/// - Throws: `SparkConnectError` if the operation fails
public func head(_ n: Int32 = 1) async throws -> [Row] {
return try await limit(n).collect()
}
/// Returns the last `n` rows.
/// - Parameter n: The number of rows.
/// - Returns: ``[Row]``
public func tail(_ n: Int32) async throws -> [Row] {
let lastN = DataFrame(spark:spark, plan: SparkConnectClient.getTail(self.plan.root, n))
return try await lastN.collect()
}
/// Returns true if the `collect` and `take` methods can be run locally
/// (without any Spark executors).
/// - Returns: True if the plan is local.
public func isLocal() async throws -> Bool {
try await withGPRC { client in
let service = Spark_Connect_SparkConnectService.Client(wrapping: client)
let response = try await service.analyzePlan(spark.client.getIsLocal(spark.sessionID, plan))
return response.isLocal.isLocal
}
}
/// Returns true if this `DataFrame` contains one or more sources that continuously return data as it
/// arrives.
/// - Returns: True if a plan is streaming.
public func isStreaming() async throws -> Bool {
try await withGPRC { client in
let service = Spark_Connect_SparkConnectService.Client(wrapping: client)
let response = try await service.analyzePlan(spark.client.getIsStreaming(spark.sessionID, plan))
return response.isStreaming.isStreaming
}
}
/// Checks if the ``DataFrame`` is empty and returns a boolean value.
/// - Returns: `true` if the ``DataFrame`` is empty, `false` otherwise.
public func isEmpty() async throws -> Bool {
return try await select().limit(1).count() == 0
}
/// Persists this DataFrame with the default storage level (MEMORY_AND_DISK).
///
/// Caching can significantly improve performance when a DataFrame is accessed multiple times.
/// The cached data is stored in memory and/or disk depending on the storage level.
///
/// ```swift
/// // Cache a frequently used DataFrame
/// let cachedDf = try await df.cache()
///
/// // Use the cached DataFrame multiple times
/// let count1 = try await cachedDf.count()
/// let count2 = try await cachedDf.filter("age > 30").count()
/// ```
///
/// - Returns: The cached DataFrame
/// - Throws: `SparkConnectError` if the operation fails
public func cache() async throws -> DataFrame {
return try await persist()
}
/// Persist this `DataFrame` with the given storage level.
/// - Parameter storageLevel: A storage level to apply.
@discardableResult
public func persist(storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK) async throws
-> DataFrame
{
try await withGPRC { client in
let service = Spark_Connect_SparkConnectService.Client(wrapping: client)
_ = try await service.analyzePlan(
spark.client.getPersist(spark.sessionID, plan, storageLevel))
}
return self
}
/// Mark the `DataFrame` as non-persistent, and remove all blocks for it from memory and disk.
/// This will not un-persist any cached data that is built upon this `DataFrame`.
/// - Parameter blocking: Whether to block until all blocks are deleted.
/// - Returns: A `DataFrame`
@discardableResult
public func unpersist(blocking: Bool = false) async throws -> DataFrame {
try await withGPRC { client in
let service = Spark_Connect_SparkConnectService.Client(wrapping: client)
_ = try await service.analyzePlan(spark.client.getUnpersist(spark.sessionID, plan, blocking))
}
return self
}
public var storageLevel: StorageLevel {
get async throws {
try await withGPRC { client in
let service = Spark_Connect_SparkConnectService.Client(wrapping: client)
return try await service
.analyzePlan(spark.client.getStorageLevel(spark.sessionID, plan)).getStorageLevel.storageLevel.toStorageLevel
}
}
}
/// Returns a `hashCode` of the logical query plan against this ``DataFrame``.
/// - Returns: A hashcode value.
public func semanticHash() async throws -> Int32 {
return try await self.spark.semanticHash(self.plan)
}
/// Returns `true` when the logical query plans inside both ``Dataset``s are equal and therefore
/// return same results.
/// - Parameter other: A ``DataFrame`` to compare.
/// - Returns: Whether the both logical plans are equal.
public func sameSemantics(other: DataFrame) async throws -> Bool {
return try await self.spark.sameSemantics(self.plan, other.getPlan() as! Plan)
}
/// Prints the physical plan to the console for debugging purposes.
public func explain() async throws {
try await explain("simple")
}
/// Prints the plans (logical and physical) to the console for debugging purposes.
/// - Parameter extended: If `false`, prints only the physical plan.
public func explain(_ extended: Bool) async throws {
if (extended) {
try await explain("extended")
} else {
try await explain("simple")
}
}
/// Prints the plans (logical and physical) with a format specified by a given explain mode.
/// - Parameter mode: the expected output format of plans;
/// `simple`, `extended`, `codegen`, `cost`, `formatted`.
public func explain(_ mode: String) async throws {
try await withGPRC { client in
let service = Spark_Connect_SparkConnectService.Client(wrapping: client)
let response = try await service.analyzePlan(spark.client.getExplain(spark.sessionID, plan, mode))
print(response.explain.explainString)
}
}
/// Returns a best-effort snapshot of the files that compose this Dataset. This method simply
/// asks each constituent BaseRelation for its respective files and takes the union of all
/// results. Depending on the source relations, this may not find all input files. Duplicates are removed.
/// - Returns: An array of file path strings.
public func inputFiles() async throws -> [String] {
try await withGPRC { client in
let service = Spark_Connect_SparkConnectService.Client(wrapping: client)
let response = try await service.analyzePlan(spark.client.getInputFiles(spark.sessionID, plan))
return response.inputFiles.files
}
}
/// Prints the schema to the console in a nice tree format.
public func printSchema() async throws {
try await printSchema(Int32.max)
}
/// Prints the schema up to the given level to the console in a nice tree format.
/// - Parameter level: A level to be printed.
public func printSchema(_ level: Int32) async throws {
try await withGPRC { client in
let service = Spark_Connect_SparkConnectService.Client(wrapping: client)
let response = try await service.analyzePlan(spark.client.getTreeString(spark.sessionID, plan, level))
print(response.treeString.treeString)
}
}
/// Join with another DataFrame.
///
/// This performs an inner join and requires a subsequent join predicate.
/// For other join types, use the overloaded methods with join type parameter.
///
/// ```swift
/// // Basic join (requires join condition)
/// let joined = df1.join(df2)
/// .where("df1.id = df2.user_id")
///
/// // Join with condition
/// let result = users.join(orders, "id")
/// ```
///
/// - Parameter right: The DataFrame to join with
/// - Returns: A new DataFrame representing the join result
public func join(_ right: DataFrame) async -> DataFrame {
let right = await (right.getPlan() as! Plan).root
let plan = SparkConnectClient.getJoin(self.plan.root, right, JoinType.inner)
return DataFrame(spark: self.spark, plan: plan)
}
/// Equi-join with another DataFrame using the given column.
///
/// This method performs an equi-join on a single column that exists in both DataFrames.
///
/// ```swift
/// // Inner join on a single column
/// let joined = users.join(orders, "user_id")
///
/// // Left outer join
/// let leftJoined = users.join(orders, "user_id", "left")
///
/// // Other join types: "inner", "outer", "left", "right", "semi", "anti"
/// ```
///
/// - Parameters:
/// - right: The DataFrame to join with
/// - usingColumn: Column name that exists in both DataFrames
/// - joinType: Type of join (default: "inner")
/// - Returns: A new DataFrame with the join result
public func join(_ right: DataFrame, _ usingColumn: String, _ joinType: String = "inner") async -> DataFrame {
await join(right, [usingColumn], joinType)
}
/// Inner equi-join with another `DataFrame` using the given columns.
/// - Parameters:
/// - right: Right side of the join operation.
/// - usingColumn: Names of the columns to join on. These columns must exist on both sides.
/// - joinType: A join type name.
/// - Returns: A `DataFrame`.
public func join(_ other: DataFrame, _ usingColumns: [String], _ joinType: String = "inner") async -> DataFrame {
let right = await (other.getPlan() as! Plan).root
let plan = SparkConnectClient.getJoin(
self.plan.root,
right,
joinType.toJoinType,
usingColumns: usingColumns
)
return DataFrame(spark: self.spark, plan: plan)
}
/// Inner equi-join with another `DataFrame` using the given columns.
/// - Parameters:
/// - right: Right side of the join operation.
/// - joinExprs:A join expression string.
/// - Returns: A `DataFrame`.
public func join(_ right: DataFrame, joinExprs: String) async -> DataFrame {
return await join(right, joinExprs: joinExprs, joinType: "inner")
}
/// Inner equi-join with another `DataFrame` using the given columns.
/// - Parameters:
/// - right: Right side of the join operation.
/// - joinExprs:A join expression string.
/// - joinType: A join type name.
/// - Returns: A `DataFrame`.
public func join(_ right: DataFrame, joinExprs: String, joinType: String) async -> DataFrame {
let rightPlan = await (right.getPlan() as! Plan).root
let plan = SparkConnectClient.getJoin(
self.plan.root,
rightPlan,
joinType.toJoinType,
joinCondition: joinExprs
)
return DataFrame(spark: self.spark, plan: plan)
}
/// Explicit cartesian join with another `DataFrame`.
/// - Parameter right: Right side of the join operation.
/// - Returns: A `DataFrame`.
public func crossJoin(_ right: DataFrame) async -> DataFrame {
let rightPlan = await (right.getPlan() as! Plan).root
let plan = SparkConnectClient.getJoin(self.plan.root, rightPlan, JoinType.cross)
return DataFrame(spark: self.spark, plan: plan)
}
/// Lateral join with another ``DataFrame``.
///
/// Behaves as an JOIN LATERAL.
///
/// - Parameters:
/// - right: Right side of the join operation.
/// - Returns: A ``DataFrame``.
public func lateralJoin(_ right: DataFrame) async -> DataFrame {
let rightPlan = await (right.getPlan() as! Plan).root
let plan = SparkConnectClient.getLateralJoin(
self.plan.root,
rightPlan,
JoinType.inner
)
return DataFrame(spark: self.spark, plan: plan)
}
/// Lateral join with another ``DataFrame``.
///
/// Behaves as an JOIN LATERAL.
///
/// - Parameters:
/// - right: Right side of the join operation.
/// - joinType: One of `inner` (default), `cross`, `left`, `leftouter`, `left_outer`.
/// - Returns: A ``DataFrame``.
public func lateralJoin(_ right: DataFrame, joinType: String) async -> DataFrame {
let rightPlan = await (right.getPlan() as! Plan).root
let plan = SparkConnectClient.getLateralJoin(
self.plan.root,
rightPlan,
joinType.toJoinType
)
return DataFrame(spark: self.spark, plan: plan)
}
/// Lateral join with another ``DataFrame``.
///
/// Behaves as an JOIN LATERAL.
///
/// - Parameters:
/// - right: Right side of the join operation.
/// - joinExprs: A join expression string.
/// - Returns: A ``DataFrame``.
public func lateralJoin(_ right: DataFrame, joinExprs: String) async -> DataFrame {
let rightPlan = await (right.getPlan() as! Plan).root
let plan = SparkConnectClient.getLateralJoin(
self.plan.root,
rightPlan,
JoinType.inner,
joinCondition: joinExprs
)
return DataFrame(spark: self.spark, plan: plan)
}
/// Lateral join with another ``DataFrame``.
///
/// Behaves as an JOIN LATERAL.
///
/// - Parameters:
/// - right: Right side of the join operation.
/// - joinType: One of `inner` (default), `cross`, `left`, `leftouter`, `left_outer`.
/// - joinExprs: A join expression string.
/// - Returns: A ``DataFrame``.
public func lateralJoin(
_ right: DataFrame, joinExprs: String, joinType: String = "inner"
) async -> DataFrame {
let rightPlan = await (right.getPlan() as! Plan).root
let plan = SparkConnectClient.getLateralJoin(
self.plan.root,
rightPlan,
joinType.toJoinType,
joinCondition: joinExprs
)
return DataFrame(spark: self.spark, plan: plan)
}
/// Returns a new `DataFrame` containing rows in this `DataFrame` but not in another `DataFrame`.
/// This is equivalent to `EXCEPT DISTINCT` in SQL.
/// - Parameter other: A `DataFrame` to exclude.
/// - Returns: A `DataFrame`.
public func except(_ other: DataFrame) async -> DataFrame {
let right = await (other.getPlan() as! Plan).root
let plan = SparkConnectClient.getSetOperation(self.plan.root, right, SetOpType.except)
return DataFrame(spark: self.spark, plan: plan)
}
/// Returns a new `DataFrame` containing rows in this `DataFrame` but not in another `DataFrame` while
/// preserving the duplicates. This is equivalent to `EXCEPT ALL` in SQL.
/// - Parameter other: A `DataFrame` to exclude.
/// - Returns: A `DataFrame`.
public func exceptAll(_ other: DataFrame) async -> DataFrame {
let right = await (other.getPlan() as! Plan).root
let plan = SparkConnectClient.getSetOperation(self.plan.root, right, SetOpType.except, isAll: true)
return DataFrame(spark: self.spark, plan: plan)
}
/// Returns a new `DataFrame` containing rows only in both this `DataFrame` and another `DataFrame`.
/// This is equivalent to `INTERSECT` in SQL.
/// - Parameter other: A `DataFrame` to intersect with.
/// - Returns: A `DataFrame`.
public func intersect(_ other: DataFrame) async -> DataFrame {
let right = await (other.getPlan() as! Plan).root
let plan = SparkConnectClient.getSetOperation(self.plan.root, right, SetOpType.intersect)
return DataFrame(spark: self.spark, plan: plan)
}
/// Returns a new `DataFrame` containing rows only in both this `DataFrame` and another `DataFrame` while
/// preserving the duplicates. This is equivalent to `INTERSECT ALL` in SQL.
/// - Parameter other: A `DataFrame` to intersect with.
/// - Returns: A `DataFrame`.
public func intersectAll(_ other: DataFrame) async -> DataFrame {
let right = await (other.getPlan() as! Plan).root
let plan = SparkConnectClient.getSetOperation(self.plan.root, right, SetOpType.intersect, isAll: true)
return DataFrame(spark: self.spark, plan: plan)
}
/// Returns a new `DataFrame` containing union of rows in this `DataFrame` and another `DataFrame`.
/// This is equivalent to `UNION ALL` in SQL. To do a SQL-style set union (that does
/// deduplication of elements), use this function followed by a [[distinct]].
/// Also as standard in SQL, this function resolves columns by position (not by name)
/// - Parameter other: A `DataFrame` to union with.
/// - Returns: A `DataFrame`.
public func union(_ other: DataFrame) async -> DataFrame {
let right = await (other.getPlan() as! Plan).root
let plan = SparkConnectClient.getSetOperation(self.plan.root, right, SetOpType.union, isAll: true)
return DataFrame(spark: self.spark, plan: plan)
}
/// Returns a new `DataFrame` containing union of rows in this `DataFrame` and another `DataFrame`.
/// This is an alias of `union`.
/// - Parameter other: A `DataFrame` to union with.
/// - Returns: A `DataFrame`.
public func unionAll(_ other: DataFrame) async -> DataFrame {
return await union(other)
}
/// Returns a new `DataFrame` containing union of rows in this `DataFrame` and another `DataFrame`.
/// The difference between this function and [[union]] is that this function resolves columns by
/// name (not by position).
/// When the parameter `allowMissingColumns` is `true`, the set of column names in this and other
/// `DataFrame` can differ; missing columns will be filled with null. Further, the missing columns
/// of this `DataFrame` will be added at the end in the schema of the union result
/// - Parameter other: A `DataFrame` to union with.
/// - Returns: A `DataFrame`.
public func unionByName(_ other: DataFrame, _ allowMissingColumns: Bool = false) async -> DataFrame {
let right = await (other.getPlan() as! Plan).root
let plan = SparkConnectClient.getSetOperation(
self.plan.root,
right,
SetOpType.union,
isAll: true,
byName: true,
allowMissingColumns: allowMissingColumns
)
return DataFrame(spark: self.spark, plan: plan)
}
private func buildRepartition(numPartitions: Int32, shuffle: Bool) -> DataFrame {
let plan = SparkConnectClient.getRepartition(self.plan.root, numPartitions, shuffle)
return DataFrame(spark: self.spark, plan: plan)
}
private func buildRepartitionByExpression(numPartitions: Int32?, partitionExprs: [String]) -> DataFrame {
let plan = SparkConnectClient.getRepartitionByExpression(self.plan.root, partitionExprs, numPartitions)
return DataFrame(spark: self.spark, plan: plan)
}
/// Returns a new ``DataFrame`` that has exactly `numPartitions` partitions.
/// - Parameter numPartitions: The number of partitions.
/// - Returns: A `DataFrame`.
public func repartition(_ numPartitions: Int32) -> DataFrame {
return buildRepartition(numPartitions: numPartitions, shuffle: true)
}
/// Returns a new ``DataFrame`` partitioned by the given partitioning expressions, using
/// `spark.sql.shuffle.partitions` as number of partitions. The resulting Dataset is hash
/// partitioned.
/// - Parameter partitionExprs: The partition expression strings.
/// - Returns: A `DataFrame`.
public func repartition(_ partitionExprs: String...) -> DataFrame {
return buildRepartitionByExpression(numPartitions: nil, partitionExprs: partitionExprs)
}
/// Returns a new ``DataFrame`` partitioned by the given partitioning expressions, using
/// `spark.sql.shuffle.partitions` as number of partitions. The resulting Dataset is hash
/// partitioned.
/// - Parameters:
/// - numPartitions: The number of partitions.
/// - partitionExprs: The partition expression strings.
/// - Returns: A `DataFrame`.
public func repartition(_ numPartitions: Int32, _ partitionExprs: String...) -> DataFrame {
return buildRepartitionByExpression(numPartitions: numPartitions, partitionExprs: partitionExprs)
}
/// Returns a new ``DataFrame`` partitioned by the given partitioning expressions, using
/// `spark.sql.shuffle.partitions` as number of partitions. The resulting Dataset is hash
/// partitioned.
/// - Parameter partitionExprs: The partition expression strings.
/// - Returns: A `DataFrame`.
public func repartitionByExpression(_ numPartitions: Int32?, _ partitionExprs: String...) -> DataFrame {
return buildRepartitionByExpression(numPartitions: numPartitions, partitionExprs: partitionExprs)
}
/// Returns a new ``DataFrame`` that has exactly `numPartitions` partitions, when the fewer partitions
/// are requested. If a larger number of partitions is requested, it will stay at the current
/// number of partitions. Similar to coalesce defined on an `RDD`, this operation results in a
/// narrow dependency, e.g. if you go from 1000 partitions to 100 partitions, there will not be a
/// shuffle, instead each of the 100 new partitions will claim 10 of the current partitions.
/// - Parameter numPartitions: The number of partitions.
/// - Returns: A `DataFrame`.
public func coalesce(_ numPartitions: Int32) -> DataFrame {
return buildRepartition(numPartitions: numPartitions, shuffle: false)
}
/// Returns a new ``Dataset`` that contains only the unique rows from this ``Dataset``.
/// This is an alias for `dropDuplicates`.
/// - Returns: A `DataFrame`.
public func distinct() -> DataFrame {
return dropDuplicates()
}
/// Groups the DataFrame using the specified columns.
///
/// This method is used to perform aggregations on groups of data.
/// After grouping, you can apply aggregation functions like count(), sum(), avg(), etc.
///
/// ```swift
/// // Group by single column
/// let byDept = df.groupBy("department")
/// .agg(count("*").alias("employee_count"))
///
/// // Group by multiple columns
/// let byDeptAndLocation = df.groupBy("department", "location")
/// .agg(
/// avg("salary").alias("avg_salary"),
/// max("salary").alias("max_salary")
/// )
/// ```
///
/// - Parameter cols: Column names to group by
/// - Returns: A ``GroupedData`` object for aggregation operations
public func groupBy(_ cols: String...) -> GroupedData {
return GroupedData(self, GroupType.groupby, cols)
}
/// Create a multi-dimensional rollup for the current ``DataFrame`` using the specified columns, so we
/// can run aggregation on them.
/// - Parameter cols: Grouping column names.
/// - Returns: A ``GroupedData``.
public func rollup(_ cols: String...) -> GroupedData {
return GroupedData(self, GroupType.rollup, cols)
}
/// Create a multi-dimensional cube for the current ``DataFrame`` using the specified columns, so we
/// can run aggregation on them.
/// - Parameter cols: Grouping column names.
/// - Returns: A ``GroupedData``.
public func cube(_ cols: String...) -> GroupedData {
return GroupedData(self, GroupType.cube, cols)
}
/// Creates a local temporary view using the given name. The lifetime of this temporary view is
/// tied to the `SparkSession` that was used to create this ``DataFrame``.
/// - Parameter viewName: A view name.
public func createTempView(_ viewName: String) async throws {
try await createTempView(viewName, replace: false, global: false)
}
/// Creates a local temporary view using the given name. The lifetime of this temporary view is
/// tied to the `SparkSession` that was used to create this ``DataFrame``.
/// - Parameter viewName: A view name.
public func createOrReplaceTempView(_ viewName: String) async throws {
try await createTempView(viewName, replace: true, global: false)
}
/// Creates a global temporary view using the given name. The lifetime of this temporary view is
/// tied to this Spark application, but is cross-session.
/// - Parameter viewName: A view name.
public func createGlobalTempView(_ viewName: String) async throws {
try await createTempView(viewName, replace: false, global: true)
}
/// Creates a global temporary view using the given name. The lifetime of this temporary view is
/// tied to this Spark application, but is cross-session.
/// - Parameter viewName: A view name.
public func createOrReplaceGlobalTempView(_ viewName: String) async throws {
try await createTempView(viewName, replace: true, global: true)
}
func createTempView(_ viewName: String, replace: Bool, global: Bool) async throws {
try await spark.client.createTempView(self.plan.root, viewName, replace: replace, isGlobal: global)
}
/// Returns a ``DataFrameWriter`` that can be used to write non-streaming data.
public var write: DataFrameWriter {
get {
return DataFrameWriter(df: self)
}
}
/// Create a write configuration builder for v2 sources.
/// - Parameter table: A table name, e.g., `catalog.db.table`.
/// - Returns: A ``DataFrameWriterV2`` instance.
public func writeTo(_ table: String) -> DataFrameWriterV2 {
return DataFrameWriterV2(table, self)
}
}