def _get_arg_placeholders()

in sdks/python/apache_beam/runners/common.py [0:0]


def _get_arg_placeholders(
    method: MethodWrapper,
    input_args: Optional[List[Any]],
    input_kwargs: Optional[Dict[str, any]]):
  input_args = input_args if input_args else []
  input_kwargs = input_kwargs if input_kwargs else {}

  arg_names = method.args
  default_arg_values = method.defaults

  # Create placeholder for element parameter of DoFn.process() method.
  # Not to be confused with ArgumentPlaceHolder, which may be passed in
  # input_args and is a placeholder for side-inputs.
  class ArgPlaceholder(object):
    def __init__(self, placeholder):
      self.placeholder = placeholder

  if all(core.DoFn.ElementParam != arg for arg in default_arg_values):
    # TODO(https://github.com/apache/beam/issues/19631): Handle cases in which
    #   len(arg_names) == len(default_arg_values).
    args_to_pick = len(arg_names) - len(default_arg_values) - 1
    # Positional argument values for process(), with placeholders for special
    # values such as the element, timestamp, etc.
    args_with_placeholders = ([ArgPlaceholder(core.DoFn.ElementParam)] +
                              input_args[:args_to_pick])
  else:
    args_to_pick = len(arg_names) - len(default_arg_values)
    args_with_placeholders = input_args[:args_to_pick]

  # Fill the OtherPlaceholders for context, key, window or timestamp
  remaining_args_iter = iter(input_args[args_to_pick:])
  for a, d in zip(arg_names[-len(default_arg_values):], default_arg_values):
    if core.DoFn.ElementParam == d:
      args_with_placeholders.append(ArgPlaceholder(d))
    elif core.DoFn.KeyParam == d:
      args_with_placeholders.append(ArgPlaceholder(d))
    elif core.DoFn.WindowParam == d:
      args_with_placeholders.append(ArgPlaceholder(d))
    elif core.DoFn.WindowedValueParam == d:
      args_with_placeholders.append(ArgPlaceholder(d))
    elif core.DoFn.TimestampParam == d:
      args_with_placeholders.append(ArgPlaceholder(d))
    elif core.DoFn.PaneInfoParam == d:
      args_with_placeholders.append(ArgPlaceholder(d))
    elif core.DoFn.SideInputParam == d:
      # If no more args are present then the value must be passed via kwarg
      try:
        args_with_placeholders.append(next(remaining_args_iter))
      except StopIteration:
        if a not in input_kwargs:
          raise ValueError("Value for sideinput %s not provided" % a)
    elif isinstance(d, core.DoFn.StateParam):
      args_with_placeholders.append(ArgPlaceholder(d))
    elif isinstance(d, core.DoFn.TimerParam):
      args_with_placeholders.append(ArgPlaceholder(d))
    elif isinstance(d, type) and core.DoFn.BundleFinalizerParam == d:
      args_with_placeholders.append(ArgPlaceholder(d))
    elif isinstance(d, core.DoFn.BundleContextParam):
      args_with_placeholders.append(ArgPlaceholder(d))
    elif isinstance(d, core.DoFn.SetupContextParam):
      args_with_placeholders.append(ArgPlaceholder(d))
    else:
      # If no more args are present then the value must be passed via kwarg
      try:
        args_with_placeholders.append(next(remaining_args_iter))
      except StopIteration:
        pass
  args_with_placeholders.extend(list(remaining_args_iter))

  # Stash the list of placeholder positions for performance
  placeholders = [(i, x.placeholder)
                  for (i, x) in enumerate(args_with_placeholders)
                  if isinstance(x, ArgPlaceholder)]

  return placeholders, args_with_placeholders, input_kwargs