in odps-sdk/odps-sdk-core/src/main/java/com/aliyun/odps/tunnel/io/RawTunnelRecordReader.java [245:359]
public static RawTunnelRecordReader createTableTunnelReader(long start, long count,
CompressOption compress,
List<Column> columns,
RestClient restClient,
TableTunnel.DownloadSession session,
boolean disableModifiedCheck)
throws IOException, TunnelException {
HashMap<String, String> params = new HashMap<String, String>();
HashMap<String, String> headers = new HashMap<String, String>();
headers.put(Headers.CONTENT_LENGTH, String.valueOf(0));
headers.put(HttpHeaders.HEADER_ODPS_TUNNEL_VERSION, String.valueOf(TunnelConstants.VERSION));
if (compress != null) {
switch (compress.algorithm) {
case ODPS_RAW: {
break;
}
case ODPS_ZLIB: {
headers.put(Headers.ACCEPT_ENCODING, "deflate");
break;
}
case ODPS_SNAPPY: {
headers.put(Headers.ACCEPT_ENCODING, "x-snappy-framed");
break;
}
case ODPS_LZ4_FRAME: {
headers.put(Headers.ACCEPT_ENCODING, "x-lz4-frame");
break;
}
default: {
throw new TunnelException("invalid compression option.");
}
}
}
Configuration conf = session.getConfig();
List<String> tags = conf.getTags();
if (tags != null) {
headers.put(HttpHeaders.HEADER_ODPS_TUNNEL_TAGS, String.join(",", tags));
}
if (columns != null && columns.size() != 0) {
StringBuilder sb = new StringBuilder();
for (int i = 0; i < columns.size(); ++i) {
sb.append(columns.get(i).getName());
if (i != columns.size() - 1) {
sb.append(",");
}
}
params.put(TunnelConstants.RES_COLUMNS, sb.toString());
}
params.put(TunnelConstants.DOWNLOADID, session.getId());
params.put("data", null);
params.put(TunnelConstants.ROW_RANGE, "(" + start + "," + count + ")");
String partitionSpec = session.getPartitionSpec();
if (partitionSpec != null && partitionSpec.length() > 0) {
params.put(TunnelConstants.RES_PARTITION, partitionSpec);
}
if (!StringUtils.isNullOrEmpty(conf.getQuotaName())) {
params.put(TunnelConstants.PARAM_QUOTA_NAME, conf.getQuotaName());
}
if (disableModifiedCheck) {
params.put(TunnelConstants.PARAM_DISABLE_MODIFIED_CHECK, "true");
}
Connection conn = null;
try {
String resource = ResourceBuilder.buildTableResource(
session.getProjectName(), session.getSchemaName(), session.getTableName());
conn = restClient.connect(resource, "GET", params, headers);
Response resp = conn.getResponse();
if (!resp.isOK()) {
TunnelException err = new TunnelException(conn.getInputStream());
err.setRequestId(resp.getHeader(HttpHeaders.HEADER_ODPS_REQUEST_ID));
throw err;
}
CompressOption option = null;
String content_encoding = resp.getHeader(Headers.CONTENT_ENCODING);
if (content_encoding != null) {
if (content_encoding.equals("deflate")) {
option = new CompressOption(CompressOption.CompressAlgorithm.ODPS_ZLIB, -1, 0);
} else if (content_encoding.equals("x-snappy-framed")) {
option = new CompressOption(CompressOption.CompressAlgorithm.ODPS_SNAPPY, -1, 0);
} else if (content_encoding.equals("x-lz4-frame")) {
option = new CompressOption(CompressOption.CompressAlgorithm.ODPS_LZ4_FRAME, -1, 0);
} else {
throw new TunnelException("invalid content encoding");
}
}
return new RawTunnelRecordReader(session.getSchema(), columns, conn, option);
} catch (IOException e) {
if (conn != null) {
conn.disconnect();
}
throw new TunnelException(e.getMessage(), e);
} catch (TunnelException e) {
throw e;
} catch (OdpsException e) {
if (conn != null) {
conn.disconnect();
}
throw new TunnelException(e.getMessage(), e);
}
}