in zetasql-toolkit-core/src/main/java/com/google/zetasql/toolkit/tools/lineage/ColumnLineageExtractor.java [289:327]
private static Set<ColumnLineage> extractColumnLevelLineage(
Table targetTable, ResolvedMergeWhen mergeWhen, ResolvedMergeStmt originalStatement) {
List<ResolvedColumn> insertedColumns = mergeWhen.getInsertColumnList();
ResolvedInsertRow insertRow = mergeWhen.getInsertRow();
List<ResolvedUpdateItem> updateItems = mergeWhen.getUpdateItemList();
if (Objects.nonNull(insertRow)) {
// WHEN ... THEN INSERT
return IntStream.range(0, insertedColumns.size())
.mapToObj(
index ->
new SimpleEntry<>(
insertedColumns.get(index), insertRow.getValueList().get(index).getValue()))
.map(
entry ->
new SimpleEntry<>(
entry.getKey(),
ParentColumnFinder.findParentsForExpression(
originalStatement, entry.getValue())))
.map(
entry ->
buildColumnLineage(
targetTable.getFullName(), entry.getKey().getName(), entry.getValue()))
.collect(Collectors.toSet());
} else if (Objects.nonNull(updateItems)) {
// WHEN ... THEN UPDATE
return updateItems.stream()
.map(
updateItem ->
extractColumnLevelLineageForUpdateItem(
targetTable, updateItem, originalStatement))
.filter(Optional::isPresent)
.map(Optional::get)
.collect(Collectors.toSet());
}
return ImmutableSet.of();
}