public void commit()

in odps-sdk/odps-sdk-table-api/src/main/java/com/aliyun/odps/table/write/impl/batch/TableBatchWriteSessionImpl.java [203:298]


    public void commit(WriterCommitMessage[] messages) throws IOException {
        ensureInitialized();

        if (messages == null) {
            throw new IllegalArgumentException("Invalid argument: messages.");
        }

        Preconditions.checkString(sessionId, "Table write session id");
        Map<String, String> headers = HttpUtils.createCommonHeader(settings);
        headers.put(Headers.CONTENT_TYPE, "application/json");

        Map<String, String> params = HttpUtils.createCommonParams(settings);
        params.put(ConfigConstants.SESSION_ID, sessionId);

        try {
            String commitRequest = generateCommitRequest(messages);
            if (logger.isDebugEnabled()) {
                logger.debug(String.format("Commit table '%s'.\n"
                        + "Session request:\n"
                        + "%s", identifier.toString(), commitRequest));
            }

            String response = retryHandler.executeWithRetry(() -> {
                        Response resp = restClient.stringRequest(ResourceBuilder.buildTableCommitResource(
                                        VERSION_1,
                                        identifier.getProject(),
                                        identifier.getSchema(),
                                        identifier.getTable()),
                                "POST", params, headers, commitRequest);
                        String body;
                        if (!resp.isOK()) {
                            throw new TunnelException(resp.getHeader(HEADER_ODPS_REQUEST_ID),
                                    new ByteArrayInputStream(resp.getBody()), resp.getStatus());
                        } else {
                            body = new String(resp.getBody());
                            loadResultFromJson(body);
                        }
                        return body;
                    }
            );

            if (sessionStatus != SessionStatus.COMMITTED) {
                long asyncIntervalInMills = HttpUtils.getAsyncIntervalInMills(settings);
                long asyncTimeoutInMills = HttpUtils.getAsyncTimeoutInSeconds(settings) * 1000L;
                long startTime = System.currentTimeMillis();

                while (sessionStatus == SessionStatus.NORMAL ||
                        sessionStatus == SessionStatus.COMMITTING) {
                    Thread.sleep(asyncIntervalInMills);

                    logger.trace(String.format("Async commit table: '%s', session id: %s",
                            identifier.toString(), sessionId));

                    response = reloadSession();

                    if (System.currentTimeMillis() - startTime >= asyncTimeoutInMills) {
                        throw new IOException(
                                String.format(
                                        "Commit table write session timeout.\n"
                                                + "Table identifier: %s.\n"
                                                + "Session status: %s.\n"
                                                + "Session id: %s.\n"
                                                + "Error message: %s.",
                                        identifier.toString(),
                                        sessionStatus,
                                        sessionId,
                                        errorMessage));
                    }
                }
            }

            if (sessionStatus != SessionStatus.COMMITTED) {
                throw new IOException(
                        String.format(
                                "Commit table write session failed.\n"
                                        + "Table identifier: %s.\n"
                                        + "Session status: %s.\n"
                                        + "Session id: %s.\n"
                                        + "Error message: %s.",
                                identifier.toString(),
                                sessionStatus,
                                sessionId,
                                errorMessage));
            } else {
                if (logger.isDebugEnabled()) {
                    logger.debug(String.format("Commit table '%s' success.\n"
                            + "Session response:\n"
                            + "%s", identifier.toString(), response));
                }
            }
        } catch (Exception e) {
            throw new IOException(e.getMessage(), e);
        } finally {
            // nothing
        }
    }