atomresponder/message_schema.py (70 lines of code) (raw):

import jsonschema # This file contains useful constants and interface definitions from # https://github.com/guardian/media-atom-maker/blob/main/common/src/main/scala/com/gu/media/model/PlutoIntegrationData.scala MESSAGE_TYPE_ASSIGNED_PROJECT = "project-assigned" MESSAGE_TYPE_PAC_FILE = "pac-file-upload" MESSAGE_TYPE_VIDEO_UPLOAD = "video-upload" MESSAGE_TYPE_RESYNC = "video-upload-resync" AtomAssignedProjectMessageSchema = { "type": "object", "properties": { "type": {"type": "string"}, "atomId": {"type": "string"}, "commissionId": {"type": "string"}, "projectId": {"type": "string"}, "title": {"type": "string"}, "user": {"type": ["string", "null"]} }, "required": [ "type","atomId","projectId" ] } PacFileMessageSchema = { "type": "object", "properties": { "type": {"type": "string"}, "atomId": {"type": "string"}, "s3Bucket": {"type": "string"}, "s3Path": {"type": "string"}, }, "required": ["type", "atomId", "s3Bucket", "s3Path"] } VideoUploadMessageSchema = { "type": "object", "properties": { "type": {"type": "string"}, "projectId": {"type": ["string","null"]}, "s3Key": {"type": "string"}, "atomId": {"type": "string"}, "title": {"type": "string"}, "user": {"type": "string"}, "posterImageUrl": {"type": ["string", "null"]} }, "required": ["type", "s3Key", "atomId"] } VideoUploadResyncMessageSchema = { "type": "object", "properties": { "type": {"type": "string"}, "projectId": {"type": ["string","null"]}, "s3Key": {"type": "string"}, "atomId": {"type": "string"}, "title": {"type": "string"}, "posterImageUrl": {"type": ["string", "null"]} }, "required": ["type", "s3Key", "atomId"] } def validate_message(raw_message: dict) -> dict: """ validates parsed data as a Media Atom message. :param raw_message: the parsed data to check, as a dictionary :return: a dictionary of validated data. If the data does not validate, then either a jsonschema.ValidationError or a ValueError is raised with a descriptive string. """ if "type" not in raw_message: raise ValueError("Message was missing the \"type\" field") schema = None if raw_message["type"]==MESSAGE_TYPE_ASSIGNED_PROJECT: schema = AtomAssignedProjectMessageSchema elif raw_message["type"]==MESSAGE_TYPE_PAC_FILE: schema = PacFileMessageSchema elif raw_message["type"]==MESSAGE_TYPE_VIDEO_UPLOAD: schema = VideoUploadMessageSchema elif raw_message["type"]==MESSAGE_TYPE_RESYNC: schema = VideoUploadResyncMessageSchema else: raise ValueError("Message had an invalid \"type\" field: \"{}\".".format(raw_message["type"])) jsonschema.validate(raw_message, schema) #raises ValidationError if it fails return raw_message