def _execute_query_job()

in gcpdiag/queries/logs.py [0:0]


def _execute_query_job(job: _LogsQueryJob):
  thread = threading.current_thread()
  thread.name = f'log_query:{job.log_name}'
  logging_api = apis.get_api('logging', 'v2', job.project_id)

  # Convert "within" relative time to an absolute timestamp.
  start_time = datetime.datetime.now(
      datetime.timezone.utc) - datetime.timedelta(
          days=config.get('within_days'))
  filter_lines = ['timestamp>"%s"' % start_time.isoformat(timespec='seconds')]
  filter_lines.append('resource.type="%s"' % job.resource_type)
  if job.log_name.startswith('log_id('):
    # Special case: log_id(logname)
    # https://cloud.google.com/logging/docs/view/logging-query-language#functions
    filter_lines.append(job.log_name)
  else:
    filter_lines.append('logName="%s"' % job.log_name)
  if len(job.filters) == 1:
    filter_lines.append('(' + next(iter(job.filters)) + ')')
  else:
    filter_lines.append(
        '(' + ' OR '.join(['(' + val + ')' for val in sorted(job.filters)]) +
        ')')
  filter_str = '\n'.join(filter_lines)
  logging.debug('searching logs in project %s (resource type: %s)',
                job.project_id, job.resource_type)
  # Fetch all logs and put the results in temporary storage (diskcache.Deque)
  deque = caching.get_tmp_deque('tmp-logs-')
  req = logging_api.entries().list(
      body={
          'resourceNames': [f'projects/{job.project_id}'],
          'filter': filter_str,
          'orderBy': 'timestamp desc',
          'pageSize': config.get('logging_page_size')
      })
  fetched_entries_count = 0
  query_pages = 0
  query_start_time = datetime.datetime.now()
  while req is not None:
    query_pages += 1
    res = _ratelimited_execute(req)
    if 'entries' in res:
      for e in res['entries']:
        fetched_entries_count += 1
        deque.appendleft(e)

    # Verify that we aren't above limits, exit otherwise.
    if fetched_entries_count > config.get('logging_fetch_max_entries'):
      logging.warning(
          'maximum number of log entries (%d) reached (project: %s, query: %s).',
          config.get('logging_fetch_max_entries'), job.project_id,
          filter_str.replace('\n', ' AND '))
      return deque
    run_time = (datetime.datetime.now() - query_start_time).total_seconds()
    if run_time >= config.get('logging_fetch_max_time_seconds'):
      logging.warning(
          'maximum query runtime for log query reached (project: %s, query: %s).',
          job.project_id, filter_str.replace('\n', ' AND '))
      return deque
    req = logging_api.entries().list_next(req, res)
    if req is not None:
      logging.debug(
          'still fetching logs (project: %s, resource type: %s, max wait: %ds)',
          job.project_id, job.resource_type,
          config.get('logging_fetch_max_time_seconds') - run_time)

  query_end_time = datetime.datetime.now()
  logging.debug('logging query run time: %s, pages: %d, query: %s',
                query_end_time - query_start_time, query_pages,
                filter_str.replace('\n', ' AND '))

  return deque