in odps-sdk/odps-sdk-core/src/main/java/com/aliyun/odps/tunnel/io/RawTunnelRecordReader.java [57:218]
public static RawTunnelRecordReader createInstanceTunnelReader(long start, long count, long sizeLimit,
CompressOption compress,
List<Column> columns,
RestClient restClient,
InstanceTunnel.DownloadSession session,
boolean longPolling)
throws TunnelException, IOException {
HashMap<String, String> params = new HashMap<String, String>();
HashMap<String, String> headers = new HashMap<String, String>();
headers.put(Headers.CONTENT_LENGTH, String.valueOf(0));
headers.put(HttpHeaders.HEADER_ODPS_TUNNEL_VERSION, String.valueOf(TunnelConstants.VERSION));
switch (compress.algorithm) {
case ODPS_RAW: {
break;
}
case ODPS_ZLIB: {
headers.put(Headers.ACCEPT_ENCODING, "deflate");
break;
}
case ODPS_SNAPPY: {
headers.put(Headers.ACCEPT_ENCODING, "x-snappy-framed");
break;
}
case ODPS_LZ4_FRAME: {
headers.put(Headers.ACCEPT_ENCODING, "x-lz4-frame");
break;
}
default: {
throw new TunnelException("invalid compression option.");
}
}
Configuration conf = session.getConfig();
List<String> tags = conf.getTags();
if (tags != null) {
headers.put(HttpHeaders.HEADER_ODPS_TUNNEL_TAGS, String.join(",", tags));
}
if (columns != null && columns.size() != 0) {
StringBuilder sb = new StringBuilder();
for (int i = 0; i < columns.size(); ++i) {
sb.append(columns.get(i).getName());
if (i != columns.size() - 1) {
sb.append(",");
}
}
params.put(TunnelConstants.RES_COLUMNS, sb.toString());
}
if (session.getEnableLimit()) {
params.put(TunnelConstants.INSTANCE_TUNNEL_LIMIT_ENABLED, null);
}
if (!StringUtils.isNullOrEmpty(conf.getQuotaName())) {
params.put(TunnelConstants.PARAM_QUOTA_NAME, conf.getQuotaName());
}
params.put("data", null);
if (longPolling) {
// get schema from http stream
params.put(TunnelConstants.SCHEMA_IN_STREAM, null);
params.put(TunnelConstants.CACHED, null);
params.put(TunnelConstants.TASK_NAME, session.getTaskName());
if (session.getQueryId() != -1) {
params.put(TunnelConstants.QUERY_ID, String.valueOf(session.getQueryId()));
}
if (start < 0) {
TunnelException err = new TunnelException("The specified row range is not valid. start index is negative.");
throw err;
}
// either count or start is set, ROW_RANGE should be set in HTTP parameters
if (count > 0 || start > 0) {
// only start is set
if (start > 0 && count < 0) {
params.put(TunnelConstants.ROW_RANGE, "(" + start + "," + Long.MAX_VALUE + ")");
}
else {
// limit mode, otherwise unlimited
params.put(TunnelConstants.ROW_RANGE, "(" + start + "," + count + ")");
}
}
if(sizeLimit > 0){
//limit result size if necessary
params.put(TunnelConstants.SIZE_LIMIT, Long.toString(sizeLimit));
}
} else {
params.put(TunnelConstants.DOWNLOADID, session.getId());
params.put(TunnelConstants.ROW_RANGE, "(" + start + "," + count + ")");
}
Connection conn = null;
try {
conn =
restClient.connect(ResourceBuilder.buildInstanceResource(session.getProjectName(),
session.getInstanceID()), "GET",
params, headers);
Response resp = conn.getResponse();
if (!resp.isOK()) {
TunnelException err = new TunnelException(conn.getInputStream());
err.setRequestId(resp.getHeader(HttpHeaders.HEADER_ODPS_REQUEST_ID));
throw err;
}
CompressOption option = null;
String content_encoding = resp.getHeader(Headers.CONTENT_ENCODING);
if (content_encoding != null) {
if (content_encoding.equals("deflate")) {
option = new CompressOption(CompressOption.CompressAlgorithm.ODPS_ZLIB,
-1, 0);
} else if (content_encoding.equals("x-snappy-framed")) {
option = new CompressOption(CompressOption.CompressAlgorithm.ODPS_SNAPPY,
-1, 0);
} else if (content_encoding.equals("x-lz4-frame")) {
option = new CompressOption(CompressOption.CompressAlgorithm.ODPS_LZ4_FRAME,
-1, 0);
} else {
throw new TunnelException("invalid content encoding");
}
}
if (longPolling) {
long recordCount = 0;
if (resp.getHeaders().containsKey(Headers.TUNNEL_RECORD_COUNT)) {
recordCount = Long.parseLong(resp.getHeader(Headers.TUNNEL_RECORD_COUNT));
}
session.setRecordCount(recordCount);
// tunnel server do not support schema in stream
if (resp.getHeaders().containsKey(Headers.TUNNEL_SCHEMA)) {
String schemaStr = resp.getHeader(Headers.TUNNEL_SCHEMA);
if (StringUtils.isNullOrEmpty(schemaStr)) {
throw new TunnelException("Invalid response schema in header:" + schemaStr);
}
JsonObject tree = new JsonParser().parse(schemaStr).getAsJsonObject();
TableSchema schema = new TunnelTableSchema(tree);
// in direct mode, schema in session is null, we need to set it back
session.setSchema(schema);
return new RawTunnelRecordReader(session.getSchema(), columns, conn, option);
} else {
RawTunnelRecordReader reader = new RawTunnelRecordReader(columns, conn, option);
// in direct mode, schema in session is null, we need to set it back
session.setSchema(reader.getTableSchema());
return reader;
}
}
return new RawTunnelRecordReader(session.getSchema(), columns, conn, option);
} catch (IOException e) {
if (conn != null) {
conn.disconnect();
}
throw new TunnelException(e.getMessage(), e);
} catch (TunnelException e) {
throw e;
} catch (OdpsException e) {
if (conn != null) {
conn.disconnect();
}
throw new TunnelException(e.getMessage(), e);
}
}