override def deserializeStream()

in backends-clickhouse/src-celeborn/main/scala/org/apache/spark/shuffle/CHCelebornColumnarBatchSerializer.scala [68:197]


  override def deserializeStream(in: InputStream): DeserializationStream = {
    new DeserializationStream {
      private var reader: CHStreamReader = null
      private val original_in: InputStream = if (in.equals(CelebornInputStream.empty())) {
        null
      } else {
        in
      }
      private var cb: ColumnarBatch = _
      private val isEmptyStream: Boolean = in.equals(CelebornInputStream.empty())
      private val forceCompress: Boolean =
        gluten_conf.isUseColumnarShuffleManager ||
          gluten_conf.isUseCelebornShuffleManager

      private var numBatchesTotal: Long = _
      private var numRowsTotal: Long = _

      // Otherwise calling close() twice would cause replication of metrics.
      private val closeCalled: AtomicBoolean = new AtomicBoolean(false)

      override def asIterator: Iterator[Any] = {
        // This method is never called by shuffle code.
        throw new UnsupportedOperationException
      }

      override def asKeyValueIterator: Iterator[(Any, Any)] = new Iterator[(Any, Any)] {
        private var gotNext = false
        private var nextValue: (Any, Any) = _
        private var finished = false

        def getNext: (Any, Any) = {
          try {
            (readKey[Any](), readValue[Any]())
          } catch {
            case eof: EOFException =>
              finished = true
              // try to release memory immediately
              closeReader()
              null
          }
        }

        override def hasNext: Boolean = {
          if (!isEmptyStream && !finished) {
            if (!gotNext) {
              nextValue = getNext
              gotNext = true
            }
          }
          !isEmptyStream && !finished
        }

        override def next(): (Any, Any) = {
          if (!hasNext) {
            throw new NoSuchElementException("End of stream")
          }
          gotNext = false
          nextValue
        }
      }

      override def readKey[T: ClassTag](): T = {
        // We skipped serialization of the key in writeKey(), so just return a dummy value since
        // this is going to be discarded anyways.
        null.asInstanceOf[T]
      }

      @throws(classOf[EOFException])
      override def readValue[T: ClassTag](): T = {
        if (cb != null) {
          cb.close()
          cb = null
        }

        var nativeBlock = getReader.next()
        while (nativeBlock.numRows() == 0) {
          if (nativeBlock.numColumns() == 0) {
            nativeBlock.close()
            this.close()
            throw new EOFException
          }
          nativeBlock = getReader.next()
        }
        val numRows = nativeBlock.numRows()

        numBatchesTotal += 1
        numRowsTotal += numRows
        cb = nativeBlock.toColumnarBatch
        cb.asInstanceOf[T]
      }

      override def readObject[T: ClassTag](): T = {
        // This method is never called by shuffle code.
        throw new UnsupportedOperationException
      }

      override def close(): Unit = {
        if (!closeCalled.compareAndSet(false, true)) {
          return
        }
        if (numBatchesTotal > 0) {
          readBatchNumRows.set(numRowsTotal.toDouble / numBatchesTotal)
        }
        numOutputRows += numRowsTotal
        if (cb != null) {
          cb.close()
          cb = null
        }
        closeReader()
      }

      def getReader: CHStreamReader = {
        if (reader == null) {
          reader = new CHStreamReader(
            original_in,
            forceCompress,
            CHBackendSettings.useCustomizedShuffleCodec
          )
        }
        reader
      }

      def closeReader(): Unit = {
        if (reader != null) {
          reader.close()
          reader = null
        }
      }
    }
  }