in azure/durable_functions/models/DurableOrchestrationContext.py [0:0]
def call_activity(self, name: Union[str, Callable], input_: Optional[Any] = None) -> TaskBase:
"""Schedule an activity for execution.
Parameters
----------
name: str | Callable
Either the name of the activity function to call, as a string or,
in the Python V2 programming model, the activity function itself.
input_: Optional[Any]
The JSON-serializable input to pass to the activity function.
Returns
-------
Task
A Durable Task that completes when the called activity function completes or fails.
"""
if isinstance(name, Callable) and not isinstance(name, FunctionBuilder):
error_message = "The `call_activity` API received a `Callable` without an "\
"associated Azure Functions trigger-type. "\
"Please ensure you're using the Python programming model V2 "\
"and that your activity function is annotated with the `activity_trigger`"\
"decorator. Otherwise, provide in the name of the activity as a string."
raise ValueError(error_message)
if isinstance(name, FunctionBuilder):
name = self._get_function_name(name, ActivityTrigger)
action = CallActivityAction(name, input_)
task = self._generate_task(action)
return task