in runtime/src/main/java/com/google/cloud/verticals/foundations/dataharmonization/imports/impl/DefaultImportProcessor.java [74:127]
void processImports(
ImportPath currentPath,
RuntimeContext context,
PipelineConfig config,
List<ImportException> importExceptions)
throws IOException {
if (!(context instanceof InitializationContext)) {
throw new IllegalStateException(
String.format(
"Expect an instance of InitializationContext but got %s.",
context.getClass().getName()));
}
InitializationContext initializationContext = (InitializationContext) context;
seenSet.add(currentPath.getAbsPath());
for (Import i : config.getImportsList()) {
String path = initializationContext.evaluateImport(i, config).asPrimitive().string();
ImportPath iPath = ImportPath.resolve(currentPath, path);
// Avoid processing any imports seen before.
if (!seenSet.add(iPath.getAbsPath())) {
continue;
}
Registries registries = context.getRegistries();
Loader loader = registries.getLoaderRegistry().get(iPath.getLoader());
if (loader == null) {
throw new IllegalArgumentException(
String.format(
"Loader %s for import %s not found. Do you need to import a plugin that contains"
+ " it?",
iPath.getLoader(), path));
}
try {
byte[] loadBytes = loader.load(iPath);
Parser parser = ImportProcessor.matchParser(iPath, registries.getParserRegistry());
parser.parse(loadBytes, registries, context.getMetaData(), this, iPath);
importPaths.add(iPath);
} catch (RuntimeException | IOException e) {
if (importExceptions == null) {
// Case for when we are not storing ImportExceptions.
throw new ImportException(iPath, e);
}
// Save any ImportExceptions to the provided list.
Map<String, Any> metaMap = i.getMeta().getEntriesMap();
Source source =
metaMap.containsKey(SOURCE_META_KEY)
? metaMap.get(SOURCE_META_KEY).unpack(Source.class)
: Source.getDefaultInstance();
importExceptions.add(new ImportException(iPath, e, source, currentPath.toString()));
}
}
}