in azure/durable_functions/models/Task.py [0:0]
def __init__(self, id_, action: CreateTimerAction, orchestration_context):
"""Initialize a LongTimerTask.
Parameters
----------
id_ : int
An ID for the task
action : CreateTimerAction
The action this task represents
orchestration_context: DurableOrchestrationContext
The orchestration context this task was created in
"""
current_time = orchestration_context.current_utc_datetime
final_fire_time = action.fire_at
duration_until_fire = final_fire_time - current_time
if duration_until_fire > orchestration_context._maximum_short_timer_duration:
next_fire_time = current_time + orchestration_context._long_timer_interval_duration
else:
next_fire_time = final_fire_time
next_timer_action = CreateTimerAction(next_fire_time)
next_timer_task = TimerTask(None, next_timer_action)
super().__init__([next_timer_task], orchestration_context._replay_schema)
self.id = id_
self.action = action
self._orchestration_context = orchestration_context
self._max_short_timer_duration = self._orchestration_context._maximum_short_timer_duration
self._long_timer_interval = self._orchestration_context._long_timer_interval_duration