in sdks/python/apache_beam/runners/common.py [0:0]
def _get_arg_placeholders(
method: MethodWrapper,
input_args: Optional[List[Any]],
input_kwargs: Optional[Dict[str, any]]):
input_args = input_args if input_args else []
input_kwargs = input_kwargs if input_kwargs else {}
arg_names = method.args
default_arg_values = method.defaults
# Create placeholder for element parameter of DoFn.process() method.
# Not to be confused with ArgumentPlaceHolder, which may be passed in
# input_args and is a placeholder for side-inputs.
class ArgPlaceholder(object):
def __init__(self, placeholder):
self.placeholder = placeholder
if all(core.DoFn.ElementParam != arg for arg in default_arg_values):
# TODO(https://github.com/apache/beam/issues/19631): Handle cases in which
# len(arg_names) == len(default_arg_values).
args_to_pick = len(arg_names) - len(default_arg_values) - 1
# Positional argument values for process(), with placeholders for special
# values such as the element, timestamp, etc.
args_with_placeholders = ([ArgPlaceholder(core.DoFn.ElementParam)] +
input_args[:args_to_pick])
else:
args_to_pick = len(arg_names) - len(default_arg_values)
args_with_placeholders = input_args[:args_to_pick]
# Fill the OtherPlaceholders for context, key, window or timestamp
remaining_args_iter = iter(input_args[args_to_pick:])
for a, d in zip(arg_names[-len(default_arg_values):], default_arg_values):
if core.DoFn.ElementParam == d:
args_with_placeholders.append(ArgPlaceholder(d))
elif core.DoFn.KeyParam == d:
args_with_placeholders.append(ArgPlaceholder(d))
elif core.DoFn.WindowParam == d:
args_with_placeholders.append(ArgPlaceholder(d))
elif core.DoFn.WindowedValueParam == d:
args_with_placeholders.append(ArgPlaceholder(d))
elif core.DoFn.TimestampParam == d:
args_with_placeholders.append(ArgPlaceholder(d))
elif core.DoFn.PaneInfoParam == d:
args_with_placeholders.append(ArgPlaceholder(d))
elif core.DoFn.SideInputParam == d:
# If no more args are present then the value must be passed via kwarg
try:
args_with_placeholders.append(next(remaining_args_iter))
except StopIteration:
if a not in input_kwargs:
raise ValueError("Value for sideinput %s not provided" % a)
elif isinstance(d, core.DoFn.StateParam):
args_with_placeholders.append(ArgPlaceholder(d))
elif isinstance(d, core.DoFn.TimerParam):
args_with_placeholders.append(ArgPlaceholder(d))
elif isinstance(d, type) and core.DoFn.BundleFinalizerParam == d:
args_with_placeholders.append(ArgPlaceholder(d))
elif isinstance(d, core.DoFn.BundleContextParam):
args_with_placeholders.append(ArgPlaceholder(d))
elif isinstance(d, core.DoFn.SetupContextParam):
args_with_placeholders.append(ArgPlaceholder(d))
else:
# If no more args are present then the value must be passed via kwarg
try:
args_with_placeholders.append(next(remaining_args_iter))
except StopIteration:
pass
args_with_placeholders.extend(list(remaining_args_iter))
# Stash the list of placeholder positions for performance
placeholders = [(i, x.placeholder)
for (i, x) in enumerate(args_with_placeholders)
if isinstance(x, ArgPlaceholder)]
return placeholders, args_with_placeholders, input_kwargs