in spark-connector/common/src/main/java/org/apache/spark/sql/odps/table/tunnel/read/TunnelTableBatchReadSession.java [83:189]
protected void planInputSplits() throws IOException {
if (requiredBucketIds.size() > 0) {
throw new UnsupportedOperationException("Unsupported bucket pruning in tunnel env");
}
if (splitOptions.getSplitMode().equals(SplitOptions.SplitMode.SIZE) ||
splitOptions.getSplitMode().equals(SplitOptions.SplitMode.ROW_OFFSET)) {
ExecutionEnvironment env = ExecutionEnvironment.create(settings);
Odps odps = env.createOdpsClient();
TableTunnel tunnel = new TableTunnel(odps);
tunnel.setEndpoint(env.getTunnelEndpoint(identifier.getProject()));
// TODO: support schema
Table table = odps.tables().get(identifier.getProject(),
identifier.getTable());
TableSchema tableSchema = table.getSchema();
long splitSizeInBytes = splitOptions.getSplitNumber();
try {
List<TunnelInputSplit> splits = new ArrayList<>();
List<Column> requiredColumns = new ArrayList<>();
List<String> partitionKeys = new ArrayList<>();
if (table.isPartitioned()) {
// TODO: public odps sdk need table.getPartitionSpecs();
List<PartitionSpec> readPartitions = requiredPartitions.size() > 0 ?
requiredPartitions :
table.getPartitions().stream().map(Partition::getPartitionSpec).collect(Collectors.toList());
for (PartitionSpec partitionSpec : readPartitions) {
long size = table.getPartition(partitionSpec).getSize();
TableTunnel.DownloadSession session = createDownloadSession(
identifier.getProject(),
identifier.getTable(),
partitionSpec,
tunnel);
splits.addAll(
getInputSplitsInternal(session, size, splitSizeInBytes, partitionSpec, splitOptions));
}
if (requiredDataColumns.size() == 0 &&
requiredPartitionColumns.size() == 0) {
requiredColumns.addAll(tableSchema.getColumns());
requiredColumns.addAll(tableSchema.getPartitionColumns());
partitionKeys = tableSchema.getPartitionColumns().stream()
.map(Column::getName).collect(Collectors.toList());
} else {
if (requiredDataColumns.size() > 0) {
TableUtils.validateRequiredDataColumns(requiredDataColumns,
tableSchema.getColumns());
requiredColumns.addAll(requiredDataColumns
.stream()
.map(name -> tableSchema.getColumn(name.toLowerCase()))
.collect(Collectors.toList()));
}
if (requiredPartitionColumns.size() > 0) {
TableUtils.validateRequiredPartitionColumns(requiredPartitionColumns,
tableSchema.getPartitionColumns());
requiredColumns.addAll(requiredPartitionColumns
.stream()
.map(name -> tableSchema.getPartitionColumn(name.toLowerCase()))
.collect(Collectors.toList()));
partitionKeys = requiredPartitionColumns;
}
}
} else {
if (requiredPartitions.size() > 0) {
throw new UnsupportedOperationException(
"Partition filter not supported for none partitioned table");
}
long size = table.getSize();
TableTunnel.DownloadSession session = createDownloadSession(
identifier.getProject(),
identifier.getTable(),
null,
tunnel);
splits.addAll(
getInputSplitsInternal(session, size, splitSizeInBytes, null, splitOptions));
TableUtils.validateRequiredDataColumns(requiredDataColumns,
tableSchema.getColumns());
requiredColumns = requiredDataColumns.size() > 0 ?
requiredDataColumns.stream()
.map(name -> tableSchema.getColumn(name.toLowerCase()))
.collect(Collectors.toList()) : tableSchema.getColumns();
}
this.readSchema = DataSchema.newBuilder()
.columns(requiredColumns)
.partitionBy(partitionKeys)
.build();
if (splitOptions.getSplitMode().equals(SplitOptions.SplitMode.SIZE)) {
this.inputSplitAssigner = new TunnelInputSplitAssigner(splits);
} else {
this.inputSplitAssigner = new TunnelRowRangeInputSplitAssigner(splits);
}
this.sessionId = "";
} catch (Exception exception) {
throw new IOException(exception);
}
} else {
throw new UnsupportedOperationException(
"Split mode: '" + splitOptions.getSplitMode() + "' not supported");
}
}