in odps-sdk/odps-sdk-table-api/src/main/java/com/aliyun/odps/table/write/impl/batch/TableBatchWriteSessionImpl.java [203:298]
public void commit(WriterCommitMessage[] messages) throws IOException {
ensureInitialized();
if (messages == null) {
throw new IllegalArgumentException("Invalid argument: messages.");
}
Preconditions.checkString(sessionId, "Table write session id");
Map<String, String> headers = HttpUtils.createCommonHeader(settings);
headers.put(Headers.CONTENT_TYPE, "application/json");
Map<String, String> params = HttpUtils.createCommonParams(settings);
params.put(ConfigConstants.SESSION_ID, sessionId);
try {
String commitRequest = generateCommitRequest(messages);
if (logger.isDebugEnabled()) {
logger.debug(String.format("Commit table '%s'.\n"
+ "Session request:\n"
+ "%s", identifier.toString(), commitRequest));
}
String response = retryHandler.executeWithRetry(() -> {
Response resp = restClient.stringRequest(ResourceBuilder.buildTableCommitResource(
VERSION_1,
identifier.getProject(),
identifier.getSchema(),
identifier.getTable()),
"POST", params, headers, commitRequest);
String body;
if (!resp.isOK()) {
throw new TunnelException(resp.getHeader(HEADER_ODPS_REQUEST_ID),
new ByteArrayInputStream(resp.getBody()), resp.getStatus());
} else {
body = new String(resp.getBody());
loadResultFromJson(body);
}
return body;
}
);
if (sessionStatus != SessionStatus.COMMITTED) {
long asyncIntervalInMills = HttpUtils.getAsyncIntervalInMills(settings);
long asyncTimeoutInMills = HttpUtils.getAsyncTimeoutInSeconds(settings) * 1000L;
long startTime = System.currentTimeMillis();
while (sessionStatus == SessionStatus.NORMAL ||
sessionStatus == SessionStatus.COMMITTING) {
Thread.sleep(asyncIntervalInMills);
logger.trace(String.format("Async commit table: '%s', session id: %s",
identifier.toString(), sessionId));
response = reloadSession();
if (System.currentTimeMillis() - startTime >= asyncTimeoutInMills) {
throw new IOException(
String.format(
"Commit table write session timeout.\n"
+ "Table identifier: %s.\n"
+ "Session status: %s.\n"
+ "Session id: %s.\n"
+ "Error message: %s.",
identifier.toString(),
sessionStatus,
sessionId,
errorMessage));
}
}
}
if (sessionStatus != SessionStatus.COMMITTED) {
throw new IOException(
String.format(
"Commit table write session failed.\n"
+ "Table identifier: %s.\n"
+ "Session status: %s.\n"
+ "Session id: %s.\n"
+ "Error message: %s.",
identifier.toString(),
sessionStatus,
sessionId,
errorMessage));
} else {
if (logger.isDebugEnabled()) {
logger.debug(String.format("Commit table '%s' success.\n"
+ "Session response:\n"
+ "%s", identifier.toString(), response));
}
}
} catch (Exception e) {
throw new IOException(e.getMessage(), e);
} finally {
// nothing
}
}