in src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelService.java [1689:1795]
private void updateReusedModelsAndIndexPlans(String project, List<ModelRequest> modelRequestList) {
if (CollectionUtils.isEmpty(modelRequestList)) {
return;
}
if (modelRequestList.stream()
.anyMatch(modelRequest -> !FusionIndexService.checkUpdateIndexEnabled(project, modelRequest.getId()))) {
throw new KylinException(STREAMING_INDEX_UPDATE_DISABLE, MsgPicker.getMsg().getStreamingIndexesConvert());
}
for (ModelRequest modelRequest : modelRequestList) {
modelRequest.setProject(project);
semanticUpdater.expandModelRequest(modelRequest);
KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
NDataModelManager modelManager = NDataModelManager.getInstance(kylinConfig, project);
NIndexPlanManager indexPlanManager = NIndexPlanManager.getInstance(kylinConfig, project);
Map<String, NDataModel.NamedColumn> columnMap = Maps.newHashMap();
modelRequest.getAllNamedColumns().forEach(column -> {
Preconditions.checkArgument(!columnMap.containsKey(column.getAliasDotColumn()));
columnMap.put(column.getAliasDotColumn(), column);
});
BaseIndexUpdateHelper baseIndexUpdater = new BaseIndexUpdateHelper(
modelManager.getDataModelDesc(modelRequest.getId()), false);
// update model
List<LayoutRecDetailResponse> recItems = modelRequest.getRecItems();
NDataModel updated = modelManager.updateDataModel(modelRequest.getId(), copyForWrite -> {
copyForWrite.setJoinTables(modelRequest.getJoinTables());
List<NDataModel.NamedColumn> allNamedColumns = copyForWrite.getAllNamedColumns();
Map<Integer, NDataModel.NamedColumn> namedColumnMap = Maps.newHashMap();
allNamedColumns.forEach(col -> namedColumnMap.put(col.getId(), col));
Map<String, ComputedColumnDesc> newCCMap = Maps.newLinkedHashMap();
Map<String, NDataModel.NamedColumn> newDimMap = Maps.newLinkedHashMap();
Map<String, NDataModel.Measure> newMeasureMap = Maps.newLinkedHashMap();
recItems.forEach(recItem -> {
recItem.getComputedColumns().stream() //
.filter(LayoutRecDetailResponse.RecComputedColumn::isNew) //
.forEach(recCC -> {
ComputedColumnDesc cc = recCC.getCc();
newCCMap.putIfAbsent(cc.getFullName(), cc);
});
recItem.getDimensions().stream() //
.filter(LayoutRecDetailResponse.RecDimension::isNew) //
.forEach(recDim -> {
NDataModel.NamedColumn dim = recDim.getDimension();
newDimMap.putIfAbsent(dim.getAliasDotColumn(), dim);
});
recItem.getMeasures().stream() //
.filter(LayoutRecDetailResponse.RecMeasure::isNew) //
.forEach(recMeasure -> {
NDataModel.Measure measure = recMeasure.getMeasure();
newMeasureMap.putIfAbsent(measure.getName(), measure);
});
});
newCCMap.forEach((ccName, cc) -> {
copyForWrite.getComputedColumnDescs().add(cc);
NDataModel.NamedColumn column = columnMap.get(cc.getFullName());
allNamedColumns.add(column);
namedColumnMap.putIfAbsent(column.getId(), column);
});
newDimMap.forEach((colName, dim) -> {
if (namedColumnMap.containsKey(dim.getId())) {
namedColumnMap.get(dim.getId()).setStatus(NDataModel.ColumnStatus.DIMENSION);
} else {
allNamedColumns.add(dim);
}
});
Set<Integer> namedColumnIds = allNamedColumns.stream().map(NDataModel.NamedColumn::getId)
.collect(Collectors.toSet());
modelRequest.getAllNamedColumns().forEach(col -> {
if (!namedColumnIds.contains(col.getId())) {
allNamedColumns.add(col);
namedColumnIds.add(col.getId());
}
});
newMeasureMap.forEach((measureName, measure) -> copyForWrite.getAllMeasures().add(measure));
// keep order of all columns and measures
copyForWrite.keepColumnOrder();
copyForWrite.keepMeasureOrder();
});
// expand
semanticUpdater.deleteExpandableMeasureInternalMeasures(updated);
semanticUpdater.expandExpandableMeasure(updated);
preProcessBeforeModelSave(updated, project);
NDataModel expanded = getManager(NDataModelManager.class, project).updateDataModelDesc(updated);
// update IndexPlan
IndexPlan indexPlan = modelRequest.getIndexPlan();
indexPlanService.expandIndexPlanRequest(indexPlan, expanded);
Map<Long, LayoutEntity> layoutMap = Maps.newHashMap();
indexPlan.getAllLayouts().forEach(layout -> layoutMap.putIfAbsent(layout.getId(), layout));
indexPlanManager.updateIndexPlan(modelRequest.getId(), copyForWrite -> {
IndexPlan.IndexPlanUpdateHandler updateHandler = copyForWrite.createUpdateHandler();
for (LayoutRecDetailResponse recItem : recItems) {
long layoutId = recItem.getIndexId();
LayoutEntity layout = layoutMap.get(layoutId);
updateHandler.add(layout, IndexEntity.isAggIndex(recItem.getIndexId()));
}
updateHandler.complete();
});
modelChangeSupporters.forEach(listener -> listener.onUpdateSingle(project, modelRequest.getUuid()));
baseIndexUpdater.update(indexPlanService);
}
}