in cassandra-analytics-cdc-codec/src/main/java/org/apache/cassandra/cdc/schemastore/LocalTableSchemaStore.java [103:173]
protected void loadFromResource()
{
FileSystem jarFs = null;
try
{
URL url = getClass().getClassLoader().getResource("table_schemas");
if (url == null)
{
throw new RuntimeException("Resource table_schemas not found");
}
URI schemas = url.toURI();
Path path;
if (schemas.getScheme().equals("jar"))
{
jarFs = FileSystems.newFileSystem(schemas, new HashMap<>());
path = jarFs.getPath("table_schemas");
}
else
{
path = Paths.get(schemas);
}
try (Stream<Path> paths = Files.walk(path, 1))
{
Schema.Parser parser = new Schema.Parser();
paths.forEach(p -> {
if (!p.toString().endsWith(".avsc"))
{
return;
}
try
{
InputStream is = Files.newInputStream(p);
Schema schema = parser.parse(is);
String key = schema.getNamespace();
LOGGER.info("Loading schema namespace={}", key);
cache.put(key, schema);
writers.put(key, new GenericDatumWriter<>(schema));
readers.put(key, new GenericDatumReader<>(schema));
}
catch (IOException e)
{
throw new RuntimeException(e);
}
});
}
}
catch (URISyntaxException e)
{
throw new RuntimeException(e);
}
catch (IOException e)
{
throw new RuntimeException("Unable to read from table_schemas", e);
}
finally
{
if (jarFs != null)
{
try
{
jarFs.close();
}
catch (IOException e)
{
throw new RuntimeException("Unable to close jar", e);
}
}
}
}