def on_event()

in pylib/vcsreplicator/vcsreplicator/snsnotifier.py [0:0]


def on_event(config, message_type, partition, message, created, data):
    """Called when a replication message should be handled."""
    repo_url = data["repo_url"]

    logger.warn(
        "processing message %d: %s for %s" % (message.offset, message_type, repo_url)
    )

    c = config.c

    if c.has_option("awsevents", "s3_endpoint_url"):
        s3_endpoint_url = config.get("awsevents", "s3_endpoint_url")
    else:
        s3_endpoint_url = None

    if c.has_option("awsevents", "sns_endpoint_url"):
        sns_endpoint_url = config.get("awsevents", "sns_endpoint_url")
    else:
        sns_endpoint_url = None

    access_key_id = config.get("awsevents", "access_key_id")
    secret_access_key = config.get("awsevents", "secret_access_key")
    region = config.get("awsevents", "region")
    topic_arn = config.get("awsevents", "topic_arn")
    bucket = config.get("awsevents", "bucket")

    session = boto3.Session(
        aws_access_key_id=access_key_id,
        aws_secret_access_key=secret_access_key,
        region_name=region,
    )

    s3 = session.client("s3", endpoint_url=s3_endpoint_url)
    sns = session.client("sns", endpoint_url=sns_endpoint_url)

    # We upload the event to S3 for later reference. To prevent multiple
    # copies of the same object and to ensure decent ordering, we put
    # the Kafka partition offset and the original message data in the
    # key name.
    dt = datetime.datetime.utcfromtimestamp(created)
    key = "events/%s/%010d-%s.json" % (
        dt.date().isoformat(),
        message.offset,
        dt.strftime("%Y%m%dT%H%M%S"),
    )

    s3_data = json.dumps(
        {
            "created": created,
            "id": message.offset,
            "type": message_type,
            "data": data,
        },
        sort_keys=True,
    )

    s3_url = "%s/%s/%s" % (s3.meta.endpoint_url, bucket, key)

    sns_data = json.dumps(
        {
            "type": message_type,
            "data": data,
            "data_url": s3_url,
        },
        sort_keys=True,
    )

    # SNS has a message size limit of 256 KB (262,144 bytes). If our message
    # is too large for SNS, set a key indicating data is external and only
    # available in S3.
    if len(sns_data) > 260000:
        logger.warn("message too large for SNS; dropping payload")
        sns_data = json.dumps(
            {
                "type": message_type,
                "data_url": s3_url,
                "external": True,
                "repo_url": repo_url,
            },
            sort_keys=True,
        )

    logger.warn("uploading to S3: %s" % s3_url)
    s3.put_object(Bucket=bucket, Key=key, Body=s3_data, ContentType="application/json")

    logger.warn("sending SNS notification to %s" % topic_arn)
    sns.publish(
        TopicArn=topic_arn,
        Message=sns_data,
    )

    logger.warn("finished processing message %d" % message.offset)