private void openReaderConnection()

in odps-sdk/odps-sdk-core/src/main/java/com/aliyun/odps/tunnel/io/ArrowTunnelRecordReader.java [199:312]


  private void openReaderConnection(long start, long count, List<Column> columns,
                                    RestClient restClient,
                                    TableTunnel.DownloadSession session,
                                    boolean disableModifiedCheck)
      throws IOException, TunnelException {
    HashMap<String, String> params = new HashMap<String, String>();
    HashMap<String, String> headers = new HashMap<String, String>();

    headers.put(Headers.CONTENT_LENGTH, String.valueOf(0));
    headers.put(HttpHeaders.HEADER_ODPS_TUNNEL_VERSION, String.valueOf(TunnelConstants.VERSION));

    switch (compression.algorithm) {
      case ODPS_RAW: {
        break;
      }
      case ODPS_ZLIB: {
        headers.put(Headers.ACCEPT_ENCODING, "deflate");
        break;
      }
      case ODPS_SNAPPY: {
        headers.put(Headers.ACCEPT_ENCODING, "x-snappy-framed");
        break;
      }
      case ODPS_ARROW_LZ4_FRAME: {
        headers.put(Headers.ACCEPT_ENCODING, "x-odps-lz4-frame");
        break;
      }
      default: {
        throw new TunnelException("invalid compression option.");
      }
    }
    Configuration conf = tableSession.getConfig();
    List<String> tags = conf.getTags();
    if (tags != null) {
      headers.put(HttpHeaders.HEADER_ODPS_TUNNEL_TAGS, String.join(",", tags));
    }

    if (columns != null && columns.size() != 0) {
      StringBuilder sb = new StringBuilder();
      for (int i = 0; i < columns.size(); ++i) {
        sb.append(columns.get(i).getName());
        if (i != columns.size() - 1) {
          sb.append(",");
        }
      }
      params.put(TunnelConstants.RES_COLUMNS, sb.toString());
    }

    params.put(TunnelConstants.DOWNLOADID, session.getId());
    params.put("data", null);

    params.put(TunnelConstants.ROW_RANGE, "(" + start + "," + count + ")");

    String partitionSpec = session.getPartitionSpec();
    if (partitionSpec != null && partitionSpec.length() > 0) {
      params.put(TunnelConstants.RES_PARTITION, partitionSpec);
    }

    params.put(TunnelConstants.PARAM_ARROW, "");

    if (!StringUtils.isNullOrEmpty(conf.getQuotaName())) {
      params.put(TunnelConstants.PARAM_QUOTA_NAME, conf.getQuotaName());
    }

    if (disableModifiedCheck) {
      params.put(TunnelConstants.PARAM_DISABLE_MODIFIED_CHECK, "true");
    }

    Connection conn = null;
    try {
      String resource = ResourceBuilder.buildTableResource(
          session.getProjectName(), session.getSchemaName(), session.getTableName());
      conn = restClient.connect(resource, "GET", params, headers);

      Response resp = conn.getResponse();
      if (!resp.isOK()) {
        TunnelException err = new TunnelException(conn.getInputStream());
        err.setRequestId(resp.getHeader(HttpHeaders.HEADER_ODPS_REQUEST_ID));
        throw err;
      }
      CompressOption reply_compression = null;
      String content_encoding = resp.getHeader(Headers.CONTENT_ENCODING);
      if (content_encoding != null) {
        if (content_encoding.equals("deflate")) {
          reply_compression = new CompressOption(CompressOption.CompressAlgorithm.ODPS_ZLIB,
                                                 -1, 0);
        } else if (content_encoding.equals("x-snappy-framed")) {
          reply_compression = new CompressOption(CompressOption.CompressAlgorithm.ODPS_SNAPPY,
                                                 -1, 0);
        } else if (content_encoding.equals("x-odps-lz4-frame")) {
          reply_compression =
              new CompressOption(CompressOption.CompressAlgorithm.ODPS_ARROW_LZ4_FRAME,
                                 -1, 0);
        } else {
          throw new TunnelException("invalid content encoding");
        }
      }
      // update to match server's selection of compress algorithm
      this.compression = reply_compression;
      this.connection = conn;
    } catch (IOException e) {
      if (connection != null) {
        connection.disconnect();
      }
      throw new TunnelException(e.getMessage(), e);
    } catch (TunnelException e) {
      throw e;
    } catch (OdpsException e) {
      if (connection != null) {
        connection.disconnect();
      }
      throw new TunnelException(e.getMessage(), e);
    }
  }