protected void planInputSplits()

in spark-connector/common/src/main/java/org/apache/spark/sql/odps/table/tunnel/read/TunnelTableBatchReadSession.java [83:189]


    protected void planInputSplits() throws IOException {
        if (requiredBucketIds.size() > 0) {
            throw new UnsupportedOperationException("Unsupported bucket pruning in tunnel env");
        }

        if (splitOptions.getSplitMode().equals(SplitOptions.SplitMode.SIZE) ||
                splitOptions.getSplitMode().equals(SplitOptions.SplitMode.ROW_OFFSET)) {
            ExecutionEnvironment env = ExecutionEnvironment.create(settings);
            Odps odps = env.createOdpsClient();
            TableTunnel tunnel = new TableTunnel(odps);
            tunnel.setEndpoint(env.getTunnelEndpoint(identifier.getProject()));
            // TODO: support schema
            Table table = odps.tables().get(identifier.getProject(),
                    identifier.getTable());
            TableSchema tableSchema = table.getSchema();
            long splitSizeInBytes = splitOptions.getSplitNumber();
            try {
                List<TunnelInputSplit> splits = new ArrayList<>();
                List<Column> requiredColumns = new ArrayList<>();
                List<String> partitionKeys = new ArrayList<>();

                if (table.isPartitioned()) {
                    // TODO: public odps sdk need table.getPartitionSpecs();
                    List<PartitionSpec> readPartitions = requiredPartitions.size() > 0 ?
                            requiredPartitions :
                            table.getPartitions().stream().map(Partition::getPartitionSpec).collect(Collectors.toList());
                    for (PartitionSpec partitionSpec : readPartitions) {
                        long size = table.getPartition(partitionSpec).getSize();
                        TableTunnel.DownloadSession session = createDownloadSession(
                                identifier.getProject(),
                                identifier.getTable(),
                                partitionSpec,
                                tunnel);
                        splits.addAll(
                                getInputSplitsInternal(session, size, splitSizeInBytes, partitionSpec, splitOptions));
                    }

                    if (requiredDataColumns.size() == 0 &&
                            requiredPartitionColumns.size() == 0) {
                        requiredColumns.addAll(tableSchema.getColumns());
                        requiredColumns.addAll(tableSchema.getPartitionColumns());
                        partitionKeys = tableSchema.getPartitionColumns().stream()
                                .map(Column::getName).collect(Collectors.toList());
                    } else {
                        if (requiredDataColumns.size() > 0) {
                            TableUtils.validateRequiredDataColumns(requiredDataColumns,
                                    tableSchema.getColumns());
                            requiredColumns.addAll(requiredDataColumns
                                    .stream()
                                    .map(name -> tableSchema.getColumn(name.toLowerCase()))
                                    .collect(Collectors.toList()));
                        }

                        if (requiredPartitionColumns.size() > 0) {
                            TableUtils.validateRequiredPartitionColumns(requiredPartitionColumns,
                                    tableSchema.getPartitionColumns());
                            requiredColumns.addAll(requiredPartitionColumns
                                    .stream()
                                    .map(name -> tableSchema.getPartitionColumn(name.toLowerCase()))
                                    .collect(Collectors.toList()));
                            partitionKeys = requiredPartitionColumns;
                        }
                    }
                } else {
                    if (requiredPartitions.size() > 0) {
                        throw new UnsupportedOperationException(
                                "Partition filter not supported for none partitioned table");
                    }

                    long size = table.getSize();
                    TableTunnel.DownloadSession session = createDownloadSession(
                            identifier.getProject(),
                            identifier.getTable(),
                            null,
                            tunnel);
                    splits.addAll(
                            getInputSplitsInternal(session, size, splitSizeInBytes, null, splitOptions));

                    TableUtils.validateRequiredDataColumns(requiredDataColumns,
                            tableSchema.getColumns());

                    requiredColumns = requiredDataColumns.size() > 0 ?
                            requiredDataColumns.stream()
                                    .map(name -> tableSchema.getColumn(name.toLowerCase()))
                                    .collect(Collectors.toList()) : tableSchema.getColumns();
                }

                this.readSchema = DataSchema.newBuilder()
                        .columns(requiredColumns)
                        .partitionBy(partitionKeys)
                        .build();

                if (splitOptions.getSplitMode().equals(SplitOptions.SplitMode.SIZE)) {
                    this.inputSplitAssigner = new TunnelInputSplitAssigner(splits);
                } else {
                    this.inputSplitAssigner = new TunnelRowRangeInputSplitAssigner(splits);
                }

                this.sessionId = "";
            } catch (Exception exception) {
                throw new IOException(exception);
            }
        } else {
            throw new UnsupportedOperationException(
                    "Split mode: '" + splitOptions.getSplitMode() + "' not supported");
        }
    }