in apps/cloudwatch-dashboard/layers/lookoutequipment/python/l4ecwcw.py [0:0]
def get_matching_s3_keys(bucket, prefix='', suffix=''):
"""
Generate the keys in an S3 bucket.
Parameters:
bucket (string):
Name of the S3 bucket
prefix (string):
Only fetch keys that start with this prefix (optional)
suffix (string):
Only fetch keys that end with this suffix (optional)
"""
s3 = boto3.client('s3')
kwargs = {'Bucket': bucket}
# If the prefix is a single string (not a tuple of strings), we can
# do the filtering directly in the S3 API.
if isinstance(prefix, str):
kwargs['Prefix'] = prefix
while True:
# The S3 API response is a large blob of metadata.
# 'Contents' contains information about the listed objects.
resp = s3.list_objects_v2(**kwargs)
try:
for obj in resp['Contents']:
key = obj['Key']
if key.startswith(prefix) and key.endswith(suffix):
yield key
except KeyError:
print(f'No object found in s3://{bucket}/{prefix}/')
# The S3 API is paginated, returning up to 1000 keys at a time.
# Pass the continuation token into the next response, until we
# reach the final page (when this field is missing).
try:
kwargs['ContinuationToken'] = resp['NextContinuationToken']
except KeyError:
break