in presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java [468:560]
static List<PulsarColumnMetadata> getColumns(String fieldName, Schema fieldSchema,
Set<String> fieldTypes,
Stack<String> fieldNames,
Stack<Integer> positionIndices,
PulsarColumnHandle.HandleKeyValueType handleKeyValueType) {
List<PulsarColumnMetadata> columnMetadataList = new LinkedList<>();
if (isPrimitiveType(fieldSchema.getType())) {
columnMetadataList.add(new PulsarColumnMetadata(
PulsarColumnMetadata.getColumnName(handleKeyValueType, fieldName),
convertType(fieldSchema.getType(), fieldSchema.getLogicalType()),
null, null, false, false,
fieldNames.toArray(new String[fieldNames.size()]),
positionIndices.toArray(new Integer[positionIndices.size()]), handleKeyValueType));
} else if (fieldSchema.getType() == Schema.Type.UNION) {
boolean canBeNull = false;
for (Schema type : fieldSchema.getTypes()) {
if (isPrimitiveType(type.getType())) {
PulsarColumnMetadata columnMetadata;
if (type.getType() != Schema.Type.NULL) {
if (!canBeNull) {
columnMetadata = new PulsarColumnMetadata(
PulsarColumnMetadata.getColumnName(handleKeyValueType, fieldName),
convertType(type.getType(), type.getLogicalType()),
null, null, false, false,
fieldNames.toArray(new String[fieldNames.size()]),
positionIndices.toArray(new Integer[positionIndices.size()]), handleKeyValueType);
} else {
columnMetadata = new PulsarColumnMetadata(
PulsarColumnMetadata.getColumnName(handleKeyValueType, fieldName),
convertType(type.getType(), type.getLogicalType()),
"field can be null", null, false, false,
fieldNames.toArray(new String[fieldNames.size()]),
positionIndices.toArray(new Integer[positionIndices.size()]), handleKeyValueType);
}
columnMetadataList.add(columnMetadata);
} else {
canBeNull = true;
}
} else {
List<PulsarColumnMetadata> columns = getColumns(fieldName, type, fieldTypes, fieldNames,
positionIndices, handleKeyValueType);
columnMetadataList.addAll(columns);
}
}
} else if (fieldSchema.getType() == Schema.Type.RECORD) {
// check if we have seen this type before to prevent cyclic class definitions.
if (!fieldTypes.contains(fieldSchema.getFullName())) {
// add to types seen so far in traversal
fieldTypes.add(fieldSchema.getFullName());
List<Schema.Field> fields = fieldSchema.getFields();
for (int i = 0; i < fields.size(); i++) {
Schema.Field field = fields.get(i);
fieldNames.push(field.name());
positionIndices.push(i);
List<PulsarColumnMetadata> columns;
if (fieldName == null) {
columns = getColumns(field.name(), field.schema(), fieldTypes, fieldNames, positionIndices,
handleKeyValueType);
} else {
columns = getColumns(String.format("%s.%s", fieldName, field.name()), field.schema(),
fieldTypes, fieldNames, positionIndices, handleKeyValueType);
}
positionIndices.pop();
fieldNames.pop();
columnMetadataList.addAll(columns);
}
fieldTypes.remove(fieldSchema.getFullName());
} else {
log.debug("Already seen type: %s", fieldSchema.getFullName());
}
} else if (fieldSchema.getType() == Schema.Type.ARRAY) {
} else if (fieldSchema.getType() == Schema.Type.MAP) {
} else if (fieldSchema.getType() == Schema.Type.ENUM) {
PulsarColumnMetadata columnMetadata = new PulsarColumnMetadata(
PulsarColumnMetadata.getColumnName(handleKeyValueType, fieldName),
convertType(fieldSchema.getType(), fieldSchema.getLogicalType()),
null, null, false, false,
fieldNames.toArray(new String[fieldNames.size()]),
positionIndices.toArray(new Integer[positionIndices.size()]), handleKeyValueType);
columnMetadataList.add(columnMetadata);
} else if (fieldSchema.getType() == Schema.Type.FIXED) {
} else {
log.error("Unknown column type: {}", fieldSchema);
}
return columnMetadataList;
}