private void openReaderConnection()

in odps-sdk/odps-sdk-table-api/src/main/java/com/aliyun/odps/table/read/impl/batch/SplitArrowReaderImpl.java [117:193]


    private void openReaderConnection(TableIdentifier identifier,
                                      InputSplit split,
                                      ReaderOptions options) throws IOException {
        RestClient restClient = ExecutionEnvironment.create(options.getSettings())
                .createHttpClient(identifier.getProject());
        restClient.setRetryLogger(new RestClient.RetryLogger() {
            @Override
            public void onRetryLog(Throwable e, long retryCount, long retrySleepTime) {
                logger.warn(String.format("Reader retry for session: %s, " +
                                "retryCount: %d, will retry in %d seconds.",
                        split.getSessionId(), retryCount, retrySleepTime / 1000), e);
            }
        });

        TunnelRetryHandler retryHandler = new TableRetryHandler(restClient);

        Map<String, String> headers = HttpUtils.createCommonHeader(options.getSettings());
        if (options.getCompressionCodec().equals(CompressionCodec.ZSTD)) {
            headers.put(Headers.ACCEPT_ENCODING, CompressionCodec.ZSTD.toString());
        } else if (options.getCompressionCodec().equals(CompressionCodec.LZ4_FRAME)) {
            headers.put(Headers.ACCEPT_ENCODING, CompressionCodec.LZ4_FRAME.toString());
        }

        Map<String, String> params = HttpUtils.createCommonParams(options.getSettings());
        params.put(ConfigConstants.SESSION_ID, split.getSessionId());

        if (split instanceof InputSplitWithRowRange) {
            InputSplitWithRowRange rowRangeInputSplit = (InputSplitWithRowRange) split;
            params.put(ConfigConstants.ROW_INDEX,
                    String.valueOf(rowRangeInputSplit.getRowRange().getStartIndex()));
            params.put(ConfigConstants.ROW_COUNT,
                    String.valueOf(rowRangeInputSplit.getRowRange().getNumRecord()));
        } else if (split instanceof InputSplitWithIndex) {
            InputSplitWithIndex indexedInputSplit = (InputSplitWithIndex) split;
            params.put(ConfigConstants.SPLIT_INDEX,
                    String.valueOf(indexedInputSplit.getSplitIndex()));
        } else {
            throw new UnsupportedOperationException("Unsupported split type: " + split);
        }

        params.put(ConfigConstants.MAX_BATCH_ROWS,
                String.valueOf(options.getBatchRowCount()));
        if (options.getBatchRawSize() != 0L) {
            params.put(ConfigConstants.MAX_BATCH_RAW_SIZE, String.valueOf(options.getBatchRawSize()));
        }
        params.put(ConfigConstants.DATA_FORMAT_TYPE,
                options.getDataFormat().getType().toString());
        params.put(ConfigConstants.DATA_FORMAT_VERSION,
                options.getDataFormat().getVersion().toString());
        try {
            String resource = ResourceBuilder.buildTableDataResource(
                    ConfigConstants.VERSION_1,
                    identifier.getProject(),
                    identifier.getSchema(),
                    identifier.getTable());

            retryHandler.executeWithRetry(() -> {
                try {
                    this.connection = restClient.connect(resource, "GET", params, headers);
                    Response resp = connection.getResponse();
                    this.requestId = resp.getHeader(HttpHeaders.HEADER_ODPS_REQUEST_ID);
                    if (!resp.isOK()) {
                        throw new TunnelException(requestId, connection.getInputStream(),
                                resp.getStatus());
                    }
                } catch (Exception e) {
                    disconnect();
                    throw e;
                }
                return null;
            });
        } catch (Exception e) {
            disconnect();
            logger.error("Open split reader failed", e);
            throw new IOException(e.getMessage(), e);
        }
    }