in odps-sdk/odps-sdk-table-api/src/main/java/com/aliyun/odps/table/write/impl/batch/TableBatchWriteSessionImpl.java [368:482]
private void loadResultFromJson(String json) throws TunnelException {
try {
JsonObject tree = new JsonParser().parse(json).getAsJsonObject();
// session id
if (tree.has("SessionId")) {
sessionId = tree.get("SessionId").getAsString();
}
// expiration time
if (tree.has("ExpirationTime")) {
expirationTime = tree.get("ExpirationTime").getAsLong();
}
// session type
if (tree.has("SessionType")) {
String sessionType = tree.get("SessionType").getAsString();
if (!getType().toString().equals(sessionType.toLowerCase())) {
throw new UnsupportedOperationException("Unsupported session type: " + sessionType);
}
}
// status
if (tree.has("SessionStatus")) {
String status = tree.get("SessionStatus").getAsString().toUpperCase();
sessionStatus = SessionStatus.valueOf(status);
}
// error message
if (tree.has("Message")) {
errorMessage = tree.get("Message").getAsString();
}
// schema
if (tree.has("DataSchema")) {
JsonObject dataSchema = tree.get("DataSchema").getAsJsonObject();
List<Column> schemaColumns = new ArrayList<>();
List<String> partitionKeys = new ArrayList<>();
if (dataSchema.has("DataColumns")) {
JsonArray dataColumns = dataSchema.get("DataColumns").getAsJsonArray();
for (int i = 0; i < dataColumns.size(); ++i) {
JsonObject column = dataColumns.get(i).getAsJsonObject();
schemaColumns.add(SchemaUtils.parseColumn(column));
}
}
if (dataSchema.has("PartitionColumns")) {
JsonArray partitionColumns = dataSchema.get("PartitionColumns").getAsJsonArray();
for (int i = 0; i < partitionColumns.size(); ++i) {
JsonObject column = partitionColumns.get(i).getAsJsonObject();
Column partitionCol = SchemaUtils.parseColumn(column);
schemaColumns.add(partitionCol);
partitionKeys.add(partitionCol.getName());
}
}
requiredSchema = DataSchema.newBuilder()
.columns(schemaColumns)
.partitionBy(partitionKeys)
.build();
}
// data format
if (tree.has("SupportedDataFormat")) {
supportDataFormats = new HashSet<>();
JsonArray formats = tree.get("SupportedDataFormat").getAsJsonArray();
formats.forEach(format -> supportDataFormats.add(
SessionUtils.parseDataFormat(format.getAsJsonObject())));
}
// max block num
if (tree.has("MaxBlockNumber")) {
maxBlockNumber = tree.get("MaxBlockNumber").getAsLong();
}
// required ordering
if (tree.has("RequiredOrdering")) {
JsonArray orders = tree.get("RequiredOrdering").getAsJsonArray();
List<SortOrder> sortOrders = new ArrayList<>();
orders.forEach(order -> sortOrders.add(parseOrders(order.getAsJsonObject())));
requiredSortOrders = sortOrders.toArray(new SortOrder[0]);
}
// required distribution
if (tree.has("RequiredDistribution")) {
JsonObject distribution = tree.get("RequiredDistribution").getAsJsonObject();
Distribution.Type type = Distribution.Type.UNSPECIFIED;
if (distribution.has("Type")) {
type = Distribution.Type.valueOf(distribution.get("Type").getAsString().toUpperCase());
}
if (type.equals(Distribution.Type.UNSPECIFIED)) {
requiredDistribution = Distributions.unspecified();
} else if (type.equals(Distribution.Type.HASH) ||
type.equals(Distribution.Type.RANGE)) {
List<String> clusterKeys = new ArrayList<>();
if (distribution.has("ClusterKeys")) {
JsonArray keys = distribution.get("ClusterKeys").getAsJsonArray();
keys.forEach(key -> clusterKeys.add(key.getAsString()));
}
int bucketsNumber = -1;
if (distribution.has("BucketsNumber")) {
bucketsNumber = distribution.get("BucketsNumber").getAsInt();
}
requiredDistribution = Distributions.clustered(clusterKeys, type, bucketsNumber);
}
}
if (tree.has("EnhanceWriteCheck")) {
enhanceWriteCheck = tree.get("EnhanceWriteCheck").getAsBoolean();
}
} catch (Exception e) {
throw new TunnelException("Invalid session response: \n" + json, e);
}
}