rabbitmq/media_atom.py (45 lines of code) (raw):
import logging
from .models import ProjectModel, CachedCommission
logger = logging.getLogger(__name__)
MSG_PROJECT_CREATED = 'project-created'
MSG_PROJECT_UPDATED = 'project-updated'
def update_kinesis(project_instance: ProjectModel, commission_instance: CachedCommission, message_type):
"""
notifies media atom of a project update or create by pushing a message onto its kinesis stream.
the kinesis stream is indicated in settings.
:param project_instance: model instance describing the project that has been created/updated
:param commission_instance: model instance describing the commission that it belongs to
:param message_type: either `media_atom.MSG_PROJECT_CREATED` or `media_atom.MSG_PROJECT_UPDATED`
:return:
"""
from boto import sts, kinesis
from django.conf import settings
import json
SESSION_NAME = 'pluto-media-atom-integration'
project_id = str(project_instance.id)
logger.info("{0}: Project updated, notifying {1} via role {2}".format(project_id,
settings.OUTGOING_KINESIS_STREAM,
settings.MEDIA_ATOM_ROLE_ARN
)
)
sts_connection = sts.STSConnection(
aws_access_key_id=settings.MEDIA_ATOM_AWS_ACCESS_KEY_ID,
aws_secret_access_key=settings.MEDIA_ATOM_AWS_SECRET_ACCESS_KEY)
assume_role_result = sts_connection.assume_role(
role_arn=settings.MEDIA_ATOM_ROLE_ARN,
role_session_name=SESSION_NAME)
credentials = assume_role_result.credentials
logger.debug("{0}: Got kinesis credentials".format(project_id))
kinesis_connection = kinesis.connect_to_region(
region_name='eu-west-1',
aws_access_key_id=credentials.access_key,
aws_secret_access_key=credentials.secret_key,
security_token=credentials.session_token)
message_content = {
'type': message_type,
'id': str(project_id),
'title': project_instance.title,
'status': project_instance.status,
'commissionId': str(project_instance.commissionId),
'commissionTitle': commission_instance.title,
'productionOffice': project_instance.productionOffice,
'created': project_instance.created.isoformat()
}
logger.debug("{0}: Message is {1}".format(project_id, message_content))
kinesis_connection.put_record(
stream_name=settings.OUTGOING_KINESIS_STREAM,
data=json.dumps(message_content),
partition_key=project_id)
logger.info("{0}: Project update sent".format(project_id))