awswrangler/cloudwatch.py (230 lines of code) (raw):
"""CloudWatch Logs module."""
from __future__ import annotations
import datetime
import logging
import time
from typing import Any, Dict, List, cast
import boto3
import awswrangler.pandas as pd
from awswrangler import _utils, exceptions
from awswrangler._config import apply_configs
_logger: logging.Logger = logging.getLogger(__name__)
_QUERY_WAIT_POLLING_DELAY: float = 1.0 # SECONDS
def _validate_args(
start_timestamp: int,
end_timestamp: int,
) -> None:
if start_timestamp < 0:
raise exceptions.InvalidArgument("`start_time` cannot be a negative value.")
if start_timestamp >= end_timestamp:
raise exceptions.InvalidArgumentCombination("`start_time` must be inferior to `end_time`.")
def start_query(
query: str,
log_group_names: list[str],
start_time: datetime.datetime | None = None,
end_time: datetime.datetime | None = None,
limit: int | None = None,
boto3_session: boto3.Session | None = None,
) -> str:
"""Run a query against AWS CloudWatchLogs Insights.
https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/CWL_QuerySyntax.html
Parameters
----------
query
The query string.
log_group_names
The list of log group names or ARNs to be queried. You can include up to 50 log groups.
start_time
The beginning of the time range to query.
end_time
The end of the time range to query.
limit
The maximum number of log events to return in the query.
boto3_session
The default boto3 session will be used if **boto3_session** is ``None``.
Returns
-------
Query ID.
Examples
--------
>>> import awswrangler as wr
>>> query_id = wr.cloudwatch.start_query(
... log_group_names=["loggroup"],
... query="fields @timestamp, @message | sort @timestamp desc | limit 5",
... )
"""
_logger.debug("log_group_names: %s", log_group_names)
start_time = (
start_time if start_time else datetime.datetime(year=1970, month=1, day=1, tzinfo=datetime.timezone.utc)
)
end_time = end_time if end_time else datetime.datetime.utcnow()
start_timestamp: int = int(1000 * start_time.timestamp())
end_timestamp: int = int(1000 * end_time.timestamp())
_logger.debug("start_timestamp: %s", start_timestamp)
_logger.debug("end_timestamp: %s", end_timestamp)
_validate_args(
start_timestamp=start_timestamp,
end_timestamp=end_timestamp,
)
args: dict[str, Any] = {
"logGroupIdentifiers": log_group_names,
"startTime": start_timestamp,
"endTime": end_timestamp,
"queryString": query,
}
if limit is not None:
args["limit"] = limit
client_logs = _utils.client(service_name="logs", session=boto3_session)
response = client_logs.start_query(**args)
return response["queryId"]
@apply_configs
def wait_query(
query_id: str,
boto3_session: boto3.Session | None = None,
cloudwatch_query_wait_polling_delay: float = _QUERY_WAIT_POLLING_DELAY,
) -> dict[str, Any]:
"""Wait query ends.
https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/CWL_QuerySyntax.html
Parameters
----------
query_id
Query ID.
boto3_session
The default boto3 session will be used if **boto3_session** is ``None``.
cloudwatch_query_wait_polling_delay
Interval in seconds for how often the function will check if the CloudWatch query has completed.
Returns
-------
Query result payload.
Examples
--------
>>> import awswrangler as wr
>>> query_id = wr.cloudwatch.start_query(
... log_group_names=["loggroup"],
... query="fields @timestamp, @message | sort @timestamp desc | limit 5",
... )
... response = wr.cloudwatch.wait_query(query_id=query_id)
"""
final_states: list[str] = ["Complete", "Failed", "Cancelled"]
client_logs = _utils.client(service_name="logs", session=boto3_session)
response = client_logs.get_query_results(queryId=query_id)
status = response["status"]
while status not in final_states:
time.sleep(cloudwatch_query_wait_polling_delay)
response = client_logs.get_query_results(queryId=query_id)
status = response["status"]
_logger.debug("status: %s", status)
if status == "Failed":
raise exceptions.QueryFailed(f"query ID: {query_id}")
if status == "Cancelled":
raise exceptions.QueryCancelled(f"query ID: {query_id}")
return cast(Dict[str, Any], response)
def run_query(
query: str,
log_group_names: list[str],
start_time: datetime.datetime | None = None,
end_time: datetime.datetime | None = None,
limit: int | None = None,
boto3_session: boto3.Session | None = None,
) -> list[list[dict[str, str]]]:
"""Run a query against AWS CloudWatchLogs Insights and wait the results.
https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/CWL_QuerySyntax.html
Parameters
----------
query
The query string.
log_group_names
The list of log group names or ARNs to be queried. You can include up to 50 log groups.
start_time
The beginning of the time range to query.
end_time
The end of the time range to query.
limit
The maximum number of log events to return in the query.
boto3_session
The default boto3 session will be used if **boto3_session** is ``None``.
Returns
-------
Result.
Examples
--------
>>> import awswrangler as wr
>>> result = wr.cloudwatch.run_query(
... log_group_names=["loggroup"],
... query="fields @timestamp, @message | sort @timestamp desc | limit 5",
... )
"""
query_id: str = start_query(
query=query,
log_group_names=log_group_names,
start_time=start_time,
end_time=end_time,
limit=limit,
boto3_session=boto3_session,
)
response: dict[str, Any] = wait_query(query_id=query_id, boto3_session=boto3_session)
return cast(List[List[Dict[str, str]]], response["results"])
def read_logs(
query: str,
log_group_names: list[str],
start_time: datetime.datetime | None = None,
end_time: datetime.datetime | None = None,
limit: int | None = None,
boto3_session: boto3.Session | None = None,
) -> pd.DataFrame:
"""Run a query against AWS CloudWatchLogs Insights and convert the results to Pandas DataFrame.
https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/CWL_QuerySyntax.html
Parameters
----------
query:
The query string.
log_group_names
The list of log group names or ARNs to be queried. You can include up to 50 log groups.
start_time
The beginning of the time range to query.
end_time
The end of the time range to query.
limit
The maximum number of log events to return in the query.
boto3_session
The default boto3 session will be used if **boto3_session** is ``None``.
Returns
-------
Result as a Pandas DataFrame.
Examples
--------
>>> import awswrangler as wr
>>> df = wr.cloudwatch.read_logs(
... log_group_names=["loggroup"],
... query="fields @timestamp, @message | sort @timestamp desc | limit 5",
... )
"""
results: list[list[dict[str, str]]] = run_query(
query=query,
log_group_names=log_group_names,
start_time=start_time,
end_time=end_time,
limit=limit,
boto3_session=boto3_session,
)
pre_df: list[dict[str, str]] = []
for row in results:
new_row: dict[str, str] = {}
for col in row:
if col["field"].startswith("@"):
col_name = col["field"].replace("@", "", 1)
else:
col_name = col["field"]
new_row[col_name] = col["value"]
pre_df.append(new_row)
df: pd.DataFrame = pd.DataFrame(pre_df, dtype="string")
if "timestamp" in df.columns:
df["timestamp"] = pd.to_datetime(df["timestamp"])
return df
def describe_log_streams(
log_group_name: str,
log_stream_name_prefix: str | None = None,
order_by: str | None = "LogStreamName",
descending: bool | None = False,
limit: int | None = 50,
boto3_session: boto3.Session | None = None,
) -> pd.DataFrame:
"""List the log streams for the specified log group, return results as a Pandas DataFrame.
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/logs.html#CloudWatchLogs.Client.describe_log_streams
Parameters
----------
log_group_name
The name of the log group.
log_stream_name_prefix
The prefix to match log streams' name
order_by
If the value is LogStreamName , the results are ordered by log stream name.
If the value is LastEventTime , the results are ordered by the event time.
The default value is LogStreamName .
descending
If the value is True, results are returned in descending order.
If the value is to False, results are returned in ascending order.
The default value is False.
limit
The maximum number of items returned. The default is up to 50 items.
boto3_session
The default boto3 session will be used if **boto3_session** is ``None``.
Returns
-------
Result as a Pandas DataFrame.
Examples
--------
>>> import awswrangler as wr
>>> df = wr.cloudwatch.describe_log_streams(
... log_group_name="aws_sdk_pandas_log_group",
... log_stream_name_prefix="aws_sdk_pandas_log_stream",
... )
"""
client_logs = _utils.client(service_name="logs", session=boto3_session)
args: dict[str, Any] = {
"logGroupName": log_group_name,
"descending": descending,
"orderBy": order_by,
"limit": limit,
}
if log_stream_name_prefix and order_by == "LogStreamName":
args["logStreamNamePrefix"] = log_stream_name_prefix
elif log_stream_name_prefix and order_by == "LastEventTime":
raise exceptions.InvalidArgumentCombination(
"Cannot call describe_log_streams with both `log_stream_name_prefix` and order_by equal 'LastEventTime'"
)
log_streams: list[dict[str, Any]] = []
response = client_logs.describe_log_streams(**args)
log_streams += cast(List[Dict[str, Any]], response["logStreams"])
while "nextToken" in response:
response = client_logs.describe_log_streams(
**args,
nextToken=response["nextToken"],
)
log_streams += cast(List[Dict[str, Any]], response["logStreams"])
if log_streams:
df: pd.DataFrame = pd.DataFrame(log_streams)
df["logGroupName"] = log_group_name
return df
return pd.DataFrame()
def _filter_log_events(
log_group_name: str,
log_stream_names: list[str],
start_timestamp: int | None = None,
end_timestamp: int | None = None,
filter_pattern: str | None = None,
limit: int | None = 10000,
boto3_session: boto3.Session | None = None,
) -> list[dict[str, Any]]:
client_logs = _utils.client(service_name="logs", session=boto3_session)
events: list[dict[str, Any]] = []
args: dict[str, Any] = {
"logGroupName": log_group_name,
"logStreamNames": log_stream_names,
"limit": limit,
}
if start_timestamp:
args["startTime"] = start_timestamp
if end_timestamp:
args["endTime"] = end_timestamp
if filter_pattern:
args["filterPattern"] = filter_pattern
response = client_logs.filter_log_events(**args)
events += cast(List[Dict[str, Any]], response["events"])
while "nextToken" in response:
response = client_logs.filter_log_events(
**args,
nextToken=response["nextToken"],
)
events += cast(List[Dict[str, Any]], response["events"])
return events
def filter_log_events(
log_group_name: str,
log_stream_name_prefix: str | None = None,
log_stream_names: list[str] | None = None,
filter_pattern: str | None = None,
start_time: datetime.datetime | None = None,
end_time: datetime.datetime | None = None,
boto3_session: boto3.Session | None = None,
) -> pd.DataFrame:
"""List log events from the specified log group. The results are returned as Pandas DataFrame.
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/logs.html#CloudWatchLogs.Client.filter_log_events
Note
----
Cannot call ``filter_log_events`` with both ``log_stream_names`` and ``log_stream_name_prefix``.
Parameters
----------
log_group_name
The name of the log group.
log_stream_name_prefix
Filters the results to include only events from log streams that have names starting with this prefix.
log_stream_names
Filters the results to only logs from the log streams in this list.
filter_pattern
The filter pattern to use. If not provided, all the events are matched.
start_time
Events with a timestamp before this time are not returned.
end_time
Events with a timestamp later than this time are not returned.
boto3_session
The default boto3 session will be used if **boto3_session** is ``None``.
Returns
-------
Result as a Pandas DataFrame.
Examples
--------
Get all log events from log group 'aws_sdk_pandas_log_group' that have log stream prefix 'aws_sdk_pandas_log_stream'
>>> import awswrangler as wr
>>> df = wr.cloudwatch.filter_log_events(
... log_group_name="aws_sdk_pandas_log_group",
... log_stream_name_prefix="aws_sdk_pandas_log_stream",
... )
Get all log events contains 'REPORT' from log stream
'aws_sdk_pandas_log_stream_one' and 'aws_sdk_pandas_log_stream_two'
from log group 'aws_sdk_pandas_log_group'
>>> import awswrangler as wr
>>> df = wr.cloudwatch.filter_log_events(
... log_group_name="aws_sdk_pandas_log_group",
... log_stream_names=["aws_sdk_pandas_log_stream_one","aws_sdk_pandas_log_stream_two"],
... filter_pattern="REPORT",
... )
"""
if log_stream_name_prefix and log_stream_names:
raise exceptions.InvalidArgumentCombination(
"Cannot call `filter_log_events` with both `log_stream_names` and `log_stream_name_prefix`"
)
_logger.debug("log_group_name: %s", log_group_name)
events: list[dict[str, Any]] = []
if not log_stream_names:
describe_log_streams_args: dict[str, Any] = {
"log_group_name": log_group_name,
}
if boto3_session:
describe_log_streams_args["boto3_session"] = boto3_session
if log_stream_name_prefix:
describe_log_streams_args["log_stream_name_prefix"] = log_stream_name_prefix
log_streams = describe_log_streams(**describe_log_streams_args)
log_stream_names = log_streams["logStreamName"].tolist() if len(log_streams.index) else []
args: dict[str, Any] = {
"log_group_name": log_group_name,
}
if start_time:
args["start_timestamp"] = int(1000 * start_time.timestamp())
if end_time:
args["end_timestamp"] = int(1000 * end_time.timestamp())
if filter_pattern:
args["filter_pattern"] = filter_pattern
if boto3_session:
args["boto3_session"] = boto3_session
chunked_log_streams_size: int = 50
for i in range(0, len(log_stream_names), chunked_log_streams_size):
log_streams = log_stream_names[i : i + chunked_log_streams_size]
events += _filter_log_events(**args, log_stream_names=log_streams)
if events:
df: pd.DataFrame = pd.DataFrame(events)
df["logGroupName"] = log_group_name
return df
return pd.DataFrame()