in presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSchemaHandlers.java [35:64]
static SchemaHandler newPulsarSchemaHandler(TopicName topicName,
PulsarConnectorConfig pulsarConnectorConfig,
SchemaInfo schemaInfo,
List<PulsarColumnHandle> columnHandles) throws RuntimeException{
if (schemaInfo.getType().isPrimitive()) {
return new PulsarPrimitiveSchemaHandler(schemaInfo);
} else if (schemaInfo.getType().isStruct()) {
try {
switch (schemaInfo.getType()) {
case JSON:
return new JSONSchemaHandler(columnHandles);
case AVRO:
return new AvroSchemaHandler(topicName, pulsarConnectorConfig, schemaInfo, columnHandles);
default:
throw new PrestoException(NOT_SUPPORTED, "Not supported schema type: " + schemaInfo.getType());
}
} catch (PulsarClientException e) {
throw new RuntimeException(
new Throwable("PulsarAdmin gets version schema fail, topicName : "
+ topicName.toString(), e));
}
} else if (schemaInfo.getType().equals(SchemaType.KEY_VALUE)) {
return new KeyValueSchemaHandler(topicName, pulsarConnectorConfig, schemaInfo, columnHandles);
} else {
throw new PrestoException(
NOT_SUPPORTED,
"Schema `" + schemaInfo.getType() + "` is not supported by presto yet : " + schemaInfo);
}
}