in src/main/java/org/apache/flink/connector/rocketmq/catalog/RocketMQCatalog.java [179:198]
public CatalogBaseTable getTable(ObjectPath tablePath)
throws TableNotExistException, CatalogException {
if (!tableExists(tablePath)) {
throw new TableNotExistException(getName(), tablePath);
}
String subject = tablePath.getObjectName();
try {
GetSchemaResponse getSchemaResponse = schemaRegistryClient.getSchemaBySubject(subject);
if (getSchemaResponse.getType() != SchemaType.AVRO) {
throw new CatalogException("Only support avro schema.");
}
return getCatalogTableForSchema(subject, getSchemaResponse);
} catch (Exception e) {
throw new CatalogException(
String.format(
"Failed to get schema of table %s from schema registry client.",
tablePath.getFullName()),
e);
}
}