in ingestion-core/src/main/java/com/mozilla/telemetry/ingestion/core/schema/SchemaStore.java [117:151]
private void loadAllSchemas() throws IOException {
final Map<String, T> tempSchemas = new HashMap<>();
final Set<String> tempDirs = new HashSet<>();
InputStream inputStream;
try {
inputStream = open.apply(schemasLocation);
} catch (IOException e) {
throw new IOException("Exception thrown while fetching from configured schemasLocation", e);
}
try (InputStream bi = new BufferedInputStream(inputStream);
InputStream gzi = new GzipCompressorInputStream(bi);
ArchiveInputStream i = new TarArchiveInputStream(gzi);) {
ArchiveEntry entry;
while ((entry = i.getNextEntry()) != null) {
if (!i.canReadEntryData(entry)) {
LOG.warn("Unable to read tar file entry: " + entry.getName());
continue;
}
String[] components = entry.getName().split("/");
if (components.length > 2 && "schemas".equals(components[1])) {
String name = getAndCacheNormalizedPath(
Arrays.copyOfRange(components, 2, components.length));
if (entry.isDirectory()) {
tempDirs.add(name);
continue;
}
if (name.endsWith(schemaSuffix())) {
tempSchemas.put(name, loadSchemaFromArchive(i));
}
}
}
}
schemas = tempSchemas;
dirs = tempDirs;
}