def cached_api_call()

in gcpdiag/caching.py [0:0]


def cached_api_call(expire=None, in_memory=False):
  """Caching decorator optimized for API calls.

  This is very similar to functools.lru_cache, with the following differences:
  - uses diskcache so that the memory footprint doesn't grow uncontrollably (the
    API results might be big).
  - uses a lock so that if the function is called from two threads
    simultaneously, only one API call will be done and the other will wait until
    the result is available in the cache.

  Parameters:
  - expire: number of seconds until the key expires (default: expire when the
    process ends)
  - in_memory: if true the result will be kept in memory, similarly to
    lru_cache (but with the locking).
  """

  def _cached_api_call_decorator(func):
    lockdict = collections.defaultdict(threading.Lock)
    if in_memory:
      lru_cached_func = functools.lru_cache()(func)

    @functools.wraps(func)
    def _cached_api_call_wrapper(*args, **kwargs):
      key = None
      if _use_cache:
        logging.debug('looking up cache for %s', func.__name__)
        key = _make_key(func, args, kwargs)
        lock = lockdict[key]
        with _acquire_timeout(lock, config.CACHE_LOCK_TIMEOUT, func.__name__):
          if in_memory:
            if _get_bypass_cache():
              logging.debug('bypassing cache for %s, fetching fresh data.',
                            func.__name__)
              lru_cached_func.cache_clear()
            return lru_cached_func(*args, **kwargs)
          else:
            api_cache = get_disk_cache()
            if _get_bypass_cache():
              logging.debug('bypassing cache for %s, fetching fresh data.',
                            func.__name__)
            else:
              # We use 'no data' to be able to cache calls that returned None.
              cached_result = api_cache.get(key, default='no data')
              if cached_result != 'no data':
                logging.debug('returning cached result for %s', func.__name__)
                if isinstance(cached_result, Exception):
                  raise cached_result
                return cached_result
      else:
        logging.debug('caching is disabled for %s', func.__name__)
      # Call the function
      logging.debug('calling function %s (expire=%s, key=%s)', func.__name__,
                    str(expire), str(key))
      result = None
      try:
        result = func(*args, **kwargs)
        logging.debug('DONE calling function %s (expire=%s, key=%s)',
                      func.__name__, str(expire), str(key))
      except googleapiclient.errors.HttpError as err:
        # cache API errors as well
        result = err
      if _use_cache:
        if expire:
          api_cache.set(key, result, expire=expire)
        else:
          api_cache.set(key, result, tag='tmp')
        if isinstance(result, Exception):
          raise result
      return result

    return _cached_api_call_wrapper

  # Decorator without parens -> called with function as first parameter
  if callable(expire):
    func = expire
    expire = None
    return _cached_api_call_decorator(func)
  else:
    return _cached_api_call_decorator