in src/main/java/org/apache/doris/kafka/connector/decode/avro/DorisAvroConverter.java [61:105]
public void parseTopic2Schema(final Map<String, ?> configs) {
Object avroSchemaPath = configs.get(AVRO_TOPIC_SCHEMA_FILEPATH);
if (avroSchemaPath == null) {
LOG.error(AVRO_TOPIC_SCHEMA_FILEPATH + " can not be empty in DorisAvroConverter.class");
throw new DataDecodeException(
AVRO_TOPIC_SCHEMA_FILEPATH + " can not be empty in DorisAvroConverter.class");
}
if (avroSchemaPath instanceof String) {
Map<String, String> topic2SchemaFileMap = parseTopicSchemaPath((String) avroSchemaPath);
for (Map.Entry<String, String> entry : topic2SchemaFileMap.entrySet()) {
String topic = entry.getKey();
String schemaPath = entry.getValue();
Schema schema;
try {
schema = new Schema.Parser().parse(new File(schemaPath));
} catch (SchemaParseException | IOException e) {
LOG.error(
"the provided for "
+ AVRO_TOPIC_SCHEMA_FILEPATH
+ " is no valid, failed to parse {} {}",
topic,
schemaPath,
e);
throw new DataDecodeException(
"the provided for "
+ AVRO_TOPIC_SCHEMA_FILEPATH
+ " is no valid, failed to parse "
+ topic
+ " "
+ schemaPath
+ ".\n",
e);
}
topic2SchemaMap.put(topic, schema);
}
} else {
LOG.error(AVRO_TOPIC_SCHEMA_FILEPATH + " must be a string.");
throw new DataDecodeException(
"The "
+ AVRO_TOPIC_SCHEMA_FILEPATH
+ " is provided, but can not be parsed as an Avro schema.");
}
}