def call_activity_with_retry()

in azure/durable_functions/models/DurableOrchestrationContext.py [0:0]


    def call_activity_with_retry(self,
                                 name: Union[str, Callable], retry_options: RetryOptions,
                                 input_: Optional[Any] = None) -> TaskBase:
        """Schedule an activity for execution with retry options.

        Parameters
        ----------
        name: str | Callable
            Either the name of the activity function to call, as a string or,
            in the Python V2 programming model, the activity function itself.
        retry_options: RetryOptions
            The retry options for the activity function.
        input_: Optional[Any]
            The JSON-serializable input to pass to the activity function.

        Returns
        -------
        Task
            A Durable Task that completes when the called activity function completes or
            fails completely.
        """
        if isinstance(name, Callable) and not isinstance(name, FunctionBuilder):
            error_message = "The `call_activity` API received a `Callable` without an "\
                "associated Azure Functions trigger-type. "\
                "Please ensure you're using the Python programming model V2 "\
                "and that your activity function is annotated with the `activity_trigger`"\
                "decorator. Otherwise, provide in the name of the activity as a string."
            raise ValueError(error_message)

        if isinstance(name, FunctionBuilder):
            name = self._get_function_name(name, ActivityTrigger)

        action = CallActivityWithRetryAction(name, retry_options, input_)
        task = self._generate_task(action, retry_options)
        return task