in tfx/dsl/component/experimental/decorators.py [0:0]
def Do(self, input_dict: Dict[str, List[tfx_types.Artifact]],
output_dict: Dict[str, List[tfx_types.Artifact]],
exec_properties: Dict[str, Any]) -> None:
function_args = {}
for name, arg_format in self._ARG_FORMATS.items():
if arg_format == function_parser.ArgFormats.INPUT_ARTIFACT:
input_list = input_dict.get(name, [])
if len(input_list) == 1:
function_args[name] = input_list[0]
elif not input_list and name in self._ARG_DEFAULTS:
# Do not pass the missing optional input.
pass
else:
raise ValueError((
'Expected input %r to %s to be a singleton ValueArtifact channel '
'(got %s instead).') % (name, self, input_list))
elif arg_format == function_parser.ArgFormats.OUTPUT_ARTIFACT:
output_list = output_dict.get(name, [])
if len(output_list) == 1:
function_args[name] = output_list[0]
else:
raise ValueError((
'Expected output %r to %s to be a singleton ValueArtifact channel '
'(got %s instead).') % (name, self, output_list))
elif arg_format == function_parser.ArgFormats.ARTIFACT_VALUE:
input_list = input_dict.get(name, [])
if len(input_list) == 1:
function_args[name] = input_list[0].value
elif not input_list and name in self._ARG_DEFAULTS:
# Do not pass the missing optional input.
pass
else:
raise ValueError((
'Expected input %r to %s to be a singleton ValueArtifact channel '
'(got %s instead).') % (name, self, input_list))
elif arg_format == function_parser.ArgFormats.PARAMETER:
if name in exec_properties:
function_args[name] = exec_properties[name]
elif name in self._ARG_DEFAULTS:
# Do not pass the missing optional input.
pass
else:
raise ValueError((
'Expected non-optional parameter %r of %s to be provided, but no '
'value was passed.') % (name, self))
else:
raise ValueError('Unknown argument format: %r' % (arg_format,))
# Call function and check returned values.
outputs = self._FUNCTION(**function_args)
outputs = outputs or {}
if not isinstance(outputs, dict):
raise ValueError(
('Expected component executor function %s to return a dict of '
'outputs (got %r instead).') % (self._FUNCTION, outputs))
# Assign returned ValueArtifact values.
for name, is_optional in self._RETURNED_VALUES.items():
if name not in outputs:
raise ValueError(
'Did not receive expected output %r as return value from '
'component executor function %s.' % (name, self._FUNCTION))
if not is_optional and outputs[name] is None:
raise ValueError(
'Non-nullable output %r received None return value from '
'component executor function %s.' % (name, self._FUNCTION))
try:
output_dict[name][0].value = outputs[name]
except TypeError:
raise TypeError(
('Return value %r for output %r is incompatible with output type '
'%r.') % (outputs[name], name, output_dict[name][0].__class__))