in plugins/sensors/s3_metadata_sensor.py [0:0]
def get_wildcard_metadata_key(self, wildcard_key, bucket_name, metadata_key, metadata_values, context):
from airflow.hooks.S3_hook import S3Hook
hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
if not bucket_name:
(bucket_name, wildcard_key) = hook.parse_s3_url(wildcard_key)
prefix = re.split(r'[*]', wildcard_key, 1)[0]
klist = hook.list_keys(bucket_name, prefix=prefix)
if klist:
key_matches = [k for k in klist if
fnmatch.fnmatch(k, wildcard_key) and not self.has_key_tags(k, bucket_name, metadata_key,
metadata_values)]
if key_matches:
key = key_matches[0]
print(f"Pushing key {key} to key 'filename_s3_key'.")
context['ti'].xcom_push(key="filename_s3_bucket", value=bucket_name)
context['ti'].xcom_push(key="filename_s3_key", value=key)
return hook.get_key(key, bucket_name)