Sources/SparkConnect/ArrowBufferBuilder.swift (324 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. import Foundation /// @nodoc public protocol ArrowBufferBuilder { associatedtype ItemType var capacity: UInt { get } var length: UInt { get } var nullCount: UInt { get } var offset: UInt { get } init() throws func append(_ newValue: ItemType?) func isNull(_ index: UInt) -> Bool func resize(_ length: UInt) func finish() -> [ArrowBuffer] } public class BaseBufferBuilder { var nulls: ArrowBuffer public var offset: UInt = 0 public var capacity: UInt { return self.nulls.capacity } public var length: UInt = 0 public var nullCount: UInt = 0 init(_ nulls: ArrowBuffer) { self.nulls = nulls } public func isNull(_ index: UInt) -> Bool { return self.nulls.length == 0 || BitUtility.isSet(index + self.offset, buffer: self.nulls) } func resizeLength(_ data: ArrowBuffer, len: UInt = 0) -> UInt { if len == 0 || len < data.length * 2 { if data.length == 0 || data.length * 2 < ArrowBuffer.minLength { return ArrowBuffer.minLength } return UInt(data.length * 2) } return UInt(len * 2) } } public class ValuesBufferBuilder<T>: BaseBufferBuilder { var values: ArrowBuffer var stride: Int public override var capacity: UInt { return self.values.capacity } init(values: ArrowBuffer, nulls: ArrowBuffer, stride: Int = MemoryLayout<T>.stride) { self.stride = stride self.values = values super.init(nulls) } } public class FixedBufferBuilder<T>: ValuesBufferBuilder<T>, ArrowBufferBuilder { public typealias ItemType = T private let defaultVal: ItemType public required init() throws { self.defaultVal = try FixedBufferBuilder<T>.defaultValueForType() let values = ArrowBuffer.createBuffer(0, size: UInt(MemoryLayout<T>.stride)) let nulls = ArrowBuffer.createBuffer(0, size: UInt(MemoryLayout<UInt8>.stride)) super.init(values: values, nulls: nulls) } public func append(_ newValue: ItemType?) { let index = UInt(self.length) let byteIndex = self.stride * Int(index) self.length += 1 if length > self.values.length { self.resize(length) } if let val = newValue { BitUtility.setBit(index + self.offset, buffer: self.nulls) self.values.rawPointer.advanced(by: byteIndex).storeBytes(of: val, as: T.self) } else { self.nullCount += 1 BitUtility.clearBit(index + self.offset, buffer: self.nulls) self.values.rawPointer.advanced(by: byteIndex).storeBytes(of: defaultVal, as: T.self) } } public func resize(_ length: UInt) { if length > self.values.length { let resizeLength = resizeLength(self.values) var values = ArrowBuffer.createBuffer(resizeLength, size: UInt(MemoryLayout<T>.size)) var nulls = ArrowBuffer.createBuffer( resizeLength / 8 + 1, size: UInt(MemoryLayout<UInt8>.size)) ArrowBuffer.copyCurrent(self.values, to: &values, len: self.values.capacity) ArrowBuffer.copyCurrent(self.nulls, to: &nulls, len: self.nulls.capacity) self.values = values self.nulls = nulls } } public func finish() -> [ArrowBuffer] { let length = self.length var values = ArrowBuffer.createBuffer(length, size: UInt(MemoryLayout<T>.size)) var nulls = ArrowBuffer.createBuffer(length / 8 + 1, size: UInt(MemoryLayout<UInt8>.size)) ArrowBuffer.copyCurrent(self.values, to: &values, len: values.capacity) ArrowBuffer.copyCurrent(self.nulls, to: &nulls, len: nulls.capacity) return [nulls, values] } fileprivate static func defaultValueForType() throws -> T { let type = T.self if type == Int8.self { return Int8(0) as! T // swiftlint:disable:this force_cast } else if type == Int16.self { return Int16(0) as! T // swiftlint:disable:this force_cast } else if type == Int32.self { return Int32(0) as! T // swiftlint:disable:this force_cast } else if type == Int64.self { return Int64(0) as! T // swiftlint:disable:this force_cast } else if type == UInt8.self { return UInt8(0) as! T // swiftlint:disable:this force_cast } else if type == UInt16.self { return UInt16(0) as! T // swiftlint:disable:this force_cast } else if type == UInt32.self { return UInt32(0) as! T // swiftlint:disable:this force_cast } else if type == UInt64.self { return UInt64(0) as! T // swiftlint:disable:this force_cast } else if type == Float.self { return Float(0) as! T // swiftlint:disable:this force_cast } else if type == Double.self { return Double(0) as! T // swiftlint:disable:this force_cast } throw ArrowError.unknownType("Unable to determine default value") } } public class BoolBufferBuilder: ValuesBufferBuilder<Bool>, ArrowBufferBuilder { public typealias ItemType = Bool public required init() throws { let values = ArrowBuffer.createBuffer(0, size: UInt(MemoryLayout<UInt8>.stride)) let nulls = ArrowBuffer.createBuffer(0, size: UInt(MemoryLayout<UInt8>.stride)) super.init(values: values, nulls: nulls) } public func append(_ newValue: ItemType?) { let index = UInt(self.length) self.length += 1 if (length / 8) > self.values.length { self.resize(length) } if newValue != nil { BitUtility.setBit(index + self.offset, buffer: self.nulls) if newValue == true { BitUtility.setBit(index + self.offset, buffer: self.values) } else { BitUtility.clearBit(index + self.offset, buffer: self.values) } } else { self.nullCount += 1 BitUtility.clearBit(index + self.offset, buffer: self.nulls) BitUtility.clearBit(index + self.offset, buffer: self.values) } } public func resize(_ length: UInt) { if (length / 8) > self.values.length { let resizeLength = resizeLength(self.values) var values = ArrowBuffer.createBuffer(resizeLength, size: UInt(MemoryLayout<UInt8>.size)) var nulls = ArrowBuffer.createBuffer(resizeLength, size: UInt(MemoryLayout<UInt8>.size)) ArrowBuffer.copyCurrent(self.values, to: &values, len: self.values.capacity) ArrowBuffer.copyCurrent(self.nulls, to: &nulls, len: self.nulls.capacity) self.values = values self.nulls = nulls } } public func finish() -> [ArrowBuffer] { let length = self.length var values = ArrowBuffer.createBuffer(length, size: UInt(MemoryLayout<UInt8>.size)) var nulls = ArrowBuffer.createBuffer(length, size: UInt(MemoryLayout<UInt8>.size)) ArrowBuffer.copyCurrent(self.values, to: &values, len: values.capacity) ArrowBuffer.copyCurrent(self.nulls, to: &nulls, len: nulls.capacity) return [nulls, values] } } public class VariableBufferBuilder<T>: ValuesBufferBuilder<T>, ArrowBufferBuilder { public typealias ItemType = T var offsets: ArrowBuffer let binaryStride = MemoryLayout<UInt8>.stride public required init() throws { let values = ArrowBuffer.createBuffer(0, size: UInt(binaryStride)) let nulls = ArrowBuffer.createBuffer(0, size: UInt(binaryStride)) self.offsets = ArrowBuffer.createBuffer(0, size: UInt(MemoryLayout<Int32>.stride)) super.init(values: values, nulls: nulls, stride: binaryStride) } public func append(_ newValue: ItemType?) { let index = UInt(self.length) self.length += 1 let offsetIndex = MemoryLayout<Int32>.stride * Int(index) if self.length >= self.offsets.length { self.resize(UInt(self.offsets.length + 1)) } var binData: Data var isNull = false if let val = newValue { binData = getBytesFor(val)! } else { var nullVal = 0 isNull = true binData = Data(bytes: &nullVal, count: MemoryLayout<UInt32>.size) } var currentIndex: Int32 = 0 var currentOffset: Int32 = Int32(binData.count) if index > 0 { currentIndex = self.offsets.rawPointer.advanced(by: offsetIndex).load(as: Int32.self) currentOffset += currentIndex if currentOffset > self.values.length { self.value_resize(UInt(currentOffset)) } } if isNull { self.nullCount += 1 BitUtility.clearBit(index + self.offset, buffer: self.nulls) } else { BitUtility.setBit(index + self.offset, buffer: self.nulls) } binData.withUnsafeBytes { bufferPointer in let rawPointer = bufferPointer.baseAddress! self.values.rawPointer.advanced(by: Int(currentIndex)) .copyMemory(from: rawPointer, byteCount: binData.count) } self.offsets.rawPointer.advanced(by: (offsetIndex + MemoryLayout<Int32>.stride)) .storeBytes(of: currentOffset, as: Int32.self) } public func value_resize(_ length: UInt) { if length > self.values.length { let resizeLength = resizeLength(self.values, len: length) var values = ArrowBuffer.createBuffer(resizeLength, size: UInt(MemoryLayout<UInt8>.size)) ArrowBuffer.copyCurrent(self.values, to: &values, len: self.values.capacity) self.values = values } } public func resize(_ length: UInt) { if length > self.offsets.length { let resizeLength = resizeLength(self.offsets, len: length) var nulls = ArrowBuffer.createBuffer( resizeLength / 8 + 1, size: UInt(MemoryLayout<UInt8>.size)) var offsets = ArrowBuffer.createBuffer(resizeLength, size: UInt(MemoryLayout<Int32>.size)) ArrowBuffer.copyCurrent(self.nulls, to: &nulls, len: self.nulls.capacity) ArrowBuffer.copyCurrent(self.offsets, to: &offsets, len: self.offsets.capacity) self.nulls = nulls self.offsets = offsets } } public func finish() -> [ArrowBuffer] { let length = self.length var values = ArrowBuffer.createBuffer(self.values.length, size: UInt(MemoryLayout<UInt8>.size)) var nulls = ArrowBuffer.createBuffer(length / 8 + 1, size: UInt(MemoryLayout<UInt8>.size)) var offsets = ArrowBuffer.createBuffer(length, size: UInt(MemoryLayout<Int32>.size)) ArrowBuffer.copyCurrent(self.values, to: &values, len: values.capacity) ArrowBuffer.copyCurrent(self.nulls, to: &nulls, len: nulls.capacity) ArrowBuffer.copyCurrent(self.offsets, to: &offsets, len: offsets.capacity) return [nulls, offsets, values] } } public class AbstractWrapperBufferBuilder<T, U>: ArrowBufferBuilder { public typealias ItemType = T public var capacity: UInt { return self.bufferBuilder.capacity } public var length: UInt { return self.bufferBuilder.length } public var nullCount: UInt { return self.bufferBuilder.nullCount } public var offset: UInt { return self.bufferBuilder.offset } let bufferBuilder: FixedBufferBuilder<U> public required init() throws { self.bufferBuilder = try FixedBufferBuilder() } public func append(_ newValue: ItemType?) { fatalError("Method is not implemented") } public func isNull(_ index: UInt) -> Bool { return self.bufferBuilder.isNull(index) } public func resize(_ length: UInt) { self.bufferBuilder.resize(length) } public func finish() -> [ArrowBuffer] { return self.bufferBuilder.finish() } } public class Date32BufferBuilder: AbstractWrapperBufferBuilder<Date, Int32> { public override func append(_ newValue: ItemType?) { if let val = newValue { let daysSinceEpoch = Int32(val.timeIntervalSince1970 / 86400) self.bufferBuilder.append(daysSinceEpoch) } else { self.bufferBuilder.append(nil) } } } public class Date64BufferBuilder: AbstractWrapperBufferBuilder<Date, Int64> { public override func append(_ newValue: ItemType?) { if let val = newValue { let daysSinceEpoch = Int64(val.timeIntervalSince1970 * 1000) self.bufferBuilder.append(daysSinceEpoch) } else { self.bufferBuilder.append(nil) } } } public final class StructBufferBuilder: BaseBufferBuilder, ArrowBufferBuilder { public typealias ItemType = [Any?] var info: ArrowNestedType? public init() throws { let nulls = ArrowBuffer.createBuffer(0, size: UInt(MemoryLayout<UInt8>.stride)) super.init(nulls) } public func initializeTypeInfo(_ fields: [ArrowField]) { info = ArrowNestedType(ArrowType.ArrowStruct, fields: fields) } public func append(_ newValue: [Any?]?) { let index = UInt(self.length) self.length += 1 if length > self.nulls.length { self.resize(length) } if newValue != nil { BitUtility.setBit(index + self.offset, buffer: self.nulls) } else { self.nullCount += 1 BitUtility.clearBit(index + self.offset, buffer: self.nulls) } } public func resize(_ length: UInt) { if length > self.nulls.length { let resizeLength = resizeLength(self.nulls) var nulls = ArrowBuffer.createBuffer( resizeLength / 8 + 1, size: UInt(MemoryLayout<UInt8>.size)) ArrowBuffer.copyCurrent(self.nulls, to: &nulls, len: self.nulls.capacity) self.nulls = nulls } } public func finish() -> [ArrowBuffer] { let length = self.length var nulls = ArrowBuffer.createBuffer(length / 8 + 1, size: UInt(MemoryLayout<UInt8>.size)) ArrowBuffer.copyCurrent(self.nulls, to: &nulls, len: nulls.capacity) return [nulls] } }