Sources/SparkConnect/ArrowArray.swift (269 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
import Foundation
/// @nodoc
public protocol ArrowArrayHolder {
var type: ArrowType { get }
var length: UInt { get }
var nullCount: UInt { get }
var array: AnyArray { get }
var data: ArrowData { get }
var getBufferData: () -> [Data] { get }
var getBufferDataSizes: () -> [Int] { get }
var getArrowColumn: (ArrowField, [ArrowArrayHolder]) throws -> ArrowColumn { get }
}
/// @nodoc
public class ArrowArrayHolderImpl: ArrowArrayHolder {
public let data: ArrowData
public let type: ArrowType
public let length: UInt
public let nullCount: UInt
public let array: AnyArray
public let getBufferData: () -> [Data]
public let getBufferDataSizes: () -> [Int]
public let getArrowColumn: (ArrowField, [ArrowArrayHolder]) throws -> ArrowColumn
public init<T>(_ arrowArray: ArrowArray<T>) {
self.array = arrowArray
self.data = arrowArray.arrowData
self.length = arrowArray.length
self.type = arrowArray.arrowData.type
self.nullCount = arrowArray.nullCount
self.getBufferData = { () -> [Data] in
var bufferData = [Data]()
for buffer in arrowArray.arrowData.buffers {
bufferData.append(Data())
buffer.append(to: &bufferData[bufferData.count - 1])
}
return bufferData
}
self.getBufferDataSizes = { () -> [Int] in
var bufferDataSizes = [Int]()
for buffer in arrowArray.arrowData.buffers {
bufferDataSizes.append(Int(buffer.capacity))
}
return bufferDataSizes
}
self.getArrowColumn = {
(field: ArrowField, arrayHolders: [ArrowArrayHolder]) throws -> ArrowColumn in
var arrays = [ArrowArray<T>]()
for arrayHolder in arrayHolders {
if let array = arrayHolder.array as? ArrowArray<T> {
arrays.append(array)
}
}
return ArrowColumn(field, chunked: ChunkedArrayHolder(try ChunkedArray<T>(arrays)))
}
}
public static func loadArray( // swiftlint:disable:this cyclomatic_complexity
_ arrowType: ArrowType, with: ArrowData
) throws -> ArrowArrayHolder {
switch arrowType.id {
case .int8:
return try ArrowArrayHolderImpl(FixedArray<Int8>(with))
case .int16:
return try ArrowArrayHolderImpl(FixedArray<Int16>(with))
case .int32:
return try ArrowArrayHolderImpl(FixedArray<Int32>(with))
case .int64:
return try ArrowArrayHolderImpl(FixedArray<Int64>(with))
case .uint8:
return try ArrowArrayHolderImpl(FixedArray<UInt8>(with))
case .uint16:
return try ArrowArrayHolderImpl(FixedArray<UInt16>(with))
case .uint32:
return try ArrowArrayHolderImpl(FixedArray<UInt32>(with))
case .uint64:
return try ArrowArrayHolderImpl(FixedArray<UInt64>(with))
case .double:
return try ArrowArrayHolderImpl(FixedArray<Double>(with))
case .float:
return try ArrowArrayHolderImpl(FixedArray<Float>(with))
case .date32:
return try ArrowArrayHolderImpl(Date32Array(with))
case .date64:
return try ArrowArrayHolderImpl(Date64Array(with))
case .time32:
return try ArrowArrayHolderImpl(Time32Array(with))
case .time64:
return try ArrowArrayHolderImpl(Time64Array(with))
case .string:
return try ArrowArrayHolderImpl(StringArray(with))
case .boolean:
return try ArrowArrayHolderImpl(BoolArray(with))
case .binary:
return try ArrowArrayHolderImpl(BinaryArray(with))
case .strct:
return try ArrowArrayHolderImpl(StructArray(with))
default:
throw ArrowError.invalid("Array not found for type: \(arrowType)")
}
}
}
/// @nodoc
public class ArrowArray<T>: AsString, AnyArray {
public typealias ItemType = T
public let arrowData: ArrowData
public var nullCount: UInt { return self.arrowData.nullCount }
public var length: UInt { return self.arrowData.length }
public required init(_ arrowData: ArrowData) throws {
self.arrowData = arrowData
}
public func isNull(_ at: UInt) throws -> Bool {
if at >= self.length {
throw ArrowError.outOfBounds(index: Int64(at))
}
return self.arrowData.isNull(at)
}
public subscript(_ index: UInt) -> T? {
fatalError("subscript() has not been implemented")
}
public func asString(_ index: UInt) -> String {
if self[index] == nil {
return ""
}
return "\(self[index]!)"
}
public func asAny(_ index: UInt) -> Any? {
if self[index] == nil {
return nil
}
return self[index]!
}
}
/// @nodoc
public class FixedArray<T>: ArrowArray<T> {
public override subscript(_ index: UInt) -> T? {
if self.arrowData.isNull(index) {
return nil
}
let byteOffset = self.arrowData.stride * Int(index)
return self.arrowData.buffers[1].rawPointer.advanced(by: byteOffset).load(as: T.self)
}
}
/// @nodoc
public class StringArray: ArrowArray<String> {
public override subscript(_ index: UInt) -> String? {
let offsetIndex = MemoryLayout<Int32>.stride * Int(index)
if self.arrowData.isNull(index) {
return nil
}
let offsets = self.arrowData.buffers[1]
let values = self.arrowData.buffers[2]
var startIndex: Int32 = 0
if index > 0 {
startIndex = offsets.rawPointer.advanced(by: offsetIndex).load(as: Int32.self)
}
let endIndex = offsets.rawPointer.advanced(by: offsetIndex + MemoryLayout<Int32>.stride)
.load(as: Int32.self)
let arrayLength = Int(endIndex - startIndex)
let rawPointer = values.rawPointer.advanced(by: Int(startIndex))
.bindMemory(to: UInt8.self, capacity: arrayLength)
let buffer = UnsafeBufferPointer<UInt8>(start: rawPointer, count: arrayLength)
let byteArray = Array(buffer)
return String(data: Data(byteArray), encoding: .utf8)
}
}
/// @nodoc
public class BoolArray: ArrowArray<Bool> {
public override subscript(_ index: UInt) -> Bool? {
if self.arrowData.isNull(index) {
return nil
}
let valueBuffer = self.arrowData.buffers[1]
return BitUtility.isSet(index, buffer: valueBuffer)
}
}
/// @nodoc
public class Date32Array: ArrowArray<Date> {
public override subscript(_ index: UInt) -> Date? {
if self.arrowData.isNull(index) {
return nil
}
let byteOffset = self.arrowData.stride * Int(index)
let milliseconds = self.arrowData.buffers[1].rawPointer.advanced(by: byteOffset).load(
as: UInt32.self)
return Date(timeIntervalSince1970: TimeInterval(milliseconds * 86400))
}
}
/// @nodoc
public class Date64Array: ArrowArray<Date> {
public override subscript(_ index: UInt) -> Date? {
if self.arrowData.isNull(index) {
return nil
}
let byteOffset = self.arrowData.stride * Int(index)
let milliseconds = self.arrowData.buffers[1].rawPointer.advanced(by: byteOffset).load(
as: UInt64.self)
return Date(timeIntervalSince1970: TimeInterval(milliseconds / 1000))
}
}
/// @nodoc
public class Time32Array: FixedArray<Time32> {}
/// @nodoc
public class Time64Array: FixedArray<Time64> {}
/// @nodoc
public class BinaryArray: ArrowArray<Data> {
public struct Options {
public var printAsHex = false
public var printEncoding: String.Encoding = .utf8
}
public var options = Options()
public override subscript(_ index: UInt) -> Data? {
let offsetIndex = MemoryLayout<Int32>.stride * Int(index)
if self.arrowData.isNull(index) {
return nil
}
let offsets = self.arrowData.buffers[1]
let values = self.arrowData.buffers[2]
var startIndex: Int32 = 0
if index > 0 {
startIndex = offsets.rawPointer.advanced(by: offsetIndex).load(as: Int32.self)
}
let endIndex = offsets.rawPointer.advanced(by: offsetIndex + MemoryLayout<Int32>.stride)
.load(as: Int32.self)
let arrayLength = Int(endIndex - startIndex)
let rawPointer = values.rawPointer.advanced(by: Int(startIndex))
.bindMemory(to: UInt8.self, capacity: arrayLength)
let buffer = UnsafeBufferPointer<UInt8>(start: rawPointer, count: arrayLength)
let byteArray = Array(buffer)
return Data(byteArray)
}
public override func asString(_ index: UInt) -> String {
if self[index] == nil { return "" }
let data = self[index]!
if options.printAsHex {
return data.hexEncodedString()
} else {
return String(data: data, encoding: .utf8)!
}
}
}
/// @nodoc
public class StructArray: ArrowArray<[Any?]> {
public private(set) var arrowFields: [ArrowArrayHolder]?
public required init(_ arrowData: ArrowData) throws {
try super.init(arrowData)
var fields = [ArrowArrayHolder]()
for child in arrowData.children {
fields.append(try ArrowArrayHolderImpl.loadArray(child.type, with: child))
}
self.arrowFields = fields
}
public override subscript(_ index: UInt) -> [Any?]? {
if self.arrowData.isNull(index) {
return nil
}
if let fields = arrowFields {
var result = [Any?]()
for field in fields {
result.append(field.array.asAny(index))
}
return result
}
return nil
}
public override func asString(_ index: UInt) -> String {
if self.arrowData.isNull(index) {
return ""
}
var output = "{"
if let fields = arrowFields {
for fieldIndex in 0..<fields.count {
let asStr = fields[fieldIndex].array as? AsString
if fieldIndex == 0 {
output.append("\(asStr!.asString(index))")
} else {
output.append(",\(asStr!.asString(index))")
}
}
}
output += "}"
return output
}
}