in metaflow/plugins/events_decorator.py [0:0]
def process_parameters(self, parameters):
new_param_values = {}
if isinstance(parameters, (list, tuple)):
for mapping in parameters:
if is_stringish(mapping):
new_param_values[mapping] = mapping
elif callable(mapping) and not isinstance(mapping, DeployTimeField):
mapping = DeployTimeField(
"parameter_val", str, None, mapping, False
)
new_param_values[mapping] = mapping
elif isinstance(mapping, (list, tuple)) and len(mapping) == 2:
if callable(mapping[0]) and not isinstance(
mapping[0], DeployTimeField
):
mapping[0] = DeployTimeField(
"parameter_val", str, None, mapping[0], False
)
if callable(mapping[1]) and not isinstance(
mapping[1], DeployTimeField
):
mapping[1] = DeployTimeField(
"parameter_val", str, None, mapping[1], False
)
new_param_values[mapping[0]] = mapping[1]
else:
raise MetaflowException(
"The *parameters* attribute for event is invalid. "
"It should be a list/tuple of strings and lists/tuples of size 2"
)
elif callable(parameters) and not isinstance(parameters, DeployTimeField):
return DeployTimeField(
"parameters", [list, dict, tuple], None, parameters, False
)
elif isinstance(parameters, dict):
for key, value in parameters.items():
if callable(key) and not isinstance(key, DeployTimeField):
key = DeployTimeField("flow_parameter", str, None, key, False)
if callable(value) and not isinstance(value, DeployTimeField):
value = DeployTimeField("signal_parameter", str, None, value, False)
new_param_values[key] = value
return new_param_values