in odps-sdk/odps-sdk-table-api/src/main/java/com/aliyun/odps/table/read/impl/batch/SplitArrowReaderImpl.java [117:193]
private void openReaderConnection(TableIdentifier identifier,
InputSplit split,
ReaderOptions options) throws IOException {
RestClient restClient = ExecutionEnvironment.create(options.getSettings())
.createHttpClient(identifier.getProject());
restClient.setRetryLogger(new RestClient.RetryLogger() {
@Override
public void onRetryLog(Throwable e, long retryCount, long retrySleepTime) {
logger.warn(String.format("Reader retry for session: %s, " +
"retryCount: %d, will retry in %d seconds.",
split.getSessionId(), retryCount, retrySleepTime / 1000), e);
}
});
TunnelRetryHandler retryHandler = new TableRetryHandler(restClient);
Map<String, String> headers = HttpUtils.createCommonHeader(options.getSettings());
if (options.getCompressionCodec().equals(CompressionCodec.ZSTD)) {
headers.put(Headers.ACCEPT_ENCODING, CompressionCodec.ZSTD.toString());
} else if (options.getCompressionCodec().equals(CompressionCodec.LZ4_FRAME)) {
headers.put(Headers.ACCEPT_ENCODING, CompressionCodec.LZ4_FRAME.toString());
}
Map<String, String> params = HttpUtils.createCommonParams(options.getSettings());
params.put(ConfigConstants.SESSION_ID, split.getSessionId());
if (split instanceof InputSplitWithRowRange) {
InputSplitWithRowRange rowRangeInputSplit = (InputSplitWithRowRange) split;
params.put(ConfigConstants.ROW_INDEX,
String.valueOf(rowRangeInputSplit.getRowRange().getStartIndex()));
params.put(ConfigConstants.ROW_COUNT,
String.valueOf(rowRangeInputSplit.getRowRange().getNumRecord()));
} else if (split instanceof InputSplitWithIndex) {
InputSplitWithIndex indexedInputSplit = (InputSplitWithIndex) split;
params.put(ConfigConstants.SPLIT_INDEX,
String.valueOf(indexedInputSplit.getSplitIndex()));
} else {
throw new UnsupportedOperationException("Unsupported split type: " + split);
}
params.put(ConfigConstants.MAX_BATCH_ROWS,
String.valueOf(options.getBatchRowCount()));
if (options.getBatchRawSize() != 0L) {
params.put(ConfigConstants.MAX_BATCH_RAW_SIZE, String.valueOf(options.getBatchRawSize()));
}
params.put(ConfigConstants.DATA_FORMAT_TYPE,
options.getDataFormat().getType().toString());
params.put(ConfigConstants.DATA_FORMAT_VERSION,
options.getDataFormat().getVersion().toString());
try {
String resource = ResourceBuilder.buildTableDataResource(
ConfigConstants.VERSION_1,
identifier.getProject(),
identifier.getSchema(),
identifier.getTable());
retryHandler.executeWithRetry(() -> {
try {
this.connection = restClient.connect(resource, "GET", params, headers);
Response resp = connection.getResponse();
this.requestId = resp.getHeader(HttpHeaders.HEADER_ODPS_REQUEST_ID);
if (!resp.isOK()) {
throw new TunnelException(requestId, connection.getInputStream(),
resp.getStatus());
}
} catch (Exception e) {
disconnect();
throw e;
}
return null;
});
} catch (Exception e) {
disconnect();
logger.error("Open split reader failed", e);
throw new IOException(e.getMessage(), e);
}
}