in odps-sdk/odps-sdk-core/src/main/java/com/aliyun/odps/tunnel/io/ArrowTunnelRecordReader.java [199:312]
private void openReaderConnection(long start, long count, List<Column> columns,
RestClient restClient,
TableTunnel.DownloadSession session,
boolean disableModifiedCheck)
throws IOException, TunnelException {
HashMap<String, String> params = new HashMap<String, String>();
HashMap<String, String> headers = new HashMap<String, String>();
headers.put(Headers.CONTENT_LENGTH, String.valueOf(0));
headers.put(HttpHeaders.HEADER_ODPS_TUNNEL_VERSION, String.valueOf(TunnelConstants.VERSION));
switch (compression.algorithm) {
case ODPS_RAW: {
break;
}
case ODPS_ZLIB: {
headers.put(Headers.ACCEPT_ENCODING, "deflate");
break;
}
case ODPS_SNAPPY: {
headers.put(Headers.ACCEPT_ENCODING, "x-snappy-framed");
break;
}
case ODPS_ARROW_LZ4_FRAME: {
headers.put(Headers.ACCEPT_ENCODING, "x-odps-lz4-frame");
break;
}
default: {
throw new TunnelException("invalid compression option.");
}
}
Configuration conf = tableSession.getConfig();
List<String> tags = conf.getTags();
if (tags != null) {
headers.put(HttpHeaders.HEADER_ODPS_TUNNEL_TAGS, String.join(",", tags));
}
if (columns != null && columns.size() != 0) {
StringBuilder sb = new StringBuilder();
for (int i = 0; i < columns.size(); ++i) {
sb.append(columns.get(i).getName());
if (i != columns.size() - 1) {
sb.append(",");
}
}
params.put(TunnelConstants.RES_COLUMNS, sb.toString());
}
params.put(TunnelConstants.DOWNLOADID, session.getId());
params.put("data", null);
params.put(TunnelConstants.ROW_RANGE, "(" + start + "," + count + ")");
String partitionSpec = session.getPartitionSpec();
if (partitionSpec != null && partitionSpec.length() > 0) {
params.put(TunnelConstants.RES_PARTITION, partitionSpec);
}
params.put(TunnelConstants.PARAM_ARROW, "");
if (!StringUtils.isNullOrEmpty(conf.getQuotaName())) {
params.put(TunnelConstants.PARAM_QUOTA_NAME, conf.getQuotaName());
}
if (disableModifiedCheck) {
params.put(TunnelConstants.PARAM_DISABLE_MODIFIED_CHECK, "true");
}
Connection conn = null;
try {
String resource = ResourceBuilder.buildTableResource(
session.getProjectName(), session.getSchemaName(), session.getTableName());
conn = restClient.connect(resource, "GET", params, headers);
Response resp = conn.getResponse();
if (!resp.isOK()) {
TunnelException err = new TunnelException(conn.getInputStream());
err.setRequestId(resp.getHeader(HttpHeaders.HEADER_ODPS_REQUEST_ID));
throw err;
}
CompressOption reply_compression = null;
String content_encoding = resp.getHeader(Headers.CONTENT_ENCODING);
if (content_encoding != null) {
if (content_encoding.equals("deflate")) {
reply_compression = new CompressOption(CompressOption.CompressAlgorithm.ODPS_ZLIB,
-1, 0);
} else if (content_encoding.equals("x-snappy-framed")) {
reply_compression = new CompressOption(CompressOption.CompressAlgorithm.ODPS_SNAPPY,
-1, 0);
} else if (content_encoding.equals("x-odps-lz4-frame")) {
reply_compression =
new CompressOption(CompressOption.CompressAlgorithm.ODPS_ARROW_LZ4_FRAME,
-1, 0);
} else {
throw new TunnelException("invalid content encoding");
}
}
// update to match server's selection of compress algorithm
this.compression = reply_compression;
this.connection = conn;
} catch (IOException e) {
if (connection != null) {
connection.disconnect();
}
throw new TunnelException(e.getMessage(), e);
} catch (TunnelException e) {
throw e;
} catch (OdpsException e) {
if (connection != null) {
connection.disconnect();
}
throw new TunnelException(e.getMessage(), e);
}
}