in pylib/vcsreplicator/vcsreplicator/snsnotifier.py [0:0]
def on_event(config, message_type, partition, message, created, data):
"""Called when a replication message should be handled."""
repo_url = data["repo_url"]
logger.warn(
"processing message %d: %s for %s" % (message.offset, message_type, repo_url)
)
c = config.c
if c.has_option("awsevents", "s3_endpoint_url"):
s3_endpoint_url = config.get("awsevents", "s3_endpoint_url")
else:
s3_endpoint_url = None
if c.has_option("awsevents", "sns_endpoint_url"):
sns_endpoint_url = config.get("awsevents", "sns_endpoint_url")
else:
sns_endpoint_url = None
access_key_id = config.get("awsevents", "access_key_id")
secret_access_key = config.get("awsevents", "secret_access_key")
region = config.get("awsevents", "region")
topic_arn = config.get("awsevents", "topic_arn")
bucket = config.get("awsevents", "bucket")
session = boto3.Session(
aws_access_key_id=access_key_id,
aws_secret_access_key=secret_access_key,
region_name=region,
)
s3 = session.client("s3", endpoint_url=s3_endpoint_url)
sns = session.client("sns", endpoint_url=sns_endpoint_url)
# We upload the event to S3 for later reference. To prevent multiple
# copies of the same object and to ensure decent ordering, we put
# the Kafka partition offset and the original message data in the
# key name.
dt = datetime.datetime.utcfromtimestamp(created)
key = "events/%s/%010d-%s.json" % (
dt.date().isoformat(),
message.offset,
dt.strftime("%Y%m%dT%H%M%S"),
)
s3_data = json.dumps(
{
"created": created,
"id": message.offset,
"type": message_type,
"data": data,
},
sort_keys=True,
)
s3_url = "%s/%s/%s" % (s3.meta.endpoint_url, bucket, key)
sns_data = json.dumps(
{
"type": message_type,
"data": data,
"data_url": s3_url,
},
sort_keys=True,
)
# SNS has a message size limit of 256 KB (262,144 bytes). If our message
# is too large for SNS, set a key indicating data is external and only
# available in S3.
if len(sns_data) > 260000:
logger.warn("message too large for SNS; dropping payload")
sns_data = json.dumps(
{
"type": message_type,
"data_url": s3_url,
"external": True,
"repo_url": repo_url,
},
sort_keys=True,
)
logger.warn("uploading to S3: %s" % s3_url)
s3.put_object(Bucket=bucket, Key=key, Body=s3_data, ContentType="application/json")
logger.warn("sending SNS notification to %s" % topic_arn)
sns.publish(
TopicArn=topic_arn,
Message=sns_data,
)
logger.warn("finished processing message %d" % message.offset)