spark-connector/common/src/main/java/org/apache/spark/sql/odps/table/tunnel/read/TunnelArrowSplitReader.java [147:169]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    private void initMetrics() {
        this.bytesCount = new BytesCount();
        this.recordCount = new RecordCount();
        this.metrics = new Metrics();
        this.metrics.register(bytesCount);
        this.metrics.register(recordCount);
    }

    private void init(TableIdentifier identifier,
                      TunnelInputSplit split,
                      DataSchema requiredSchema,
                      ReaderOptions options) throws IOException {
        String project = identifier.getProject();
        String table = identifier.getTable();
        // TODO: support schema
        PartitionSpec partitionSpec = split.getPartitionSpec();
        String downloadId = split.getSessionId();
        long startIndex = split.getRowRange().getStartIndex();
        long numRecord = split.getRowRange().getNumRecord();

        Set<String> partitionKeys = new HashSet<>(requiredSchema.getPartitionKeys());
        List<Column> readDataColumns = requiredSchema.getColumns()
                .stream()
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



spark-connector/common/src/main/java/org/apache/spark/sql/odps/table/tunnel/read/TunnelSplitRecordReader.java [115:137]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    private void initMetrics() {
        this.bytesCount = new BytesCount();
        this.recordCount = new RecordCount();
        this.metrics = new Metrics();
        this.metrics.register(bytesCount);
        this.metrics.register(recordCount);
    }

    private void init(TableIdentifier identifier,
                      TunnelInputSplit split,
                      DataSchema requiredSchema,
                      ReaderOptions options) throws IOException {
        String project = identifier.getProject();
        String table = identifier.getTable();
        // TODO: support schema
        PartitionSpec partitionSpec = split.getPartitionSpec();
        String downloadId = split.getSessionId();
        long startIndex = split.getRowRange().getStartIndex();
        long numRecord = split.getRowRange().getNumRecord();

        Set<String> partitionKeys = new HashSet<>(requiredSchema.getPartitionKeys());
        List<Column> readDataColumns = requiredSchema.getColumns()
                .stream()
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



