in maestro-engine/src/main/java/com/netflix/maestro/engine/params/ParamsManager.java [133:238]
public Map<String, Parameter> generateMergedStepParams(
WorkflowSummary workflowSummary,
Step stepDefinition,
StepRuntime stepRuntime,
StepRuntimeSummary runtimeSummary) {
Map<String, ParamDefinition> allParamDefs = new LinkedHashMap<>();
// Start with default step level params if present
Map<String, ParamDefinition> globalDefault = defaultParamManager.getDefaultStepParams();
if (globalDefault != null) {
ParamsMergeHelper.mergeParams(
allParamDefs,
globalDefault,
ParamsMergeHelper.MergeContext.stepCreate(ParamSource.SYSTEM_DEFAULT));
}
// Merge in injected params returned by step if present (template schema)
Map<String, ParamDefinition> injectedParams =
stepRuntime.injectRuntimeParams(workflowSummary, stepDefinition);
maybeOverrideParamType(allParamDefs);
if (injectedParams != null) {
maybeOverrideParamType(injectedParams);
ParamsMergeHelper.mergeParams(
allParamDefs,
injectedParams,
ParamsMergeHelper.MergeContext.stepCreate(ParamSource.TEMPLATE_SCHEMA));
}
// Merge in params applicable to step type
Optional<Map<String, ParamDefinition>> defaultStepTypeParams =
defaultParamManager.getDefaultParamsForType(stepDefinition.getType());
if (defaultStepTypeParams.isPresent()) {
LOG.debug("Merging step level default for {}", stepDefinition.getType());
ParamsMergeHelper.mergeParams(
allParamDefs,
defaultStepTypeParams.get(),
ParamsMergeHelper.MergeContext.stepCreate(ParamSource.SYSTEM_DEFAULT));
}
// Merge in workflow and step info
ParamsMergeHelper.mergeParams(
allParamDefs,
injectWorkflowAndStepInfoParams(workflowSummary, runtimeSummary),
ParamsMergeHelper.MergeContext.stepCreate(ParamSource.SYSTEM_INJECTED));
// merge step run param and user provided restart step run params
// first to get undefined params from both run param and restart params
Map<String, ParamDefinition> undefinedRestartParams = new LinkedHashMap<>();
Optional<Map<String, ParamDefinition>> stepRestartParams =
getUserStepRestartParam(workflowSummary, runtimeSummary);
stepRestartParams.ifPresent(undefinedRestartParams::putAll);
Optional<Map<String, ParamDefinition>> stepRunParams =
getStepRunParams(workflowSummary, runtimeSummary);
Map<String, ParamDefinition> systemInjectedRestartRunParams = new LinkedHashMap<>();
stepRunParams.ifPresent(
params -> {
params.forEach(
(key, val) -> {
if (runtimeSummary.getRestartConfig() != null
&& Constants.RESERVED_PARAM_NAMES.contains(key)
&& val.getMode() == ParamMode.CONSTANT
&& val.getSource() == ParamSource.SYSTEM_INJECTED) {
((AbstractParamDefinition) val)
.getMeta()
.put(Constants.METADATA_SOURCE_KEY, ParamSource.RESTART.name());
systemInjectedRestartRunParams.put(key, val);
}
});
systemInjectedRestartRunParams.keySet().forEach(params::remove);
});
stepRunParams.ifPresent(undefinedRestartParams::putAll);
Optional.ofNullable(stepDefinition.getParams())
.ifPresent(
stepDefParams ->
stepDefParams.keySet().stream()
.filter(undefinedRestartParams::containsKey)
.forEach(undefinedRestartParams::remove));
// Then merge undefined restart params
if (!undefinedRestartParams.isEmpty()) {
mergeUserProvidedStepParams(allParamDefs, undefinedRestartParams, workflowSummary);
}
// Final merge from step definition
if (stepDefinition.getParams() != null) {
maybeOverrideParamType(stepDefinition.getParams());
ParamsMergeHelper.mergeParams(
allParamDefs,
stepDefinition.getParams(),
ParamsMergeHelper.MergeContext.stepCreate(ParamSource.DEFINITION));
}
// merge step run params
stepRunParams.ifPresent(
stepParams -> mergeUserProvidedStepParams(allParamDefs, stepParams, workflowSummary));
// merge all user provided restart step run params
stepRestartParams.ifPresent(
stepParams -> mergeUserProvidedStepParams(allParamDefs, stepParams, workflowSummary));
// merge all system injected restart step run params with mode and source already set.
allParamDefs.putAll(systemInjectedRestartRunParams);
// Cleanup any params that are missing and convert to params
return ParamsMergeHelper.convertToParameters(ParamsMergeHelper.cleanupParams(allParamDefs));
}