in odps-sdk/odps-sdk-table-api/src/main/java/com/aliyun/odps/table/read/impl/batch/TableBatchReadSessionImpl.java [100:190]
protected void planInputSplits() throws IOException {
ensureClientInitialized();
Map<String, String> headers = HttpUtils.createCommonHeader(settings);
headers.put(Headers.CONTENT_TYPE, "application/json");
Map<String, String> params = HttpUtils.createCommonParams(settings);
params.put(ConfigConstants.SESSION_TYPE, getType().toString());
try {
String request = generateReadSessionRequest();
if (logger.isDebugEnabled()) {
logger.debug(String.format("Read table '%s'.\n"
+ "Session request:\n"
+ "%s", identifier.toString(), request));
}
String response = retryHandler.executeWithRetry(() -> {
Response resp = restClient.stringRequest(
ResourceBuilder.buildTableSessionResource(
ConfigConstants.VERSION_1,
identifier.getProject(),
identifier.getSchema(),
identifier.getTable(),
null),
"POST", params, headers, request);
String body;
if (resp.isOK()) {
body = new String(resp.getBody());
loadResultFromJson(body);
return body;
} else {
throw new TunnelException(resp.getHeader(HEADER_ODPS_REQUEST_ID),
new ByteArrayInputStream(resp.getBody()), resp.getStatus());
}
});
if (sessionStatus != SessionStatus.NORMAL) {
long asyncIntervalInMills = HttpUtils.getAsyncIntervalInMills(settings);
long asyncTimeoutInMills = HttpUtils.getAsyncTimeoutInSeconds(settings) * 1000L;
long startTime = System.currentTimeMillis();
while (sessionStatus == SessionStatus.INIT) {
Thread.sleep(asyncIntervalInMills);
logger.trace(String.format("Async read table: '%s', session id: %s",
identifier.toString(), sessionId));
response = reloadInputSplits();
if (System.currentTimeMillis() - startTime >= asyncTimeoutInMills) {
throw new IOException(
String.format(
"Create table read session timeout.\n"
+ "Table identifier: %s.\n"
+ "Session status: %s.\n"
+ "Session id: %s.\n"
+ "Error message: %s.",
identifier.toString(),
sessionStatus,
sessionId,
errorMessage));
}
}
}
if (sessionStatus != SessionStatus.NORMAL) {
throw new IOException(
String.format(
"Create table read session failed.\n"
+ "Table identifier: %s.\n"
+ "Session status: %s.\n"
+ "Session id: %s.\n"
+ "Error message: %s.",
identifier.toString(),
sessionStatus,
sessionId,
errorMessage));
} else {
if (logger.isDebugEnabled()) {
logger.debug(String.format("Read table '%s'.\n"
+ "Session response:\n"
+ "%s", identifier.toString(), response));
}
}
} catch (Exception e) {
throw new IOException(e.getMessage(), e);
} finally {
// nothing
}
}