def _schedule_refresh()

in google/cloud/sql/connector/instance.py [0:0]


    def _schedule_refresh(self, delay: int) -> asyncio.Task:
        """
        Schedule task to sleep and then perform refresh to get ConnectionInfo.

        Args:
            delay (int): Time in seconds to sleep before performing a refresh.

        Returns:
            An asyncio.Task representing the scheduled refresh.
        """

        async def _refresh_task(self: RefreshAheadCache, delay: int) -> ConnectionInfo:
            """
            A coroutine that sleeps for the specified amount of time before
            running _perform_refresh.
            """
            refresh_task: asyncio.Task
            try:
                if delay > 0:
                    await asyncio.sleep(delay)
                refresh_task = asyncio.create_task(self._perform_refresh())
                refresh_data = await refresh_task
                # check that refresh is valid
                if not await _is_valid(refresh_task):
                    raise RefreshNotValidError(
                        f"['{self._conn_name}']: Invalid refresh operation. Certficate appears to be expired."
                    )
            except asyncio.CancelledError:
                logger.debug(
                    f"['{self._conn_name}']: Scheduled refresh" " operation cancelled"
                )
                raise
            # bad refresh attempt
            except Exception as e:
                logger.exception(
                    f"['{self._conn_name}']: "
                    "An error occurred while performing refresh. "
                    "Scheduling another refresh attempt immediately",
                    exc_info=e,
                )
                # check if current metadata is invalid (expired),
                # don't want to replace valid metadata with invalid refresh
                if not await _is_valid(self._current):
                    self._current = refresh_task
                # schedule new refresh attempt immediately
                self._next = self._schedule_refresh(0)
                raise
            # if valid refresh, replace current with valid metadata and schedule next refresh
            self._current = refresh_task
            # calculate refresh delay based on certificate expiration
            delay = _seconds_until_refresh(refresh_data.expiration)
            logger.debug(
                f"['{self._conn_name}']: Connection info refresh"
                " operation scheduled for "
                f"{(datetime.now(timezone.utc) + timedelta(seconds=delay)).isoformat(timespec='seconds')} "
                f"(now + {timedelta(seconds=delay)})"
            )
            self._next = self._schedule_refresh(delay)

            return refresh_data

        # schedule refresh task and return it
        scheduled_task = asyncio.create_task(_refresh_task(self, delay))
        return scheduled_task