sdks/python/apache_beam/io/gcp/gcsio_retry.py (56 lines of code) (raw):
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
Throttling Handler for GCSIO
"""
import inspect
import logging
import math
from itertools import tee
from google.api_core import __version__ as _api_core_version
from google.api_core import exceptions as api_exceptions
from google.api_core import retry
from google.cloud.storage.retry import DEFAULT_RETRY
from google.cloud.storage.retry import _should_retry # pylint: disable=protected-access
from packaging import version
from apache_beam.metrics.metric import Metrics
from apache_beam.options.pipeline_options import GoogleCloudOptions
_LOGGER = logging.getLogger(__name__)
__all__ = ['DEFAULT_RETRY_WITH_THROTTLING_COUNTER']
_MIN_SLEEP_ARG_SWITCH_VERSION = version.parse("2.25.0rc0")
_LEGACY_SLEEP_ARG_NAME = "next_sleep"
_CURRENT_SLEEP_ARG_NAME = "sleep_iterator"
class ThrottlingHandler(object):
_THROTTLED_SECS = Metrics.counter('gcsio', "cumulativeThrottlingSeconds")
def __init__(self):
# decide which arg name google-api-core uses
try:
core_ver = version.parse(_api_core_version)
except Exception:
core_ver = version.parse("0")
if core_ver < _MIN_SLEEP_ARG_SWITCH_VERSION:
self._sleep_arg = _LEGACY_SLEEP_ARG_NAME
else:
self._sleep_arg = _CURRENT_SLEEP_ARG_NAME
def __call__(self, exc):
if isinstance(exc, api_exceptions.TooManyRequests):
_LOGGER.debug('Caught GCS quota error (%s), retrying.', exc.reason)
# TODO: revisit the logic here when gcs client library supports error
# callbacks
frame = inspect.currentframe()
if frame is None:
_LOGGER.warning('cannot inspect the current stack frame')
return
prev_frame = frame.f_back
if prev_frame is None:
_LOGGER.warning('cannot inspect the caller stack frame')
return
# Determine which retry helper argument to inspect in
# google/api_core/retry/retry_base.py’s _retry_error_helper():
# - versions < 2.25.0rc0 use “next_sleep”
# - versions ≥ 2.25.0rc0 use “sleep_iterator”
if self._sleep_arg == _LEGACY_SLEEP_ARG_NAME:
sleep_seconds = prev_frame.f_locals.get(self._sleep_arg, 0)
else:
sleep_iterator = prev_frame.f_locals.get(self._sleep_arg, iter([]))
sleep_iterator, sleep_iterator_copy = tee(sleep_iterator)
try:
sleep_seconds = next(sleep_iterator_copy)
except StopIteration:
sleep_seconds = 0
ThrottlingHandler._THROTTLED_SECS.inc(math.ceil(sleep_seconds))
DEFAULT_RETRY_WITH_THROTTLING_COUNTER = retry.Retry(
predicate=_should_retry, on_error=ThrottlingHandler())
def get_retry(pipeline_options):
if pipeline_options.view_as(GoogleCloudOptions).no_gcsio_throttling_counter:
return DEFAULT_RETRY
else:
return DEFAULT_RETRY_WITH_THROTTLING_COUNTER