in plugins/sensors/s3_metadata_sensor.py [0:0]
def __init__(self,
bucket_key,
bucket_name=None,
metadata_key: str = '',
metadata_values: List[str] = '',
wildcard_match=False,
aws_conn_id='aws_default',
verify=None,
*args,
**kwargs):
super(S3MetadataSensor, self).__init__(*args, **kwargs)
# Parse
if bucket_name is None:
parsed_url = urlparse(bucket_key)
if parsed_url.netloc == '':
raise AirflowException('Please provide a bucket_name')
else:
bucket_name = parsed_url.netloc
if parsed_url.path[0] == '/':
bucket_key = parsed_url.path[1:]
else:
bucket_key = parsed_url.path
self.bucket_name = bucket_name
self.bucket_key = bucket_key
self.wildcard_match = wildcard_match
self.aws_conn_id = aws_conn_id
self.verify = verify
self.metadata_key = metadata_key
self.metadata_values = metadata_values