in presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java [381:408]
static List<ColumnMetadata> getPulsarColumnsFromStructSchema(TopicName topicName,
SchemaInfo schemaInfo,
boolean withInternalColumns,
PulsarColumnHandle.HandleKeyValueType handleKeyValueType) {
String schemaJson = new String(schemaInfo.getSchema());
if (StringUtils.isBlank(schemaJson)) {
throw new PrestoException(NOT_SUPPORTED, "Topic " + topicName.toString()
+ " does not have a valid schema");
}
Schema schema;
try {
schema = PulsarConnectorUtils.parseSchema(schemaJson);
} catch (SchemaParseException ex) {
throw new PrestoException(NOT_SUPPORTED, "Topic " + topicName.toString()
+ " does not have a valid schema");
}
ImmutableList.Builder<ColumnMetadata> builder = ImmutableList.builder();
builder.addAll(getColumns(null, schema, new HashSet<>(), new Stack<>(), new Stack<>(), handleKeyValueType));
if (withInternalColumns) {
PulsarInternalColumn.getInternalFields()
.stream()
.forEach(pulsarInternalColumn -> builder.add(pulsarInternalColumn.getColumnMetadata(false)));
}
return builder.build();
}