in src/main/java/com/aliyun/odps/kafka/connect/MaxComputeSinkTask.java [574:599]
protected static TableSchema parseSchema(String json) {
TableSchema schema = new TableSchema();
try {
JsonObject tree = new JsonParser().parse(json).getAsJsonObject();
if (tree.has("columns") && tree.get("columns") != null) {
JsonArray columnsNode = tree.get("columns").getAsJsonArray();
for (int i = 0; i < columnsNode.size(); ++i) {
JsonObject n = columnsNode.get(i).getAsJsonObject();
schema.addColumn(parseColumn(n));
}
}
if (tree.has("partitionKeys") && tree.get("partitionKeys") != null) {
JsonArray columnsNode = tree.get("partitionKeys").getAsJsonArray();
for (int i = 0; i < columnsNode.size(); ++i) {
JsonObject n = columnsNode.get(i).getAsJsonObject();
schema.addPartitionColumn(parseColumn(n));
}
}
} catch (Exception e) {
throw new RuntimeException(e.getMessage(), e);
}
return schema;
}