in backends-clickhouse/src-celeborn/main/scala/org/apache/spark/shuffle/CHCelebornColumnarBatchSerializer.scala [68:197]
override def deserializeStream(in: InputStream): DeserializationStream = {
new DeserializationStream {
private var reader: CHStreamReader = null
private val original_in: InputStream = if (in.equals(CelebornInputStream.empty())) {
null
} else {
in
}
private var cb: ColumnarBatch = _
private val isEmptyStream: Boolean = in.equals(CelebornInputStream.empty())
private val forceCompress: Boolean =
gluten_conf.isUseColumnarShuffleManager ||
gluten_conf.isUseCelebornShuffleManager
private var numBatchesTotal: Long = _
private var numRowsTotal: Long = _
// Otherwise calling close() twice would cause replication of metrics.
private val closeCalled: AtomicBoolean = new AtomicBoolean(false)
override def asIterator: Iterator[Any] = {
// This method is never called by shuffle code.
throw new UnsupportedOperationException
}
override def asKeyValueIterator: Iterator[(Any, Any)] = new Iterator[(Any, Any)] {
private var gotNext = false
private var nextValue: (Any, Any) = _
private var finished = false
def getNext: (Any, Any) = {
try {
(readKey[Any](), readValue[Any]())
} catch {
case eof: EOFException =>
finished = true
// try to release memory immediately
closeReader()
null
}
}
override def hasNext: Boolean = {
if (!isEmptyStream && !finished) {
if (!gotNext) {
nextValue = getNext
gotNext = true
}
}
!isEmptyStream && !finished
}
override def next(): (Any, Any) = {
if (!hasNext) {
throw new NoSuchElementException("End of stream")
}
gotNext = false
nextValue
}
}
override def readKey[T: ClassTag](): T = {
// We skipped serialization of the key in writeKey(), so just return a dummy value since
// this is going to be discarded anyways.
null.asInstanceOf[T]
}
@throws(classOf[EOFException])
override def readValue[T: ClassTag](): T = {
if (cb != null) {
cb.close()
cb = null
}
var nativeBlock = getReader.next()
while (nativeBlock.numRows() == 0) {
if (nativeBlock.numColumns() == 0) {
nativeBlock.close()
this.close()
throw new EOFException
}
nativeBlock = getReader.next()
}
val numRows = nativeBlock.numRows()
numBatchesTotal += 1
numRowsTotal += numRows
cb = nativeBlock.toColumnarBatch
cb.asInstanceOf[T]
}
override def readObject[T: ClassTag](): T = {
// This method is never called by shuffle code.
throw new UnsupportedOperationException
}
override def close(): Unit = {
if (!closeCalled.compareAndSet(false, true)) {
return
}
if (numBatchesTotal > 0) {
readBatchNumRows.set(numRowsTotal.toDouble / numBatchesTotal)
}
numOutputRows += numRowsTotal
if (cb != null) {
cb.close()
cb = null
}
closeReader()
}
def getReader: CHStreamReader = {
if (reader == null) {
reader = new CHStreamReader(
original_in,
forceCompress,
CHBackendSettings.useCustomizedShuffleCodec
)
}
reader
}
def closeReader(): Unit = {
if (reader != null) {
reader.close()
reader = null
}
}
}
}