Sources/SparkConnect/DataFrameWriter.swift (132 lines of code) (raw):

// // Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. // import Foundation /// An interface used to write a `DataFrame` to external storage systems /// (e.g. file systems, key-value stores, etc). Use `DataFrame.write` to access this. public actor DataFrameWriter: Sendable { var source: String? = nil var saveMode: String = "default" var extraOptions: CaseInsensitiveDictionary = CaseInsensitiveDictionary() var partitioningColumns: [String]? = nil var bucketColumnNames: [String]? = nil var numBuckets: Int32? = nil var sortColumnNames: [String]? = nil var clusteringColumns: [String]? = nil let df: DataFrame init(df: DataFrame) { self.df = df } /// Specifies the output data source format. /// - Parameter source: A string. /// - Returns: A `DataFrameReader`. public func format(_ source: String) -> DataFrameWriter { self.source = source return self } /// Specifies the behavior when data or table already exists. Options include: /// `overwrite`, `append`, `ignore`, `error` or `errorifexists` (default). /// /// - Parameter saveMode: A string for save mode. /// - Returns: A `DataFrameWriter` public func mode(_ saveMode: String) -> DataFrameWriter { self.saveMode = saveMode return self } /// Adds an output option for the underlying data source. /// - Parameters: /// - key: A key string. /// - value: A value string. /// - Returns: A `DataFrameWriter`. public func option(_ key: String, _ value: String) -> DataFrameWriter { self.extraOptions[key] = value return self } public func partitionBy(_ columns: String...) -> DataFrameWriter { self.partitioningColumns = columns return self } public func bucketBy(numBuckets: Int32, _ columnNames: String...) -> DataFrameWriter { self.numBuckets = numBuckets self.bucketColumnNames = columnNames return self } public func sortBy(_ columnNames: String...) -> DataFrameWriter { self.sortColumnNames = columnNames return self } public func clusterBy(_ columnNames: String...) -> DataFrameWriter { self.clusteringColumns = columnNames return self } /// Loads input in as a `DataFrame`, for data sources that don't require a path (e.g. external /// key-value stores). public func save() async throws { return try await saveInternal(nil) } /// Loads input in as a `DataFrame`, for data sources that require a path (e.g. data backed by a /// local or distributed file system). /// - Parameter path: A path string. public func save(_ path: String) async throws { try await saveInternal(path) } private func saveInternal(_ path: String?) async throws { try await executeWriteOperation({ var write = WriteOperation() if let path = path { write.path = path } return write }) } /// Saves the content of the ``DataFrame`` as the specified table. /// - Parameter tableName: A table name. public func saveAsTable(_ tableName: String) async throws { try await executeWriteOperation({ var write = WriteOperation() write.table.tableName = tableName write.table.saveMethod = .saveAsTable return write }) } /// Inserts the content of the ``DataFrame`` to the specified table. It requires that the schema of /// the ``DataFrame`` is the same as the schema of the table. Unlike ``saveAsTable``, /// ``insertInto`` ignores the column names and just uses position-based resolution. /// - Parameter tableName: A table name. public func insertInto(_ tableName: String) async throws { try await executeWriteOperation({ var write = WriteOperation() write.table.tableName = tableName write.table.saveMethod = .insertInto return write }) } private func executeWriteOperation(_ f: () -> WriteOperation) async throws { var write = f() // Cannot both be set assert(!(!write.path.isEmpty && !write.table.tableName.isEmpty)) let plan = await self.df.getPlan() as! Plan write.input = plan.root write.mode = self.saveMode.toSaveMode if let source = self.source { write.source = source } if let sortColumnNames = self.sortColumnNames { write.sortColumnNames = sortColumnNames } if let partitioningColumns = self.partitioningColumns { write.partitioningColumns = partitioningColumns } if let clusteringColumns = self.clusteringColumns { write.clusteringColumns = clusteringColumns } if let numBuckets = self.numBuckets { var bucketBy = WriteOperation.BucketBy() bucketBy.numBuckets = numBuckets if let bucketColumnNames = self.bucketColumnNames { bucketBy.bucketColumnNames = bucketColumnNames } write.bucketBy = bucketBy } for option in self.extraOptions.toStringDictionary() { write.options[option.key] = option.value } var command = Spark_Connect_Command() command.writeOperation = write try await df.spark.client.execute(df.spark.sessionID, command) } /// Saves the content of the `DataFrame` in CSV format at the specified path. /// - Parameter path: A path string /// - Returns: A `DataFrame`. public func csv(_ path: String) async throws { self.source = "csv" return try await save(path) } /// Saves the content of the `DataFrame` in JSON format at the specified path. /// - Parameter path: A path string /// - Returns: A `DataFrame`. public func json(_ path: String) async throws { self.source = "json" return try await save(path) } /// Saves the content of the `DataFrame` in XML format at the specified path. /// - Parameter path: A path string /// - Returns: A `DataFrame`. public func xml(_ path: String) async throws { self.source = "xml" return try await save(path) } /// Saves the content of the `DataFrame` in ORC format at the specified path. /// - Parameter path: A path string /// - Returns: A `DataFrame`. public func orc(_ path: String) async throws { self.source = "orc" return try await save(path) } /// Saves the content of the `DataFrame` in Parquet format at the specified path. /// - Parameter path: A path string public func parquet(_ path: String) async throws { self.source = "parquet" return try await save(path) } /// Saves the content of the `DataFrame` in a text file at the specified path. /// The DataFrame must have only one column that is of string type. /// Each row becomes a new line in the output file. /// /// - Parameter path: A path string public func text(_ path: String) async throws { self.source = "text" return try await save(path) } }