protected void planInputSplits()

in odps-sdk/odps-sdk-table-api/src/main/java/com/aliyun/odps/table/read/impl/batch/TableBatchReadSessionImpl.java [100:190]


    protected void planInputSplits() throws IOException {
        ensureClientInitialized();

        Map<String, String> headers = HttpUtils.createCommonHeader(settings);
        headers.put(Headers.CONTENT_TYPE, "application/json");

        Map<String, String> params = HttpUtils.createCommonParams(settings);
        params.put(ConfigConstants.SESSION_TYPE, getType().toString());

        try {
            String request = generateReadSessionRequest();
            if (logger.isDebugEnabled()) {
                logger.debug(String.format("Read table '%s'.\n"
                        + "Session request:\n"
                        + "%s", identifier.toString(), request));
            }

            String response = retryHandler.executeWithRetry(() -> {
                Response resp = restClient.stringRequest(
                        ResourceBuilder.buildTableSessionResource(
                                ConfigConstants.VERSION_1,
                                identifier.getProject(),
                                identifier.getSchema(),
                                identifier.getTable(),
                                null),
                        "POST", params, headers, request);
                String body;
                if (resp.isOK()) {
                    body = new String(resp.getBody());
                    loadResultFromJson(body);
                    return body;
                } else {
                    throw new TunnelException(resp.getHeader(HEADER_ODPS_REQUEST_ID),
                            new ByteArrayInputStream(resp.getBody()), resp.getStatus());
                }
            });

            if (sessionStatus != SessionStatus.NORMAL) {
                long asyncIntervalInMills = HttpUtils.getAsyncIntervalInMills(settings);
                long asyncTimeoutInMills = HttpUtils.getAsyncTimeoutInSeconds(settings) * 1000L;
                long startTime = System.currentTimeMillis();

                while (sessionStatus == SessionStatus.INIT) {
                    Thread.sleep(asyncIntervalInMills);

                    logger.trace(String.format("Async read table: '%s', session id: %s",
                            identifier.toString(), sessionId));

                    response = reloadInputSplits();

                    if (System.currentTimeMillis() - startTime >= asyncTimeoutInMills) {
                        throw new IOException(
                                String.format(
                                        "Create table read session timeout.\n"
                                                + "Table identifier: %s.\n"
                                                + "Session status: %s.\n"
                                                + "Session id: %s.\n"
                                                + "Error message: %s.",
                                        identifier.toString(),
                                        sessionStatus,
                                        sessionId,
                                        errorMessage));
                    }
                }
            }

            if (sessionStatus != SessionStatus.NORMAL) {
                throw new IOException(
                        String.format(
                                "Create table read session failed.\n"
                                        + "Table identifier: %s.\n"
                                        + "Session status: %s.\n"
                                        + "Session id: %s.\n"
                                        + "Error message: %s.",
                                identifier.toString(),
                                sessionStatus,
                                sessionId,
                                errorMessage));
            } else {
                if (logger.isDebugEnabled()) {
                    logger.debug(String.format("Read table '%s'.\n"
                            + "Session response:\n"
                            + "%s", identifier.toString(), response));
                }
            }
        } catch (Exception e) {
            throw new IOException(e.getMessage(), e);
        } finally {
            // nothing
        }
    }