in gcpdiag/queries/logs.py [0:0]
def _execute_query_job(job: _LogsQueryJob):
thread = threading.current_thread()
thread.name = f'log_query:{job.log_name}'
logging_api = apis.get_api('logging', 'v2', job.project_id)
# Convert "within" relative time to an absolute timestamp.
start_time = datetime.datetime.now(
datetime.timezone.utc) - datetime.timedelta(
days=config.get('within_days'))
filter_lines = ['timestamp>"%s"' % start_time.isoformat(timespec='seconds')]
filter_lines.append('resource.type="%s"' % job.resource_type)
if job.log_name.startswith('log_id('):
# Special case: log_id(logname)
# https://cloud.google.com/logging/docs/view/logging-query-language#functions
filter_lines.append(job.log_name)
else:
filter_lines.append('logName="%s"' % job.log_name)
if len(job.filters) == 1:
filter_lines.append('(' + next(iter(job.filters)) + ')')
else:
filter_lines.append(
'(' + ' OR '.join(['(' + val + ')' for val in sorted(job.filters)]) +
')')
filter_str = '\n'.join(filter_lines)
logging.debug('searching logs in project %s (resource type: %s)',
job.project_id, job.resource_type)
# Fetch all logs and put the results in temporary storage (diskcache.Deque)
deque = caching.get_tmp_deque('tmp-logs-')
req = logging_api.entries().list(
body={
'resourceNames': [f'projects/{job.project_id}'],
'filter': filter_str,
'orderBy': 'timestamp desc',
'pageSize': config.get('logging_page_size')
})
fetched_entries_count = 0
query_pages = 0
query_start_time = datetime.datetime.now()
while req is not None:
query_pages += 1
res = _ratelimited_execute(req)
if 'entries' in res:
for e in res['entries']:
fetched_entries_count += 1
deque.appendleft(e)
# Verify that we aren't above limits, exit otherwise.
if fetched_entries_count > config.get('logging_fetch_max_entries'):
logging.warning(
'maximum number of log entries (%d) reached (project: %s, query: %s).',
config.get('logging_fetch_max_entries'), job.project_id,
filter_str.replace('\n', ' AND '))
return deque
run_time = (datetime.datetime.now() - query_start_time).total_seconds()
if run_time >= config.get('logging_fetch_max_time_seconds'):
logging.warning(
'maximum query runtime for log query reached (project: %s, query: %s).',
job.project_id, filter_str.replace('\n', ' AND '))
return deque
req = logging_api.entries().list_next(req, res)
if req is not None:
logging.debug(
'still fetching logs (project: %s, resource type: %s, max wait: %ds)',
job.project_id, job.resource_type,
config.get('logging_fetch_max_time_seconds') - run_time)
query_end_time = datetime.datetime.now()
logging.debug('logging query run time: %s, pages: %d, query: %s',
query_end_time - query_start_time, query_pages,
filter_str.replace('\n', ' AND '))
return deque