in presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java [277:336]
private ConnectorTableMetadata getTableMetadata(SchemaTableName schemaTableName, boolean withInternalColumns) {
if (schemaTableName.getSchemaName().equals(INFORMATION_SCHEMA)) {
return null;
}
String namespace = restoreNamespaceDelimiterIfNeeded(schemaTableName.getSchemaName(), pulsarConnectorConfig);
TopicName topicName = TopicName.get(
String.format("%s/%s", namespace, schemaTableName.getTableName()));
List<String> topics;
try {
if (!PulsarConnectorUtils.isPartitionedTopic(topicName, this.pulsarAdmin)) {
topics = this.pulsarAdmin.topics().getList(namespace);
} else {
topics = this.pulsarAdmin.topics().getPartitionedTopicList(namespace);
}
} catch (PulsarAdminException e) {
if (e.getStatusCode() == 404) {
throw new PrestoException(NOT_FOUND, "Schema " + namespace + " does not exist");
} else if (e.getStatusCode() == 401) {
throw new PrestoException(QUERY_REJECTED,
String.format("Failed to get topics in schema %s: Unauthorized", namespace));
}
throw new RuntimeException("Failed to get topics in schema " + namespace
+ ": " + ExceptionUtils.getRootCause(e).getLocalizedMessage(), e);
}
if (!topics.contains(topicName.toString())) {
log.error("Table %s not found",
String.format("%s/%s", namespace,
schemaTableName.getTableName()));
throw new TableNotFoundException(schemaTableName);
}
SchemaInfo schemaInfo;
try {
schemaInfo = this.pulsarAdmin.schemas().getSchemaInfo(
String.format("%s/%s", namespace, schemaTableName.getTableName()));
} catch (PulsarAdminException e) {
if (e.getStatusCode() == 404) {
// use default schema because there is no schema
schemaInfo = PulsarSchemaHandlers.defaultSchema();
} else if (e.getStatusCode() == 401) {
throw new PrestoException(QUERY_REJECTED,
String.format("Failed to get pulsar topic schema information for topic %s/%s: Unauthorized",
namespace, schemaTableName.getTableName()));
} else {
throw new RuntimeException("Failed to get pulsar topic schema information for topic "
+ String.format("%s/%s", namespace, schemaTableName.getTableName())
+ ": " + ExceptionUtils.getRootCause(e).getLocalizedMessage(), e);
}
}
List<ColumnMetadata> handles = getPulsarColumns(
topicName, schemaInfo, withInternalColumns, PulsarColumnHandle.HandleKeyValueType.NONE
);
return new ConnectorTableMetadata(schemaTableName, handles);
}