def get_stream_name_by_prefix()

in aws_codeseeder/services/cloudwatch.py [0:0]


def get_stream_name_by_prefix(group_name: str, prefix: str) -> Optional[str]:
    """Get the CloudWatch Logs stream name

    Parameters
    ----------
    group_name : str
        Name of the CloudWatch Logs group
    prefix : str
        Naming prefix of the CloudWatch Logs Stream

    Returns
    -------
    Optional[str]
        Name of the CloudWatch Logs Stream (if found)
    """
    client = boto3_client("logs")
    response: Dict[str, Union[str, List[Dict[str, Union[float, str]]]]] = client.describe_log_streams(
        logGroupName=group_name,
        logStreamNamePrefix=prefix,
        orderBy="LogStreamName",
        descending=True,
        limit=1,
    )
    streams = cast(List[Dict[str, Union[float, str]]], response.get("logStreams", []))
    if streams:
        return str(streams[0]["logStreamName"])
    return None