def process()

in atomresponder/master_importer.py [0:0]


    def process(self,record, approx_arrival, attempt=0):
        """
        Process a message from the kinesis stream.  Each record is a JSON document which contains keys for atomId, s3Key,
        projectId.  This will find an item with the given atom ID or create a new one, get a signed download URL from
        S3 for the media and then instruct Vidsipine to import it.
        Rather than wait for the job to complete here, we return immediately and rely on receiving a message from VS
        when the job terminates.
        :param record: JSON document in the form of a string
        :param approx_arrival:
        :param attempt: optional integer showing how many times this has been retried
        :return:
        """
        from .media_atom import request_atom_resend, HttpError
        content = json.loads(record)

        try:
            message_schema.validate_message(content)
        except ValueError as e:
            logger.error("Incoming message \'{}\' was not valid: {}".format(record, e))
            return
        except jsonschema.ValidationError as e:
            logger.error("Incoming message \'{}\' was not valid: {}".format(record, e))
            return

        logger.info("Valid message of type {}: {}".format(content["type"],content))

        #We get two types of message on the stream, one for incoming xml the other for incoming media.
        if content['type'] == const.MESSAGE_TYPE_MEDIA or content['type'] == const.MESSAGE_TYPE_RESYNC_MEDIA:
            if 'user' in content:
                atom_user = content['user']
            else:
                atom_user = None
            if "projectId" in content and content["projectId"] is not None:
                project_id = content["projectId"]
            else:
                project_id = None

            master_item, created = self.get_or_create_master_item(content['atomId'],
                                                                  title=content['title'],
                                                                  filename=content['s3Key'],
                                                                  project_id=project_id,
                                                                  user=atom_user)

            return self.import_new_item(master_item, content)
        elif content['type'] == const.MESSAGE_TYPE_PAC:
            logger.info("Got PAC form data message")
            record = self.register_pac_xml(content)
            self.ingest_pac_xml(record)
            logger.info("PAC form data message complete")
        elif content['type'] == const.MESSAGE_TYPE_PROJECT_ASSIGNED:
            logger.info("Got project (re-)assignment message: {0}".format(content))

            master_item = self.get_item_for_atomid(content['atomId'])
            if master_item is not None:
                logger.info("Master item for atom already exists at {0}, assigning".format(master_item.name))
                self.update_pluto_record(master_item.name, None, content, None)
            else:
                logger.warning("No master item exists for atom {0}.  Requesting a re-send from media atom tool".format(content['atomId']))
                try:
                    request_atom_resend(content['atomId'], settings.ATOM_TOOL_HOST, settings.ATOM_TOOL_SECRET)
                except HttpError as e:
                    if e.code == 404:
                        if attempt >= 10:
                            logger.error("{0}: still nothing after 10 attempts. Giving up.".format(content['atomId']))
                            raise
                        logger.warning("{0}: Media atom tool responded with a 404 on attempt {1}: {2}. Retry is NOT YET IMPLEMENTED.".format(content['atomId'], attempt, e.content))
                        # timed_request_resend.apply_async(args=(record, approx_arrival),
                        #                                  kwargs={'attempt': attempt+1},
                        #                                  countdown=60)
                    else:
                        logger.exception("{0}: Could not request resync".format(content['atomId']))

            logger.info("Project (re-)assignment complete")
        else:
            raise ValueError("Unrecognised message type: {0}".format(content['type']))