def get_wildcard_metadata_key()

in plugins/sensors/s3_metadata_sensor.py [0:0]


    def get_wildcard_metadata_key(self, wildcard_key, bucket_name, metadata_key, metadata_values, context):
        from airflow.hooks.S3_hook import S3Hook
        hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)

        if not bucket_name:
            (bucket_name, wildcard_key) = hook.parse_s3_url(wildcard_key)

        prefix = re.split(r'[*]', wildcard_key, 1)[0]
        klist = hook.list_keys(bucket_name, prefix=prefix)
        if klist:
            key_matches = [k for k in klist if
                           fnmatch.fnmatch(k, wildcard_key) and not self.has_key_tags(k, bucket_name, metadata_key,
                                                                                          metadata_values)]
            if key_matches:
                key = key_matches[0]
                print(f"Pushing key {key} to key 'filename_s3_key'.")
                context['ti'].xcom_push(key="filename_s3_bucket", value=bucket_name)
                context['ti'].xcom_push(key="filename_s3_key", value=key)
                return hook.get_key(key, bucket_name)